Skip to content

Commit

Permalink
Small cleanup tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Oct 24, 2024
1 parent 9cc0d25 commit 30e5bea
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 150 deletions.
14 changes: 6 additions & 8 deletions music_assistant/server/controllers/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
MediaItemType,
SearchResults,
)
from music_assistant.common.models.provider import SyncTask
from music_assistant.common.models.provider import ProviderInstance, SyncTask
from music_assistant.constants import (
DB_TABLE_ALBUM_ARTISTS,
DB_TABLE_ALBUM_TRACKS,
Expand Down Expand Up @@ -192,7 +192,7 @@ def start_sync(
for provider in self.providers:
if provider.instance_id not in providers:
continue
self._start_provider_sync(provider.instance_id, media_types)
self._start_provider_sync(provider, media_types)

@api_command("music/synctasks")
def get_running_sync_tasks(self) -> list[SyncTask]:
Expand Down Expand Up @@ -794,31 +794,29 @@ def get_unique_providers(self) -> set[str]:
return instances

def _start_provider_sync(
self, provider_instance: str, media_types: tuple[MediaType, ...]
self, provider: ProviderInstance, media_types: tuple[MediaType, ...]
) -> None:
"""Start sync task on provider and track progress."""
# check if we're not already running a sync task for this provider/mediatype
for sync_task in self.in_progress_syncs:
if sync_task.provider_instance != provider_instance:
if sync_task.provider_instance != provider.instance_id:
continue
for media_type in media_types:
if media_type in sync_task.media_types:
self.logger.debug(
"Skip sync task for %s because another task is already in progress",
provider_instance,
provider.name,
)
return

provider = self.mass.get_provider(provider_instance)

async def run_sync() -> None:
# Wrap the provider sync into a lock to prevent
# race conditions when multiple providers are syncing at the same time.
async with self._sync_lock:
await provider.sync_library(media_types)
# precache playlist tracks
if MediaType.PLAYLIST in media_types:
for playlist in await self.playlists.library_items(provider=provider_instance):
for playlist in await self.playlists.library_items(provider=provider.instance_id):
async for _ in self.playlists.tracks(playlist.item_id, playlist.provider):
pass

Expand Down
272 changes: 131 additions & 141 deletions music_assistant/server/controllers/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,16 +838,6 @@ def update(
)
self._prev_states[player_id] = new_state

if "available" in changed_values and not player.available:
# ensure a player that became available is no longer synced
if player.synced_to:
self.mass.create_task(self.cmd_unsync(player_id))
if player.group_childs:
for group_child_id in player.group_childs:
self.mass.create_task(self.cmd_power(group_child_id))
if player.active_group:
self.mass.create_task(self.cmd_power(player.active_group, False))

if not player.enabled and not force_update:
# ignore updates for disabled players
return
Expand Down Expand Up @@ -915,6 +905,101 @@ def get_announcement_volume(self, player_id: str, volume_override: int | None) -
# ensure the result is an integer
return None if volume_level is None else int(volume_level)

def iter_group_members(
self,
group_player: Player,
only_powered: bool = False,
only_playing: bool = False,
active_only: bool = False,
exclude_self: bool = True,
) -> Iterator[Player]:
"""Get (child) players attached to a group player or syncgroup."""
for child_id in list(group_player.group_childs):
if child_player := self.get(child_id, False):
if not child_player.available or not child_player.enabled:
continue
if not (not only_powered or child_player.powered):
continue
if not (not active_only or child_player.active_group == group_player.player_id):
continue
if exclude_self and child_player.player_id == group_player.player_id:
continue
if not (
not only_playing
or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
):
continue
yield child_player

async def wait_for_state(
self,
player: Player,
wanted_state: PlayerState,
timeout: float = 60.0,
minimal_time: float = 0,
) -> None:
"""Wait for the given player to reach the given state."""
start_timestamp = time.time()
self.logger.debug(
"Waiting for player %s to reach state %s", player.display_name, wanted_state
)
try:
async with asyncio.timeout(timeout):
while player.state != wanted_state:
await asyncio.sleep(0.1)

except TimeoutError:
self.logger.debug(
"Player %s did not reach state %s within the timeout of %s seconds",
player.display_name,
wanted_state,
timeout,
)
elapsed_time = round(time.time() - start_timestamp, 2)
if elapsed_time < minimal_time:
self.logger.debug(
"Player %s reached state %s too soon (%s vs %s seconds) - add fallback sleep...",
player.display_name,
wanted_state,
elapsed_time,
minimal_time,
)
await asyncio.sleep(minimal_time - elapsed_time)
else:
self.logger.debug(
"Player %s reached state %s within %s seconds",
player.display_name,
wanted_state,
elapsed_time,
)

async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
player_disabled = "enabled" in changed_keys and not config.enabled
# signal player provider that the config changed
if player_provider := self.mass.get_provider(config.provider):
with suppress(PlayerUnavailableError):
await player_provider.on_player_config_change(config, changed_keys)
if not (player := self.get(config.player_id)):
return
if player_disabled:
# edge case: ensure that the player is powered off if the player gets disabled
await self.cmd_power(config.player_id, False)
player.available = False
# if the player was playing, restart playback
elif not player_disabled and player.state == PlayerState.PLAYING:
self.mass.call_later(1, self.mass.player_queues.resume, player.active_source)
# check for group memberships that need to be updated
if player_disabled and player.active_group and player_provider:
# try to remove from the group
group_player = self.get(player.active_group)
with suppress(UnsupportedFeaturedException, PlayerCommandFailed):
await player_provider.set_members(
player.active_group,
[x for x in group_player.group_childs if x != player.player_id],
)
player.enabled = config.enabled

def _get_player_with_redirect(self, player_id: str) -> Player:
"""Get player with check if playback related command should be redirected."""
player = self.get(player_id, True)
Expand Down Expand Up @@ -981,96 +1066,6 @@ def _get_group_volume_level(self, player: Player) -> int:
group_volume = group_volume / active_players
return int(group_volume)

def iter_group_members(
self,
group_player: Player,
only_powered: bool = False,
only_playing: bool = False,
active_only: bool = False,
exclude_self: bool = True,
) -> Iterator[Player]:
"""Get (child) players attached to a group player or syncgroup."""
for child_id in list(group_player.group_childs):
if child_player := self.get(child_id, False):
if not child_player.available or not child_player.enabled:
continue
if not (not only_powered or child_player.powered):
continue
if not (not active_only or child_player.active_group == group_player.player_id):
continue
if exclude_self and child_player.player_id == group_player.player_id:
continue
if not (
not only_playing
or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
):
continue
yield child_player

async def _poll_players(self) -> None:
"""Background task that polls players for updates."""
while True:
for player in list(self._players.values()):
player_id = player.player_id
# if the player is playing, update elapsed time every tick
# to ensure the queue has accurate details
player_playing = (
player.active_source == player.player_id and player.state == PlayerState.PLAYING
)
if player_playing:
self.mass.loop.call_soon(self.update, player_id)
# Poll player;
if not player.needs_poll:
continue
if (self.mass.loop.time() - player.last_poll) < player.poll_interval:
continue
player.last_poll = self.mass.loop.time()
if player_prov := self.get_player_provider(player_id):
try:
await player_prov.poll_player(player_id)
except PlayerUnavailableError:
player.available = False
player.state = PlayerState.IDLE
player.powered = False
except Exception as err:
self.logger.warning(
"Error while requesting latest state from player %s: %s",
player.display_name,
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
)
finally:
# always update player state
self.mass.loop.call_soon(self.update, player_id)
await asyncio.sleep(1)

async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
player_disabled = "enabled" in changed_keys and not config.enabled
# signal player provider that the config changed
if player_provider := self.mass.get_provider(config.provider):
with suppress(PlayerUnavailableError):
await player_provider.on_player_config_change(config, changed_keys)
if not (player := self.get(config.player_id)):
return
if player_disabled:
# edge case: ensure that the player is powered off if the player gets disabled
await self.cmd_power(config.player_id, False)
player.available = False
# if the player was playing, restart playback
elif not player_disabled and player.state == PlayerState.PLAYING:
self.mass.call_later(1, self.mass.player_queues.resume, player.active_source)
# check for group memberships that need to be updated
if player_disabled and player.active_group and player_provider:
# try to remove from the group
group_player = self.get(player.active_group)
with suppress(UnsupportedFeaturedException, PlayerCommandFailed):
await player_provider.set_members(
player.active_group,
[x for x in group_player.group_childs if x != player.player_id],
)
player.enabled = config.enabled

async def _play_announcement(
self,
player: Player,
Expand Down Expand Up @@ -1193,44 +1188,39 @@ async def _play_announcement(
self.logger.warning("Can not resume %s on %s", prev_item_id, player.display_name)
# TODO !!

async def wait_for_state(
self,
player: Player,
wanted_state: PlayerState,
timeout: float = 60.0,
minimal_time: float = 0,
) -> None:
"""Wait for the given player to reach the given state."""
start_timestamp = time.time()
self.logger.debug(
"Waiting for player %s to reach state %s", player.display_name, wanted_state
)
try:
async with asyncio.timeout(timeout):
while player.state != wanted_state:
await asyncio.sleep(0.1)

except TimeoutError:
self.logger.debug(
"Player %s did not reach state %s within the timeout of %s seconds",
player.display_name,
wanted_state,
timeout,
)
elapsed_time = round(time.time() - start_timestamp, 2)
if elapsed_time < minimal_time:
self.logger.debug(
"Player %s reached state %s too soon (%s vs %s seconds) - add fallback sleep...",
player.display_name,
wanted_state,
elapsed_time,
minimal_time,
)
await asyncio.sleep(minimal_time - elapsed_time)
else:
self.logger.debug(
"Player %s reached state %s within %s seconds",
player.display_name,
wanted_state,
elapsed_time,
)
async def _poll_players(self) -> None:
"""Background task that polls players for updates."""
while True:
for player in list(self._players.values()):
player_id = player.player_id
# if the player is playing, update elapsed time every tick
# to ensure the queue has accurate details
player_playing = (
player.active_source == player.player_id and player.state == PlayerState.PLAYING
)
if player_playing:
self.mass.loop.call_soon(self.update, player_id)
# Poll player;
if not player.needs_poll:
continue
if (self.mass.loop.time() - player.last_poll) < player.poll_interval:
continue
player.last_poll = self.mass.loop.time()
if player_prov := self.get_player_provider(player_id):
try:
await player_prov.poll_player(player_id)
except PlayerUnavailableError:
player.available = False
player.state = PlayerState.IDLE
player.powered = False
except Exception as err:
self.logger.warning(
"Error while requesting latest state from player %s: %s",
player.display_name,
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
)
finally:
# always update player state
self.mass.loop.call_soon(self.update, player_id)
await asyncio.sleep(1)
8 changes: 7 additions & 1 deletion music_assistant/server/providers/sonos/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, PlayerMedia
from music_assistant.constants import MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.tags import parse_tags
from music_assistant.server.models.player_provider import PlayerProvider

from .const import CONF_AIRPLAY_MODE
Expand Down Expand Up @@ -296,7 +297,12 @@ async def play_announcement(
await sonos_player.client.player.play_audio_clip(
announcement.uri, volume_level, name="Announcement"
)
# TODO: Wait until the announcement is finished playing
# Wait until the announcement is finished playing
# This is helpful for people who want to play announcements in a sequence
# yeah we can also setup a subscription on the sonos player for this, but this is easier
media_info = await parse_tags(announcement.uri)
duration = media_info.duration or 10
await asyncio.sleep(duration)

async def _setup_player(self, player_id: str, name: str, info: AsyncServiceInfo) -> None:
"""Handle setup of a new player that is discovered using mdns."""
Expand Down

0 comments on commit 30e5bea

Please sign in to comment.