diff --git a/.dockerignore b/.dockerignore index 3edb0b5..9ae0f55 100644 --- a/.dockerignore +++ b/.dockerignore @@ -32,3 +32,9 @@ **/values.dev.yaml LICENSE README.md + +# Costom ignore +**/downloads +**/.github +birthdays.json +instagram_session.json diff --git a/.gitignore b/.gitignore index 8c1df70..b46e67b 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ Token.py OpenAiKey.py spotify_tokens.py *.env +/jsons/instagram_session.json # Any log files *.log # Any kind of cache diff --git a/Constants.py b/Constants.py index bff68b4..1006293 100644 --- a/Constants.py +++ b/Constants.py @@ -21,3 +21,4 @@ FUNNY_COLOR = 696969 RESPONSES_FILE = "responses.json" +MAX_VIDEO_DOWNLOAD_SIZE: int = 80 # in MB, do not use anything other than an integer diff --git a/requirements.txt b/requirements.txt index 714d8f1..22eaaf2 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/src/Youtube.py b/src/Youtube.py index 681e66e..95df30f 100644 --- a/src/Youtube.py +++ b/src/Youtube.py @@ -1,9 +1,13 @@ import functools import logging +import os from queue import LifoQueue import yt_dlp +from Constants import MAX_VIDEO_DOWNLOAD_SIZE +from src.downloader import VIDEO_RETURN_TYPE, VideoDownloader, VideoFile + ydl_opts = { 'format': 'bestaudio', 'noplaylist': True, @@ -75,11 +79,11 @@ def yt_dlp_hook(progress_queue: LifoQueue, download): def youtube_download(video_url, progress_queue: LifoQueue, file_path_with_name): logging.debug(f"Downloading {video_url} to {file_path_with_name}") yt_dlp_hook_partial = functools.partial(yt_dlp_hook, progress_queue) + ydl_opts_new = ydl_opts.copy() + ydl_opts_new["outtmpl"] = file_path_with_name + ydl_opts_new["progress_hooks"] = [yt_dlp_hook_partial] - ydl_opts["outtmpl"] = file_path_with_name - ydl_opts["progress_hooks"] = [yt_dlp_hook_partial] - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: + with yt_dlp.YoutubeDL(ydl_opts_new) as ydl: return ydl.download(url_list=[video_url]) @@ -88,3 +92,35 @@ def youtube_download(video_url, progress_queue: LifoQueue, file_path_with_name): def get_last_played_guilded() -> video_data_guild: return last_played + +class YoutubeDownloader(VideoDownloader): + @staticmethod + def download_video_from_link(url: str, path: str | None = None) -> VIDEO_RETURN_TYPE: + if path is None: + path = os.path.join("downloads", "youtube") + + os.makedirs(path, exist_ok=True) + + costum_options = { + 'format': f'bestvideo[filesize<{MAX_VIDEO_DOWNLOAD_SIZE}M]+bestaudio', + "outtmpl": os.path.join(path, "%(id)s.mp4"), + 'noplaylist': True, + 'default_search': 'auto', + 'nooverwrites': True, + 'quiet': True, + } + + with yt_dlp.YoutubeDL(costum_options) as ydl: + ydt = ydl.extract_info(url, download=True) + + if ydt is None: + return [] + + info = ydt.get("entries", [None])[0] or ydt + video_id = info["id"] + if video_id is None: + return [] + + file_path = os.path.join(path, f"{video_id}.mp4") + + return [VideoFile(file_path, info.get("title", None))] diff --git a/src/commands.py b/src/commands.py index ce14fc3..0f68a25 100644 --- a/src/commands.py +++ b/src/commands.py @@ -7,12 +7,12 @@ import src.client as client import src.voice_commands as vc_cmds -from src.twitter import download_tweets_attachments -from src.Helpers.twitter_helpers import convert_paths_to_discord_files +from src.download_commands import download_video_command from Constants import BOT_ADMIN_SERVER_ID, BOT_OWNER_ID, CYAN, KYTPBS_TAG from src import GPT, Youtube from src.Helpers.birthday_helpers import get_user_and_date_from_string + birthdays = client.get_birthdays() custom_responses = client.get_custom_responses() last_played = Youtube.get_last_played_guilded() @@ -276,11 +276,6 @@ async def run_code(self, interaction: discord.Interaction, code: str): tree = app_commands.CommandTree(discord_client) -@tree.context_menu(name="Test") -async def test(interaction: discord.Interaction, message: discord.Message): - await interaction.response.send_message( - f"The message You used this on was: {message.content} by {message.author.mention}", ephemeral=True) - @tree.context_menu(name="Mesajı_Sabitle") async def pin_message(interaction: discord.Interaction, message: discord.Message): @@ -289,17 +284,19 @@ async def pin_message(interaction: discord.Interaction, message: discord.Message f"{message.author.mention} adlı kişinin; **{message.content}** mesajı sabitlendi", ephemeral=True) -@tree.context_menu(name="Mesajdaki_Linki_Çal") -async def find_and_play(interaction: discord.Interaction, message: discord.Message): +@app_commands.allowed_installs(guilds=True, users=True) +@app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True) +@tree.context_menu(name="Linkteki_Videoyu_Indir") +async def download_video_link(interaction: discord.Interaction, message: discord.Message): + content = message.content + await download_video_command(interaction, content) + +@app_commands.allowed_installs(guilds=False, users=True) +@app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True) +@tree.context_menu(name="Linkteki_Videoyu_Gizlice_Indir") +async def download_video_link_hidden(interaction: discord.Interaction, message: discord.Message): content = message.content - watch_link = "https://www.youtube.com/watch?v=" - links = content.split(watch_link) - if len(links) > 1: # we found a link - logging.debug(f"Found a link in the message {content} the link is {links[1].split(' ')[0]}") - await vc_cmds.play(interaction, watch_link + links[1].split(' ')[0]) - return - # we didn't find a link - await interaction.response.send_message("Mesajda bir link bulamadım", ephemeral=True) + await download_video_command(interaction, content, is_ephemeral=True) @tree.command(name="ping", description="Botun pingini gösterir") @@ -307,20 +304,11 @@ async def ping(interaction: discord.Interaction): await interaction.response.send_message(f"Pong: {round(discord_client.latency * 1000)}ms") -@tree.command(name="twitter-indir", description="Twitter'dan bir Tweet'i indirir, ve içindeki medyayı gösterir") +@tree.command(name="video-indir", description="Paylaşılan linkteki videoyu paylaşır şuan-desteklenen: twitter, instagram, youtube") @app_commands.allowed_installs(guilds=True, users=True) @app_commands.allowed_contexts(guilds=True, dms=True, private_channels=True) -async def twitter_download(interaction: discord.Interaction, url: str): - await interaction.response.defer(ephemeral=False) - #TODO: add better error handling then just catching all exceptions - try: - attachments = convert_paths_to_discord_files(download_tweets_attachments(url)) - except Exception as e: - await interaction.followup.send("Bir şey ters gitti... lütfen tekrar deneyin", ephemeral=True) - raise e # re-raise the exception so we can see what went wrong - if interaction.channel is None or isinstance(interaction.channel, (discord.ForumChannel, discord.CategoryChannel)): - return - await interaction.followup.send(files=attachments) +async def download_video(interaction: discord.Interaction, url: str): + await download_video_command(interaction, url) def get_tree_instance(): diff --git a/src/download_commands.py b/src/download_commands.py new file mode 100644 index 0000000..b6bdcde --- /dev/null +++ b/src/download_commands.py @@ -0,0 +1,29 @@ +import logging +import discord + +from src.Helpers.twitter_helpers import convert_paths_to_discord_files +from src.downloading_system import get_downloader + + +async def download_video_command(interaction: discord.Interaction, url: str, is_ephemeral: bool = False): + #TODO: add better error handling then just catching all exceptions + downloader = get_downloader(url) + if downloader is None: + await interaction.response.send_message("Bu link desteklenmiyor", ephemeral=True) + logging.info("Found an unsupported link: %s", url) + return + + await interaction.response.defer(ephemeral=is_ephemeral) + + try: + attachments = downloader.download_video_from_link(url) + file_paths = [attachment.path for attachment in attachments] + discord_files = convert_paths_to_discord_files(file_paths) + except Exception as e: + await interaction.followup.send("Bir şey ters gitti... lütfen tekrar deneyin", ephemeral=True) + raise e # re-raise the exception so we can see what went wrong + if len(attachments) == 0: + await interaction.followup.send("Videoyu Bulamadım, lütfen daha sonra tekrar deneyin ya da hatayı bildirin", ephemeral=True) + return + content = " + ".join(filter(None, [attachment.caption for attachment in attachments])) or f"Video{'s' if len(attachments) > 1 else ''} Downloaded" + await interaction.followup.send(content, files=discord_files, ephemeral=is_ephemeral) diff --git a/src/downloader.py b/src/downloader.py new file mode 100644 index 0000000..590135a --- /dev/null +++ b/src/downloader.py @@ -0,0 +1,60 @@ +import logging +from abc import ABC, abstractmethod + +_NONE_STRING = "Doesn't exist" + + +class VideoFile: + """ + Video object that contains the title, and its file path. + """ + def __init__(self, file_path: str, title: str | None = None) -> None: + self._title = title + self._file_path = file_path + + def __str__(self) -> str: + return f"Title: {self._title}, File Path: {self._file_path}" + + def __repr__(self) -> str: + return f'Title: {self._title or _NONE_STRING}, File Path: {self._file_path or _NONE_STRING}' + + def __hash__(self) -> int: + return hash((self._title, self._file_path)) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, VideoFile): + return False + return self._title == other._title and self._file_path == other._file_path + + @property + def caption(self) -> str | None: + return self._title + + @property + def path(self) -> str: + return self._file_path + + + +VIDEO_RETURN_TYPE = list[VideoFile] + +class VideoDownloader(ABC): + """ + INTERPHASE FOR DOWNLOADING CONTENT FROM A WEBSITE + """ + + @staticmethod + @abstractmethod + def download_video_from_link(url: str, path: str | None = None) -> VIDEO_RETURN_TYPE: + """ + Downloads Videos from a url + if path is None, the default path is downloads/{website_name} + + if the download fails, it returns an empty list + """ + logging.error( + "VideoDownloader download_url interface was directly called, this should not happen! url was: %s for path: %s", + url, + path, + ) + return [] diff --git a/src/downloading_system.py b/src/downloading_system.py new file mode 100644 index 0000000..83611ea --- /dev/null +++ b/src/downloading_system.py @@ -0,0 +1,32 @@ +import re +from typing import Type + +from src.Youtube import YoutubeDownloader +from src.downloader import VideoDownloader +from src.instagram import InstagramDownloader +from src.twitter import TwitterDownloader + +_URL_PARSE_REGEX = re.compile(r"\b((?:https?://)?(?:(?:www\.)?(?:[\da-z\.-]+)\.(?:[a-z]{2,6})|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))(?::[0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])?(?:/[\w\.-]*)*/?)\b") # NOSONAR + +_TWITTER_REGEX = re.compile(r"\b(?:https?:\/\/)?(?:www\.)?(?:twitter\.com\/|t\.co\/|x\.com\/)\S*") +_INSTAGRAM_REGEX = re.compile(r"\b(?:https?:\/\/)?(?:www\.)?(?:instagram\.com\/|instagr\.am\/)\S*") +_YOUTUBE_REGEX = re.compile(r"\b(?:https?:\/\/)?(?:www\.)?(?:youtube\.com\/|youtu\.be\/)\S*") + + +def get_downloader(url: str) -> Type[VideoDownloader] | None: + """ + Returns the correct downloader for the given url if it can't find it + it tries to extract the url incase there is extra text in the url string + if it still can't find a downloader, it returns None + """ + + if re.match(_TWITTER_REGEX, url): + return TwitterDownloader + if re.match(_INSTAGRAM_REGEX, url): + return InstagramDownloader + if re.match(_YOUTUBE_REGEX, url): + return YoutubeDownloader + # try to extract the url from the text incase there is extra text + if (result := re.search(_URL_PARSE_REGEX, url)) and result.group(0) != url: + return get_downloader(result.group(0)) + return None diff --git a/src/instagram.py b/src/instagram.py new file mode 100644 index 0000000..798668d --- /dev/null +++ b/src/instagram.py @@ -0,0 +1,127 @@ +import logging +import os +import re +from pathlib import Path + +from dotenv import load_dotenv +from instaloader import ConnectionException, LoginException +from instaloader.instaloader import Instaloader +from instaloader.structures import Post + +from Constants import JSON_FOLDER +from src.downloader import VIDEO_RETURN_TYPE, VideoFile, VideoDownloader +from src.Read import json_read, write_json + +_SHORTCODE_REGEX = ( + r"^https?:\/\/(?:www\.)?instagram\.com\/[^\/]+(?:\/[^\/]+)?\/([^\/]{11})\/.*$" +) + +load_dotenv() + +logged_in = False + +downloader = Instaloader( + download_videos=True, + download_pictures=False, + save_metadata=False, + download_comments=False, +) + + +def _login() -> bool: + """ + Loads the session from the json file. or logs in if it doesn't exist. + Safe to call multiple times. + + Returns: + bool: True if the session was loaded successfully + """ + if logged_in: + return True + + session_path = os.path.join(JSON_FOLDER, "instagram_session.json") + username = os.getenv("INSTAGRAM_USERNAME") + if username is None: + logging.error("INSTAGRAM_USERNAME is not set in the environment variables") + return False + if os.path.exists(session_path): + session_data = json_read(session_path) + + downloader.load_session(username, session_data) + return True + + password = os.getenv("INSTAGRAM_PASSWORD") + if password is None: + logging.error( + "INSTAGRAM_PASSWORD was not set in the environment variables" + + " and couldn't find instagram_session file in jsons folder" + ) + return False + try: + downloader.login(username, password) + session = downloader.save_session() + write_json(session_path, session) + return True + except LoginException as e: + logging.error("Instagram login failed!!! FIX CREDENTIALS. Error: %s", e) + return False + + +logged_in = _login() + + +def _get_post_from_url(url: str) -> Post | None: + result = re.match(_SHORTCODE_REGEX, url) + if result is None: + return None + shortcode = result.group(1) + if not isinstance(shortcode, str): + return None + try: + return Post.from_shortcode(downloader.context, shortcode) + except ConnectionException as e: # probably graphql error + logging.exception(e) + return None + + +class InstagramDownloader(VideoDownloader): + @staticmethod + def download_video_from_link(url: str, path: str | None = None) -> VIDEO_RETURN_TYPE: + attachment_list: VIDEO_RETURN_TYPE = [] + + if path is None: + path = os.path.join("downloads", "instagram") + + os.makedirs(path, exist_ok=True) + + path = Path(path) # type: ignore + + post = _get_post_from_url(url) + if post is None: + return attachment_list + downloader.filename_pattern = "{shortcode}" + + is_video_list = post.get_is_videos() + is_video_list = list(filter(lambda x: x is True, is_video_list)) + + downloaded: bool = False + if post.typename == "GraphSidecar": + for index, _ in enumerate(is_video_list, start=1): + + file_path = os.path.join(path, f"{post.shortcode}_{index}.mp4") # type: ignore # there is a bug in pylance... + file = VideoFile(file_path, post.caption) + + if not os.path.exists(file.path) and not downloaded: + downloader.download_post(post, path) # type: ignore # path is literally a Path object it cannot be None... + downloaded = True + + attachment_list.append(file) + else: + file_path = os.path.join(path, f"{post.shortcode}.mp4") # type: ignore # there is a bug in pylance... + file = VideoFile(file_path, post.caption) + if not os.path.exists(file.path) and not downloaded: + downloader.download_post(post, path) # type: ignore # path is literally a Path object it cannot be None... + downloaded = True + attachment_list.append(file) + + return attachment_list diff --git a/src/twitter.py b/src/twitter.py index d2690e1..4f5d00e 100644 --- a/src/twitter.py +++ b/src/twitter.py @@ -4,13 +4,14 @@ from dotenv import load_dotenv import requests +from src.downloader import VideoDownloader, VideoFile, VIDEO_RETURN_TYPE from src.Helpers.twitter_helpers import get_filename_from_data, get_tweet_id load_dotenv() API_URL_START = "https://twitsave.com/info?url=" -def download_video_from_tweet(url: str, filename: int | str, path: str | None = None): +def _download_video_from_link(url: str, filename: int | str, path: str | None = None): """ Downloads Videos from a twitter tweet, if path is None, the default path is downloads/twitter @@ -20,7 +21,7 @@ def download_video_from_tweet(url: str, filename: int | str, path: str | None = path (str | None, optional): Path to download all the attachments to. Defaults to None. Returns: - int : count of attachments downloaded + the filepath of the downloaded file or none if there was an error """ if path is None: path = os.path.join("downloads", "twitter") @@ -31,7 +32,7 @@ def download_video_from_tweet(url: str, filename: int | str, path: str | None = response = requests.get(url, timeout=30) except requests.exceptions.RequestException as e: logging.error("Error while downloading tweet: %s", str(e)) - return + return None filepath = os.path.join( path, @@ -46,28 +47,55 @@ def download_video_from_tweet(url: str, filename: int | str, path: str | None = return filepath -def download_tweets_attachments(url: str, path: str | None = None) -> list[str]: - attachment_list: list[str] = [] - try: - response = requests.get(API_URL_START + url, timeout=30) - except requests.exceptions.RequestException as e: - logging.error("Error while downloading tweet: %s", str(e)) - return attachment_list - data = bs4.BeautifulSoup(response.text, "html.parser") - #TODO: ERROR-HANDLING: Try-Catch for the scraping going wrong - download_button = data.find_all("div", class_="origin-top-right")[0] - quality_buttons = download_button.find_all("a") - highest_quality_url = quality_buttons[0].get("href") # Highest quality video url - tweet_id = get_tweet_id(url) - filename = tweet_id if tweet_id is not None else get_filename_from_data(data) - # TODO: Handle multiple attachments, currenly don't know what happens with multiple attachments - attachment = download_video_from_tweet( - highest_quality_url, filename=filename, path=path - ) +class TwitterDownloader(VideoDownloader): + @staticmethod + def download_video_from_link( + url: str, path: str | None = None + ) -> VIDEO_RETURN_TYPE: + attachment_list: VIDEO_RETURN_TYPE = [] + try: + response = requests.get(API_URL_START + url, timeout=30) + except requests.exceptions.RequestException as e: + logging.error("Error while downloading tweet: %s", str(e)) + return attachment_list + data = bs4.BeautifulSoup(response.text, "html.parser") - if attachment is None: - return attachment_list + download_buttons: list[bs4.element.Tag] = data.find_all( + "div", class_="origin-top-right" + ) + + for index, button in enumerate(download_buttons): + highest_quality_url_button = button.find("a") + + if not isinstance(highest_quality_url_button, bs4.element.Tag): + logging.warning("No highest quality url button found at index %d URL: %s", index, url) + continue - attachment_list.append(attachment) + highest_quality_url = highest_quality_url_button.get( + "href" + ) # Highest quality video url button - return attachment_list + if not isinstance(highest_quality_url, str): + logging.error("No highest quality url found at index %d URL: %s", index, url) + continue + + tweet_id = get_tweet_id(url) + + # add index to filename to avoid overwriting + if tweet_id is not None: + filename = tweet_id + "_" + str(index) + else: + filename = ( + get_filename_from_data(data) + "_" + str(index) + ) # just in case both filenames are the same + + attachment = _download_video_from_link( + highest_quality_url, filename=filename, path=path + ) + + if attachment is None: + continue + + attachment_list.append(VideoFile(attachment)) + + return attachment_list