Skip to content

Commit

Permalink
Merge pull request #37 from kutu-dev/feat/refactorize-media-retrieval
Browse files Browse the repository at this point in the history
Clean and simplify downloading files
  • Loading branch information
kutu-dev authored Oct 7, 2023
2 parents b409342 + 29ad97f commit 9daaaf2
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions src/knuckles/media_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,44 @@ def __init__(self, api: Api) -> None:
self.api = api

def _generate_url(self, endpoint: str, params: dict[str, Any]) -> str:
"""Using the PreparedRequest object of the Requests request package generates a
valid URL for any endpoint with a valid authentication parameter.
:param endpoint: The endpoint of the API to be used.
:type endpoint: str
:param params: Extra parameters to be added to the URL.
:type params: dict[str, Any]
:return: The generated url.
:rtype: str
"""

prepared_request = PreparedRequest()
prepared_request.prepare_url(
f"{self.api.url}/rest/{endpoint}", {**self.api.generate_params(), **params}
)

# Ignore the error caused by the url parameter of prepared_request
# Ignore the type error caused by the url parameter of prepared_request
# as the prepare_url method always set it to a string.
return prepared_request.url # type: ignore [return-value]

def _download_file(
self, response: Response, file_or_directory_path: Path, directory_filename: str
) -> Path:
response.raise_for_status()
def _download_file(self, response: Response, downloaded_file_path: Path) -> Path:
"""Downloads a file attached to a Response object.
if file_or_directory_path.is_dir():
download_path = Path(
file_or_directory_path,
directory_filename,
)
else:
download_path = file_or_directory_path
:param response: The response to get the download binary data.
:type response: Response
:param downloaded_file_path: The file path to save the downloaded file.
:type downloaded_file_path: Path
:return: The same path given in downloaded_file_path.
:rtype: Path
"""

with open(download_path, "wb") as f:
response.raise_for_status()

with open(downloaded_file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

return download_path
return downloaded_file_path

def stream(
self,
Expand Down Expand Up @@ -114,6 +125,9 @@ def download(self, id: str, file_or_directory_path: Path) -> Path:

response = self.api.raw_request("download", {"id": id})

if not file_or_directory_path.is_dir():
return self._download_file(response, file_or_directory_path)

filename = response.headers["Content-Disposition"].split("filename=")[1].strip()

# Remove leading quote char
Expand All @@ -124,7 +138,7 @@ def download(self, id: str, file_or_directory_path: Path) -> Path:
if filename[-1] == '"':
filename = filename[:-1]

return self._download_file(response, file_or_directory_path, filename)
return self._download_file(response, file_or_directory_path / filename)

def hls(
self,
Expand Down Expand Up @@ -180,6 +194,9 @@ def get_captions(
{"id": id, "format": subtitles_file_format.value},
)

if not file_or_directory_path.is_dir():
return self._download_file(response, file_or_directory_path)

mime_type = response.headers["content-type"].partition(";")[0].strip()

# As application/x-subrip is not a valid MIME TYPE so a manual check is done
Expand All @@ -190,7 +207,7 @@ def get_captions(

filename = id + file_extension if file_extension else id

return self._download_file(response, file_or_directory_path, filename)
return self._download_file(response, file_or_directory_path / filename)

def get_cover_art(
self, id: str, file_or_directory_path: Path, size: int | None = None
Expand All @@ -212,13 +229,16 @@ def get_cover_art(

response = self.api.raw_request("getCoverArt", {"id": id, "size": size})

if not file_or_directory_path.is_dir():
return self._download_file(response, file_or_directory_path)

file_extension = guess_extension(
response.headers["content-type"].partition(";")[0].strip()
)

filename = id + file_extension if file_extension else id

return self._download_file(response, file_or_directory_path, filename)
return self._download_file(response, file_or_directory_path / filename)

def get_lyrics(self) -> None:
...
Expand All @@ -238,12 +258,14 @@ def get_avatar(self, username: str, file_or_directory_path: Path) -> Path:
"""

response = self.api.raw_request("getAvatar", {"username": username})
response.raise_for_status()

if not file_or_directory_path.is_dir():
return self._download_file(response, file_or_directory_path)

file_extension = guess_extension(
response.headers["content-type"].partition(";")[0].strip()
)

filename = username + file_extension if file_extension else username

return self._download_file(response, file_or_directory_path, filename)
return self._download_file(response, file_or_directory_path / filename)

0 comments on commit 9daaaf2

Please sign in to comment.