Skip to content

Commit

Permalink
Merge branch 'tidal_aac'
Browse files Browse the repository at this point in the history
  • Loading branch information
nathom committed Apr 10, 2021
2 parents 43663ef + f8dc9d2 commit 9301757
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 41 deletions.
69 changes: 46 additions & 23 deletions streamrip/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import click
from mutagen.flac import FLAC, Picture
from mutagen.id3 import APIC, ID3, ID3NoHeaderError
from mutagen.mp4 import MP4, MP4Cover
from pathvalidate import sanitize_filename, sanitize_filepath

from . import converter
from .clients import ClientInterface
from .constants import (
ALBUM_KEYS,
EXT,
FLAC_MAX_BLOCKSIZE,
FOLDER_FORMAT,
TRACK_FORMAT,
Expand All @@ -37,6 +37,7 @@
safe_get,
tidal_cover_url,
tqdm_download,
ext,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -315,7 +316,7 @@ def format_final_path(self) -> str:
filename = clean_format(self.file_format, formatter)
self.final_path = (
os.path.join(self.folder, filename)[:250].strip()
+ EXT[self.quality] # file extension dict
+ ext(self.quality, self.client.source)
)

logger.debug("Formatted path: %s", self.final_path)
Expand Down Expand Up @@ -366,7 +367,7 @@ def from_api(cls, item: dict, client: ClientInterface):
def tag(
self,
album_meta: dict = None,
cover: Union[Picture, APIC] = None,
cover: Union[Picture, APIC, MP4Cover] = None,
embed_cover: bool = True,
):
"""Tag the track using the stored metadata.
Expand Down Expand Up @@ -403,22 +404,28 @@ def tag(
logger.debug("Tagging file with %s container", self.container)
audio = FLAC(self.final_path)
elif self.quality <= 1:
self.container = "MP3"
if self.client.source == 'tidal':
self.container = 'AAC'
audio = MP4(self.final_path)
else:
self.container = 'MP3'
try:
audio = ID3(self.final_path)
except ID3NoHeaderError:
audio = ID3()

logger.debug("Tagging file with %s container", self.container)
try:
audio = ID3(self.final_path)
except ID3NoHeaderError:
audio = ID3()
else:
raise InvalidQuality(f'Invalid quality: "{self.quality}"')

# automatically generate key, value pairs based on container
for k, v in self.meta.tags(self.container):
tags = self.meta.tags(self.container)
for k, v in tags:
audio[k] = v

if embed_cover and cover is None:
assert hasattr(self, "cover_path")
cover = Tracklist.get_cover_obj(self.cover_path, self.quality)
cover = Tracklist.get_cover_obj(self.cover_path, self.quality, self.client.source)

if isinstance(audio, FLAC):
if embed_cover:
Expand All @@ -428,6 +435,9 @@ def tag(
if embed_cover:
audio.add(cover)
audio.save(self.final_path, "v2_version=3")
elif isinstance(audio, MP4):
audio['covr'] = [cover]
audio.save()
else:
raise ValueError(f"Unknown container type: {audio}")

Expand Down Expand Up @@ -606,7 +616,7 @@ def from_api(cls, item: dict, client: ClientInterface):
return cls(client=client, **info)

@staticmethod
def get_cover_obj(cover_path: str, quality: int) -> Union[Picture, APIC]:
def get_cover_obj(cover_path: str, quality: int, source: str) -> Union[Picture, APIC]:
"""Given the path to an image and a quality id, return an initialized
cover object that can be used for every track in the album.
Expand All @@ -616,25 +626,38 @@ def get_cover_obj(cover_path: str, quality: int) -> Union[Picture, APIC]:
:type quality: int
:rtype: Union[Picture, APIC]
"""
cover_type = {0: APIC, 1: APIC, 2: Picture, 3: Picture, 4: Picture}
def flac_mp3_cover_obj(cover):
cover_obj = cover()
cover_obj.type = 3
cover_obj.mime = "image/jpeg"
with open(cover_path, "rb") as img:
cover_obj.data = img.read()

return cover_obj

if quality > 1:
cover = Picture
elif source == 'tidal':
cover = MP4Cover
else:
cover = APIC

cover = cover_type.get(quality)
if cover is Picture:
size_ = os.path.getsize(cover_path)
if size_ > FLAC_MAX_BLOCKSIZE:
raise TooLargeCoverArt(
f"Not suitable for Picture embed: {size_ / 10 ** 6} MB"
)
elif cover is None:
raise InvalidQuality(f"Quality {quality} not allowed")
return flac_mp3_cover_obj(cover)

elif cover is APIC:
return flac_mp3_cover_obj(cover)

cover_obj = cover()
cover_obj.type = 3
cover_obj.mime = "image/jpeg"
with open(cover_path, "rb") as img:
cover_obj.data = img.read()
elif cover is MP4Cover:
with open(cover_path, 'rb') as img:
return cover(img.read(), imageformat=MP4Cover.FORMAT_JPEG)

return cover_obj
raise InvalidQuality(f"Quality {quality} not allowed")

def download_message(self):
click.secho(
Expand Down Expand Up @@ -913,7 +936,7 @@ def download(
if (
self.cover_urls.get(download_cover_size, embed_cover_size)
!= embed_cover_size
or os.path.size(cover_path) > FLAC_MAX_BLOCKSIZE
or os.path.getsize(cover_path) > FLAC_MAX_BLOCKSIZE
):
# download cover at another resolution but don't use for embed
embed_cover_path = cover_path.replace(".jpg", "_embed.jpg")
Expand All @@ -926,7 +949,7 @@ def download(

embed_cover = kwargs.get("embed_cover", True) # embed by default
if self.client.source != "deezer" and embed_cover:
cover = self.get_cover_obj(embed_cover_path, quality)
cover = self.get_cover_obj(embed_cover_path, quality, self.client.source)

download_args = {
"quality": quality,
Expand Down
48 changes: 30 additions & 18 deletions streamrip/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ def add_album_meta(self, resp: dict):
"""
if self.__source == "qobuz":
self.album = resp.get("title")
self.tracktotal = str(resp.get("tracks_count", 1))
self.tracktotal = resp.get("tracks_count", 1)
self.genre = resp.get("genres_list", [])
self.date = resp.get("release_date_original") or resp.get("release_date")
if self.date:
self.year = self.date[:4]

self.copyright = resp.get("copyright")
self.albumartist = safe_get(resp, "artist", "name")
self.label = resp.get("label")
Expand All @@ -117,8 +120,11 @@ def add_album_meta(self, resp: dict):
self.tracktotal = resp.get("numberOfTracks")
# genre not returned by API
self.date = resp.get("releaseDate")
if self.date:
self.year = self.date[:4]

self.copyright = resp.get("copyright")
self.albumartist = resp.get("artist", {}).get("name")
self.albumartist = safe_get(resp, 'artist', 'name')
self.disctotal = resp.get("numberOfVolumes")
self.isrc = resp.get("isrc")
self.explicit = resp.get("explicit", False)
Expand All @@ -127,9 +133,9 @@ def add_album_meta(self, resp: dict):
elif self.__source == "deezer":
self.album = resp.get("title")
self.tracktotal = resp.get("track_total")
self.genre = resp.get("genres", {}).get("data")
self.genre = safe_get(resp, 'genres', 'data')
self.date = resp.get("release_date")
self.albumartist = resp.get("artist", {}).get("name")
self.albumartist = safe_get(resp, 'artist', 'name')
self.label = resp.get("label")
# either 0 or 1
self.explicit = bool(resp.get("parental_warning"))
Expand All @@ -140,8 +146,8 @@ def add_album_meta(self, resp: dict):
raise ValueError(self.__source)

def add_track_meta(self, track: dict):
"""Parse the metadata from a track dict returned by the
Qobuz API.
"""Parse the metadata from a track dict returned by an
API.
:param track:
"""
Expand All @@ -150,25 +156,23 @@ def add_track_meta(self, track: dict):
self._mod_title(track.get("version"), track.get("work"))
self.composer = track.get("composer", {}).get("name")

self.tracknumber = f"{int(track.get('track_number', 1)):02}"
self.discnumber = str(track.get("media_number", 1))
try:
self.artist = track["performer"]["name"]
except KeyError:
if hasattr(self, "albumartist"):
self.artist = self.albumartist
self.tracknumber = track.get('track_number', 1)
self.discnumber = track.get("media_number", 1)
self.artist = safe_get(track, 'performer', 'name')
if self.artist is None:
self.artist = self.get('albumartist')

elif self.__source == "tidal":
self.title = track.get("title").strip()
self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('trackNumber', 1)):02}"
self.discnumber = str(track.get("volumeNumber"))
self.tracknumber = track.get('trackNumber', 1)
self.discnumber = track.get("volumeNumber")
self.artist = track.get("artist", {}).get("name")

elif self.__source == "deezer":
self.title = track.get("title").strip()
self._mod_title(track.get("version"), None)
self.tracknumber = f"{int(track.get('track_position', 1)):02}"
self.tracknumber = track.get('track_position', 1)
self.discnumber = track.get("disk_number")
self.artist = track.get("artist", {}).get("name")

Expand Down Expand Up @@ -364,14 +368,22 @@ def __gen_mp3_tags(self) -> Tuple[str, str]:
if text is not None and v is not None:
yield (v.__name__, v(encoding=3, text=text))

def __mp4_tags(self) -> Tuple[str, str]:
def __gen_mp4_tags(self) -> Tuple[str, Union[str, int, tuple]]:
"""Generate key, value pairs to tag ALAC or AAC files in
an MP4 container.
:rtype: Tuple[str, str]
"""
for k, v in MP4_KEY.items():
return (v, getattr(self, k))
if k == "tracknumber":
text = [(self.tracknumber, self.tracktotal)]
elif k == 'discnumber':
text = [(self.discnumber, self.get('disctotal', 1))]
else:
text = getattr(self, k)

if v is not None and text is not None:
yield (v, text)

def __setitem__(self, key, val):
"""Dict-like access for tags.
Expand Down
10 changes: 10 additions & 0 deletions streamrip/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,13 @@ def decrypt_mqa_file(in_path, out_path, encryption_key):
dec_bytes = decryptor.decrypt(enc_file.read())
with open(out_path, "wb") as dec_file:
dec_file.write(dec_bytes)


def ext(quality: int, source: str):
if quality <= 1:
if source == 'tidal':
return '.m4a'
else:
return '.mp3'
else:
return '.flac'

0 comments on commit 9301757

Please sign in to comment.