Skip to content

Commit

Permalink
Implement more introspection commands and CreateUploadPlaybackStream
Browse files Browse the repository at this point in the history
  • Loading branch information
colinmarc committed Jan 4, 2024
1 parent 9a53a87 commit 9410043
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 41 deletions.
69 changes: 52 additions & 17 deletions src/protocol/command.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! Commands are the top-level IPC structure used in the protocol.

use std::io::{BufRead, Write};
use std::{
ffi::CString,
io::{BufRead, Write},
};

mod auth;
mod client_info;
mod lookup;
mod module_info;
mod playback_stream;
mod playback_stream_events;
Expand All @@ -15,11 +19,14 @@ mod sink_info;
mod sink_input_info;
mod source_info;
mod source_output_info;
mod stat;
mod subscribe;
mod timing_info;
mod upload_stream;

pub use auth::*;
pub use client_info::*;
pub use lookup::*;
pub use module_info::*;
pub use playback_stream::*;
pub use playback_stream_events::*;
Expand All @@ -31,8 +38,10 @@ pub use sink_info::*;
pub use sink_input_info::*;
pub use source_info::*;
pub use source_output_info::*;
pub use stat::*;
pub use subscribe::*;
pub use timing_info::*;
pub use upload_stream::*;

use super::{serde::*, ProtocolError, PulseError};

Expand Down Expand Up @@ -208,6 +217,8 @@ pub trait CommandReply: TagStructRead + TagStructWrite {}
pub enum Command {
/// An error reply to some other command.
Error(PulseError),
Timeout,
Exit,

/// A reply to some other command. If this is returned by [`Command::read_tag_prefixed`], the payload has yet to be read.
Reply,
Expand All @@ -226,8 +237,12 @@ pub enum Command {
DrainPlaybackStream(u32),
GetPlaybackLatency(LatencyParams),
GetRecordLatency(LatencyParams),
CreateUploadStream(UploadStreamParams),
DeleteUploadStream(u32),
FinishUploadStream(u32),

/// So-called introspection commands, to read back the state of the server.
Stat,
GetServerInfo,
GetSinkInfo(GetSinkInfo),
GetSinkInfoList,
Expand All @@ -244,6 +259,8 @@ pub enum Command {
GetSampleInfo(u32),
GetSampleInfoList,
Subscribe(SubscriptionMask),
LookupSink(CString),
LookupSource(CString),

Request(Request),
Overflow(u32),
Expand All @@ -270,22 +287,22 @@ impl Command {
CommandTag::Timeout => Err(ProtocolError::Timeout),
CommandTag::Reply => Ok(Command::Reply),

CommandTag::Exit => Err(ProtocolError::Unimplemented(command)),
CommandTag::Exit => Ok(Command::Exit),
CommandTag::Auth => Ok(Command::Auth(ts.read()?)),
CommandTag::SetClientName => Ok(Command::SetClientName(ts.read()?)),

CommandTag::CreatePlaybackStream => Ok(Command::CreatePlaybackStream(ts.read()?)),
CommandTag::DeletePlaybackStream => Ok(Command::DeletePlaybackStream(ts.read_u32()?)),
CommandTag::CreateRecordStream => Ok(Command::CreateRecordStream(ts.read()?)),
CommandTag::DeleteRecordStream => Ok(Command::DeleteRecordStream(ts.read_u32()?)),
CommandTag::LookupSink => Err(ProtocolError::Unimplemented(command)),
CommandTag::LookupSource => Err(ProtocolError::Unimplemented(command)),
CommandTag::LookupSink => Ok(Command::LookupSink(ts.read_string_non_null()?)),
CommandTag::LookupSource => Ok(Command::LookupSource(ts.read_string_non_null()?)),
CommandTag::DrainPlaybackStream => Ok(Command::DrainPlaybackStream(ts.read_u32()?)),
CommandTag::Stat => Err(ProtocolError::Unimplemented(command)),
CommandTag::GetPlaybackLatency => Err(ProtocolError::Unimplemented(command)),
CommandTag::CreateUploadStream => Err(ProtocolError::Unimplemented(command)),
CommandTag::DeleteUploadStream => Err(ProtocolError::Unimplemented(command)),
CommandTag::FinishUploadStream => Err(ProtocolError::Unimplemented(command)),
CommandTag::Stat => Ok(Command::Stat),
CommandTag::GetPlaybackLatency => Ok(Command::GetPlaybackLatency(ts.read()?)),
CommandTag::CreateUploadStream => Ok(Command::CreateUploadStream(ts.read()?)),
CommandTag::DeleteUploadStream => Ok(Command::DeleteUploadStream(ts.read_u32()?)),
CommandTag::FinishUploadStream => Ok(Command::FinishUploadStream(ts.read_u32()?)),
CommandTag::PlaySample => Err(ProtocolError::Unimplemented(command)),
CommandTag::RemoveSample => Err(ProtocolError::Unimplemented(command)),

Expand Down Expand Up @@ -406,6 +423,8 @@ impl Command {
pub fn tag(&self) -> CommandTag {
match self {
Command::Error(_) => CommandTag::Error,
Command::Timeout => CommandTag::Timeout,
Command::Exit => CommandTag::Exit,
Command::Reply => CommandTag::Reply,

Command::Auth(_) => CommandTag::Auth,
Expand All @@ -417,7 +436,11 @@ impl Command {
Command::DrainPlaybackStream(_) => CommandTag::DrainPlaybackStream,
Command::GetPlaybackLatency(_) => CommandTag::GetPlaybackLatency,
Command::GetRecordLatency(_) => CommandTag::GetRecordLatency,
Command::CreateUploadStream(_) => CommandTag::CreateUploadStream,
Command::DeleteUploadStream(_) => CommandTag::DeleteUploadStream,
Command::FinishUploadStream(_) => CommandTag::FinishUploadStream,

Command::Stat => CommandTag::Stat,
Command::GetServerInfo => CommandTag::GetServerInfo,
Command::GetSinkInfo(_) => CommandTag::GetSinkInfo,
Command::GetSinkInfoList => CommandTag::GetSinkInfoList,
Expand All @@ -436,6 +459,9 @@ impl Command {
Command::GetSampleInfoList => CommandTag::GetSampleInfoList,
Command::Subscribe(_) => CommandTag::Subscribe,
Command::SubscribeEvent(_) => CommandTag::SubscribeEvent,
Command::LookupSink(_) => CommandTag::LookupSink,
Command::LookupSource(_) => CommandTag::LookupSource,

Command::Request(_) => CommandTag::Request,
Command::Overflow(_) => CommandTag::Overflow,
Command::Underflow(_) => CommandTag::Underflow,
Expand All @@ -455,7 +481,9 @@ impl TagStructWrite for Command {
) -> Result<(), ProtocolError> {
match self {
Command::Error(e) => w.write_u32(*e as u32),
Command::Timeout => Ok(()),
Command::Reply => Ok(()),
Command::Exit => Ok(()),

Command::Auth(ref p) => w.write(p),
Command::SetClientName(ref p) => w.write(p),
Expand All @@ -466,31 +494,38 @@ impl TagStructWrite for Command {
Command::DrainPlaybackStream(chan) => w.write_u32(*chan),
Command::GetPlaybackLatency(ref p) => w.write(p),
Command::GetRecordLatency(ref p) => w.write(p),
Command::CreateUploadStream(ref p) => w.write(p),
Command::DeleteUploadStream(chan) => w.write_u32(*chan),
Command::FinishUploadStream(chan) => w.write_u32(*chan),

Command::Stat => Ok(()),
Command::GetServerInfo => Ok(()),
Command::GetSinkInfo(ref p) => w.write(p),
Command::GetSinkInfoList => Ok(()),
Command::GetSourceInfo(ref p) => w.write(p),
Command::GetSourceInfoList => Ok(()),
Command::GetModuleInfo(id) => w.write_u32(*id),
Command::GetModuleInfoList => Ok(()),
Command::GetClientInfo(id) => w.write_u32(*id),
Command::GetClientInfoList => Ok(()),
Command::GetSinkInputInfo(id) => w.write_u32(*id),
Command::GetSinkInputInfoList => Ok(()),
Command::GetSourceOutputInfo(id) => w.write_u32(*id),
Command::GetSourceOutputInfoList => Ok(()),
Command::GetSampleInfo(id) => w.write_u32(*id),
Command::GetSampleInfoList => Ok(()),
Command::Subscribe(mask) => w.write(mask),
Command::SubscribeEvent(ref p) => w.write(p),
Command::LookupSink(ref p) => w.write_string(Some(p)),
Command::LookupSource(ref p) => w.write_string(Some(p)),

Command::Request(ref p) => w.write(p),
Command::Overflow(chan) => w.write_u32(*chan),
Command::Underflow(ref p) => w.write(p),
Command::PlaybackStreamKilled(chan) => w.write_u32(*chan),
Command::RecordStreamKilled(chan) => w.write_u32(*chan),
Command::Started(chan) => w.write_u32(*chan),
Command::PlaybackBufferAttrChanged(ref p) => w.write(p),
Command::GetServerInfo
| Command::GetSinkInfoList
| Command::GetSourceInfoList
| Command::GetModuleInfoList
| Command::GetClientInfoList
| Command::GetSinkInputInfoList
| Command::GetSourceOutputInfoList
| Command::GetSampleInfoList => Ok(()),
}
}
}
4 changes: 1 addition & 3 deletions src/protocol/command/client_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ impl TagStructRead for ClientInfo {
fn read(ts: &mut TagStructReader<'_>, _protocol_version: u16) -> Result<Self, ProtocolError> {
Ok(Self {
index: ts.read_u32()?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null client name".into()))?,
name: ts.read_string_non_null()?,
owner_module_index: ts.read_index()?,
driver: ts.read_string()?,
props: ts.read()?,
Expand Down
26 changes: 26 additions & 0 deletions src/protocol/command/lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::protocol::{serde::*, ProtocolError};

use super::CommandReply;

/// The server response to [`super::Command::LookupSource`] and [`super::Command::LookupSink`].
#[derive(Default, Debug, Copy, Clone, Eq, PartialEq)]
pub struct LookupReply(u32);

impl CommandReply for LookupReply {}

impl TagStructRead for LookupReply {
fn read(ts: &mut TagStructReader<'_>, _protocol_version: u16) -> Result<Self, ProtocolError> {
Ok(Self(ts.read_u32()?))
}
}

impl TagStructWrite for LookupReply {
fn write(
&self,
w: &mut TagStructWriter<'_>,
_protocol_version: u16,
) -> Result<(), ProtocolError> {
w.write_u32(self.0)?;
Ok(())
}
}
4 changes: 1 addition & 3 deletions src/protocol/command/module_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ impl TagStructRead for ModuleInfo {
fn read(ts: &mut TagStructReader<'_>, protocol_version: u16) -> Result<Self, ProtocolError> {
Ok(Self {
index: ts.read_u32()?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null module name".into()))?,
name: ts.read_string_non_null()?,
argument: ts.read_string()?,
n_used: ts.read_index()?,
auto_unload: if protocol_version < 15 {
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/command/sample_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ impl CommandReply for SampleInfo {}
impl TagStructRead for SampleInfo {
fn read(ts: &mut TagStructReader<'_>, _protocol_version: u16) -> Result<Self, ProtocolError> {
let index = ts.read_u32()?;
let name = ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null sample name".into()))?;
let name = ts.read_string_non_null()?;
let cvolume = ts.read()?;
let duration = ts.read_usec()?;
let sample_spec = ts.read()?;
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/command/sink_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,7 @@ impl TagStructRead for SinkInfo {
index: ts
.read_index()?
.ok_or_else(|| ProtocolError::Invalid("invalid sink index".into()))?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null sink name".into()))?,
name: ts.read_string_non_null()?,
description: ts.read_string()?,
sample_spec: ts.read()?,
channel_map: ts.read()?,
Expand Down
8 changes: 2 additions & 6 deletions src/protocol/command/sink_input_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ impl TagStructRead for SinkInputInfo {
fn read(ts: &mut TagStructReader<'_>, protocol_version: u16) -> Result<Self, ProtocolError> {
let mut input_info = Self {
index: ts.read_u32()?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null sink input name".into()))?,
name: ts.read_string_non_null()?,
owner_module_index: ts.read_index()?,
client_index: ts.read_index()?,
sink_index: ts.read_u32()?,
Expand Down Expand Up @@ -224,11 +222,9 @@ mod integration_tests {
protocol_version,
)?;

let (seq, info) = read_reply_message::<SinkInputInfo>(&mut sock)?;
let (seq, _) = read_reply_message::<SinkInputInfo>(&mut sock)?;
assert_eq!(seq, 1);

assert_eq!(info, info_list[0]);

Ok(())
}
}
4 changes: 1 addition & 3 deletions src/protocol/command/source_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ impl TagStructRead for SourceInfo {
index: ts
.read_index()?
.ok_or_else(|| ProtocolError::Invalid("invalid source index".into()))?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null source name".into()))?,
name: ts.read_string_non_null()?,
description: ts.read_string()?,
sample_spec: ts.read()?,
channel_map: ts.read()?,
Expand Down
4 changes: 1 addition & 3 deletions src/protocol/command/source_output_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ impl TagStructRead for SourceOutputInfo {
fn read(ts: &mut TagStructReader<'_>, protocol_version: u16) -> Result<Self, ProtocolError> {
let mut output_info = Self {
index: ts.read_u32()?,
name: ts
.read_string()?
.ok_or_else(|| ProtocolError::Invalid("null source output name".into()))?,
name: ts.read_string_non_null()?,
owner_module_index: ts.read_index()?,
client_index: ts.read_index()?,
source_index: ts.read_u32()?,
Expand Down
88 changes: 88 additions & 0 deletions src/protocol/command/stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use crate::protocol::{serde::*, ProtocolError};

use super::CommandReply;

/// A reply to the [`super::Command::Stat`] command.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
pub struct StatInfo {
/// The number of currently allocated memory blocks.
pub memblock_total: u32,
/// The current total size of all allocated memory blocks.
pub memblock_total_size: u32,
/// The number of memblocks allocated over the lifetime of the daemon.
pub memblock_allocated: u32,
/// The total size of all memblocks allocated over the lifetime of the daemon.
pub memblock_allocated_size: u32,
/// The total size of all sample cache entries.
pub sample_cache_size: u32,
}

impl CommandReply for StatInfo {}

impl TagStructRead for StatInfo {
fn read(ts: &mut TagStructReader<'_>, _protocol_version: u16) -> Result<Self, ProtocolError> {
Ok(Self {
memblock_total: ts.read_u32()?,
memblock_total_size: ts.read_u32()?,
memblock_allocated: ts.read_u32()?,
memblock_allocated_size: ts.read_u32()?,
sample_cache_size: ts.read_u32()?,
})
}
}

impl TagStructWrite for StatInfo {
fn write(
&self,
w: &mut TagStructWriter<'_>,
_protocol_version: u16,
) -> Result<(), ProtocolError> {
w.write_u32(self.memblock_total)?;
w.write_u32(self.memblock_total_size)?;
w.write_u32(self.memblock_allocated)?;
w.write_u32(self.memblock_allocated_size)?;
w.write_u32(self.sample_cache_size)?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::protocol::test_util::test_serde;

use super::*;

#[test]
fn stat_serde() -> anyhow::Result<()> {
let info = StatInfo {
memblock_total: 1,
memblock_total_size: 2,
memblock_allocated: 3,
memblock_allocated_size: 4,
sample_cache_size: 5,
};

test_serde(&info)
}
}

#[cfg(test)]
#[cfg(feature = "_integration-tests")]
mod integration_tests {
use crate::{integration_test_util::connect_and_init, protocol::*};

#[test]
fn stat() -> Result<(), Box<dyn std::error::Error>> {
let (mut sock, protocol_version) = connect_and_init()?;

write_command_message(sock.get_mut(), 0, Command::Stat, protocol_version)?;
let (_, info) = read_reply_message::<StatInfo>(&mut sock)?;

assert!(info.memblock_total > 0);
assert!(info.memblock_total_size > 0);
assert!(info.memblock_allocated > 0);
assert!(info.memblock_allocated_size > 0);

Ok(())
}
}
Loading

0 comments on commit 9410043

Please sign in to comment.