Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add multi device DSP support #1839

Merged
merged 10 commits into from
Jan 9, 2025
14 changes: 14 additions & 0 deletions music_assistant/controllers/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,20 @@ def update(
if len(changed_values) == 0 and not force_update:
return

# handle DSP reload when player is grouped or ungrouped
prev_is_grouped = bool(prev_state.get("synced_to")) or bool(prev_state.get("group_childs"))
new_is_grouped = bool(new_state.get("synced_to")) or bool(new_state.get("group_childs"))

if prev_is_grouped != new_is_grouped:
dsp_config = self.mass.config.get_player_dsp_config(player_id)
supports_multi_device_dsp = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
if dsp_config.enabled and not supports_multi_device_dsp:
# We now know that that the player was grouped or ungrouped,
# the player has a custom DSP enabled, but the player provider does
# not support multi-device DSP.
# So we need to reload the DSP configuration.
self.mass.create_task(self.mass.players.on_player_dsp_change(player_id))

if changed_values.keys() != {"elapsed_time"} or force_update:
# ignore elapsed_time only changes
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
Expand Down
8 changes: 8 additions & 0 deletions music_assistant/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from music_assistant_models.enums import (
ContentType,
MediaType,
PlayerFeature,
StreamType,
VolumeNormalizationMode,
)
Expand Down Expand Up @@ -836,6 +837,13 @@ def get_player_filter_params(

dsp = mass.config.get_player_dsp_config(player_id)

if player := mass.players.get(player_id):
is_grouped = bool(player.synced_to) or bool(player.group_childs)
if is_grouped and PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features:
# We can not correctly apply DSP to a grouped player without multi-device DSP support,
# so we disable it.
dsp.enabled = False
marcelveldt marked this conversation as resolved.
Show resolved Hide resolved

if dsp.enabled:
# Apply input gain
if dsp.input_gain != 0:
Expand Down
5 changes: 3 additions & 2 deletions music_assistant/providers/airplay/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ async def play_media(
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
input_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = AIRPLAY_FLOW_PCM_FORMAT
Expand Down Expand Up @@ -516,6 +516,7 @@ async def _setup_player(
supported_features={
PlayerFeature.PAUSE,
PlayerFeature.SET_MEMBERS,
PlayerFeature.MULTI_DEVICE_DSP,
PlayerFeature.VOLUME_SET,
},
volume_level=volume,
Expand Down
28 changes: 25 additions & 3 deletions music_assistant/providers/player_group/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
create_sample_rates_config_entry,
)
from music_assistant.controllers.streams import DEFAULT_STREAM_HEADERS
from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.helpers.util import TaskManager
from music_assistant.models.player_provider import PlayerProvider
Expand Down Expand Up @@ -453,7 +454,9 @@ async def play_media(
)

# start the stream task
self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
self.ugp_streams[player_id] = UGPStream(
audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
)
base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3"

# set the state optimistically
Expand Down Expand Up @@ -659,7 +662,11 @@ async def _register_group_player(
self, group_player_id: str, group_type: str, name: str, members: Iterable[str]
) -> Player:
"""Register a syncgroup player."""
player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET}
player_features = {
PlayerFeature.POWER,
PlayerFeature.VOLUME_SET,
PlayerFeature.MULTI_DEVICE_DSP,
}

if not (self.mass.players.get(x) for x in members):
raise PlayerUnavailableError("One or more members are not available!")
Expand Down Expand Up @@ -825,6 +832,10 @@ async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
child_player_id = request.query.get("player_id") # optional!

# Right now we default to MP3 output format, since it's the most compatible
# TODO: use the player's preferred output format
output_format = AudioFormat(content_type=ContentType.MP3)

if not (ugp_player := self.mass.players.get(ugp_player_id)):
raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")

Expand Down Expand Up @@ -860,7 +871,18 @@ async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
ugp_player.display_name,
child_player_id or request.remote,
)
async for chunk in stream.subscribe():

# Generate filter params for the player specific DSP settings
filter_params = None
if child_player_id:
filter_params = get_player_filter_params(
self.mass, child_player_id, stream.input_format
)

async for chunk in stream.get_stream(
output_format,
filter_params=filter_params,
):
try:
await resp.write(chunk)
except (ConnectionError, ConnectionResetError):
Expand Down
31 changes: 25 additions & 6 deletions music_assistant/providers/player_group/ugp_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import TYPE_CHECKING

from music_assistant_models.enums import ContentType
from music_assistant_models.media_items import AudioFormat
if TYPE_CHECKING:
from music_assistant_models.media_items import AudioFormat

from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.util import empty_queue
Expand All @@ -31,11 +32,12 @@ def __init__(
self,
audio_source: AsyncGenerator[bytes, None],
audio_format: AudioFormat,
base_pcm_format: AudioFormat,
) -> None:
"""Initialize UGP Stream."""
self.audio_source = audio_source
self.input_format = audio_format
self.output_format = AudioFormat(content_type=ContentType.MP3)
self.base_pcm_format = base_pcm_format
self.subscribers: list[Callable[[bytes], Awaitable]] = []
self._task: asyncio.Task | None = None
self._done: asyncio.Event = asyncio.Event()
Expand All @@ -53,8 +55,12 @@ async def stop(self) -> None:
self._task.cancel()
self._done.set()

async def subscribe(self) -> AsyncGenerator[bytes, None]:
"""Subscribe to the raw/unaltered audio stream."""
async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
"""
Subscribe to the raw/unaltered audio stream.

The returned stream has the format `self.base_pcm_format`.
"""
# start the runner as soon as the (first) client connects
if not self._task:
self._task = asyncio.create_task(self._runner())
Expand All @@ -71,13 +77,26 @@ async def subscribe(self) -> AsyncGenerator[bytes, None]:
empty_queue(queue)
del queue

async def get_stream(
self, output_format: AudioFormat, filter_params: list[str] | None = None
) -> AsyncGenerator[bytes, None]:
"""Subscribe to the client specific audio stream."""
# start the runner as soon as the (first) client connects
async for chunk in get_ffmpeg_stream(
audio_input=self.subscribe_raw(),
input_format=self.base_pcm_format,
output_format=output_format,
filter_params=filter_params,
):
yield chunk

async def _runner(self) -> None:
"""Run the stream for the given audio source."""
await asyncio.sleep(0.25) # small delay to allow subscribers to connect
async for chunk in get_ffmpeg_stream(
audio_input=self.audio_source,
input_format=self.input_format,
output_format=self.output_format,
output_format=self.base_pcm_format,
# we don't allow the player to buffer too much ahead so we use readrate limiting
extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
):
Expand Down
4 changes: 3 additions & 1 deletion music_assistant/providers/slimproto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ async def play_media(
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
audio_source = ugp_stream.subscribe()
# Filter is later applied in MultiClientStream
audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
elif media.queue_id and media.queue_item_id:
# regular queue stream request
audio_source = self.mass.streams.get_flow_stream(
Expand Down Expand Up @@ -644,6 +645,7 @@ async def _handle_player_update(self, slimplayer: SlimClient) -> None:
supported_features={
PlayerFeature.POWER,
PlayerFeature.SET_MEMBERS,
PlayerFeature.MULTI_DEVICE_DSP,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
Expand Down
4 changes: 2 additions & 2 deletions music_assistant/providers/snapcast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,8 @@ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
# special case: UGP stream
ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
input_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
elif media.queue_id and media.queue_item_id:
# regular queue (flow) stream request
input_format = DEFAULT_SNAPCAST_PCM_FORMAT
Expand Down
Loading