-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtrending_movies.py
133 lines (115 loc) · 4.42 KB
/
trending_movies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Add the top 10 trending movies to a channel on dizqueTV
Refreshes the channel (removes all existing programs, re-adds new items)
"""
from typing import List, Union
import argparse
import requests
from plexapi import server, video
from dizqueTV import API
# COMPLETE THESE SETTINGS
DIZQUETV_URL = "http://localhost:8000"
PLEX_URL = "http://localhost:32400"
PLEX_TOKEN = "thisisaplextoken"
# Make a Trakt application: https://trakt.tv/oauth/applications
# Use redirect URI: urn:ietf:wg:oauth:2.0:oob
TRAKT_CLIENT_ID = "traktclientid"
TRAKT_CLIENT_SECRET = "traktclientsecret"
parser = argparse.ArgumentParser()
parser.add_argument('-n',
'--channel_name',
type=str,
default="Trending Movies",
help="Name of channel to create/edit")
parser.add_argument('-c', '--channel_number',
nargs='?',
type=int,
default=None,
help="dizqueTV channel to add playlist to.")
args = parser.parse_args()
class TraktConnection:
def __init__(self):
pass
def request(self, endpoint: str) -> dict:
headers = {
'Content-Type': 'application/json',
'trakt-api-version': '2',
'trakt-api-key': TRAKT_CLIENT_ID,
'x-pagination-page': '1',
'x-pagination-limit': '20'
}
res = requests.get(f'https://api.trakt.tv{endpoint}', headers=headers)
if res:
return res.json()
return {}
def get_trending_movies(self):
data = self.request(endpoint='/movies/trending')
items = []
for entry in data:
items.append(entry.get('movie'))
return items
def get_trending_shows(self):
data = self.request(endpoint='/shows/trending')
items = []
for entry in data:
items.append(entry.get('show'))
return items
class Plex:
def __init__(self, url, token):
self.url = url
self.token = token
self.server = server.PlexServer(url, token)
def get_plex_movie(self, movie_name: str, year: int = None, section_name = None) -> Union[video.Movie, None]:
search_kwargs = {
'title': movie_name
}
if year:
search_kwargs['year'] = year
if section_name:
results = self.server.library.section(section_name).search(**search_kwargs)
else:
results = self.server.library.search(**search_kwargs)
for result in results:
if type(result) == video.Movie and result.title == movie_name:
return result
return None
dtv = API(url=DIZQUETV_URL)
plex = Plex(url=PLEX_URL,
token=PLEX_TOKEN)
trakt = TraktConnection()
this_channel = None
# try to get channel by number first
if args.channel_number:
this_channel = dtv.get_channel(channel_number=args.channel_number)
# if failed, get channel by name
if not this_channel:
for channel in dtv.channels:
if channel.name == args.channel_name:
this_channel = channel
# if still failed, make new channel (try with number, but handle error if can't)
if this_channel:
print(f"Found and using {this_channel.name} on dizqueTV. This channel will be reset.")
if not this_channel:
if not args.channel_number:
args.channel_number = max(dtv.channel_numbers) + 1
print(f"Could not find that channel. Creating it on dizqueTV...")
this_channel = dtv.add_channel(name=args.channel_name, programs=[], number=args.channel_number, handle_errors=True)
# if still no channel, exit
if not this_channel:
print("Could not create that channel. Exiting...")
exit(1)
movies_to_add = []
trending_movies = trakt.get_trending_movies()
for trakt_movie in trending_movies:
print(f"Searching for {trakt_movie.get('title')} on Plex...")
matching_plex_movie = plex.get_plex_movie(movie_name=trakt_movie.get('title'), year=trakt_movie.get('year'))
if not matching_plex_movie:
print(f"Could not find {trakt_movie.get('title')} on Plex.")
else:
print(f"Found {matching_plex_movie.title} on Plex. Adding to dizqueTV...")
dizquetv_movie = dtv.convert_plex_item_to_program(plex_item=matching_plex_movie, plex_server=plex.server)
if dizquetv_movie:
movies_to_add.append(dizquetv_movie)
if this_channel.delete_all_programs():
if this_channel.add_programs(programs=movies_to_add):
print("Complete!")