diff --git a/commons/zenoh-shm/src/posix_shm/array.rs b/commons/zenoh-shm/src/posix_shm/array.rs index ac05f5fe2a..0018012bd3 100644 --- a/commons/zenoh-shm/src/posix_shm/array.rs +++ b/commons/zenoh-shm/src/posix_shm/array.rs @@ -64,17 +64,35 @@ where self.inner.shmem.len() / size_of::() } + /// # Safety + /// Retrieves const element by it's index. This is safe if the index doesn't go out of underlying array. + /// Additional assert to check the index validity is added for "test" feature pub unsafe fn elem(&self, index: ElemIndex) -> *const Elem { + #[cfg(feature = "test")] + assert!(self.inner.shmem.len() > index.as_() * size_of::()); (self.inner.shmem.as_ptr() as *const Elem).add(index.as_()) } - + #[cfg(feature = "test")] + /// # Safety + /// Retrieves mut element by it's index. This is safe if the index doesn't go out of underlying array. + /// Additional assert to check the index validity is added for "test" feature pub unsafe fn elem_mut(&mut self, index: ElemIndex) -> *mut Elem { + #[cfg(feature = "test")] + assert!(self.inner.shmem.len() > index.as_() * size_of::()); (self.inner.shmem.as_ptr() as *mut Elem).add(index.as_()) } + /// # Safety + /// Calculates element's index. This is safe if the element belongs to underlying array. + /// Additional assert is added for "test" feature pub unsafe fn index(&self, elem: *const Elem) -> ElemIndex { - elem.offset_from(self.inner.shmem.as_ptr() as *const Elem) - .as_() + let index = elem.offset_from(self.inner.shmem.as_ptr() as *const Elem); + #[cfg(feature = "test")] + { + assert!(index >= 0); + assert!(self.inner.shmem.len() > index as usize * size_of::()); + } + index.as_() } } diff --git a/commons/zenoh-shm/src/watchdog/confirmator.rs b/commons/zenoh-shm/src/watchdog/confirmator.rs index 284af7bb42..2d1b30f29f 100644 --- a/commons/zenoh-shm/src/watchdog/confirmator.rs +++ b/commons/zenoh-shm/src/watchdog/confirmator.rs @@ -134,16 +134,16 @@ impl WatchdogConfirmator { let c_running = running.clone(); let _ = ThreadBuilder::default() - .name("Watchdog Confirmator thread") + .name("Watchdog Confirmator") //.policy(ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Deadline)) - //.priority(ThreadPriority::Deadline { runtime: Duration::from_micros(100), deadline: interval, period: interval, flags: DeadlineFlags::default() }) + //.priority(ThreadPriority::Deadline { runtime: interval, deadline: interval, period: interval, flags: DeadlineFlags::default() }) //.policy(ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo)) - .priority(ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(48).unwrap())) + //.priority(ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(48).unwrap())) .spawn(move |result| { if let Err(e) = result { - let ret = std::io::Error::last_os_error().to_string(); + //let ret = std::io::Error::last_os_error().to_string(); panic!("Watchdog Confirmator: error setting thread priority: {:?}, will continue operating with default priority...", e); - panic!(""); + //panic!(""); } let mut segments: Vec<(Arc, BTreeMap)> = vec![]; diff --git a/commons/zenoh-shm/src/watchdog/validator.rs b/commons/zenoh-shm/src/watchdog/validator.rs index 243674ee1c..96222c2cd6 100644 --- a/commons/zenoh-shm/src/watchdog/validator.rs +++ b/commons/zenoh-shm/src/watchdog/validator.rs @@ -18,13 +18,13 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc }, - time::{Duration, SystemTime}, + time::Duration, }; use lazy_static::lazy_static; use log::{error, warn}; -use thread_priority::{ThreadBuilderExt, ThreadPriority}; +use thread_priority::ThreadBuilder; use super::descriptor::OwnedDescriptor; @@ -90,9 +90,9 @@ impl WatchdogValidator { let c_storage = storage.clone(); let c_running = running.clone(); - let _ = std::thread::Builder::new() - .name("Watchdog Validator thread".to_owned()) - .spawn_with_priority(ThreadPriority::Min, move |result| { + let _ = ThreadBuilder::default() + .name("Watchdog Validator".to_owned()) + .spawn(move |result| { if let Err(e) = result { error!("Watchdog Validator: error setting thread priority: {:?}, will continue operating with default priority...", e); panic!(""); diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index 2e9786e65d..c5f83d8fb1 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -16,10 +16,13 @@ use std::sync::atomic::Ordering::Relaxed; use rand::Rng; use zenoh_result::ZResult; -use zenoh_shm::{header::{ - descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, - subscription::GLOBAL_HEADER_SUBSCRIPTION, -}, test_helpers::execute_concurrent}; +use zenoh_shm::{ + header::{ + descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, + subscription::GLOBAL_HEADER_SUBSCRIPTION, + }, + test_helpers::execute_concurrent, +}; fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { |_task_index: usize, _iteration: usize| -> ZResult<()> { @@ -57,7 +60,8 @@ fn header_link_concurrent() { execute_concurrent(100, 10000, header_link_fn()); } -fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { +fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static +{ |_task_index: usize, _iteration: usize| { let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; let descr = HeaderDescriptor::from(&allocated_header.descriptor); diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index 85b2c4b198..6cdb2818af 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -71,6 +71,9 @@ fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } +// todo: confirmation to dangling watchdog actually writes to potentially-existing +// other watchdog instance from other test running in the same process and changes it's behaviour, +// so we cannot run dangling test in parallel with anything else //#[test] //fn watchdog_confirmed_dangling() { // let allocated = GLOBAL_STORAGE