From de1cf6b09000c6b44f82c3c51fb147ba41ab906e Mon Sep 17 00:00:00 2001 From: Kutu Date: Sat, 30 Sep 2023 21:00:30 +0200 Subject: [PATCH] Refactorize media retrieval methods --- src/knuckles/media_retrieval.py | 96 +++++++++++++-------------------- 1 file changed, 38 insertions(+), 58 deletions(-) diff --git a/src/knuckles/media_retrieval.py b/src/knuckles/media_retrieval.py index 8d0060e..15b17a1 100644 --- a/src/knuckles/media_retrieval.py +++ b/src/knuckles/media_retrieval.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Any +from requests import Response from requests.models import PreparedRequest from .api import Api @@ -26,6 +27,25 @@ def _generate_url(self, endpoint: str, params: dict[str, Any]) -> str: # as the prepare_url method always set it to a string. return prepared_request.url # type: ignore [return-value] + def _download_file( + self, response: Response, file_or_directory_path: Path, directory_filename: str + ) -> Path: + response.raise_for_status() + + if file_or_directory_path.is_dir(): + download_path = Path( + file_or_directory_path, + directory_filename, + ) + else: + download_path = file_or_directory_path + + with open(download_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return download_path + def stream(self, id: str) -> str: """Returns a valid url for streaming the requested song @@ -51,33 +71,18 @@ def download(self, id: str, file_or_directory_path: Path) -> Path: """ response = self.api.raw_request("download", {"id": id}) - response.raise_for_status() - if file_or_directory_path.is_dir(): - filename = ( - response.headers["Content-Disposition"].split("filename=")[1].strip() - ) + filename = response.headers["Content-Disposition"].split("filename=")[1].strip() - # Remove leading quote char - if filename[0] == '"': - filename = filename[1:] + # Remove leading quote char + if filename[0] == '"': + filename = filename[1:] - # Remove trailing quote char - if filename[-1] == '"': - filename = filename[:-1] + # Remove trailing quote char + if filename[-1] == '"': + filename = filename[:-1] - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path - - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - return download_path + return self._download_file(response, file_or_directory_path, filename) def hls(self, id: str) -> str: """Returns a valid url for streaming the requested song with hls.m3u8 @@ -112,27 +117,14 @@ def get_cover_art( """ response = self.api.raw_request("getCoverArt", {"id": id, "size": size}) - response.raise_for_status() - - if file_or_directory_path.is_dir(): - file_extension = guess_extension( - response.headers["content-type"].partition(";")[0].strip() - ) - filename = id + file_extension if file_extension else id - - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path + file_extension = guess_extension( + response.headers["content-type"].partition(";")[0].strip() + ) - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + filename = id + file_extension if file_extension else id - return download_path + return self._download_file(response, file_or_directory_path, filename) def get_lyrics(self) -> None: ... @@ -154,22 +146,10 @@ def get_avatar(self, username: str, file_or_directory_path: Path) -> Path: response = self.api.raw_request("getAvatar", {"username": username}) response.raise_for_status() - if file_or_directory_path.is_dir(): - file_extension = guess_extension( - response.headers["content-type"].partition(";")[0].strip() - ) - - filename = username + file_extension if file_extension else username - - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path + file_extension = guess_extension( + response.headers["content-type"].partition(";")[0].strip() + ) - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + filename = username + file_extension if file_extension else username - return download_path + return self._download_file(response, file_or_directory_path, filename)