diff --git a/src/knuckles/media_retrieval.py b/src/knuckles/media_retrieval.py index f8d7194..15b17a1 100644 --- a/src/knuckles/media_retrieval.py +++ b/src/knuckles/media_retrieval.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Any +from requests import Response from requests.models import PreparedRequest from .api import Api @@ -26,6 +27,25 @@ def _generate_url(self, endpoint: str, params: dict[str, Any]) -> str: # as the prepare_url method always set it to a string. return prepared_request.url # type: ignore [return-value] + def _download_file( + self, response: Response, file_or_directory_path: Path, directory_filename: str + ) -> Path: + response.raise_for_status() + + if file_or_directory_path.is_dir(): + download_path = Path( + file_or_directory_path, + directory_filename, + ) + else: + download_path = file_or_directory_path + + with open(download_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + return download_path + def stream(self, id: str) -> str: """Returns a valid url for streaming the requested song @@ -51,33 +71,18 @@ def download(self, id: str, file_or_directory_path: Path) -> Path: """ response = self.api.raw_request("download", {"id": id}) - response.raise_for_status() - - if file_or_directory_path.is_dir(): - filename = ( - response.headers["Content-Disposition"].split("filename=")[1].strip() - ) - # Remove leading quote char - if filename[0] == '"': - filename = filename[1:] + filename = response.headers["Content-Disposition"].split("filename=")[1].strip() - # Remove trailing quote char - if filename[-1] == '"': - filename = filename[:-1] - - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path + # Remove leading quote char + if filename[0] == '"': + filename = filename[1:] - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + # Remove trailing quote char + if filename[-1] == '"': + filename = filename[:-1] - return download_path + return self._download_file(response, file_or_directory_path, filename) def hls(self, id: str) -> str: """Returns a valid url for streaming the requested song with hls.m3u8 @@ -93,7 +98,9 @@ def hls(self, id: str) -> str: def get_captions(self) -> None: ... - def get_cover_art(self, id: str, file_or_directory_path: Path, size: int) -> Path: + def get_cover_art( + self, id: str, file_or_directory_path: Path, size: int | None = None + ) -> Path: """Calls the "getCoverArt" endpoint of the API. :param id: The id of the cover art to download. @@ -110,27 +117,14 @@ def get_cover_art(self, id: str, file_or_directory_path: Path, size: int) -> Pat """ response = self.api.raw_request("getCoverArt", {"id": id, "size": size}) - response.raise_for_status() - - if file_or_directory_path.is_dir(): - file_extension = guess_extension( - response.headers["content-type"].partition(";")[0].strip() - ) - filename = id + file_extension if file_extension else id - - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path + file_extension = guess_extension( + response.headers["content-type"].partition(";")[0].strip() + ) - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + filename = id + file_extension if file_extension else id - return download_path + return self._download_file(response, file_or_directory_path, filename) def get_lyrics(self) -> None: ... @@ -152,22 +146,10 @@ def get_avatar(self, username: str, file_or_directory_path: Path) -> Path: response = self.api.raw_request("getAvatar", {"username": username}) response.raise_for_status() - if file_or_directory_path.is_dir(): - file_extension = guess_extension( - response.headers["content-type"].partition(";")[0].strip() - ) - - filename = username + file_extension if file_extension else username - - download_path = Path( - file_or_directory_path, - filename, - ) - else: - download_path = file_or_directory_path + file_extension = guess_extension( + response.headers["content-type"].partition(";")[0].strip() + ) - with open(download_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + filename = username + file_extension if file_extension else username - return download_path + return self._download_file(response, file_or_directory_path, filename) diff --git a/tests/api/test_media_retrieval.py b/tests/api/test_media_retrieval.py index f8295c6..214e963 100644 --- a/tests/api/test_media_retrieval.py +++ b/tests/api/test_media_retrieval.py @@ -2,11 +2,11 @@ from pathlib import Path from typing import Any - +from responses import Response import responses from knuckles import Subsonic -from tests.mocks.media_retrieval import MockDownload +from tests.mocks.media_retrieval import FileMetadata def test_stream(subsonic: Subsonic, song: dict[str, Any]) -> None: @@ -18,58 +18,44 @@ def test_stream(subsonic: Subsonic, song: dict[str, Any]) -> None: @responses.activate def test_download_with_a_given_filename( + subsonic: Subsonic, + mock_download: Response, tmp_path: Path, - output_song_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, song: dict[str, Any], - song_content_type: str, + download_metadata: FileMetadata, ) -> None: - responses.add( - mock_download_file("download", {"id": song["id"]}, tmp_path, song_content_type) - ) + responses.add(mock_download) download_path = subsonic.media_retrieval.download( - song["id"], tmp_path / output_song_filename + song["id"], tmp_path / download_metadata.output_filename ) # Check if the file data has been unaltered - with open(tmp_path / output_song_filename, "r") as file: + with open(tmp_path / download_metadata.output_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / output_song_filename + assert download_path == tmp_path / download_metadata.output_filename @responses.activate def test_download_without_a_given_filename( + subsonic: Subsonic, + mock_download: Response, tmp_path: Path, - default_song_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, song: dict[str, Any], - song_content_type: str, + download_metadata: FileMetadata, ) -> None: - responses.add( - mock_download_file( - "download", - {"id": song["id"]}, - tmp_path, - song_content_type, - headers={ - "Content-Disposition": f'attachment; filename="{default_song_filename}"' - }, - ) - ) + responses.add(mock_download) download_path = subsonic.media_retrieval.download(song["id"], tmp_path) # Check if the file data has been unaltered - with open(tmp_path / default_song_filename, "r") as file: + with open(tmp_path / download_metadata.default_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / default_song_filename + assert download_path == tmp_path / download_metadata.default_filename def test_hls(subsonic: Subsonic, song: dict[str, Any]) -> None: @@ -81,113 +67,87 @@ def test_hls(subsonic: Subsonic, song: dict[str, Any]) -> None: @responses.activate def test_get_cover_art_with_a_given_filename( + subsonic: Subsonic, + mock_cover_art: Response, tmp_path: Path, - output_cover_art_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, song: dict[str, Any], - cover_art_content_type: str, + cover_art_metadata: FileMetadata, cover_art_size: int, ) -> None: - responses.add( - mock_download_file( - "getCoverArt", - {"id": song["coverArt"], "size": cover_art_size}, - tmp_path, - cover_art_content_type, - ) - ) + responses.add(mock_cover_art) download_path = subsonic.media_retrieval.get_cover_art( - song["coverArt"], tmp_path / output_cover_art_filename, cover_art_size + song["coverArt"], tmp_path / cover_art_metadata.output_filename, cover_art_size ) # Check if the file data has been unaltered - with open(tmp_path / output_cover_art_filename, "r") as file: + with open(tmp_path / cover_art_metadata.output_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / output_cover_art_filename + assert download_path == tmp_path / cover_art_metadata.output_filename @responses.activate def test_get_cover_art_without_a_given_filename( + subsonic: Subsonic, + mock_cover_art: Response, tmp_path: Path, - default_cover_art_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, song: dict[str, Any], - cover_art_content_type: str, cover_art_size: int, + cover_art_metadata: FileMetadata, ) -> None: - responses.add( - mock_download_file( - "getCoverArt", - {"id": song["coverArt"], "size": cover_art_size}, - tmp_path, - cover_art_content_type, - ) - ) + responses.add(mock_cover_art) download_path = subsonic.media_retrieval.get_cover_art( song["coverArt"], tmp_path, cover_art_size ) # Check if the file data has been unaltered - with open(tmp_path / default_cover_art_filename, "r") as file: + with open(tmp_path / cover_art_metadata.default_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / default_cover_art_filename + assert download_path == tmp_path / cover_art_metadata.default_filename @responses.activate def test_get_avatar_with_a_given_filename( + subsonic: Subsonic, + mock_avatar: Response, tmp_path: Path, - output_avatar_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, username: str, - avatar_content_type: str, + avatar_metadata: FileMetadata, ) -> None: - responses.add( - mock_download_file( - "getAvatar", {"username": username}, tmp_path, avatar_content_type - ) - ) + responses.add(mock_avatar) download_path = subsonic.media_retrieval.get_avatar( - username, tmp_path / output_avatar_filename + username, tmp_path / avatar_metadata.output_filename ) # Check if the file data has been unaltered - with open(tmp_path / output_avatar_filename, "r") as file: + with open(tmp_path / avatar_metadata.output_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / output_avatar_filename + assert download_path == tmp_path / avatar_metadata.output_filename @responses.activate def test_get_avatar_without_a_given_filename( + subsonic: Subsonic, + mock_avatar: Response, tmp_path: Path, - default_avatar_filename: str, placeholder_data: str, - mock_download_file: MockDownload, - subsonic: Subsonic, username: str, - avatar_content_type: str, + avatar_metadata: FileMetadata, ) -> None: - responses.add( - mock_download_file( - "getAvatar", {"username": username}, tmp_path, avatar_content_type - ) - ) + responses.add(mock_avatar) download_path = subsonic.media_retrieval.get_avatar(username, tmp_path) # Check if the file data has been unaltered - with open(tmp_path / default_avatar_filename, "r") as file: + with open(tmp_path / avatar_metadata.default_filename, "r") as file: assert placeholder_data == file.read() - assert download_path == tmp_path / default_avatar_filename + assert download_path == tmp_path / avatar_metadata.default_filename diff --git a/tests/mocks/media_retrieval.py b/tests/mocks/media_retrieval.py index e30895c..05834b1 100644 --- a/tests/mocks/media_retrieval.py +++ b/tests/mocks/media_retrieval.py @@ -1,5 +1,6 @@ +from dataclasses import dataclass from pathlib import Path -from typing import Protocol, Any +from typing import Any, Protocol import pytest from responses import Response @@ -7,66 +8,86 @@ from tests.conftest import MockGenerator +@pytest.fixture() +def placeholder_data() -> str: + return "Lorem Ipsum" + + class MockDownload(Protocol): def __call__( self, endpoint: str, extra_params: dict[str, Any], - temp_dir_path: Path, content_type: str, headers: dict[str, str] = {}, ) -> Response: ... -@pytest.fixture() -def placeholder_data() -> str: - return "Lorem Ipsum" - - @pytest.fixture -def default_song_filename() -> str: - return "default.wav" - - -@pytest.fixture -def output_song_filename() -> str: - return "output.wav" - +def mock_download_file_generator( + placeholder_data: str, mock_generator: MockGenerator, tmp_path: Path +) -> MockDownload: + def inner( + endpoint: str, + extra_params: dict[str, Any], + content_type: str, + headers: dict[str, str] = {}, + ): + fake_file = tmp_path / "file.mock" + fake_file.touch() -@pytest.fixture -def song_content_type() -> str: - return "audio/wav" + with open(fake_file, "w") as file: + file.write(placeholder_data) + with open(fake_file, "r") as file: + return mock_generator( + endpoint, + extra_params, + headers=headers, + content_type=content_type, + body=file.read(), + ) -@pytest.fixture -def default_avatar_filename(username: str) -> str: - return f"{username}.png" + return inner -@pytest.fixture -def output_avatar_filename() -> str: - return "output.png" +@dataclass +class FileMetadata: + # The default filename that Knuckles should use if no custom one is provided + default_filename: str + # The custom filename given to Knuckles + output_filename: str -@pytest.fixture -def avatar_content_type() -> str: - return "image/png" + content_type: str @pytest.fixture -def default_cover_art_filename(song: dict[str, Any]) -> str: - return f"{song['coverArt']}.png" +def download_metadata() -> FileMetadata: + return FileMetadata("default.wav", "output.wav", "audio/wav") @pytest.fixture -def output_cover_art_filename() -> str: - return "output.png" +def mock_download( + mock_download_file_generator: MockDownload, + song: dict[str, Any], + download_metadata: FileMetadata, +) -> Response: + return mock_download_file_generator( + "download", + {"id": song["id"]}, + download_metadata.content_type, + headers={ + "Content-Disposition": "attachment; " + + f'filename="{download_metadata.default_filename}"' + }, + ) @pytest.fixture -def cover_art_content_type() -> str: - return "image/png" +def cover_art_metadata(song: dict[str, Any]) -> FileMetadata: + return FileMetadata(f"{song['coverArt']}.png", "output.png", "image/png") @pytest.fixture @@ -75,30 +96,32 @@ def cover_art_size() -> int: @pytest.fixture -def mock_download_file( - placeholder_data: str, - mock_generator: MockGenerator, -) -> MockDownload: - def inner( - endpoint: str, - extra_params: dict[str, Any], - temp_dir_path: Path, - content_type: str, - headers: dict[str, str] = {}, - ): - fake_file = temp_dir_path / "file.mock" - fake_file.touch() +def mock_cover_art( + mock_download_file_generator: MockDownload, + song: dict[str, Any], + cover_art_metadata: FileMetadata, + cover_art_size: int, +) -> Response: + return mock_download_file_generator( + "getCoverArt", + {"id": song["coverArt"], "size": cover_art_size}, + cover_art_metadata.content_type, + ) - with open(fake_file, "w") as file: - file.write(placeholder_data) - with open(fake_file, "r") as file: - return mock_generator( - endpoint, - extra_params, - headers=headers, - content_type=content_type, - body=file.read(), - ) +@pytest.fixture +def avatar_metadata(username: str) -> FileMetadata: + return FileMetadata(f"{username}.png", "output.png", "image/png") - return inner + +@pytest.fixture +def mock_avatar( + mock_download_file_generator: MockDownload, + username: str, + avatar_metadata: FileMetadata, +) -> Response: + return mock_download_file_generator( + "getAvatar", + {"username": username}, + avatar_metadata.content_type, + )