diff --git a/vod_recovery.py b/vod_recovery.py index bac2c50..d80e8f9 100644 --- a/vod_recovery.py +++ b/vod_recovery.py @@ -7,7 +7,7 @@ import subprocess import tkinter as tk import sys -from time import time +from time import time, sleep from shutil import rmtree, copyfileobj from datetime import datetime, timedelta from tkinter import filedialog @@ -24,7 +24,7 @@ import ffmpeg_downloader as ffdl -CURRENT_VERSION = "1.2.15" +CURRENT_VERSION = "1.3.0" SUPPORTED_FORMATS = [".mp4", ".mkv", ".mov", ".avi", ".ts"] @@ -32,14 +32,14 @@ def read_config_by_key(config_file, key): script_dir = os.path.dirname(os.path.realpath(__file__)) config_path = os.path.join(script_dir, "config", f"{config_file}.json") - with open(config_path, 'r', encoding="utf-8") as input_config_file: + with open(config_path, "r", encoding="utf-8") as input_config_file: config = json.load(input_config_file) return config.get(key, None) def get_default_video_format(): - default_video_format = read_config_by_key('settings', 'DEFAULT_VIDEO_FORMAT') + default_video_format = read_config_by_key("settings", "DEFAULT_VIDEO_FORMAT") if default_video_format in SUPPORTED_FORMATS: return default_video_format @@ -47,12 +47,36 @@ def get_default_video_format(): def get_default_directory(): - default_directory = read_config_by_key('settings', 'DEFAULT_DIRECTORY') - if os.name == 'nt' and default_directory: + default_directory = read_config_by_key("settings", "DEFAULT_DIRECTORY") + + if not os.path.exists(default_directory): + default_directory = "~/Downloads/" + + if os.name == "nt" and default_directory: default_directory = default_directory.replace("/", "\\") return os.path.expanduser(default_directory) +def get_default_downloader(): + try: + default_downloader = read_config_by_key("settings", "DEFAULT_DOWNLOADER") + if default_downloader in ["ffmpeg", "yt-dlp"]: + return default_downloader + return "ffmpeg" + except Exception: + return "ffmpeg" + + +def get_yt_dlp_custom_options(): + try: + custom_options = read_config_by_key("settings", "YT_DLP_OPTIONS") + if custom_options: + return custom_options.split() + return [] + except Exception: + return [] + + def print_main_menu(): default_video_format = get_default_video_format() or "mp4" menu_options = [ @@ -61,7 +85,7 @@ def print_main_menu(): f"3) Download VOD ({default_video_format.lstrip('.')})", "4) Unmute & Check M3U8 Availability", "5) Options", - "6) Exit" + "6) Exit", ] while True: print("\n".join(menu_options)) @@ -73,12 +97,13 @@ def print_main_menu(): except ValueError: print("\n✖ Invalid option! Please try again:\n") + def print_video_mode_menu(): vod_type_options = [ "1) Website Video Recovery", "2) Manual Recovery", "3) Bulk Video Recovery from SullyGnome CSV Export", - "4) Return" + "4) Return", ] while True: print("\n".join(vod_type_options)) @@ -95,7 +120,7 @@ def print_video_recovery_menu(): vod_recovery_options = [ "1) Website Video Recovery", "2) Manual Recovery", - "3) Return" + "3) Return", ] while True: print("\n".join(vod_recovery_options)) @@ -109,8 +134,13 @@ def print_video_recovery_menu(): def print_clip_type_menu(): - clip_type_options = ["1) Recover All Clips from a VOD", "2) Find Random Clips from a VOD", - "3) Download Clip from Twitch URL", "4) Bulk Recover Clips from SullyGnome CSV Export", "5) Return"] + clip_type_options = [ + "1) Recover All Clips from a VOD", + "2) Find Random Clips from a VOD", + "3) Download Clip from Twitch URL", + "4) Bulk Recover Clips from SullyGnome CSV Export", + "5) Return", + ] while True: print("\n".join(clip_type_options)) try: @@ -123,7 +153,11 @@ def print_clip_type_menu(): def print_clip_recovery_menu(): - clip_recovery_options = ["1) Website Clip Recovery", "2) Manual Clip Recovery", "3) Return"] + clip_recovery_options = [ + "1) Website Clip Recovery", + "2) Manual Clip Recovery", + "3) Return", + ] while True: print("\n".join(clip_recovery_options)) try: @@ -139,7 +173,7 @@ def print_bulk_clip_recovery_menu(): bulk_clip_recovery_options = [ "1) Single CSV File", "2) Multiple CSV Files", - "3) Return" + "3) Return", ] while True: print("\n".join(bulk_clip_recovery_options)) @@ -157,7 +191,7 @@ def print_clip_format_menu(): "1) Default Format ([VodID]-offset-[interval])", "2) Alternate Format (vod-[VodID]-offset-[interval])", "3) Legacy Format ([VodID]-index-[interval])", - "4) Return" + "4) Return", ] print() while True: @@ -179,7 +213,7 @@ def print_download_type_menu(): "1) From M3U8 Link", "2) From M3U8 File", "3) From Twitch URL (Only for VODs or Highlights still up on Twitch)", - "4) Return" + "4) Return", ] while True: print("\n".join(download_type_options)) @@ -196,7 +230,7 @@ def print_handle_m3u8_availability_menu(): handle_m3u8_availability_options = [ "1) Check if M3U8 file has muted segments", "2) Unmute & Remove invalid segments", - "3) Return" + "3) Return", ] while True: print("\n".join(handle_m3u8_availability_options)) @@ -210,14 +244,14 @@ def print_handle_m3u8_availability_menu(): def print_options_menu(): - options_menu = [ f"1) Set Default Video Format \033[94m({get_default_video_format().lstrip('.') or 'mp4'})\033[0m", f"2) Set Download Directory \033[94m({get_default_directory() or '~/Downloads/'})\033[0m", - "3) Check for Updates", - "4) Open settings.json File", - "5) Help", - "6) Return" + f"3) Set Default Downloader \033[94m({read_config_by_key('settings', 'DEFAULT_DOWNLOADER') or 'ffmpeg'})\033[0m", + "4) Check for Updates", + "5) Open settings.json File", + "6) Help", + "7) Return", ] while True: print("\n".join(options_menu)) @@ -231,7 +265,7 @@ def print_options_menu(): def print_get_m3u8_link_menu(): - m3u8_url = input("Enter M3U8 Link: ").strip(' "\'') + m3u8_url = input("Enter M3U8 Link: ").strip(" \"'") if m3u8_url.endswith(".m3u8"): return m3u8_url print("\n✖ Invalid M3U8 link! Please try again:\n") @@ -241,14 +275,14 @@ def print_get_m3u8_link_menu(): def get_websites_tracker_url(): while True: tracker_url = input("Enter Twitchtracker/Streamscharts/Sullygnome url: ").strip() - if re.match(r'^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com)\/.*', tracker_url): + if re.match(r"^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com)\/.*", tracker_url): return tracker_url print("\n✖ Invalid URL! Please enter a URL from Twitchtracker, Streamscharts, or Sullygnome.\n") def print_get_twitch_url_menu(): - twitch_url = input("Enter Twitch URL: ").strip(' "\'') + twitch_url = input("Enter Twitch URL: ").strip(" \"'") if "twitch.tv" in twitch_url: return twitch_url print("\n✖ Invalid Twitch URL! Please try again:\n") @@ -259,7 +293,7 @@ def get_twitch_or_tracker_url(): while True: url = input("Enter Twitchtracker/Streamscharts/Sullygnome/Twitch URL: ").strip() - if re.match(r'^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com|twitch\.tv)\/.*', url): + if re.match(r"^(https?:\/\/)?(www\.)?(twitchtracker\.com|streamscharts\.com|sullygnome\.com|twitch\.tv)\/.*", url): return url print("\n✖ Invalid URL! Please enter a URL from Twitchtracker, Streamscharts, Sullygnome, or Twitch.\n") @@ -267,7 +301,7 @@ def get_twitch_or_tracker_url(): def get_latest_version(): try: - res = requests.get("https://api.github.com/repos/MacielG1/VodRecovery/releases/latest", timeout=15) + res = requests.get("https://api.github.com/repos/MacielG1/VodRecovery/releases/latest", timeout=30) if res.status_code == 200: release_info = res.json() return release_info["tag_name"] @@ -294,46 +328,49 @@ def check_for_updates(): def sanitize_filename(filename, restricted=False): - - if filename == '': - return '' + if filename == "": + return "" def replace_insane(char): - if not restricted and char == '\n': - return '\0 ' + if not restricted and char == "\n": + return "\0 " elif not restricted and char in '"*:<>?|/\\': - return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0)) - elif char == '?' or ord(char) < 32 or ord(char) == 127: - return '' + return {"/": "\u29f8", "\\": "\u29f9"}.get(char, chr(ord(char) + 0xFEE0)) + elif char == "?" or ord(char) < 32 or ord(char) == 127: + return "" elif char == '"': - return '' if restricted else '\'' - elif char == ':': - return '\0_\0-' if restricted else '\0 \0-' - elif char in '\\/|*<>': - return '\0_' - if restricted and (char in '!&\'()[]{}$;`^,#' or char.isspace() or ord(char) > 127): - return '\0_' + return "" if restricted else "'" + elif char == ":": + return "\0_\0-" if restricted else "\0 \0-" + elif char in "\\/|*<>": + return "\0_" + if restricted and ( + char in "!&'()[]{}$;`^,#" or char.isspace() or ord(char) > 127 + ): + return "\0_" return char if restricted: - filename = normalize('NFKC', filename) - filename = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), filename) - result = ''.join(map(replace_insane, filename)) - result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result) - strip_re = r'(?:\0.|[ _-])*' - result = re.sub(f'^\0.{strip_re}|{strip_re}\0.$', '', result) - result = result.replace('\0', '') or '_' - - while '__' in result: - result = result.replace('__', '_') - result = result.strip('_') - if restricted and result.startswith('-_'): + filename = normalize("NFKC", filename) + filename = re.sub( + r"[0-9]+(?::[0-9]+)+", lambda m: m.group(0).replace(":", "_"), filename + ) + result = "".join(map(replace_insane, filename)) + result = re.sub(r"(\0.)(?:(?=\1)..)+", r"\1", result) + strip_re = r"(?:\0.|[ _-])*" + result = re.sub(f"^\0.{strip_re}|{strip_re}\0.$", "", result) + result = result.replace("\0", "") or "_" + + while "__" in result: + result = result.replace("__", "_") + result = result.strip("_") + if restricted and result.startswith("-_"): result = result[2:] - if result.startswith('-'): - result = '_' + result[len('-'):] - result = result.lstrip('.') + if result.startswith("-"): + result = "_" + result[len("-") :] + result = result.lstrip(".") if not result: - result = '_' + result = "_" return result @@ -346,19 +383,19 @@ def read_config_file(config_file): def open_file(file_path): - if sys.platform.startswith('darwin'): - subprocess.call(('open', file_path)) - elif os.name == 'nt': - subprocess.Popen(['start', file_path], shell=True) - elif os.name == 'posix': - subprocess.call(('xdg-open', file_path)) + if sys.platform.startswith("darwin"): + subprocess.call(("open", file_path)) + elif os.name == "nt": + subprocess.Popen(["start", file_path], shell=True) + elif os.name == "posix": + subprocess.call(("xdg-open", file_path)) else: print(f"\nFile Location: {file_path}") def print_help(): try: - help_data = read_config_file('help') + help_data = read_config_file("help") print("\n--------------- Help Section ---------------") for menu, options in help_data.items(): print(f"\n{menu.replace('_', ' ').title()}:") @@ -379,7 +416,7 @@ def read_text_file(text_file_path): def write_text_file(input_text, destination_path): with open(destination_path, "a+", encoding="utf-8") as text_file: - text_file.write(input_text + '\n') + text_file.write(input_text + "\n") def write_m3u8_to_file(m3u8_link, destination_path): @@ -394,7 +431,7 @@ def read_csv_file(csv_file_path): def get_current_version(): - current_version = read_config_by_key('settings', 'CURRENT_VERSION') + current_version = read_config_by_key("settings", "CURRENT_VERSION") if current_version: return current_version else: @@ -417,16 +454,15 @@ def get_script_directory(): def return_user_agent(): script_dir = os.path.dirname(os.path.abspath(__file__)) - user_agents = read_text_file(os.path.join(script_dir, 'lib', 'user_agents.txt')) - header = { - 'user-agent': random.choice(user_agents) - } + user_agents = read_text_file(os.path.join(script_dir, "lib", "user_agents.txt")) + header = {"user-agent": random.choice(user_agents)} return header def calculate_epoch_timestamp(timestamp, seconds): try: epoch_timestamp = ((datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") + timedelta(seconds=seconds)) - datetime(1970, 1, 1)).total_seconds() + return epoch_timestamp except ValueError: return None @@ -435,7 +471,7 @@ def calculate_epoch_timestamp(timestamp, seconds): def calculate_days_since_broadcast(start_timestamp): if start_timestamp is None: return 0 - vod_age = datetime.today() - datetime.strptime(start_timestamp, '%Y-%m-%d %H:%M:%S') + vod_age = datetime.today() - datetime.strptime(start_timestamp, "%Y-%m-%d %H:%M:%S") return max(vod_age.days, 0) @@ -459,21 +495,23 @@ def parse_streamer_from_csv_filename(csv_filename): def parse_streamer_from_m3u8_link(m3u8_link): - indices = [i.start() for i in re.finditer('_', m3u8_link)] - streamer_name = m3u8_link[indices[0] + 1:indices[-2]] + indices = [i.start() for i in re.finditer("_", m3u8_link)] + streamer_name = m3u8_link[indices[0] + 1 : indices[-2]] return streamer_name def parse_video_id_from_m3u8_link(m3u8_link): - indices = [i.start() for i in re.finditer('_', m3u8_link)] - video_id = m3u8_link[indices[0] + len(parse_streamer_from_m3u8_link(m3u8_link)) + 2:indices[-1]] + indices = [i.start() for i in re.finditer("_", m3u8_link)] + video_id = m3u8_link[ + indices[0] + len(parse_streamer_from_m3u8_link(m3u8_link)) + 2 : indices[-1] + ] return video_id def parse_streamer_and_video_id_from_m3u8_link(m3u8_link): - indices = [i.start() for i in re.finditer('_', m3u8_link)] - streamer_name = m3u8_link[indices[0] + 1:indices[-2]] - video_id = m3u8_link[indices[0] + len(streamer_name) + 2:indices[-1]] + indices = [i.start() for i in re.finditer("_", m3u8_link)] + streamer_name = m3u8_link[indices[0] + 1 : indices[-2]] + video_id = m3u8_link[indices[0] + len(streamer_name) + 2 : indices[-1]] return f" - {streamer_name} [{video_id}]" @@ -522,7 +560,7 @@ def set_default_video_format(): script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") try: - with open(config_file_path, 'r', encoding="utf-8") as config_file: + with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) if not config_data: @@ -531,7 +569,7 @@ def set_default_video_format(): config_data["DEFAULT_VIDEO_FORMAT"] = selected_format - with open(config_file_path, 'w', encoding="utf-8") as config_file: + with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) print(f"\n\033[92m\u2713 Default video format set to: {selected_format.lstrip('.')}\033[0m") @@ -546,10 +584,11 @@ def set_default_video_format(): def set_default_directory(): print("\nSelect the default directory") window = tk.Tk() - window.wm_attributes('-topmost', 1) + window.wm_attributes("-topmost", 1) window.withdraw() - file_path = filedialog.askdirectory(parent=window, initialdir=dir, - title="Select A Default Directory") + file_path = filedialog.askdirectory( + parent=window, initialdir=dir, title="Select A Default Directory" + ) if file_path: if not file_path.endswith("/"): @@ -558,11 +597,11 @@ def set_default_directory(): config_file_path = os.path.join(script_dir, "config", "settings.json") try: - with open(config_file_path, 'r', encoding="utf-8") as config_file: + with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) config_data["DEFAULT_DIRECTORY"] = file_path - with open(config_file_path, 'w', encoding="utf-8") as config_file: + with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) print(f"\n\033[92m\u2713 Default directory set to: {file_path}\033[0m") @@ -575,28 +614,62 @@ def set_default_directory(): window.destroy() +def set_default_downloader(): + # Choose between yt-dlp and ffmpeg + print("\nSelect the default downloader") + DOWNLOADERS = ["ffmpeg", "yt-dlp"] + for i, downloader_option in enumerate(DOWNLOADERS, start=1): + print(f"{i}) {downloader_option.lstrip('.')}") + + user_option = str(input("\nChoose a downloader: ")) + if user_option in [str(i) for i in range(1, len(DOWNLOADERS) + 1)]: + selected_downloader = DOWNLOADERS[int(user_option) - 1] + + if selected_downloader == "yt-dlp": + get_yt_dlp_path() + script_dir = get_script_directory() + config_file_path = os.path.join(script_dir, "config", "settings.json") + try: + with open(config_file_path, "r", encoding="utf-8") as config_file: + config_data = json.load(config_file) + + config_data["DEFAULT_DOWNLOADER"] = selected_downloader + with open(config_file_path, "w", encoding="utf-8") as config_file: + json.dump(config_data, config_file, indent=4) + + print(f"\n\033[92m\u2713 Default downloader set to: {selected_downloader}\033[0m") + + except (FileNotFoundError, json.JSONDecodeError) as error: + print(f"Error: {error}") + else: + print("\n✖ Invalid option! Please try again:\n") + return + + def get_m3u8_file_dialog(): window = tk.Tk() - window.wm_attributes('-topmost', 1) + window.wm_attributes("-topmost", 1) window.withdraw() directory = get_default_directory() - file_path = filedialog.askopenfilename(parent=window, - initialdir=directory, - title="Select A File", - filetypes=(("M3U8 files", "*.m3u8"), ("All files", "*"))) + file_path = filedialog.askopenfilename( + parent=window, + initialdir=directory, + title="Select A File", + filetypes=(("M3U8 files", "*.m3u8"), ("All files", "*")), + ) window.destroy() return file_path def parse_vod_filename(m3u8_video_filename): base = os.path.basename(m3u8_video_filename) - streamer_name, video_id = base.split('.m3u8', 1)[0].rsplit('_', 1) + streamer_name, video_id = base.split(".m3u8", 1)[0].rsplit("_", 1) return streamer_name, video_id def parse_vod_filename_with_Brackets(m3u8_video_filename): base = os.path.basename(m3u8_video_filename) - streamer_name, video_id = base.split('.m3u8', 1)[0].rsplit('_', 1) + streamer_name, video_id = base.split(".m3u8", 1)[0].rsplit("_", 1) return f" - {streamer_name} [{video_id}]" @@ -611,7 +684,7 @@ def generate_website_links(streamer_name, video_id, tracker_url=None): website_list = [ f"https://sullygnome.com/channel/{streamer_name}/stream/{video_id}", f"https://twitchtracker.com/{streamer_name}/streams/{video_id}", - f"https://streamscharts.com/channels/{streamer_name}/streams/{video_id}" + f"https://streamscharts.com/channels/{streamer_name}/streams/{video_id}", ] if tracker_url: website_list = [link for link in website_list if tracker_url not in link] @@ -623,7 +696,7 @@ def convert_url(url, target): patterns = { "sullygnome": "https://sullygnome.com/channel/{}/stream/{}", "twitchtracker": "https://twitchtracker.com/{}/streams/{}", - "streamscharts": "https://streamscharts.com/channels/{}/streams/{}" + "streamscharts": "https://streamscharts.com/channels/{}/streams/{}", } parsed_url = urlparse(url) streamer, video_id = None, None @@ -645,12 +718,11 @@ def convert_url(url, target): def extract_offset(clip_url): - clip_offset = re.search(r'(?:-offset|-index)-(\d+)', clip_url) + clip_offset = re.search(r"(?:-offset|-index)-(\d+)", clip_url) return clip_offset.group(1) def get_clip_format(video_id, offsets): - default_clip_list = [f"https://clips-media-assets2.twitch.tv/{video_id}-offset-{i}.mp4" for i in range(0, offsets, 2)] alternate_clip_list = [f"https://clips-media-assets2.twitch.tv/vod-{video_id}-offset-{i}.mp4" for i in range(0, offsets, 2)] legacy_clip_list = [f"https://clips-media-assets2.twitch.tv/{video_id}-index-{i:010}.mp4" for i in range(offsets)] @@ -658,7 +730,7 @@ def get_clip_format(video_id, offsets): clip_format_dict = { "1": default_clip_list, "2": alternate_clip_list, - "3": legacy_clip_list + "3": legacy_clip_list, } return clip_format_dict @@ -681,7 +753,7 @@ def get_random_clip_information(): while True: duration = get_time_input_HH_MM("Enter stream duration in (HH:MM) format: ") - hours, minutes = map(int, duration.split(':')) + hours, minutes = map(int, duration.split(":")) if hours >= 0 and minutes >= 0: break return video_id, hours, minutes @@ -704,7 +776,7 @@ def manual_clip_recover(): while True: duration = get_time_input_HH_MM("Enter stream duration in (HH:MM) format: ") - hours, minutes = map(int, duration.split(':')) + hours, minutes = map(int, duration.split(":")) if hours >= 0 and minutes >= 0: total_minutes = hours * 60 + minutes break @@ -775,7 +847,7 @@ def manual_vod_recover(): def handle_vod_recover(url, url_parser, datetime_parser, website_name): streamer, video_id = url_parser(url) - print(f"Checking {streamer} VOD Id: {video_id}") + print(f"Checking {streamer} VOD ID: {video_id}") stream_datetime, source_duration = datetime_parser(url) m3u8_link = vod_recover(streamer, video_id, stream_datetime, url) @@ -804,7 +876,7 @@ def website_vod_recover(): return handle_vod_recover(url, parse_twitchtracker_url, parse_datetime_twitchtracker, "Twitchtracker") if "sullygnome" in url: - new_tracker_url = re.sub(r'/\d+/', '/', url) + new_tracker_url = re.sub(r"/\d+/", "/", url) return handle_vod_recover(new_tracker_url, parse_sullygnome_url, parse_datetime_sullygnome, "Sullygnome") twitch_recover(url) @@ -821,80 +893,91 @@ def get_all_clip_urls(clip_format_dict, clip_format_list): return combined_clip_format_list -async def fetch_status(session, url): - try: - async with session.head(url, timeout=30) as response: - if response.status == 200: - return url - except Exception: - return None +async def fetch_status(session, url, retries=3, timeout=30): + for attempt in range(retries): + try: + async with session.get(url, timeout=timeout) as response: + if response.status == 200: + return url + except (aiohttp.ClientError, asyncio.TimeoutError): + if attempt < retries - 1: + await asyncio.sleep(2) + return None async def get_vod_urls(streamer_name, video_id, start_timestamp): m3u8_link_list = [] script_dir = get_script_directory() - domains = read_text_file(os.path.join(script_dir, 'lib', 'domains.txt')) + domains = read_text_file(os.path.join(script_dir, "lib", "domains.txt")) print("\nSearching for M3U8 URL...") for seconds in range(60): base_url = f"{streamer_name}_{video_id}_{int(calculate_epoch_timestamp(start_timestamp, seconds))}" - hashed_base_url = str(hashlib.sha1(base_url.encode('utf-8')).hexdigest())[:20] + hashed_base_url = str(hashlib.sha1(base_url.encode("utf-8")).hexdigest())[:20] for domain in domains: if domain.strip(): m3u8_link_list.append(f"{domain.strip()}{hashed_base_url}_{base_url}/chunked/index-dvr.m3u8") successful_url = None - first_url_printed = False - progress_message_printed = False async with aiohttp.ClientSession() as session: tasks = [fetch_status(session, url) for url in m3u8_link_list] - for index, task in enumerate(asyncio.as_completed(tasks)): + for index, task in enumerate(asyncio.as_completed(tasks), 1): url = await task if url: successful_url = url - if not first_url_printed: - if progress_message_printed: - print() - first_url_printed = True - print(f"\n\033[92m\u2713 Found URL: {successful_url}\033[0m") + + print(f"\n\n\033[92m\u2713 Found URL: {successful_url}\033[0m") break - if not progress_message_printed: - progress_message_printed = True - print(f"\rSearching {index + 1} out of {len(m3u8_link_list)} URLs", end='', flush=True) + + print(f"\rSearching {index} out of {len(m3u8_link_list)} URLs", end="", flush=True) - if not successful_url: - print("\nNo successful URL found!") return successful_url def return_supported_qualities(m3u8_link): - if m3u8_link is None: return None - always_best_quality = read_config_by_key('settings', 'ALWAYS_BEST_QUALITY') + always_best_quality = read_config_by_key("settings", "ALWAYS_BEST_QUALITY") - if always_best_quality is True: + if always_best_quality is True and "chunked" in m3u8_link: return m3u8_link - print("\nChecking for available qualities...\n") - resolutions = ["chunked", "1080p60", "1080p30", - "720p60", "720p30", "480p60", "480p30"] - request_list = [grequests.get(m3u8_link.replace("chunked", resolution)) for resolution in resolutions] - responses = grequests.map(request_list, size=100) - valid_resolutions = [resolution for resolution, response in zip(resolutions, responses) if response and response.status_code == 200] + print("\nChecking for available qualities...") + resolutions = [ + "chunked", + "1080p60", + "1080p30", + "720p60", + "720p30", + "480p60", + "480p30", + ] + request_list = [ + grequests.get(m3u8_link.replace("chunked", resolution)) + for resolution in resolutions + ] + responses = grequests.map(request_list) + valid_resolutions = [ + resolution + for resolution, response in zip(resolutions, responses) + if response and response.status_code == 200 + ] if not valid_resolutions: return None valid_resolutions.sort(key=resolutions.index) - print("Quality Options:") + if always_best_quality: + return m3u8_link.replace("chunked", valid_resolutions[0]) + + print("\nQuality Options:") for idx, resolution in enumerate(valid_resolutions, 1): - if 'chunked' in resolution: + if "chunked" in resolution: print(f"{idx}. {resolution.replace('chunked', 'Chunked (Best Quality)')}") else: print(f"{idx}. {resolution}") @@ -920,7 +1003,7 @@ def get_user_resolution_choice(m3u8_link, valid_resolutions): def parse_website_duration(duration_string): if isinstance(duration_string, list): - duration_string = ' '.join(duration_string) + duration_string = " ".join(duration_string) if not isinstance(duration_string, str): try: duration_string = str(duration_string) @@ -936,11 +1019,11 @@ def parse_website_duration(duration_string): except ValueError: return 0 - time_units = {'h': 0, 'm': 0} + time_units = {"h": 0, "m": 0} for value, unit in matches: time_units[unit[0].lower()] = int(value) - return calculate_broadcast_duration_in_minutes(time_units['h'], time_units['m']) + return calculate_broadcast_duration_in_minutes(time_units["h"], time_units["m"]) def handle_cloudflare(sb): @@ -955,7 +1038,7 @@ def handle_cloudflare(sb): def parse_streamscharts_duration_data(bs): - streamscharts_duration = bs.find_all('div', {'class': 'text-xs font-bold'})[3].text + streamscharts_duration = bs.find_all("div", {"class": "text-xs font-bold"})[3].text streamscharts_duration_in_minutes = parse_website_duration(streamscharts_duration) return streamscharts_duration_in_minutes @@ -965,24 +1048,23 @@ def parse_duration_streamscharts(streamscharts_url): # Method 1: Using requests response = requests.get(streamscharts_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_duration_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(streamscharts_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(streamscharts_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_duration_data(bs) # Method 3: Using Selenium print("Opening Streamcharts with browser...") with SB(uc=True) as sb: - sb.uc_open_with_reconnect(streamscharts_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') + bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_streamscharts_duration_data(bs) except Exception: @@ -995,7 +1077,7 @@ def parse_duration_streamscharts(streamscharts_url): def parse_twitchtracker_duration_data(bs): - twitchtracker_duration = bs.find_all('div', {'class': 'g-x-s-value'})[0].text + twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) return twitchtracker_duration_in_minutes @@ -1005,24 +1087,23 @@ def parse_duration_twitchtracker(twitchtracker_url, try_alternative=True): # Method 1: Using requests response = requests.get(twitchtracker_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_duration_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(twitchtracker_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(twitchtracker_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_duration_data(bs) # Method 3: Using Selenium print("Opening Twitchtracker with browser...") with SB(uc=True) as sb: - sb.uc_open_with_reconnect(twitchtracker_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') + bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_twitchtracker_duration_data(bs) except Exception: @@ -1036,7 +1117,7 @@ def parse_duration_twitchtracker(twitchtracker_url, try_alternative=True): def parse_sullygnome_duration_data(bs): - sullygnome_duration = bs.find_all('div', {'class': 'MiddleSubHeaderItemValue'})[7].text.split(",") + sullygnome_duration = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[7].text.split(",") sullygnome_duration_in_minutes = parse_website_duration(sullygnome_duration) return sullygnome_duration_in_minutes @@ -1046,24 +1127,23 @@ def parse_duration_sullygnome(sullygnome_url): # Method 1: Using requests response = requests.get(sullygnome_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_duration_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(sullygnome_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(sullygnome_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=10): if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_duration_data(bs) - # Method 3: Using Selenium + # Method 3: Using Selenium print("Opening Sullygnome with browser...") with SB(uc=True) as sb: - sb.uc_open_with_reconnect(sullygnome_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') + bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_sullygnome_duration_data(bs) except Exception: @@ -1076,10 +1156,15 @@ def parse_duration_sullygnome(sullygnome_url): def parse_streamscharts_datetime_data(bs): - stream_date = bs.find_all('time', {'class': 'ml-2 font-bold'})[0].text.strip().replace(",", "") + ":00" + stream_date = ( + bs.find_all("time", {"class": "ml-2 font-bold"})[0] + .text.strip() + .replace(",", "") + + ":00" + ) stream_datetime = datetime.strptime(stream_date, "%d %b %Y %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S") - streamcharts_duration = bs.find_all('div', {'class': 'text-xs font-bold'})[3].text + streamcharts_duration = bs.find_all("div", {"class": "text-xs font-bold"})[3].text streamcharts_duration_in_minutes = parse_website_duration(streamcharts_duration) print(f"Datetime: {stream_datetime}") @@ -1091,29 +1176,28 @@ def parse_datetime_streamscharts(streamscharts_url): try: # Method 1: Using requests - response = requests.get(streamscharts_url, headers=return_user_agent(), timeout=10) + response = requests.get( + streamscharts_url, headers=return_user_agent(), timeout=10 + ) if response.status_code == 200: - - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_datetime_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(streamscharts_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(streamscharts_url, headers=return_user_agent()) for _ in range(retries)] for response in grequests.imap(reqs, size=100): if response.status_code == 200: - - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_streamscharts_datetime_data(bs) - # Method 3: Using Selenium + # Method 3: Using Selenium print("Opening Streamscharts with browser...") - - with SB(uc=True) as sb: + with SB(uc=True) as sb: sb.uc_open_with_reconnect(streamscharts_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') + bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_streamscharts_datetime_data(bs) @@ -1123,8 +1207,8 @@ def parse_datetime_streamscharts(streamscharts_url): def parse_twitchtracker_datetime_data(bs): - twitchtracker_datetime = bs.find_all('div', {'class': 'stream-timestamp-dt'})[0].text - twitchtracker_duration = bs.find_all('div', {'class': 'g-x-s-value'})[0].text + twitchtracker_datetime = bs.find_all("div", {"class": "stream-timestamp-dt"})[0].text + twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) print(f"Datetime: {twitchtracker_datetime}") @@ -1138,40 +1222,39 @@ def parse_datetime_twitchtracker(twitchtracker_url): # Method 1: Using requests response = requests.get(twitchtracker_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: - - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_datetime_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(twitchtracker_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(twitchtracker_url, headers=return_user_agent()) for _ in range(retries)] + for response in grequests.imap(reqs, size=100): if response.status_code == 200: - - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_twitchtracker_datetime_data(bs) # Method 3: Using Selenium print("Opening Twitchtracker with browser...") with SB(uc=True) as sb: - sb.uc_open_with_reconnect(twitchtracker_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') - description_meta = bs.find('meta', {'name': 'description'}) + bs = BeautifulSoup(sb.driver.page_source, "html.parser") + description_meta = bs.find("meta", {"name": "description"}) twitchtracker_datetime = None if description_meta: - description_content = description_meta.get('content') - match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', description_content) + description_content = description_meta.get("content") + match = re.search(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", description_content) if match: twitchtracker_datetime = match.group(0) print(f"Datetime: {twitchtracker_datetime}") - twitchtracker_duration = bs.find_all('div', {'class': 'g-x-s-value'})[0].text + twitchtracker_duration = bs.find_all("div", {"class": "g-x-s-value"})[0].text twitchtracker_duration_in_minutes = parse_website_duration(twitchtracker_duration) + return twitchtracker_datetime, twitchtracker_duration_in_minutes except Exception: pass @@ -1179,12 +1262,12 @@ def parse_datetime_twitchtracker(twitchtracker_url): def parse_sullygnome_datetime_data(bs): - stream_date = bs.find_all('div', {'class': 'MiddleSubHeaderItemValue'})[6].text + stream_date = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[6].text modified_stream_date = remove_chars_from_ordinal_numbers(stream_date) formatted_stream_date = datetime.strptime(modified_stream_date, "%A %d %B %I:%M%p").strftime("%m-%d %H:%M:%S") sullygnome_datetime = str(datetime.now().year) + "-" + formatted_stream_date - sullygnome_duration = bs.find_all('div', {'class': 'MiddleSubHeaderItemValue'})[7].text.split(",") + sullygnome_duration = bs.find_all("div", {"class": "MiddleSubHeaderItemValue"})[7].text.split(",") sullygnome_duration_in_minutes = parse_website_duration(sullygnome_duration) print(f"Datetime: {sullygnome_datetime}") @@ -1198,24 +1281,24 @@ def parse_datetime_sullygnome(sullygnome_url): # Method 1: Using requests response = requests.get(sullygnome_url, headers=return_user_agent(), timeout=10) if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_datetime_data(bs) # Method 2: Using grequests retries = 10 - reqs = [grequests.get(sullygnome_url, headers=return_user_agent())for _ in range(retries)] + reqs = [grequests.get(sullygnome_url, headers=return_user_agent()) for _ in range(retries)] + for response in grequests.imap(reqs, size=100): if response.status_code == 200: - bs = BeautifulSoup(response.content, 'html.parser') + bs = BeautifulSoup(response.content, "html.parser") return parse_sullygnome_datetime_data(bs) # Method 3: Using Selenium print("Opening Sullygnome with browser...") with SB(uc=True) as sb: - sb.uc_open_with_reconnect(sullygnome_url, reconnect_time=3) handle_cloudflare(sb) - bs = BeautifulSoup(sb.driver.page_source, 'html.parser') + bs = BeautifulSoup(sb.driver.page_source, "html.parser") return parse_sullygnome_datetime_data(bs) except Exception: @@ -1226,6 +1309,7 @@ def parse_datetime_sullygnome(sullygnome_url): def unmute_vod(m3u8_link): counter = 0 video_filepath = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) + write_m3u8_to_file(m3u8_link, video_filepath) file_contents = read_text_file(video_filepath) if is_video_muted(m3u8_link): @@ -1255,16 +1339,20 @@ def unmute_vod(m3u8_link): def mark_invalid_segments_in_playlist(m3u8_link): print() unmute_vod(m3u8_link) - vod_file_path = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) + vod_file_path = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link),parse_video_id_from_m3u8_link(m3u8_link)) + with open(vod_file_path, "r", encoding="utf-8") as f: lines = f.read().splitlines() + print("Checking for invalid segments...") segments = asyncio.run(validate_playlist_segments(get_all_playlist_segments(m3u8_link))) + if not segments: if "/highlight" not in m3u8_link: print("No segments are valid. Cannot generate M3U8! Returning to main menu.") os.remove(vod_file_path) return + playlist_segments = [segment for segment in segments if segment in lines] modified_playlist = [] for line in lines: @@ -1278,7 +1366,7 @@ def mark_invalid_segments_in_playlist(m3u8_link): modified_playlist.append(line) with open(vod_file_path, "w", encoding="utf-8") as f: f.write("\n".join(modified_playlist)) - input('Press Enter to continue...') + input("Press Enter to continue...") def return_m3u8_duration(m3u8_link): @@ -1295,20 +1383,19 @@ def return_m3u8_duration(m3u8_link): def process_m3u8_configuration(m3u8_link, skip_check=False): playlist_segments = get_all_playlist_segments(m3u8_link) - check_segments = read_config_by_key('settings', 'CHECK_SEGMENTS') and not skip_check + check_segments = read_config_by_key("settings", "CHECK_SEGMENTS") and not skip_check print() m3u8_source = None if is_video_muted(m3u8_link): print("Video contains muted segments") - if read_config_by_key('settings', 'UNMUTE_VIDEO'): + if read_config_by_key("settings", "UNMUTE_VIDEO"): unmute_vod(m3u8_link) - m3u8_source = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) + m3u8_source = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link),parse_video_id_from_m3u8_link(m3u8_link),) else: # print("Video doesn't contain muted segments") m3u8_source = m3u8_link - os.remove(get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link))) if check_segments: print("Checking valid segments...") @@ -1319,9 +1406,11 @@ def process_m3u8_configuration(m3u8_link, skip_check=False): def get_all_playlist_segments(m3u8_link): counter = 0 segment_list = [] + video_file_path = get_vod_filepath(parse_streamer_from_m3u8_link(m3u8_link), parse_video_id_from_m3u8_link(m3u8_link)) write_m3u8_to_file(m3u8_link, video_file_path) file_contents = read_text_file(video_file_path) + with open(video_file_path, "w", encoding="utf-8") as video_file: for segment in file_contents: m3u8_link = m3u8_link.replace("index-dvr.m3u8", "") @@ -1364,7 +1453,6 @@ async def validate_playlist_segments(segments): return valid_segments - def vod_recover(streamer_name, video_id, timestamp, tracker_url=None): vod_age = calculate_days_since_broadcast(timestamp) @@ -1373,10 +1461,11 @@ def vod_recover(streamer_name, video_id, timestamp, tracker_url=None): vod_url = None if timestamp: vod_url = return_supported_qualities(asyncio.run(get_vod_urls(streamer_name, video_id, timestamp))) + if vod_url is None: alternate_websites = generate_website_links(streamer_name, video_id, tracker_url) - print("Unable to recover video! Trying alternate url sources...") + print("\nUnable to recover video! Trying alternate sources...") all_timestamps = [timestamp] # check if any of the alternate websites have a different timestamp, if so try to recover the video @@ -1392,11 +1481,15 @@ def vod_recover(streamer_name, video_id, timestamp, tracker_url=None): continue parsed_timestamp, _ = parse_datetime_sullygnome(website) - if parsed_timestamp and parsed_timestamp != timestamp and parsed_timestamp not in all_timestamps: + if (parsed_timestamp and parsed_timestamp != timestamp and parsed_timestamp not in all_timestamps): all_timestamps.append(parsed_timestamp) vod_url = return_supported_qualities(asyncio.run(get_vod_urls(streamer_name, video_id, parsed_timestamp))) if vod_url: return vod_url + if not parsed_timestamp: + print("\033[91m \n✖ Unable to get the datetime, try inputting the datetime manually, using the manual recovery option. \033[0m") + input("\nPress Enter to continue...") + run_vod_recover() if not vod_url: print("\033[91m \n✖ Unable to recover the video! \033[0m") input("\nPress Enter to continue...") @@ -1441,7 +1534,7 @@ def clip_recover(streamer, video_id, duration): for response in grequests.imap(rs, size=100): iteration_counter += 1 - print(f'\rSearching for clips... {iteration_counter} of {len(full_url_list)}', end=" ", flush=True) + print(f"\rSearching for clips... {iteration_counter} of {len(full_url_list)}", end=" ", flush=True) if response.status_code == 200: valid_counter += 1 valid_url_list.append(response.url) @@ -1453,9 +1546,9 @@ def clip_recover(streamer, video_id, duration): if valid_url_list: for url in valid_url_list: write_text_file(url, get_log_filepath(streamer, video_id)) - if read_config_by_key('settings', 'AUTO_DOWNLOAD_CLIPS') or input("\nDo you want to download the recovered clips (Y/N): ").upper() == "Y": + if (read_config_by_key("settings", "AUTO_DOWNLOAD_CLIPS") or input("\nDo you want to download the recovered clips (Y/N): ").upper() == "Y"): download_clips(get_default_directory(), streamer, video_id) - if read_config_by_key('settings', 'REMOVE_LOG_FILE'): + if read_config_by_key("settings", "REMOVE_LOG_FILE"): os.remove(get_log_filepath(streamer, video_id)) else: keep_log_option = input("Do you want to remove the log file? ") @@ -1467,10 +1560,11 @@ def clip_recover(streamer, video_id, duration): def get_and_validate_csv_filename(): window = tk.Tk() - window.wm_attributes('-topmost', 1) + window.wm_attributes("-topmost", 1) window.withdraw() file_path = filedialog.askopenfilename(parent=window, title="Select The CSV File", filetypes=(("CSV files", "*.csv"), ("all files", "*.*"))) + if not file_path: print("\nNo file selected! Returning to main menu.") return run_vod_recover() @@ -1491,7 +1585,7 @@ def parse_clip_csv_file(file_path): modified_stream_date = datetime.strptime(stream_date, "%A %d %B %Y %H:%M").strftime("%d-%B-%Y") video_id = line[2].partition("stream/")[2].replace('"', "") duration = line[3] - if video_id != '0': + if video_id != "0": max_clip_offset = calculate_max_clip_offset(int(duration)) vod_info_dict.update({video_id: (modified_stream_date, max_clip_offset)}) return vod_info_dict @@ -1568,7 +1662,7 @@ async def bulk_clip_recovery(): csv_file_path = get_and_validate_csv_filename() streamer_name = parse_streamer_from_csv_filename(csv_file_path) elif bulk_recovery_option == "2": - csv_directory = input("Enter the full path where the sullygnome csv files exist: ").replace('"', '') + csv_directory = input("Enter the full path where the sullygnome csv files exist: ").replace('"', "") streamer_name = input("Enter the streamer's name: ") merge_files = input("Do you want to merge the CSV files in the directory? (Y/N): ") if merge_files.upper() == "Y": @@ -1576,7 +1670,7 @@ async def bulk_clip_recovery(): csv_file_path = os.path.join(csv_directory, f"{streamer_name.title()}_MERGED.csv") else: csv_file_path = get_and_validate_csv_filename() - csv_file_path = csv_file_path.replace('"', '') + csv_file_path = csv_file_path.replace('"', "") elif bulk_recovery_option == "3": return run_vod_recover() @@ -1586,11 +1680,10 @@ async def bulk_clip_recovery(): async with aiohttp.ClientSession() as session: for video_id, values in stream_info_dict.items(): vod_counter += 1 - print( - f"\nProcessing Past Broadcast:\n" - f"Stream Date: {values[0].replace('-', ' ')}\n" - f"Vod ID: {video_id}\n" - f"Vod Number: {vod_counter} of {len(stream_info_dict)}\n") + print(f"\nProcessing Past Broadcast:\n" + f"Stream Date: {values[0].replace('-', ' ')}\n" + f"Vod ID: {video_id}\n" + f"Vod Number: {vod_counter} of {len(stream_info_dict)}\n") original_vod_url_list = get_all_clip_urls(get_clip_format(video_id, values[1]), clip_format) print("Searching...") @@ -1598,12 +1691,12 @@ async def bulk_clip_recovery(): for task in asyncio.as_completed(tasks): total_counter += 1 iteration_counter += 1 - print(f'\rSearching for clips... {iteration_counter} of {len(original_vod_url_list)}', end=" ", flush=True) + print(f"\rSearching for clips... {iteration_counter} of {len(original_vod_url_list)}", end=" ", flush=True) result = await task if result: valid_counter += 1 - print(f'\n\033[92m{valid_counter} Clip(s) Found\033[0m\n') + print(f"\n\033[92m{valid_counter} Clip(s) Found\033[0m\n") if valid_counter != 0: user_option = input("Do you want to download all clips recovered (Y/N)? ") @@ -1637,11 +1730,10 @@ def download_clips(directory, streamer_name, video_id): print("\nDownloading clips...") for response in grequests.imap(reqs, size=15): if response.status_code == 200: - offset = extract_offset(response.url) file_name = f"{streamer_name.title()}_{video_id}_{offset}{get_default_video_format()}" try: - with open(os.path.join(download_directory, file_name), 'wb') as x: + with open(os.path.join(download_directory, file_name), "wb") as x: x.write(response.content) except ValueError: print(f"Failed to download... {response.url}") @@ -1652,7 +1744,18 @@ def download_clips(directory, streamer_name, video_id): def is_m3u8_longer_than_24_hours(url): - cmd = [get_ffprobe_path(), '-protocol_whitelist', 'file,http,https,tcp,tls', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', url] + cmd = [ + get_ffprobe_path(), + "-protocol_whitelist", + "file,http,https,tcp,tls", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + url, + ] duration_seconds = float(subprocess.check_output(cmd)) return duration_seconds > 24 * 60 * 60 @@ -1664,29 +1767,29 @@ def download_segment(segment_url): def parse_m3u8_url(m3u8_url): response = requests.get(m3u8_url, timeout=30) - base_url = m3u8_url.rsplit('/', 1)[0] + base_url = m3u8_url.rsplit("/", 1)[0] segments = [] - for line in response.text.split('\n'): + for line in response.text.split("\n"): line = line.strip() - if line.endswith('.ts'): - segment_url = base_url + '/' + line + if line.endswith(".ts"): + segment_url = base_url + "/" + line segments.append(segment_url) return segments def parse_m3u8_file(m3u8_file): segments = [] - with open(m3u8_file, 'r', encoding='utf-8') as f: + with open(m3u8_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() - if line.startswith('https://'): + if line.startswith("https://"): segments.append(line) return segments def time_to_timedelta(time_str): - hours, minutes, seconds = map(int, time_str.split(':')) + hours, minutes, seconds = map(int, time_str.split(":")) return timedelta(hours=hours, minutes=minutes, seconds=seconds) @@ -1702,7 +1805,7 @@ async def download_segment_async(session, segment): async def download_m3u8_segments_async(m3u8, start_time, end_time, output_file): # function used when the m3u8 file is longer than 24 hours - if m3u8.startswith(('http://', 'https://')): + if m3u8.startswith(("http://", "https://")): segments = parse_m3u8_url(m3u8) else: segments = parse_m3u8_file(m3u8) @@ -1711,7 +1814,7 @@ async def download_m3u8_segments_async(m3u8, start_time, end_time, output_file): end_time_seconds = end_time.total_seconds() def is_segment_in_range(segment_url): - segment_number = re.search(r'(\d+)\.', segment_url.split('/')[-1]) + segment_number = re.search(r"(\d+)\.", segment_url.split("/")[-1]) if segment_number is None: return False @@ -1735,7 +1838,7 @@ def is_segment_in_range(segment_url): if segment_content: segments_content.append(segment_content) completed_segments += 1 - print(f"Progress: {completed_segments}/{total_segments} segments downloaded", end='\r') + print(f"Progress: {completed_segments}/{total_segments} segments downloaded", end="\r") if not segments_content: print("No segments found within the specified time range.") @@ -1745,7 +1848,7 @@ def is_segment_in_range(segment_url): for segment_content in segments_content: temp_file.write(segment_content) - command = [get_ffmpeg_path(), '-i', temp_file.name, '-c', 'copy', output_file] + command = [get_ffmpeg_path(), "-i", temp_file.name, "-c", "copy", output_file] try: subprocess.run(command, check=True) except subprocess.CalledProcessError as err: @@ -1758,7 +1861,7 @@ def get_ffmpeg_path(): try: if os.path.exists(ffdl.ffmpeg_path): return ffdl.ffmpeg_path - if subprocess.run(["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True).returncode == 0: + if (subprocess.run(["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True).returncode == 0): return "ffmpeg" raise Exception except Exception: @@ -1769,118 +1872,206 @@ def get_ffprobe_path(): try: if os.path.exists(ffdl.ffprobe_path): return ffdl.ffprobe_path - elif subprocess.run(["ffprobe", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True).returncode == 0: + elif (subprocess.run(["ffprobe", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True).returncode == 0): return "ffprobe" except Exception: sys.exit("FFprobe not found! Please install FFmpeg correctly and try again.") +def get_yt_dlp_path(): + try: + if (subprocess.run(["yt-dlp", "--version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True).returncode == 0): + return "yt-dlp" + except Exception: + command = ["pip", "install", "yt-dlp", "--upgrade", "--quiet"] + try: + subprocess.run(command, check=True) + print("yt-dlp installed successfully!") + return "yt-dlp" + except Exception: + sys.exit("yt-dlp not installed! Please install yt-dlp and try again.") + + def download_m3u8_video_url(m3u8_link, output_filename): + output_path = os.path.join(get_default_directory(), output_filename) + + downloader = get_default_downloader() + + if downloader == "ffmpeg": + command = [ + get_ffmpeg_path(), + "-i", m3u8_link, + "-c", "copy", + # '-bsf:a', 'aac_adtstoasc', + "-y", os.path.join(get_default_directory(), output_filename), + ] + else: + command = [ + "yt-dlp", + m3u8_link, + "-o", output_path, + ] + custom_options = get_yt_dlp_custom_options() + if custom_options: + command.extend(custom_options) - command = [ - get_ffmpeg_path(), - '-i', m3u8_link, - '-c', 'copy', - # '-bsf:a', 'aac_adtstoasc', - '-y', - os.path.join(get_default_directory(), output_filename), - ] try: subprocess.run(command, shell=True, check=True) + return True except Exception: - subprocess.run(' '.join(command), shell=True, check=True) + try: + subprocess.run(" ".join(command), shell=True, check=True) + return True + except Exception: + return False def download_m3u8_video_url_slice(m3u8_link, output_filename, video_start_time, video_end_time): - is_longer_than_24h = is_m3u8_longer_than_24_hours(m3u8_link) if is_longer_than_24h: start_time = time_to_timedelta(video_start_time) end_time = time_to_timedelta(video_end_time) - return asyncio.run(download_m3u8_segments_async(m3u8_link, start_time, end_time, os.path.join(get_default_directory(), output_filename))) - - command = [ - get_ffmpeg_path(), - '-ss', video_start_time, - '-to', video_end_time, - '-i', m3u8_link, - '-c', 'copy', - # '-bsf:a', 'aac_adtstoasc', - '-y', - os.path.join(get_default_directory(), output_filename), - ] + return asyncio.run(download_m3u8_segments_async(m3u8_link, start_time, end_time, os.path.join(get_default_directory(), output_filename))) + + downloader = get_default_downloader() + + if downloader == "ffmpeg": + command = [ + get_ffmpeg_path(), + "-i", m3u8_link, + "-ss", video_start_time, + "-to", video_end_time, + "-c", "copy", + # '-bsf:a', 'aac_adtstoasc', + "-y", os.path.join(get_default_directory(), output_filename), + ] + else: + command = [ + "yt-dlp", + m3u8_link, + "-o", os.path.join(get_default_directory(), output_filename), + "--downloader", "ffmpeg", # using ffmpeg, because yt-dlp doesn't support trimming before downloading + "--downloader-args", f"ffmpeg_i:-ss {video_start_time} -to {video_end_time}", + ] + custom_options = get_yt_dlp_custom_options() + if custom_options: + command.extend(custom_options) try: subprocess.run(command, shell=True, check=True) + return True except Exception: - subprocess.run(' '.join(command), shell=True, check=True) + try: + subprocess.run(" ".join(command), shell=True, check=True) + return True + except Exception: + return False def download_m3u8_video_file(m3u8_file_path, output_filename): - - command = [ - get_ffmpeg_path(), - '-protocol_whitelist', 'file,http,https,tcp,tls', - '-i', m3u8_file_path, - '-c', 'copy', - # '-bsf:a', 'aac_adtstoasc', - os.path.join(get_default_directory(), output_filename), - ] + downloader = get_default_downloader() + + if downloader == "ffmpeg": + command = [ + get_ffmpeg_path(), + "-protocol_whitelist", "file,http,https,tcp,tls", + "-i", m3u8_file_path, + "-c", "copy", + # '-bsf:a', 'aac_adtstoasc', + os.path.join(get_default_directory(), output_filename), + ] + else: + m3u8_file_path = f"file:\\\\{m3u8_file_path}" + command = [ + "yt-dlp", + "--enable-file-urls", + m3u8_file_path, + "-o", os.path.join(get_default_directory(), output_filename), + ] + custom_options = get_yt_dlp_custom_options() + if custom_options: + command.extend(custom_options) try: subprocess.run(command, shell=True, check=True) + return True except Exception: - subprocess.run(' '.join(command), shell=True, check=True) + try: + subprocess.run(" ".join(command), shell=True, check=True) + return True + except Exception: + return False def download_m3u8_video_file_slice(m3u8_file_path, output_filename, video_start_time, video_end_time): + # Ensure the file exists before proceeding + if not os.path.exists(m3u8_file_path): + print(f"Error: The m3u8 file does not exist at {m3u8_file_path}") + return False is_longer_than_24h = is_m3u8_longer_than_24_hours(m3u8_file_path) if is_longer_than_24h: start_time = time_to_timedelta(video_start_time) end_time = time_to_timedelta(video_end_time) - return asyncio.run(download_m3u8_segments_async(m3u8_file_path, start_time, end_time, os.path.join(get_default_directory(), output_filename))) + return asyncio.run( + download_m3u8_segments_async( + m3u8_file_path, + start_time, + end_time, + os.path.join(get_default_directory(), output_filename), + ) + ) + + downloader = get_default_downloader() + + if downloader == "yt-dlp": + print("Using ffmpeg, because yt-dlp doesn't support trimming before downloading\n") command = [ get_ffmpeg_path(), - '-protocol_whitelist', 'file,http,https,tcp,tls', - - '-ss', video_start_time, - '-to', video_end_time, - '-i', m3u8_file_path, - '-c', 'copy', - # '-c:a', 'aac', - '-y', - os.path.join(get_default_directory(), output_filename), + "-protocol_whitelist", "file,http,https,tcp,tls", + "-i", m3u8_file_path, + "-ss", video_start_time, + "-to", video_end_time, + "-c", "copy", + "-y", os.path.join(get_default_directory(), output_filename), ] + try: subprocess.run(command, shell=True, check=True) + return True except Exception: - subprocess.run(' '.join(command), shell=True, check=True) + try: + subprocess.run(" ".join(command), shell=True, check=True) + return True + except Exception: + return False def get_VLC_Location(): try: - vlc_location = read_config_by_key('settings', 'VLC_LOCATION') + vlc_location = read_config_by_key("settings", "VLC_LOCATION") if vlc_location and os.path.isfile(vlc_location): return vlc_location - possible_locations = [ - f"{chr(i)}:/Program Files/VideoLAN/VLC/vlc.exe" for i in range(65, 91)] + [ - f"{chr(i)}:/Program Files (x86)/VideoLAN/VLC/vlc.exe" for i in range(65, 91)] + [ - "/Applications/VLC.app/Contents/MacOS/VLC", # macOS default - "/usr/bin/vlc", # Linux default - "/usr/local/bin/vlc" # Additional common location for Linux - ] + possible_locations = ( + [f"{chr(i)}:/Program Files/VideoLAN/VLC/vlc.exe" for i in range(65, 91)] + [ + f"{chr(i)}:/Program Files (x86)/VideoLAN/VLC/vlc.exe" for i in range(65, 91)] + + [ + "/Applications/VLC.app/Contents/MacOS/VLC", # macOS default + "/usr/bin/vlc", # Linux default + "/usr/local/bin/vlc", # Additional common location for Linux + ] + ) for location in possible_locations: if os.path.isfile(location): script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") try: - with open(config_file_path, 'r', encoding="utf-8") as config_file: + with open(config_file_path, "r", encoding="utf-8") as config_file: config_data = json.load(config_file) config_data["VLC_LOCATION"] = location - with open(config_file_path, 'w', encoding="utf-8") as config_file: + with open(config_file_path, "w", encoding="utf-8") as config_file: json.dump(config_data, config_file, indent=4) except (FileNotFoundError, json.JSONDecodeError) as error: print(f"Error: {error}") @@ -1894,19 +2085,23 @@ def get_VLC_Location(): def handle_vod_url_normal(m3u8_source, title=None, stream_date=None): start = time() is_file = os.path.isfile(m3u8_source) - if is_file: vod_filename = get_filename_for_file_source(m3u8_source, title=title, stream_date=stream_date) print(f"\nDownloading Vod: {vod_filename}") - - download_m3u8_video_file(m3u8_source, vod_filename) + + success = download_m3u8_video_file(m3u8_source, vod_filename) + if not success: + return print(f"\n\033[91m\u2717 Failed to download Vod: {vod_filename}\033[0m\n") os.remove(m3u8_source) else: vod_filename = get_filename_for_url_source(m3u8_source, title=title, stream_date=stream_date) print(f"\nDownloading Vod: {vod_filename}") - download_m3u8_video_url(m3u8_source, vod_filename) + success = download_m3u8_video_url(m3u8_source, vod_filename) + if not success: + print(f"\n\033[91m\u2717 Failed to download Vod: {vod_filename}\033[0m\n") + return formatted_elapsed = str(timedelta(seconds=int(time() - start))).zfill(8) print(f"\n\033[92m\u2713 Vod downloaded to {os.path.join(get_default_directory(), vod_filename)} in {formatted_elapsed}\033[0m\n") @@ -1922,18 +2117,18 @@ def format_date(date_string): def get_filename_for_file_source(m3u8_source, title, stream_date): streamer_name, video_id = parse_vod_filename(m3u8_source) formatted_date = format_date(stream_date) if stream_date else None - + filename_parts = [streamer_name] - + if formatted_date: filename_parts.append(formatted_date) - + if title: filename_parts.append(sanitize_filename(title)) - + filename_parts.append(f"[{video_id}]") filename = " - ".join(filename_parts) + get_default_video_format() - + return filename @@ -1941,15 +2136,15 @@ def get_filename_for_url_source(m3u8_source, title, stream_date): streamer = parse_streamer_from_m3u8_link(m3u8_source) vod_id = parse_video_id_from_m3u8_link(m3u8_source) formatted_date = format_date(stream_date) if stream_date else None - + filename_parts = [streamer] - + if formatted_date: filename_parts.append(formatted_date) - + if title: filename_parts.append(sanitize_filename(title)) - + filename_parts.append(f"[{vod_id}]") filename = " - ".join(filename_parts) + get_default_video_format() @@ -1966,13 +2161,17 @@ def handle_vod_url_trim(m3u8_source, title=None, stream_date=None): is_file = os.path.isfile(m3u8_source) if is_file: vod_filename = get_filename_for_file_trim(m3u8_source, title, stream_date, raw_start_time, raw_end_time) - download_m3u8_video_file_slice(m3u8_source, vod_filename, vod_start_time, vod_end_time) + success = download_m3u8_video_file_slice(m3u8_source, vod_filename, vod_start_time, vod_end_time) + if not success: + return print(f"\n\033[91m\u2717 Failed to download Vod: {vod_filename}\033[0m\n") if os.path.isfile(m3u8_source): os.remove(m3u8_source) else: vod_filename = get_filename_for_url_trim(m3u8_source, title, stream_date, raw_start_time, raw_end_time) - download_m3u8_video_url_slice(m3u8_source, vod_filename, vod_start_time, vod_end_time) + success = download_m3u8_video_url_slice(m3u8_source, vod_filename, vod_start_time, vod_end_time) + if not success: + return print(f"\n\033[91m\u2717 Failed to download Vod: {vod_filename}\033[0m\n") print(f"\n\033[92m\u2713 Vod downloaded to {os.path.join(get_default_directory(), vod_filename)}\033[0m\n") @@ -1980,18 +2179,18 @@ def handle_vod_url_trim(m3u8_source, title=None, stream_date=None): def get_filename_for_file_trim(m3u8_source, title, stream_date, raw_start_time, raw_end_time): streamer_name, video_id = parse_vod_filename(m3u8_source) formatted_date = format_date(stream_date) if stream_date else None - + filename_parts = [streamer_name] - + if formatted_date: filename_parts.append(formatted_date) - + if title: filename_parts.append(sanitize_filename(title)) - + filename_parts.append(f"[{video_id}]") filename_parts.extend([raw_start_time, raw_end_time]) - + filename = " - ".join(filename_parts) + get_default_video_format() return filename @@ -2001,27 +2200,27 @@ def get_filename_for_url_trim(m3u8_source, title, stream_date, raw_start_time, r streamer = parse_streamer_from_m3u8_link(m3u8_source) vod_id = parse_video_id_from_m3u8_link(m3u8_source) formatted_date = format_date(stream_date) if stream_date else None - + filename_parts = [streamer] - + if formatted_date: filename_parts.append(formatted_date) - + if title: filename_parts.append(sanitize_filename(title)) - + filename_parts.append(f"[{vod_id}]") filename_parts.extend([raw_start_time, raw_end_time]) - + filename = " - ".join(filename_parts) + get_default_video_format() - + return filename def get_time_input_HH_MM_SS(prompt): while True: - time_input = input(prompt).strip().replace("'", "").replace('"', '') - if re.match(r'^(\d+):([0-5]\d):([0-5]\d)$', time_input): + time_input = input(prompt).strip().replace("'", "").replace('"', "") + if re.match(r"^(\d+):([0-5]\d):([0-5]\d)$", time_input): return time_input print("\nInvalid input format! Please enter the time in HH:MM:SS format.\n") @@ -2029,9 +2228,9 @@ def get_time_input_HH_MM_SS(prompt): def get_time_input_HH_MM(prompt): while True: - time_input = input(prompt).strip().replace("'", "").replace('"', '') + time_input = input(prompt).strip().replace("'", "").replace('"', "") - if re.match(r'^(\d+):([0-5]\d)$', time_input): + if re.match(r"^(\d+):([0-5]\d)$", time_input): return time_input print("\nInvalid input format! Please enter the time in HH:MM format.\n") @@ -2039,9 +2238,9 @@ def get_time_input_HH_MM(prompt): def get_time_input_YYYY_MM_DD_HH_MM_SS(prompt): while True: - time_input = input(prompt).strip().replace("'", "").replace('"', '') + time_input = input(prompt).strip().replace("'", "").replace('"', "") - if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$', time_input): + if re.match(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$", time_input): return time_input print("\nInvalid input format! Please enter the time in YYYY-MM-DD HH:MM:SS format.\n") @@ -2076,15 +2275,15 @@ def get_datetime_from_m3u8(m3u8_file): try: date = None total_seconds = 0 - date_pattern = re.compile(r'#ID3-EQUIV-TDTG:(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})') + date_pattern = re.compile(r"#ID3-EQUIV-TDTG:(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})") - with open(m3u8_file, 'r', encoding='utf-8') as f: + with open(m3u8_file, "r", encoding="utf-8") as f: for line in f: date_match = date_pattern.match(line) if date_match: date = date_match.group(1) - if line.startswith('#EXT-X-TWITCH-TOTAL-SECS:'): - total_seconds = int(float(line.split(':')[-1].strip())) + if line.startswith("#EXT-X-TWITCH-TOTAL-SECS:"): + total_seconds = int(float(line.split(":")[-1].strip())) if date is not None: date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S") adjusted_date = date - timedelta(seconds=total_seconds) @@ -2110,13 +2309,16 @@ def handle_file_download_menu(m3u8_file_path): if stream_date: output_filename = f"{streamer_name} - {stream_date} - [{video_id}]{get_default_video_format()}" else: - output_filename = f"{streamer_name} - [{video_id}]{get_default_video_format()}" + output_filename = (f"{streamer_name} - [{video_id}]{get_default_video_format()}") - download_m3u8_video_file(m3u8_file_path, output_filename) + success = download_m3u8_video_file(m3u8_file_path, output_filename) + if not success: + return print(f"\n\033[91m\u2717 Failed to download Vod: {output_filename}\033[0m\n") formatted_elapsed = str(timedelta(seconds=int(time() - start))).zfill(8) print(f"\n\033[92m\u2713 Vod downloaded to {os.path.join(get_default_directory(), output_filename)} in {formatted_elapsed}\033[0m\n") break + elif start_download == 2: vod_start_time = get_time_input_HH_MM_SS("Enter start time (HH:MM:SS): ") vod_end_time = get_time_input_HH_MM_SS("Enter end time (HH:MM:SS): ") @@ -2130,10 +2332,13 @@ def handle_file_download_menu(m3u8_file_path): else: vod_filename = f"{streamer_name} - [{video_id}] - {raw_start_time} - {raw_end_time}{get_default_video_format()}" - download_m3u8_video_file_slice(m3u8_file_path, vod_filename, vod_start_time, vod_end_time) + success = download_m3u8_video_file_slice(m3u8_file_path, vod_filename, vod_start_time, vod_end_time) + if not success: + return print(f"\n\033[91m\u2717 Failed to download Vod: {vod_filename}\033[0m\n") print(f"\n\033[92m\u2713 Vod downloaded to {os.path.join(get_default_directory(), vod_filename)}\033[0m\n") break + elif start_download == 3 and vlc_location: subprocess.Popen([vlc_location, m3u8_file_path.replace("/", "\\")]) elif start_download == exit_option: @@ -2157,7 +2362,7 @@ def print_confirm_download_menu(): def extract_id_from_url(url: str): - pattern = r'twitch\.tv/(?:[^\/]+\/)?(\d+)' + pattern = r"twitch\.tv/(?:[^\/]+\/)?(\d+)" match = re.search(pattern, url) if match: return match.group(1) @@ -2169,7 +2374,7 @@ def extract_id_from_url(url: str): def extract_slug_and_streamer_from_clip_url(url): try: - pattern = r'twitch\.tv/([^\/]+)/clip/([^\/?]+)' + pattern = r"twitch\.tv/([^\/]+)/clip/([^\/?]+)" match = re.search(pattern, url) if match: return match.group(1), match.group(2) @@ -2178,18 +2383,31 @@ def extract_slug_and_streamer_from_clip_url(url): sys.exit("\n✖ Invalid Twitch Clip URL! Please Try Again.\n") -def fetch_twitch_data(vod_id): - try: - res = requests.post("https://gql.twitch.tv/gql", json={ - "query": f'query {{ video(id: "{vod_id}") {{ title, broadcastType, createdAt, seekPreviewsURL, owner {{ login }} }} }}' - }, headers={ - 'Client-Id': 'kimne78kx3ncx6brgo4mv6wki5h1ko', - 'Accept': 'application/json', - 'Content-Type': 'application/json' - }, timeout=30) - return res.json() - except Exception: - return None +def fetch_twitch_data(vod_id, retries=3, delay=5): + attempt = 0 + while attempt < retries: + try: + res = requests.post( + "https://gql.twitch.tv/gql", + json={ + "query": f'query {{ video(id: "{vod_id}") {{ title, broadcastType, createdAt, seekPreviewsURL, owner {{ login }} }} }}' + }, + headers={ + "Client-Id": "kimne78kx3ncx6brgo4mv6wki5h1ko", + "Accept": "application/json", + "Content-Type": "application/json", + }, + timeout=30, + ) + if res.status_code == 200: + return res.json() + except Exception: + pass + + attempt += 1 + sleep(delay) + + return None def get_vod_or_highlight_url(vod_id): @@ -2197,35 +2415,34 @@ def get_vod_or_highlight_url(vod_id): response = requests.get(url, timeout=30) if response.status_code != 200: data = fetch_twitch_data(vod_id) - vod_data = data['data']['video'] + vod_data = data["data"]["video"] if data is None or vod_data is None: return None, None, None - - current_url = urlparse(vod_data['seekPreviewsURL']) + + current_url = urlparse(vod_data["seekPreviewsURL"]) domain = current_url.netloc paths = current_url.path.split("/") vod_special_id = paths[paths.index([i for i in paths if "storyboards" in i][0]) - 1] - old_vods_date = datetime.strptime("2023-02-10", "%Y-%m-%d") - created_date = datetime.strptime(vod_data['createdAt'], "%Y-%m-%dT%H:%M:%SZ") + created_date = datetime.strptime(vod_data["createdAt"], "%Y-%m-%dT%H:%M:%SZ") time_diff = (old_vods_date - created_date).total_seconds() days_diff = time_diff / (60 * 60 * 24) - broadcast_type = vod_data['broadcastType'].lower() + broadcast_type = vod_data["broadcastType"].lower() url = None if broadcast_type == "highlight": - url = f'https://{domain}/{vod_special_id}/chunked/highlight-{vod_id}.m3u8' + url = f"https://{domain}/{vod_special_id}/chunked/highlight-{vod_id}.m3u8" elif broadcast_type == "upload" and days_diff > 7: - url = f'https://{domain}/{vod_data["owner"]["login"]}/{vod_id}/{vod_special_id}/chunked/index-dvr.m3u8' + url = f"https://{domain}/{vod_data['owner']['login']}/{vod_id}/{vod_special_id}/chunked/index-dvr.m3u8" else: - url = f'https://{domain}/{vod_special_id}/chunked/index-dvr.m3u8' + url = f"https://{domain}/{vod_special_id}/chunked/index-dvr.m3u8" if url is not None: - response = requests.get(url, timeout=20) + response = requests.get(url, timeout=30) if response.status_code == 200: return url, vod_data["title"], vod_data["createdAt"] return response.url, None, None @@ -2240,12 +2457,12 @@ def twitch_recover(link=None): print("\n✖ Unable to find it! Try using one of the other websites.\n") input("Press Enter to continue...") return run_vod_recover() - + try: format_datetime = datetime.strptime(stream_datetime, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y-%m-%d %H:%M:%S") except Exception: format_datetime = None - + m3u8_url = return_supported_qualities(url) print(f"\n\033[92m\u2713 Found URL: {m3u8_url}\033[0m") @@ -2256,7 +2473,8 @@ def twitch_recover(link=None): def get_twitch_clip(clip_slug): url_endpoint = "https://gql.twitch.tv/gql" data = [ - {"operationName": "ClipsDownloadButton", + { + "operationName": "ClipsDownloadButton", "variables": { "slug": clip_slug, }, @@ -2266,17 +2484,22 @@ def get_twitch_clip(clip_slug): "sha256Hash": "6e465bb8446e2391644cf079851c0cb1b96928435a240f07ed4b240f0acc6f1b", } }, - }] + } + ] try: response_endpoint = requests.post(url_endpoint, json=data, headers={"Client-Id": "kimne78kx3ncx6brgo4mv6wki5h1ko"}, timeout=30) response = response_endpoint.json() - if 'error' in response or 'errors' in response: - raise Exception(response.get('message', 'Unable to get clip!')) + if "error" in response or "errors" in response: + raise Exception(response.get("message", "Unable to get clip!")) url = "" - playback_access_token = response[0]['data']['clip']['playbackAccessToken'] - url = response[0]['data']['clip']['videoQualities'][0]['sourceURL'] + '?sig=' + playback_access_token['signature'] + '&token=' + requests.utils.quote(playback_access_token['value']) + playback_access_token = response[0]["data"]["clip"]["playbackAccessToken"] + url = ( + response[0]["data"]["clip"]["videoQualities"][0]["sourceURL"] + + "?sig=" + playback_access_token["signature"] + + "&token=" + requests.utils.quote(playback_access_token["value"]) + ) except Exception: print("\n✖ Unable to get clip! Check the URL and try again.\n") input("Press Enter to continue...") @@ -2290,11 +2513,11 @@ def twitch_clip_downloader(clip_url, slug, streamer): try: response = requests.get(clip_url, stream=True, timeout=30) if response.status_code != 200: - raise Exception('Unable to download clip!') + raise Exception("Unable to download clip!") download_location = os.path.join(get_default_directory(), f"{streamer}-{slug}{get_default_video_format()}") start = time() - with open(os.path.join(get_default_directory(), download_location), 'wb') as file: + with open(os.path.join(get_default_directory(), download_location), "wb") as file: copyfileobj(response.raw, file) formatted_elapsed = str(timedelta(seconds=int(time() - start))).zfill(8) @@ -2302,7 +2525,7 @@ def twitch_clip_downloader(clip_url, slug, streamer): input("Press Enter to continue...") except Exception: - raise Exception('Unable to download clip!') + raise Exception("Unable to download clip!") def handle_twitch_clip(clip_url): @@ -2314,7 +2537,7 @@ def handle_twitch_clip(clip_url): def run_vod_recover(): print("\nWELCOME TO VOD RECOVERY!") - + menu = 0 while menu < 50: print() @@ -2404,8 +2627,10 @@ def run_vod_recover(): elif options_choice == 2: set_default_directory() elif options_choice == 3: - check_for_updates() + set_default_downloader() elif options_choice == 4: + check_for_updates() + elif options_choice == 5: script_dir = get_script_directory() config_file_path = os.path.join(script_dir, "config", "settings.json") if os.path.exists(config_file_path): @@ -2413,10 +2638,10 @@ def run_vod_recover(): input("\nPress Enter to continue...") else: print("File not found!") - elif options_choice == 5: + elif options_choice == 6: print_help() input("Press Enter to continue...") - elif options_choice == 6: + elif options_choice == 7: break elif menu == 6: print("\nExiting...\n") @@ -2425,7 +2650,7 @@ def run_vod_recover(): run_vod_recover() -if __name__ == '__main__': +if __name__ == "__main__": try: run_vod_recover() except Exception as e: