From 29ad97fda90bacd6293259a6da77603d99cf47a5 Mon Sep 17 00:00:00 2001 From: Kutu Date: Sat, 7 Oct 2023 19:19:00 +0200 Subject: [PATCH] Clean and simplify downloading files --- src/knuckles/media_retrieval.py | 60 ++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/src/knuckles/media_retrieval.py b/src/knuckles/media_retrieval.py index 2fceed8..e8a69fc 100644 --- a/src/knuckles/media_retrieval.py +++ b/src/knuckles/media_retrieval.py @@ -24,33 +24,44 @@ def __init__(self, api: Api) -> None: self.api = api def _generate_url(self, endpoint: str, params: dict[str, Any]) -> str: + """Using the PreparedRequest object of the Requests request package generates a + valid URL for any endpoint with a valid authentication parameter. + + :param endpoint: The endpoint of the API to be used. + :type endpoint: str + :param params: Extra parameters to be added to the URL. + :type params: dict[str, Any] + :return: The generated url. + :rtype: str + """ + prepared_request = PreparedRequest() prepared_request.prepare_url( f"{self.api.url}/rest/{endpoint}", {**self.api.generate_params(), **params} ) - # Ignore the error caused by the url parameter of prepared_request + # Ignore the type error caused by the url parameter of prepared_request # as the prepare_url method always set it to a string. return prepared_request.url # type: ignore [return-value] - def _download_file( - self, response: Response, file_or_directory_path: Path, directory_filename: str - ) -> Path: - response.raise_for_status() + def _download_file(self, response: Response, downloaded_file_path: Path) -> Path: + """Downloads a file attached to a Response object. - if file_or_directory_path.is_dir(): - download_path = Path( - file_or_directory_path, - directory_filename, - ) - else: - download_path = file_or_directory_path + :param response: The response to get the download binary data. + :type response: Response + :param downloaded_file_path: The file path to save the downloaded file. + :type downloaded_file_path: Path + :return: The same path given in downloaded_file_path. + :rtype: Path + """ - with open(download_path, "wb") as f: + response.raise_for_status() + + with open(downloaded_file_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - return download_path + return downloaded_file_path def stream( self, @@ -114,6 +125,9 @@ def download(self, id: str, file_or_directory_path: Path) -> Path: response = self.api.raw_request("download", {"id": id}) + if not file_or_directory_path.is_dir(): + return self._download_file(response, file_or_directory_path) + filename = response.headers["Content-Disposition"].split("filename=")[1].strip() # Remove leading quote char @@ -124,7 +138,7 @@ def download(self, id: str, file_or_directory_path: Path) -> Path: if filename[-1] == '"': filename = filename[:-1] - return self._download_file(response, file_or_directory_path, filename) + return self._download_file(response, file_or_directory_path / filename) def hls( self, @@ -180,6 +194,9 @@ def get_captions( {"id": id, "format": subtitles_file_format.value}, ) + if not file_or_directory_path.is_dir(): + return self._download_file(response, file_or_directory_path) + mime_type = response.headers["content-type"].partition(";")[0].strip() # As application/x-subrip is not a valid MIME TYPE so a manual check is done @@ -190,7 +207,7 @@ def get_captions( filename = id + file_extension if file_extension else id - return self._download_file(response, file_or_directory_path, filename) + return self._download_file(response, file_or_directory_path / filename) def get_cover_art( self, id: str, file_or_directory_path: Path, size: int | None = None @@ -212,13 +229,16 @@ def get_cover_art( response = self.api.raw_request("getCoverArt", {"id": id, "size": size}) + if not file_or_directory_path.is_dir(): + return self._download_file(response, file_or_directory_path) + file_extension = guess_extension( response.headers["content-type"].partition(";")[0].strip() ) filename = id + file_extension if file_extension else id - return self._download_file(response, file_or_directory_path, filename) + return self._download_file(response, file_or_directory_path / filename) def get_lyrics(self) -> None: ... @@ -238,7 +258,9 @@ def get_avatar(self, username: str, file_or_directory_path: Path) -> Path: """ response = self.api.raw_request("getAvatar", {"username": username}) - response.raise_for_status() + + if not file_or_directory_path.is_dir(): + return self._download_file(response, file_or_directory_path) file_extension = guess_extension( response.headers["content-type"].partition(";")[0].strip() @@ -246,4 +268,4 @@ def get_avatar(self, username: str, file_or_directory_path: Path) -> Path: filename = username + file_extension if file_extension else username - return self._download_file(response, file_or_directory_path, filename) + return self._download_file(response, file_or_directory_path / filename)