Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More explicit error handling; bump to v0.2.14 #76

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "common"
authors = ["Alexander Clausen <[email protected]>"]
license = "MIT"
version = "0.2.13"
version = "0.2.14"
edition = "2021"
rust-version = "1.71"

Expand Down Expand Up @@ -34,3 +34,6 @@ criterion = "0.5.1"
[[bench]]
name = "casting"
harness = false

[lints.rust]
unused_must_use = "deny"
33 changes: 21 additions & 12 deletions common/src/frame_stack.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use ipc_test::{SharedSlabAllocator, SlotForWriting};
use ipc_test::{slab::ShmError, SharedSlabAllocator, SlotForWriting};
use log::{error, warn};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
Expand Down Expand Up @@ -45,6 +45,9 @@ pub enum FrameStackWriteError {

#[error("too small")]
TooSmall,

#[error("SHM access error: {0}")]
ShmAccessError(#[from] ShmError),
}

impl From<FrameStackWriteError> for PyErr {
Expand All @@ -56,6 +59,7 @@ impl From<FrameStackWriteError> for PyErr {
FrameStackWriteError::TooSmall => {
PyValueError::new_err("frame stack too small to handle single frame")
}
FrameStackWriteError::ShmAccessError(e) => PyValueError::new_err(e.to_string()),
}
}
}
Expand All @@ -64,6 +68,9 @@ impl From<FrameStackWriteError> for PyErr {
pub enum SplitError<M: FrameMeta> {
#[error("shm full")]
ShmFull(FrameStackHandle<M>),

#[error("shm access error: {0}")]
AccessError(#[from] ShmError),
}

pub struct FrameStackForWriting<M>
Expand Down Expand Up @@ -160,7 +167,7 @@ where
) -> Result<FrameStackHandle<M>, FrameStackWriteError> {
if self.is_empty() {
let slot_info = shm.writing_done(self.slot);
shm.free_idx(slot_info.slot_idx);
shm.free_idx(slot_info.slot_idx)?;
return Err(FrameStackWriteError::Empty);
}

Expand All @@ -180,7 +187,7 @@ where
) -> Result<(), FrameStackWriteError> {
if self.is_empty() {
let slot_info = shm.writing_done(self.slot);
shm.free_idx(slot_info.slot_idx);
shm.free_idx(slot_info.slot_idx)?;
Ok(())
} else {
Err(FrameStackWriteError::NonEmpty)
Expand Down Expand Up @@ -296,15 +303,17 @@ mod inner {
let mut slot_left = match shm.try_get_mut() {
Ok(s) => s,
Err(ShmError::NoSlotAvailable) => return Err(SplitError::ShmFull(self)),
Err(e @ ShmError::MutexError(_)) => return Err(e.into()),
};
let mut slot_right = match shm.try_get_mut() {
Ok(s) => s,
Err(ShmError::NoSlotAvailable) => {
// don't leak the left slot!
let l = shm.writing_done(slot_left);
shm.free_idx(l.slot_idx);
shm.free_idx(l.slot_idx)?;
return Err(SplitError::ShmFull(self));
}
Err(e @ ShmError::MutexError(_)) => return Err(e.into()),
};

let slice_left = slot_left.as_slice_mut();
Expand All @@ -316,7 +325,7 @@ mod inner {
let left = shm.writing_done(slot_left);
let right = shm.writing_done(slot_right);

shm.free_idx(self.slot.slot_idx);
shm.free_idx(self.slot.slot_idx)?;

(left, right)
};
Expand Down Expand Up @@ -351,8 +360,8 @@ mod inner {
f(&slot_r)
}

pub fn free_slot(self, shm: &mut SharedSlabAllocator) {
shm.free_idx(self.slot.slot_idx);
pub fn free_slot(self, shm: &mut SharedSlabAllocator) -> Result<(), ShmError> {
shm.free_idx(self.slot.slot_idx)
}
}

Expand Down Expand Up @@ -493,7 +502,7 @@ impl<'b, M: FrameMeta> Drop for WriteGuard<'b, M> {
match frame_stack.writing_done(self.shm) {
Ok(frame_stack) => {
warn!("discarding non-empty frame stack as result of previous errors");
frame_stack.free_slot(self.shm);
let _ = frame_stack.free_slot(self.shm);
}
Err(e) => error!("WriteGuard::drop failed: {e:?}"),
}
Expand Down Expand Up @@ -616,12 +625,12 @@ mod tests {
assert_eq!(a.offsets.len() + b.offsets.len(), 2);

// when the split is done, there should be one free shm slot:
assert_eq!(shm.num_slots_free(), 1);
assert_eq!(shm.num_slots_free().unwrap(), 1);

// and we can free them again:
shm.free_idx(a.slot.slot_idx);
shm.free_idx(b.slot.slot_idx);
shm.free_idx(a.slot.slot_idx).unwrap();
shm.free_idx(b.slot.slot_idx).unwrap();

assert_eq!(shm.num_slots_free(), 3);
assert_eq!(shm.num_slots_free().unwrap(), 3);
}
}
10 changes: 8 additions & 2 deletions common/src/generic_cam_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Debug;

use ipc_test::{slab::SlabInitError, SharedSlabAllocator};
use ipc_test::{
slab::{ShmError, SlabInitError},
SharedSlabAllocator,
};
use multiversion::multiversion;
use ndarray::ArrayViewMut3;
use num::cast::AsPrimitive;
Expand All @@ -18,6 +21,9 @@ pub enum CamClientError {
error: SlabInitError,
},

#[error("failed to access SHM: {0}")]
ShmError(#[from] ShmError),

#[error("operation on closed client")]
Closed,

Expand Down Expand Up @@ -166,7 +172,7 @@ where
M: FrameMeta,
{
let shm = self.get_shm_mut()?;
handle.free_slot(shm);
handle.free_slot(shm)?;
Ok(())
}

Expand Down
22 changes: 15 additions & 7 deletions common/src/generic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
time::{Duration, Instant},
};

use ipc_test::{slab::SlabInitError, SharedSlabAllocator};
use ipc_test::{
slab::{ShmError, SlabInitError},
SharedSlabAllocator,
};
use log::{debug, info, trace, warn};
use stats::Stats;

Expand Down Expand Up @@ -53,6 +56,9 @@ pub enum ConnectionError {
#[error("could not connect to SHM area: {0}")]
ShmConnectError(#[from] SlabInitError),

#[error("could not access SHM")]
ShmAccessError(#[from] ShmError),

#[error("background thread is dead")]
Disconnected,

Expand Down Expand Up @@ -117,6 +123,7 @@ where
frame_stack = old_frame_stack;
continue;
}
Err(SplitError::AccessError(e)) => return Err(e.into()),
};
}
}
Expand Down Expand Up @@ -314,13 +321,13 @@ where
return Err(ConnectionError::FatalError(error))
}
ReceiverMsg::FrameStack { frame_stack } => {
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
return Err(ConnectionError::UnexpectedMessage(
"ReceiverMsg::FrameStack in wait_for_arm".to_owned(),
));
}
ReceiverMsg::Finished { frame_stack } => {
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
return Err(ConnectionError::UnexpectedMessage(
"ReceiverMsg::Finished in wait_for_arm".to_owned(),
));
Expand Down Expand Up @@ -406,11 +413,11 @@ where
match res {
ReceiverMsg::FrameStack { frame_stack } => {
trace!("wait_for_status: ignoring received FrameStackHandle");
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
}
ReceiverMsg::Finished { frame_stack } => {
warn!("wait_for_status: ignoring FrameStackHandle received in ReceiverMsg::Finished message");
frame_stack.free_slot(&mut self.shm);
frame_stack.free_slot(&mut self.shm)?;
}
ReceiverMsg::FatalError { error } => {
return Err(ConnectionError::FatalError(error));
Expand Down Expand Up @@ -568,12 +575,13 @@ where
self.wait_for_status(ConnectionStatus::Idle, *timeout, periodic_callback)
}

pub fn log_shm_stats(&self) {
pub fn log_shm_stats(&self) -> Result<(), ConnectionError> {
let shm = &self.shm;
let free = shm.num_slots_free();
let free = shm.num_slots_free()?;
let total = shm.num_slots_total();
self.stats.log_stats();
info!("shm stats free/total: {}/{}", free, total);
Ok(())
}

pub fn reset_stats(&mut self) {
Expand Down
8 changes: 6 additions & 2 deletions common/src/py_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ macro_rules! impl_py_connection {
let _trace_guard = span_from_py(py, &format!("{}::close", stringify!($name)))?;

if let Some(mut conn_impl) = self.conn_impl.take() {
conn_impl.log_shm_stats();
conn_impl
.log_shm_stats()
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;
conn_impl.reset_stats();
conn_impl.close();
Ok(())
Expand Down Expand Up @@ -261,7 +263,9 @@ macro_rules! impl_py_connection {

pub fn log_shm_stats(&self) -> PyResult<()> {
let conn_impl = self.get_conn()?;
conn_impl.log_shm_stats();
conn_impl
.log_shm_stats()
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions ipc_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ sendfd = "0.4.3"

[target.'cfg(not(windows))'.dependencies]
nix = { version = "0.29.0", features = ["poll"] }

[lints.rust]
unused_must_use = "deny"
4 changes: 2 additions & 2 deletions ipc_test/examples/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ fn main() {
// some additional "work":
//std::thread::sleep(Duration::from_micros(1));

ssa.free_idx(slot_info.slot_idx);
ssa.free_idx(slot_info.slot_idx).unwrap();

sum += sum_part.0 as f64;
bytes_processed += SLOT_SIZE_BYTES;

if t0.elapsed() > Duration::from_secs(1) {
let slots_free = ssa.num_slots_free();
let slots_free = ssa.num_slots_free().unwrap();
println!(
"idx: {idx:5}, sum: {sum_part}, throughput: {:7.2} MiB/s, slots free: {slots_free}",
bytes_processed as f32 / 1024.0 / 1024.0
Expand Down
2 changes: 1 addition & 1 deletion ipc_test/examples/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn handle_connection(
}

println!("done sending {} items", send_num_items);
while ssa.num_slots_free() < ssa.num_slots_total() {
while ssa.num_slots_free().unwrap() < ssa.num_slots_total() {
thread::sleep(Duration::from_millis(100));
}
println!("done!")
Expand Down
2 changes: 1 addition & 1 deletion ipc_test/src/backend_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl SharedMemory {
impl Drop for SharedMemory {
fn drop(&mut self) {
if self.is_owner {
remove_file(&self.handle_path).unwrap();
let _ = remove_file(&self.handle_path);
}
}
}
Expand Down
Loading
Loading