From 0fe15d62ad3b01248ee7cc0a0239532ddff762df Mon Sep 17 00:00:00 2001 From: Austin Hicks Date: Fri, 9 Feb 2024 18:08:46 -0800 Subject: [PATCH] Streaming: implement seeking in StreamingSourcePlayerNode --- crates/synthizer/src/command.rs | 2 +- crates/synthizer/src/common_commands.rs | 8 ++- crates/synthizer/src/lib.rs | 2 +- .../src/nodes/sample_source_player.rs | 61 +++++++++++++++++-- .../src/sample_sources/execution/buffered.rs | 10 +++ .../sample_sources/execution/cross_thread.rs | 15 ++++- .../src/sample_sources/execution/driver.rs | 7 +++ .../src/sample_sources/execution/executor.rs | 7 +++ .../src/sample_sources/execution/reader.rs | 2 +- .../src/sample_sources/execution/resampler.rs | 5 ++ 10 files changed, 109 insertions(+), 10 deletions(-) diff --git a/crates/synthizer/src/command.rs b/crates/synthizer/src/command.rs index fff662f..d9926e6 100644 --- a/crates/synthizer/src/command.rs +++ b/crates/synthizer/src/command.rs @@ -50,7 +50,7 @@ mod cmdkind { use super::*; use crate::common_commands::*; - variant!(pub(crate) CommandKind, ServerCommand, PropertyCommand, SetLoopConfigCommand); + variant!(pub(crate) CommandKind, ServerCommand, PropertyCommand, SetLoopConfigCommand, SeekCommand); } pub(crate) use cmdkind::*; diff --git a/crates/synthizer/src/common_commands.rs b/crates/synthizer/src/common_commands.rs index 7b72dff..2ec5d02 100644 --- a/crates/synthizer/src/common_commands.rs +++ b/crates/synthizer/src/common_commands.rs @@ -1,6 +1,12 @@ //! This module contains a bunch of commands used by more than one node. use crate::LoopSpec; -/// Command representing configuriation of loops. +/// Command representing configuration of loops. #[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)] pub(crate) struct SetLoopConfigCommand(pub(crate) LoopSpec); + +/// Command representing a seek to a given position in samples. +/// +/// Seeks are in the samplerate of the underlying source. +#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)] +pub(crate) struct SeekCommand(pub(crate) u64); diff --git a/crates/synthizer/src/lib.rs b/crates/synthizer/src/lib.rs index 1cca5d1..97a3270 100644 --- a/crates/synthizer/src/lib.rs +++ b/crates/synthizer/src/lib.rs @@ -35,6 +35,6 @@ mod worker_pool; pub use channel_format::*; pub use config::SR; pub use db::DbExt; -pub use error::Result; +pub use error::{Error, Result}; pub use loop_spec::*; pub use server::Server; diff --git a/crates/synthizer/src/nodes/sample_source_player.rs b/crates/synthizer/src/nodes/sample_source_player.rs index 5dfa1d8..24f923f 100644 --- a/crates/synthizer/src/nodes/sample_source_player.rs +++ b/crates/synthizer/src/nodes/sample_source_player.rs @@ -1,6 +1,3 @@ -use std::borrow::Cow; -use std::sync::Arc; - use crate::command::{CommandSender, Port}; use crate::common_commands::*; use crate::config::BLOCK_SIZE; @@ -12,6 +9,9 @@ use crate::properties::*; use crate::sample_sources::{execution::Executor, Descriptor as SDescriptor, SampleSource}; use crate::server::Server; use crate::unique_id::UniqueId; +use std::borrow::Cow; +use std::sync::Arc; +use std::time::Duration; pub(crate) struct SampleSourcePlayerNodeAt { executor: Executor, @@ -77,7 +77,6 @@ impl NodeAt for SampleSourcePlayerNodeAt { let dest_slice = &mut tmp[..BLOCK_SIZE * chans]; let frames_done = self.executor.read_block(dest_slice).unwrap_or(0) as usize; - // Note that the implementation gives us already uninterleaved blocks. match &mut context.outputs.output { OD::Block(o) => { @@ -104,6 +103,12 @@ impl NodeAt for SampleSourcePlayerNodeAt { cmd.take_call(|cmd: SetLoopConfigCommand| { self.executor.config_looping(cmd.0); }) + .or_else(|x| { + x.take_call::(|seek| { + let _ = self.executor.seek(seek.0); + // We can't log just yet. We need to do a small logging framework first. + }) + }) } } @@ -156,6 +161,54 @@ impl SampleSourcePlayerNode { .send_command_node(SetLoopConfigCommand(specification))?; Ok(()) } + + /// Seek to a given position in the underlying source given as a sample in the sampling rate of the source. + /// + /// This desugars to a direct seek call on the source. Note that this function returning `Ok` doesn't mean the seek + /// went through, only that Synthizer believes that the seek is to a valid position. The actual seek happens later + /// on another thread. + pub fn seek_sample(&self, new_pos: u64) -> Result<()> { + use crate::sample_sources::SeekSupport; + + // We validate here because this is the only place that happens on a thread controlled by the user, so we + // unfortunately can't abstract this into our internal module much. + match self.descriptor.seek_support { + SeekSupport::None => { + return Err(crate::Error::new_validation( + "Seeking is not supported for the underlying source", + )) + } + SeekSupport::ToBeginning => { + if new_pos != 0 { + return Err(crate::Error::new_validation( + "This source only supports seeking to time 0", + )); + } + } + SeekSupport::Imprecise | SeekSupport::SampleAccurate => { + if let Some(max) = self.descriptor.duration { + if new_pos >= max { + return Err(crate::Error::new_validation( + "Attempt to seek past the end of the source", + )); + } + } + } + }; + + self.internal_handle + .send_command_node(SeekCommand(new_pos))?; + Ok(()) + } + + /// Convenience function to seek to a given duration in seconds. + /// + /// This function works by converting the duration passed in to samples, then calling [Self::seek_sample] for you. + pub fn seek(&self, dur: Duration) -> Result<()> { + let secs = dur.as_secs_f64(); + let samples = secs * self.descriptor.sample_rate.get() as f64; + self.seek_sample(samples as u64) + } } impl super::NodeHandleSealed for SampleSourcePlayerNode { diff --git a/crates/synthizer/src/sample_sources/execution/buffered.rs b/crates/synthizer/src/sample_sources/execution/buffered.rs index fa12e7d..0aa5a4a 100644 --- a/crates/synthizer/src/sample_sources/execution/buffered.rs +++ b/crates/synthizer/src/sample_sources/execution/buffered.rs @@ -122,4 +122,14 @@ impl BufferedSourceReader { pub(crate) fn config_looping(&mut self, spec: LoopSpec) { self.reader.config_looping(spec); } + + /// Try to seek the underlying source. + /// + /// if successful, this clears the buffer. The underlying driver knows how to end sources forever on seek errors, + /// so we needn't handle that here. + pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + self.reader.seek(new_pos)?; + self.buffer.reset(); + Ok(()) + } } diff --git a/crates/synthizer/src/sample_sources/execution/cross_thread.rs b/crates/synthizer/src/sample_sources/execution/cross_thread.rs index 42d1633..42aac39 100644 --- a/crates/synthizer/src/sample_sources/execution/cross_thread.rs +++ b/crates/synthizer/src/sample_sources/execution/cross_thread.rs @@ -61,11 +61,11 @@ fn merge_opts(first: Option, second: Option) -> Option { } /// A patch to apply to the background thread. -/// -/// TODO: seeking is next, we're handling loops first. + #[derive(Clone, Debug, Default)] struct ConfigPatch { loop_spec: Option, + seek_to: Option, } impl ConfigPatch { @@ -74,6 +74,7 @@ impl ConfigPatch { fn merge_newer(self, newer: ConfigPatch) -> Self { Self { loop_spec: merge_opts(self.loop_spec, newer.loop_spec), + seek_to: merge_opts(self.seek_to, newer.seek_to), } } } @@ -130,6 +131,11 @@ impl Task { if let Some(loop_spec) = patch.loop_spec { self.driver.config_looping(loop_spec); } + + if let Some(seek_to) = patch.seek_to { + self.driver.seek(seek_to)?; + } + Ok(()) } @@ -281,6 +287,11 @@ impl BackgroundSourceHandle { self.get_next_patch_mut().loop_spec = Some(loop_spec); } + pub(super) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + self.get_next_patch_mut().seek_to = Some(new_pos); + Ok(()) + } + pub(crate) fn descriptor(&self) -> &Descriptor { &self.descriptor } diff --git a/crates/synthizer/src/sample_sources/execution/driver.rs b/crates/synthizer/src/sample_sources/execution/driver.rs index c983b0c..b9517b2 100644 --- a/crates/synthizer/src/sample_sources/execution/driver.rs +++ b/crates/synthizer/src/sample_sources/execution/driver.rs @@ -119,4 +119,11 @@ impl Driver { SampleSourceDriverKind::Resampled(x) => x.config_looping(spec), } } + + pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + match &mut self.kind { + SampleSourceDriverKind::Buffered(b) => b.seek(new_pos), + SampleSourceDriverKind::Resampled(r) => r.seek(new_pos), + } + } } diff --git a/crates/synthizer/src/sample_sources/execution/executor.rs b/crates/synthizer/src/sample_sources/execution/executor.rs index 944dc2c..58a9552 100644 --- a/crates/synthizer/src/sample_sources/execution/executor.rs +++ b/crates/synthizer/src/sample_sources/execution/executor.rs @@ -51,6 +51,13 @@ impl Executor { } } + pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + match &mut self.location { + ExecutionLocation::Inline(d) => d.seek(new_pos), + ExecutionLocation::CrossThread(c) => c.seek(new_pos), + } + } + pub(crate) fn descriptor(&self) -> &Descriptor { match self.location { ExecutionLocation::Inline(ref d) => d.descriptor(), diff --git a/crates/synthizer/src/sample_sources/execution/reader.rs b/crates/synthizer/src/sample_sources/execution/reader.rs index a1be05f..6fb40c6 100644 --- a/crates/synthizer/src/sample_sources/execution/reader.rs +++ b/crates/synthizer/src/sample_sources/execution/reader.rs @@ -137,7 +137,7 @@ impl SourceReader { } /// seek to the given sample, capped above by the descriptor's eof if any. - fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { let new_pos = self .descriptor .duration diff --git a/crates/synthizer/src/sample_sources/execution/resampler.rs b/crates/synthizer/src/sample_sources/execution/resampler.rs index dccbff4..b24c139 100644 --- a/crates/synthizer/src/sample_sources/execution/resampler.rs +++ b/crates/synthizer/src/sample_sources/execution/resampler.rs @@ -167,4 +167,9 @@ impl Resampler { pub(crate) fn config_looping(&mut self, spec: crate::LoopSpec) { self.reader.config_looping(spec); } + + /// Seek the source to a new position where `new_pos` is in the sampling rate of the source. + pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> { + self.reader.seek(new_pos) + } }