Skip to content

Commit

Permalink
Streaming: implement seeking in StreamingSourcePlayerNode
Browse files Browse the repository at this point in the history
  • Loading branch information
ahicks92 committed Feb 10, 2024
1 parent 519e6c1 commit 0fe15d6
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 10 deletions.
2 changes: 1 addition & 1 deletion crates/synthizer/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mod cmdkind {
use super::*;
use crate::common_commands::*;

variant!(pub(crate) CommandKind, ServerCommand, PropertyCommand, SetLoopConfigCommand);
variant!(pub(crate) CommandKind, ServerCommand, PropertyCommand, SetLoopConfigCommand, SeekCommand);
}
pub(crate) use cmdkind::*;

Expand Down
8 changes: 7 additions & 1 deletion crates/synthizer/src/common_commands.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! This module contains a bunch of commands used by more than one node.
use crate::LoopSpec;

/// Command representing configuriation of loops.
/// Command representing configuration of loops.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub(crate) struct SetLoopConfigCommand(pub(crate) LoopSpec);

/// Command representing a seek to a given position in samples.
///
/// Seeks are in the samplerate of the underlying source.
#[derive(Copy, Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub(crate) struct SeekCommand(pub(crate) u64);
2 changes: 1 addition & 1 deletion crates/synthizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ mod worker_pool;
pub use channel_format::*;
pub use config::SR;
pub use db::DbExt;
pub use error::Result;
pub use error::{Error, Result};
pub use loop_spec::*;
pub use server::Server;
61 changes: 57 additions & 4 deletions crates/synthizer/src/nodes/sample_source_player.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::borrow::Cow;
use std::sync::Arc;

use crate::command::{CommandSender, Port};
use crate::common_commands::*;
use crate::config::BLOCK_SIZE;
Expand All @@ -12,6 +9,9 @@ use crate::properties::*;
use crate::sample_sources::{execution::Executor, Descriptor as SDescriptor, SampleSource};
use crate::server::Server;
use crate::unique_id::UniqueId;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;

pub(crate) struct SampleSourcePlayerNodeAt {
executor: Executor,
Expand Down Expand Up @@ -77,7 +77,6 @@ impl NodeAt for SampleSourcePlayerNodeAt {
let dest_slice = &mut tmp[..BLOCK_SIZE * chans];
let frames_done = self.executor.read_block(dest_slice).unwrap_or(0) as usize;


// Note that the implementation gives us already uninterleaved blocks.
match &mut context.outputs.output {
OD::Block(o) => {
Expand All @@ -104,6 +103,12 @@ impl NodeAt for SampleSourcePlayerNodeAt {
cmd.take_call(|cmd: SetLoopConfigCommand| {
self.executor.config_looping(cmd.0);
})
.or_else(|x| {
x.take_call::<SeekCommand>(|seek| {
let _ = self.executor.seek(seek.0);
// We can't log just yet. We need to do a small logging framework first.
})
})
}
}

Expand Down Expand Up @@ -156,6 +161,54 @@ impl SampleSourcePlayerNode {
.send_command_node(SetLoopConfigCommand(specification))?;
Ok(())
}

/// Seek to a given position in the underlying source given as a sample in the sampling rate of the source.
///
/// This desugars to a direct seek call on the source. Note that this function returning `Ok` doesn't mean the seek
/// went through, only that Synthizer believes that the seek is to a valid position. The actual seek happens later
/// on another thread.
pub fn seek_sample(&self, new_pos: u64) -> Result<()> {
use crate::sample_sources::SeekSupport;

// We validate here because this is the only place that happens on a thread controlled by the user, so we
// unfortunately can't abstract this into our internal module much.
match self.descriptor.seek_support {
SeekSupport::None => {
return Err(crate::Error::new_validation(
"Seeking is not supported for the underlying source",
))
}
SeekSupport::ToBeginning => {
if new_pos != 0 {
return Err(crate::Error::new_validation(
"This source only supports seeking to time 0",
));
}
}
SeekSupport::Imprecise | SeekSupport::SampleAccurate => {
if let Some(max) = self.descriptor.duration {
if new_pos >= max {
return Err(crate::Error::new_validation(
"Attempt to seek past the end of the source",
));
}
}
}
};

self.internal_handle
.send_command_node(SeekCommand(new_pos))?;
Ok(())
}

/// Convenience function to seek to a given duration in seconds.
///
/// This function works by converting the duration passed in to samples, then calling [Self::seek_sample] for you.
pub fn seek(&self, dur: Duration) -> Result<()> {
let secs = dur.as_secs_f64();
let samples = secs * self.descriptor.sample_rate.get() as f64;
self.seek_sample(samples as u64)
}
}

impl super::NodeHandleSealed for SampleSourcePlayerNode {
Expand Down
10 changes: 10 additions & 0 deletions crates/synthizer/src/sample_sources/execution/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,14 @@ impl BufferedSourceReader {
pub(crate) fn config_looping(&mut self, spec: LoopSpec) {
self.reader.config_looping(spec);
}

/// Try to seek the underlying source.
///
/// if successful, this clears the buffer. The underlying driver knows how to end sources forever on seek errors,
/// so we needn't handle that here.
pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
self.reader.seek(new_pos)?;
self.buffer.reset();
Ok(())
}
}
15 changes: 13 additions & 2 deletions crates/synthizer/src/sample_sources/execution/cross_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ fn merge_opts<T>(first: Option<T>, second: Option<T>) -> Option<T> {
}

/// A patch to apply to the background thread.
///
/// TODO: seeking is next, we're handling loops first.
#[derive(Clone, Debug, Default)]
struct ConfigPatch {
loop_spec: Option<LoopSpec>,
seek_to: Option<u64>,
}

impl ConfigPatch {
Expand All @@ -74,6 +74,7 @@ impl ConfigPatch {
fn merge_newer(self, newer: ConfigPatch) -> Self {
Self {
loop_spec: merge_opts(self.loop_spec, newer.loop_spec),
seek_to: merge_opts(self.seek_to, newer.seek_to),
}
}
}
Expand Down Expand Up @@ -130,6 +131,11 @@ impl Task {
if let Some(loop_spec) = patch.loop_spec {
self.driver.config_looping(loop_spec);
}

if let Some(seek_to) = patch.seek_to {
self.driver.seek(seek_to)?;
}

Ok(())
}

Expand Down Expand Up @@ -281,6 +287,11 @@ impl BackgroundSourceHandle {
self.get_next_patch_mut().loop_spec = Some(loop_spec);
}

pub(super) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
self.get_next_patch_mut().seek_to = Some(new_pos);
Ok(())
}

pub(crate) fn descriptor(&self) -> &Descriptor {
&self.descriptor
}
Expand Down
7 changes: 7 additions & 0 deletions crates/synthizer/src/sample_sources/execution/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ impl Driver {
SampleSourceDriverKind::Resampled(x) => x.config_looping(spec),
}
}

pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
match &mut self.kind {
SampleSourceDriverKind::Buffered(b) => b.seek(new_pos),
SampleSourceDriverKind::Resampled(r) => r.seek(new_pos),
}
}
}
7 changes: 7 additions & 0 deletions crates/synthizer/src/sample_sources/execution/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ impl Executor {
}
}

pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
match &mut self.location {
ExecutionLocation::Inline(d) => d.seek(new_pos),
ExecutionLocation::CrossThread(c) => c.seek(new_pos),
}
}

pub(crate) fn descriptor(&self) -> &Descriptor {
match self.location {
ExecutionLocation::Inline(ref d) => d.descriptor(),
Expand Down
2 changes: 1 addition & 1 deletion crates/synthizer/src/sample_sources/execution/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl SourceReader {
}

/// seek to the given sample, capped above by the descriptor's eof if any.
fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
let new_pos = self
.descriptor
.duration
Expand Down
5 changes: 5 additions & 0 deletions crates/synthizer/src/sample_sources/execution/resampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,9 @@ impl Resampler {
pub(crate) fn config_looping(&mut self, spec: crate::LoopSpec) {
self.reader.config_looping(spec);
}

/// Seek the source to a new position where `new_pos` is in the sampling rate of the source.
pub(crate) fn seek(&mut self, new_pos: u64) -> Result<(), SampleSourceError> {
self.reader.seek(new_pos)
}
}

0 comments on commit 0fe15d6

Please sign in to comment.