Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix track enqueuing #1747

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions music_assistant/common/models/player_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,27 @@ class PlayerQueue(DataClassDictMixin):
flow_mode: bool = False
resume_pos: int = 0
flow_mode_stream_log: list[PlayLogEntry] = field(default_factory=list)
next_track_enqueued: str | None = None

@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
return self.elapsed_time + (time.time() - self.elapsed_time_last_updated)

def __post_serialize__(self, d: dict[Any, Any]) -> dict[Any, Any]:
"""Execute action(s) on serialization."""
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
d.pop("next_track_enqueued", None)
return d

def to_cache(self) -> dict[str, Any]:
"""Return the dict that is suitable for storing into the cache db."""
d = self.to_dict()
d.pop("current_item", None)
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
return d

@classmethod
Expand All @@ -75,6 +81,7 @@ def from_cache(cls, d: dict[Any, Any]) -> Self:
d.pop("next_item", None)
d.pop("index_in_buffer", None)
d.pop("flow_mode", None)
d.pop("flow_mode_stream_log", None)
d.pop("enqueued_media_items", None)
d.pop("next_track_enqueued", None)
d.pop("flow_mode_stream_log", None)
return cls.from_dict(d)
51 changes: 34 additions & 17 deletions music_assistant/server/controllers/player_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import asyncio
import random
import time
from contextlib import suppress
from typing import TYPE_CHECKING, Any, TypedDict

from music_assistant.common.helpers.util import get_changed_keys
Expand Down Expand Up @@ -288,9 +287,6 @@ def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
return # no change
queue.repeat_mode = repeat_mode
self.signal_update(queue_id)
# ensure that we trigger enqueue next if repeat mode changed (if needed/supported)
task_id = f"enqueue_next_{queue_id}"
self.mass.call_later(5, self._enqueue_next, queue, queue.current_index, task_id=task_id)

@api_command("player_queues/play_media")
async def play_media(
Expand Down Expand Up @@ -735,6 +731,7 @@ async def play_index(
queue.flow_mode = await self.mass.config.get_player_config_value(queue_id, CONF_FLOW_MODE)
next_index = self._get_next_index(queue_id, index, allow_repeat=False)
queue.current_item = queue_item
queue.next_track_enqueued = None
self.signal_update(queue_id)

# work out if we are playing an album and if we should prefer album loudness
Expand Down Expand Up @@ -923,7 +920,11 @@ def on_player_update(
# and has an item loaded so we are able to resume it
queue.state = player.state or PlayerState.IDLE
queue.current_item = self.get_item(queue_id, queue.current_index)
queue.next_item = self._get_next_item(queue_id)
queue.next_item = (
self.get_item(queue_id, queue.next_track_enqueued)
if queue.next_track_enqueued
else self._get_next_item(queue_id, queue.current_index)
)

# correct elapsed time when seeking
if (
Expand All @@ -935,6 +936,15 @@ def on_player_update(
):
queue.elapsed_time += queue.current_item.streamdetails.seek_position

# enqueue next track if needed
if (
queue.state == PlayerState.PLAYING
and queue.next_item is not None
and not queue.next_track_enqueued
and queue.corrected_elapsed_time > 2
):
self._check_enqueue_next(queue)

# basic throttle: do not send state changed events if queue did not actually change
prev_state = self._prev_states.get(
queue_id,
Expand Down Expand Up @@ -1002,12 +1012,17 @@ def on_player_update(
object_id=queue_item.media_item.uri,
data=round(seconds_streamed, 2),
)

if end_of_queue_reached:
# end of queue reached, clear items
self.mass.call_later(
5, self._check_clear_queue, queue, task_id=f"clear_queue_{queue_id}"
)

# clear 'next track enqueued' flag if new track is loaded
if prev_state["current_index"] != new_state["current_index"]:
queue.next_track_enqueued = None

# watch dynamic radio items refill if needed
elif "current_index" in changed_keys:
if (
Expand Down Expand Up @@ -1135,11 +1150,6 @@ def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
if queue.flow_mode:
return # nothing to do when flow mode is active
self.signal_update(queue_id)
# enqueue the next track as soon as the player reports
# it has started buffering the given queue item
if not queue.flow_mode:
task_id = f"enqueue_next_{queue_id}"
self.mass.call_later(5, self._enqueue_next, queue, item_id, task_id=task_id)

# Main queue manipulation methods

Expand Down Expand Up @@ -1180,6 +1190,7 @@ def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
self._queue_items[queue_id] = queue_items
self._queues[queue_id].items = len(self._queue_items[queue_id])
self.signal_update(queue_id, True)
self._queues[queue_id].next_track_enqueued = None

# Helper methods

Expand Down Expand Up @@ -1376,19 +1387,25 @@ async def _fill_radio_tracks(self, queue_id: str) -> None:
insert_at_index=len(self._queue_items[queue_id]) + 1,
)

async def _enqueue_next(self, queue: PlayerQueue, current_index: int | str) -> None:
"""Enqueue the next item in the queue."""
def _check_enqueue_next(self, queue: PlayerQueue) -> None:
"""Enqueue the next item in the queue (if needed)."""
if queue.flow_mode:
return
if isinstance(current_index, str):
current_index = self.index_by_id(queue.queue_id, current_index)
with suppress(QueueEmpty):
next_item = await self.load_next_item(queue.queue_id, current_index)
if queue.next_item is None:
return
if queue.next_track_enqueued == queue.next_item.queue_item_id:
return

async def _enqueue_next():
next_item = await self.load_next_item(queue.queue_id, queue.current_index)
queue.next_track_enqueued = next_item.queue_item_id
await self.mass.players.enqueue_next_media(
player_id=queue.queue_id,
media=self.player_media_from_queue_item(next_item, queue.flow_mode),
media=self.player_media_from_queue_item(next_item, False),
)

self.mass.create_task(_enqueue_next())

async def _get_radio_tracks(
self, queue_id: str, is_initial_radio_mode: bool = False
) -> list[Track]:
Expand Down