From 0c1e38558c8643611a581a0ca1f180ca49fcaef7 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 11 Jan 2024 12:31:14 +0300 Subject: [PATCH] WIP on SHM --- commons/zenoh-shm/Cargo.toml | 9 + commons/zenoh-shm/src/header/chunk_header.rs | 4 +- commons/zenoh-shm/src/header/descriptor.rs | 4 +- commons/zenoh-shm/src/header/mod.rs | 13 +- commons/zenoh-shm/src/header/storage.rs | 8 +- commons/zenoh-shm/src/lib.rs | 87 +++++-- commons/zenoh-shm/src/posix_shm/array.rs | 6 + commons/zenoh-shm/src/posix_shm/mod.rs | 4 +- commons/zenoh-shm/src/posix_shm/segment.rs | 14 +- commons/zenoh-shm/src/test_helpers.rs | 54 ++++ .../src/watchdog/allocated_watchdog.rs | 35 +++ commons/zenoh-shm/src/watchdog/confirmator.rs | 203 ++++++++++----- commons/zenoh-shm/src/watchdog/descriptor.rs | 27 +- commons/zenoh-shm/src/watchdog/mod.rs | 10 +- commons/zenoh-shm/src/watchdog/segment.rs | 1 + commons/zenoh-shm/src/watchdog/storage.rs | 48 ++-- commons/zenoh-shm/src/watchdog/validator.rs | 133 +++++++--- commons/zenoh-shm/tests/header.rs | 126 ++++++++++ commons/zenoh-shm/tests/posix_array.rs | 157 ++++++++++++ commons/zenoh-shm/tests/posix_segment.rs | 132 ++++++++++ commons/zenoh-shm/tests/watchdog.rs | 236 ++++++++++++++++++ zenoh/tests/shm.rs | 4 +- 22 files changed, 1148 insertions(+), 167 deletions(-) create mode 100644 commons/zenoh-shm/src/test_helpers.rs create mode 100644 commons/zenoh-shm/src/watchdog/allocated_watchdog.rs create mode 100644 commons/zenoh-shm/tests/header.rs create mode 100644 commons/zenoh-shm/tests/posix_array.rs create mode 100644 commons/zenoh-shm/tests/posix_segment.rs create mode 100644 commons/zenoh-shm/tests/watchdog.rs diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index eb22a25d2a..cd6a126986 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -28,6 +28,9 @@ categories = { workspace = true } description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +test = [] + [dependencies] bincode = { workspace = true } log = { workspace = true } @@ -38,3 +41,9 @@ zenoh-result = { workspace = true } rand = { workspace = true } lazy_static = { workspace = true } num-traits = { workspace = true } +thread-priority = "0.15.1" +lockfree = "0.5.1" + +[dev-dependencies] +zenoh-shm = { workspace = true, features = ["test"] } +libc = { workspace = true } diff --git a/commons/zenoh-shm/src/header/chunk_header.rs b/commons/zenoh-shm/src/header/chunk_header.rs index 37b6f48141..8977f0ce68 100644 --- a/commons/zenoh-shm/src/header/chunk_header.rs +++ b/commons/zenoh-shm/src/header/chunk_header.rs @@ -20,9 +20,9 @@ use std::sync::atomic::{AtomicBool, AtomicU32}; pub struct ChunkHeaderType { /* todo: We don't really need 32 bits here, but access to 16-bit felds with 1 byte alignment is less performant on most of the platforms. - We need to bench and select the reasonable integer sizes here when we have an implementation to bench + We need to bench and select reasonable integer sizes here once we have an implementation to bench */ pub refcount: AtomicU32, + pub watchdog_invalidated: AtomicBool, pub generation: AtomicU32, - pub watchdog_flag: AtomicBool, } diff --git a/commons/zenoh-shm/src/header/descriptor.rs b/commons/zenoh-shm/src/header/descriptor.rs index 2e839642fa..7700eb90c6 100644 --- a/commons/zenoh-shm/src/header/descriptor.rs +++ b/commons/zenoh-shm/src/header/descriptor.rs @@ -28,7 +28,7 @@ pub struct HeaderDescriptor { impl From<&OwnedHeaderDescriptor> for HeaderDescriptor { fn from(item: &OwnedHeaderDescriptor) -> Self { let id = item.segment.array.id(); - let index = unsafe { item.segment.array.index(item.header) } as HeaderIndex; + let index = unsafe { item.segment.array.index(item.header) }; Self { id, index } } @@ -44,7 +44,7 @@ unsafe impl Send for OwnedHeaderDescriptor {} unsafe impl Sync for OwnedHeaderDescriptor {} impl OwnedHeaderDescriptor { - pub fn new(segment: Arc, header: *const ChunkHeaderType) -> Self { + pub(crate) fn new(segment: Arc, header: *const ChunkHeaderType) -> Self { Self { segment, header } } diff --git a/commons/zenoh-shm/src/header/mod.rs b/commons/zenoh-shm/src/header/mod.rs index ec6992d065..84acc86e87 100644 --- a/commons/zenoh-shm/src/header/mod.rs +++ b/commons/zenoh-shm/src/header/mod.rs @@ -12,9 +12,12 @@ // ZettaScale Zenoh Team, // -pub mod allocated_descriptor; -pub mod chunk_header; pub mod descriptor; -pub mod segment; -pub mod storage; -pub mod subscription; + +tested_crate_module!(storage); +tested_crate_module!(subscription); + +pub(crate) mod allocated_descriptor; +pub(crate) mod chunk_header; + +mod segment; diff --git a/commons/zenoh-shm/src/header/storage.rs b/commons/zenoh-shm/src/header/storage.rs index d142bb0d08..3daa002923 100644 --- a/commons/zenoh-shm/src/header/storage.rs +++ b/commons/zenoh-shm/src/header/storage.rs @@ -70,14 +70,14 @@ impl Storage { .refcount .store(1, std::sync::atomic::Ordering::SeqCst); header - .watchdog_flag - .store(true, std::sync::atomic::Ordering::SeqCst); + .watchdog_invalidated + .store(false, std::sync::atomic::Ordering::SeqCst); Ok(AllocatedHeaderDescriptor { descriptor }) } - pub(crate) fn reclaim_header(&self, header: OwnedHeaderDescriptor) { - // header deallocated - increment it's generation to invalidate any existing references, if any + pub fn reclaim_header(&self, header: OwnedHeaderDescriptor) { + // header deallocated - increment it's generation to invalidate any existing references header .header() .generation diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index c0fe672e54..de51559673 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -30,18 +30,51 @@ use std::{ }, }; use watchdog::{ + allocated_watchdog::AllocatedWatchdog, confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR}, descriptor::Descriptor, storage::GLOBAL_STORAGE, + validator::GLOBAL_VALIDATOR, }; use zenoh_buffers::ZSliceBuffer; -use zenoh_result::{zerror, ShmError, ZResult}; +use zenoh_result::{bail, zerror, ShmError, ZResult}; + +#[macro_export] +macro_rules! tested_module { + ($module:ident) => { + #[cfg(feature = "test")] + pub mod $module; + #[cfg(not(feature = "test"))] + mod $module; + }; +} + +#[macro_export] +macro_rules! tested_crate_module { + ($module:ident) => { + #[cfg(feature = "test")] + pub mod $module; + #[cfg(not(feature = "test"))] + pub(crate) mod $module; + }; +} + +#[macro_export] +macro_rules! test_helpers_module { + () => { + #[cfg(feature = "test")] + pub mod test_helpers; + }; +} pub mod header; -mod posix_shm; -mod segment; pub mod watchdog; +tested_module!(posix_shm); +tested_module!(segment); + +test_helpers_module!(); + const MIN_FREE_CHUNK_SIZE: usize = 1_024; fn align_addr_at(addr: usize, align: usize) -> usize { @@ -265,7 +298,11 @@ impl SharedMemoryReader { info: info.clone(), watchdog: Arc::new(GLOBAL_CONFIRMATOR.add(&info.watchdog_descriptor)?), }; - Ok(shmb) + // Validate buffer's generation + match shmb.is_generation_valid() { + true => Ok(shmb), + false => bail!(""), + } } None => { let e = zerror!("Unable to find shared memory segment: {}", info.segment_id); @@ -302,6 +339,7 @@ impl fmt::Debug for SharedMemoryReader { pub struct BusyChunk { chunk: Chunk, header: AllocatedHeaderDescriptor, + _watchdog: AllocatedWatchdog, } /// A shared memory segment manager. @@ -352,17 +390,35 @@ impl SharedMemoryManager { } fn free_chunk_map_to_shmbuf(&self, chunk: &Chunk) -> ZResult<(SharedMemoryBuf, BusyChunk)> { + // allocate shared header let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; let header = allocated_header.descriptor.clone(); - let watchdog = GLOBAL_STORAGE.allocate_watchdog(header.clone())?; + // allocate watchdog + let allocated_watchdog = GLOBAL_STORAGE.allocate_watchdog()?; + let descriptor = Descriptor::from(&allocated_watchdog.descriptor); + + // add watchdog to confirmator + let confirmed_watchdog = GLOBAL_CONFIRMATOR.add(&descriptor)?; + + // add watchdog to validator + let c_header = header.clone(); + GLOBAL_VALIDATOR.add( + allocated_watchdog.descriptor.clone(), + Box::new(move || { + c_header + .header() + .watchdog_invalidated + .store(true, Ordering::SeqCst); + }), + ); let info = SharedMemoryBufInfo::new( chunk.offset, chunk.size, self.own_segment.segment.id, 0, - Descriptor::from(&watchdog.owned), + descriptor, HeaderDescriptor::from(&header), header.header().generation.load(Ordering::SeqCst), ); @@ -371,12 +427,13 @@ impl SharedMemoryManager { buf: AtomicPtr::::new(chunk.base_addr), len: chunk.size, info, - watchdog: Arc::new(watchdog), + watchdog: Arc::new(confirmed_watchdog), }; let busy_chunk = BusyChunk { chunk: *chunk, header: allocated_header, + _watchdog: allocated_watchdog, }; Ok((shmb, busy_chunk)) @@ -412,7 +469,9 @@ impl SharedMemoryManager { match self.free_chunk_map_to_shmbuf(&chunk) { Ok(val) => Ok(val), Err(_) => { - // no free watchdogs, try to free some by collecting the garbage + // no free watchdogs or headers, try to free some by collecting the garbage + println!("No free watchdogs or headers, trying to reclaim..."); + log::trace!("No free watchdogs or headers, trying to reclaim..."); self.garbage_collect(); self.free_chunk_map_to_shmbuf(&chunk) } @@ -453,13 +512,11 @@ impl SharedMemoryManager { } fn is_free_chunk(chunk: &BusyChunk) -> bool { - let rc = chunk - .header - .descriptor - .header() - .refcount - .load(Ordering::SeqCst); - rc == 0 + let header = chunk.header.descriptor.header(); + if header.refcount.load(Ordering::SeqCst) != 0 { + return header.watchdog_invalidated.load(Ordering::SeqCst); + } + true } fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option { diff --git a/commons/zenoh-shm/src/posix_shm/array.rs b/commons/zenoh-shm/src/posix_shm/array.rs index 155bd166ea..ac05f5fe2a 100644 --- a/commons/zenoh-shm/src/posix_shm/array.rs +++ b/commons/zenoh-shm/src/posix_shm/array.rs @@ -20,6 +20,7 @@ use zenoh_result::{bail, ZResult}; use super::segment::Segment; /// An SHM segment that is intended to be an array of elements of some certain type +#[derive(Debug)] pub struct ArrayInSHM { inner: Segment, _phantom: PhantomData<(Elem, ElemIndex)>, @@ -66,6 +67,11 @@ where pub unsafe fn elem(&self, index: ElemIndex) -> *const Elem { (self.inner.shmem.as_ptr() as *const Elem).add(index.as_()) } + + #[cfg(feature = "test")] + pub unsafe fn elem_mut(&mut self, index: ElemIndex) -> *mut Elem { + (self.inner.shmem.as_ptr() as *mut Elem).add(index.as_()) + } pub unsafe fn index(&self, elem: *const Elem) -> ElemIndex { elem.offset_from(self.inner.shmem.as_ptr() as *const Elem) diff --git a/commons/zenoh-shm/src/posix_shm/mod.rs b/commons/zenoh-shm/src/posix_shm/mod.rs index 2dbdabe7cc..025143e306 100644 --- a/commons/zenoh-shm/src/posix_shm/mod.rs +++ b/commons/zenoh-shm/src/posix_shm/mod.rs @@ -12,5 +12,5 @@ // ZettaScale Zenoh Team, // -pub mod array; -pub mod segment; +tested_crate_module!(array); +tested_crate_module!(segment); diff --git a/commons/zenoh-shm/src/posix_shm/segment.rs b/commons/zenoh-shm/src/posix_shm/segment.rs index 5e77f9f2bd..70038831a4 100644 --- a/commons/zenoh-shm/src/posix_shm/segment.rs +++ b/commons/zenoh-shm/src/posix_shm/segment.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use std::fmt::Display; +use std::fmt::{Debug, Display}; use rand::Rng; use shared_memory::{Shmem, ShmemConf, ShmemError}; @@ -26,6 +26,18 @@ pub struct Segment { pub id: ID, } +impl Debug for Segment +where + ID: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Segment") + .field("shmem", &self.shmem.as_ptr()) + .field("id", &self.id) + .finish() + } +} + impl Segment where rand::distributions::Standard: rand::distributions::Distribution, diff --git a/commons/zenoh-shm/src/test_helpers.rs b/commons/zenoh-shm/src/test_helpers.rs new file mode 100644 index 0000000000..0b678bcabc --- /dev/null +++ b/commons/zenoh-shm/src/test_helpers.rs @@ -0,0 +1,54 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh_result::ZResult; + +pub const TEST_SEGMENT_PREFIX: &str = "test"; + +pub fn validate_memory(mem1: &mut [u8], mem2: &[u8]) { + assert!(mem1.len() == mem2.len()); + for cycle in 0..255u8 { + // sequentially fill segment1 with values checking segment2 having these changes + for i in 0..mem1.len() { + mem1[i] = cycle; + assert!(mem2[i] == cycle); + } + + // check the whole segment2 having proper values + for i in mem2 { + assert!(*i == cycle); + } + } +} + +pub fn execute_concurrent(concurrent_tasks: usize, iterations: usize, task_fun: TaskFun) +where + TaskFun: Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static, +{ + let mut tasks = vec![]; + for task_index in 0..concurrent_tasks { + let c_task_fun = task_fun.clone(); + let task_handle = std::thread::spawn(move || { + for iteration in 0..iterations { + if let Err(e) = c_task_fun(task_index, iteration) { + panic!("task {task_index}: iteration {iteration}: {e}") + } + } + }); + tasks.push(task_handle); + } + for task in tasks { + task.join().expect("Error joining thread!"); + } +} diff --git a/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs b/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs new file mode 100644 index 0000000000..45917d5bdc --- /dev/null +++ b/commons/zenoh-shm/src/watchdog/allocated_watchdog.rs @@ -0,0 +1,35 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use super::{descriptor::OwnedDescriptor, storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR}; + +#[derive(Debug)] +pub struct AllocatedWatchdog { + pub descriptor: OwnedDescriptor, +} + +impl AllocatedWatchdog { + pub(crate) fn new(descriptor: OwnedDescriptor) -> Self { + // reset descriptor on allocation + descriptor.validate(); + Self { descriptor } + } +} + +impl Drop for AllocatedWatchdog { + fn drop(&mut self) { + GLOBAL_VALIDATOR.remove(self.descriptor.clone()); + GLOBAL_STORAGE.free_watchdog(self.descriptor.clone()); + } +} diff --git a/commons/zenoh-shm/src/watchdog/confirmator.rs b/commons/zenoh-shm/src/watchdog/confirmator.rs index 21c5173e31..284af7bb42 100644 --- a/commons/zenoh-shm/src/watchdog/confirmator.rs +++ b/commons/zenoh-shm/src/watchdog/confirmator.rs @@ -14,13 +14,13 @@ use std::{ collections::BTreeMap, - sync::{atomic::AtomicBool, Arc, Mutex}, - thread::{self}, + sync::{atomic::AtomicBool, Arc, RwLock}, time::Duration, }; use lazy_static::lazy_static; -use log::error; +use log::warn; +use thread_priority::*; use zenoh_result::{zerror, ZResult}; use super::{ @@ -35,42 +35,86 @@ lazy_static! { pub struct ConfirmedDescriptor { pub owned: OwnedDescriptor, - confirmed: Arc>>, + confirmed: Arc, } impl Drop for ConfirmedDescriptor { fn drop(&mut self) { - match self.confirmed.lock() { - Ok(mut guard) => match guard.entry(self.owned.clone()) { - std::collections::btree_map::Entry::Occupied(mut e) => { - let val = e.get_mut(); - if *val == 1 { - e.remove(); - } else { - *val -= 1; - } - } - std::collections::btree_map::Entry::Vacant(_) => error!("Watchdog not found!"), - }, - Err(e) => error!("{e}"), - } + self.confirmed.remove(self.owned.clone()); } } impl ConfirmedDescriptor { fn new( owned: OwnedDescriptor, - confirmed: Arc>>, + confirmed: Arc, ) -> Self { + owned.confirm(); + confirmed.add(owned.clone()); Self { owned, confirmed } } } +#[derive(PartialEq)] +enum Transaction { + Add, + Remove, +} + +struct ConfirmedSegment { + segment: Arc, + transactions: lockfree::queue::Queue<(Transaction, OwnedDescriptor)>, +} + +impl ConfirmedSegment { + fn new(segment: Arc) -> Self { + Self { + segment, + transactions: lockfree::queue::Queue::default(), + } + } + + fn add(&self, descriptor: OwnedDescriptor) { + self.transactions.push( (Transaction::Add, descriptor)); + } + + fn remove(&self, descriptor: OwnedDescriptor) { + self.transactions.push( (Transaction::Remove, descriptor)); + } + + fn collect_transactions(&self, watchdogs: &mut BTreeMap) { + while let Some((transaction, descriptor)) = self.transactions.pop() { + // collect transactions + match watchdogs.entry(descriptor) { + std::collections::btree_map::Entry::Vacant(vacant) => { + #[cfg(feature = "test")] + assert!( transaction == Transaction::Add ); + vacant.insert(1); + }, + std::collections::btree_map::Entry::Occupied(mut occupied) => match transaction { + Transaction::Add => { + *occupied.get_mut() += 1; + }, + Transaction::Remove => { + if *occupied.get() == 1 { + occupied.remove(); + } else { + *occupied.get_mut() -= 1; + } + }, + }, + } + } + } +} +unsafe impl Send for ConfirmedSegment {} +unsafe impl Sync for ConfirmedSegment {} + // todo: optimize confirmation by packing descriptors AND linked table together // todo: think about linked table cleanup pub struct WatchdogConfirmator { - linked_table: Mutex>>, - confirmed: Arc>>, + confirmed: RwLock>>, + segment_transactions: Arc>>, running: Arc, } @@ -83,67 +127,104 @@ impl Drop for WatchdogConfirmator { impl WatchdogConfirmator { fn new(interval: Duration) -> Self { - let confirmed = Arc::new(Mutex::new(BTreeMap::::default())); + let segment_transactions = Arc::>>::default(); let running = Arc::new(AtomicBool::new(true)); - let c_confirmed = confirmed.clone(); + let c_segment_transactions = segment_transactions.clone(); let c_running = running.clone(); - let _ = thread::spawn(move || { - while c_running.load(std::sync::atomic::Ordering::Relaxed) { - let guard = c_confirmed.lock().unwrap(); - for (descriptor, _count) in guard.iter() { - descriptor.confirm(); - } - drop(guard); - std::thread::sleep(interval); - } - }); + + let _ = ThreadBuilder::default() + .name("Watchdog Confirmator thread") + //.policy(ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Deadline)) + //.priority(ThreadPriority::Deadline { runtime: Duration::from_micros(100), deadline: interval, period: interval, flags: DeadlineFlags::default() }) + //.policy(ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo)) + .priority(ThreadPriority::Crossplatform(ThreadPriorityValue::try_from(48).unwrap())) + .spawn(move |result| { + if let Err(e) = result { + let ret = std::io::Error::last_os_error().to_string(); + panic!("Watchdog Confirmator: error setting thread priority: {:?}, will continue operating with default priority...", e); + panic!(""); + } + + let mut segments: Vec<(Arc, BTreeMap)> = vec![]; + while c_running.load(std::sync::atomic::Ordering::Relaxed) { + let cycle_start = std::time::Instant::now(); + + // add new segments + while let Some(new_segment) = c_segment_transactions.as_ref().pop() { + segments.push((new_segment, BTreeMap::default())); + } + + // collect all existing transactions + for (segment, watchdogs) in &mut segments { + segment.collect_transactions(watchdogs); + } + + // confirm all tracked watchdogs + for (_, watchdogs) in &segments { + for watchdog in watchdogs { + watchdog.0.confirm(); + } + } + + // sleep for next iteration + let elapsed = cycle_start.elapsed(); + if elapsed < interval { + let sleep_interval = interval - elapsed; + std::thread::sleep(sleep_interval); + } else { + warn!("Watchdog confirmation timer overrun!"); + #[cfg(feature = "test")] + panic!("Watchdog confirmation timer overrun!"); + } + } + }); Self { - linked_table: Mutex::default(), - confirmed, + confirmed: RwLock::default(), + segment_transactions, running, } } - pub fn add_owned(&self, watchdog: OwnedDescriptor) -> ZResult { - watchdog.confirm(); - let mut guard = self.confirmed.lock().map_err(|e| zerror!("{e}"))?; - match guard.entry(watchdog.clone()) { - std::collections::btree_map::Entry::Vacant(vacant) => { - vacant.insert(1); - } - std::collections::btree_map::Entry::Occupied(mut occupied) => { - *occupied.get_mut() += 1; - } - } - drop(guard); - Ok(ConfirmedDescriptor::new(watchdog, self.confirmed.clone())) + pub fn add_owned(&self, descriptor: &OwnedDescriptor) -> ZResult { + self.add(&Descriptor::from(descriptor)) } pub fn add(&self, descriptor: &Descriptor) -> ZResult { - let watchdog = self.link(descriptor)?; - self.add_owned(watchdog) - } + let guard = self.confirmed.read().map_err(|e| zerror!("{e}"))?; + if let Some(segment) = guard.get(&descriptor.id) { + return self.link(descriptor, segment); + } + drop(guard); - fn link(&self, descriptor: &Descriptor) -> ZResult { - let mut guard = self.linked_table.lock().map_err(|e| zerror!("{e}"))?; + let segment = Arc::new(Segment::open(descriptor.id)?); + let confirmed_segment = Arc::new(ConfirmedSegment::new(segment)); + let confirmed_descriptoir = self.link(descriptor, &confirmed_segment); - let segment = match guard.entry(descriptor.id) { + let mut guard = self.confirmed.write().map_err(|e| zerror!("{e}"))?; + match guard.entry(descriptor.id) { std::collections::btree_map::Entry::Vacant(vacant) => { - let segment = Arc::new(Segment::open(descriptor.id)?); - vacant.insert(segment.clone()); - segment + vacant.insert(confirmed_segment.clone()); + self.segment_transactions.push(confirmed_segment); + confirmed_descriptoir } - std::collections::btree_map::Entry::Occupied(occupied) => occupied.get().clone(), - }; + std::collections::btree_map::Entry::Occupied(occupied) => { + self.link(descriptor, occupied.get()) + } + } + } + + fn link(&self, descriptor: &Descriptor, segment: &Arc) -> ZResult { let index = descriptor.index_and_bitpos >> 6; let bitpos = descriptor.index_and_bitpos & 0x3f; - let atomic = unsafe { segment.array.elem(index) }; + let atomic = unsafe { segment.segment.array.elem(index) }; let mask = 1u64 << bitpos; - Ok(OwnedDescriptor::new(segment, atomic, mask)) + let owned = OwnedDescriptor::new(segment.segment.clone(), atomic, mask); + let confirmed = ConfirmedDescriptor::new(owned, segment.clone()); + Ok(confirmed) } } diff --git a/commons/zenoh-shm/src/watchdog/descriptor.rs b/commons/zenoh-shm/src/watchdog/descriptor.rs index 757dd470ae..866ea31a33 100644 --- a/commons/zenoh-shm/src/watchdog/descriptor.rs +++ b/commons/zenoh-shm/src/watchdog/descriptor.rs @@ -12,7 +12,10 @@ // ZettaScale Zenoh Team, // -use std::sync::{atomic::AtomicU64, Arc}; +use std::{ + hash::Hash, + sync::{atomic::AtomicU64, Arc}, +}; use super::segment::Segment; @@ -45,18 +48,25 @@ impl From<&OwnedDescriptor> for Descriptor { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct OwnedDescriptor { segment: Arc, - atomic: *const AtomicU64, - mask: u64, + pub atomic: *const AtomicU64, + pub mask: u64, } unsafe impl Send for OwnedDescriptor {} unsafe impl Sync for OwnedDescriptor {} +impl Hash for OwnedDescriptor { + fn hash(&self, state: &mut H) { + self.atomic.hash(state); + self.mask.hash(state); + } +} + impl OwnedDescriptor { - pub fn new(segment: Arc, atomic: *const AtomicU64, mask: u64) -> Self { + pub(crate) fn new(segment: Arc, atomic: *const AtomicU64, mask: u64) -> Self { Self { segment, atomic, @@ -70,11 +80,16 @@ impl OwnedDescriptor { }; } - pub fn validate(&self) -> u64 { + pub(crate) fn validate(&self) -> u64 { unsafe { (*self.atomic).fetch_and(!self.mask, std::sync::atomic::Ordering::SeqCst) & self.mask } } + + #[cfg(feature = "test")] + pub fn test_validate(&self) -> u64 { + self.validate() + } } impl Ord for OwnedDescriptor { diff --git a/commons/zenoh-shm/src/watchdog/mod.rs b/commons/zenoh-shm/src/watchdog/mod.rs index 468a4ea653..22278d6632 100644 --- a/commons/zenoh-shm/src/watchdog/mod.rs +++ b/commons/zenoh-shm/src/watchdog/mod.rs @@ -12,8 +12,12 @@ // ZettaScale Zenoh Team, // -pub mod confirmator; pub mod descriptor; + +tested_crate_module!(storage); +tested_crate_module!(validator); +tested_crate_module!(confirmator); + +pub(crate) mod allocated_watchdog; + mod segment; -pub mod storage; -mod validator; diff --git a/commons/zenoh-shm/src/watchdog/segment.rs b/commons/zenoh-shm/src/watchdog/segment.rs index ddcfe9c250..d0c222d560 100644 --- a/commons/zenoh-shm/src/watchdog/segment.rs +++ b/commons/zenoh-shm/src/watchdog/segment.rs @@ -22,6 +22,7 @@ use super::descriptor::SegmentID; const WATCHDOG_SEGMENT_PREFIX: &str = "watchdog"; +#[derive(Debug)] pub struct Segment { pub array: ArrayInSHM, } diff --git a/commons/zenoh-shm/src/watchdog/storage.rs b/commons/zenoh-shm/src/watchdog/storage.rs index 701c7557e1..04d178272b 100644 --- a/commons/zenoh-shm/src/watchdog/storage.rs +++ b/commons/zenoh-shm/src/watchdog/storage.rs @@ -15,34 +15,25 @@ use lazy_static::lazy_static; use std::{ collections::BTreeSet, sync::{Arc, Mutex}, - time::Duration, }; use zenoh_result::{zerror, ZResult}; -use crate::header::descriptor::OwnedHeaderDescriptor; - -use super::{ - confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR}, - descriptor::OwnedDescriptor, - segment::Segment, - validator::WatchdogValidator, -}; +use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment}; lazy_static! { - pub static ref GLOBAL_STORAGE: Storage = Storage::new(512, Duration::from_millis(100)).unwrap(); + pub static ref GLOBAL_STORAGE: Storage = Storage::new(65536).unwrap(); } pub struct Storage { available: Arc>>, - validator: WatchdogValidator, } // todo: expand and shrink Storage when needed // OR // support multiple descrptor assignment (allow multiple buffers to be assigned to the same watchdog) impl Storage { - pub fn new(initial_watchdog_count: usize, watchdog_interval: Duration) -> ZResult { + pub fn new(initial_watchdog_count: usize) -> ZResult { let segment = Arc::new(Segment::create(initial_watchdog_count)?); let mut initially_available = BTreeSet::default(); @@ -53,34 +44,33 @@ impl Storage { for bit in 0..64 { let mask = 1u64 << bit; let descriptor = OwnedDescriptor::new(segment.clone(), atomic, mask); - initially_available.insert(descriptor); + let _new_insert = initially_available.insert(descriptor); + #[cfg(feature = "test")] + assert!(_new_insert); } } - let available = Arc::new(Mutex::new(initially_available)); - - let c_available = available.clone(); - let validator = WatchdogValidator::new(watchdog_interval, move |descriptor| { - if let Ok(mut guard) = c_available.lock() { - let _ = guard.insert(descriptor); - } - }); - Ok(Self { - available, - validator, + available: Arc::new(Mutex::new(initially_available)), }) } - pub fn allocate_watchdog(&self, header: OwnedHeaderDescriptor) -> ZResult { + pub fn allocate_watchdog(&self) -> ZResult { let mut guard = self.available.lock().map_err(|e| zerror!("{e}"))?; let popped = guard.pop_first(); drop(guard); - let watchdog = popped.ok_or_else(|| zerror!("no free watchdogs available"))?; - let confirmed = GLOBAL_CONFIRMATOR.add_owned(watchdog.clone())?; - self.validator.add(watchdog, header)?; + let allocated = + AllocatedWatchdog::new(popped.ok_or_else(|| zerror!("no free watchdogs available"))?); + + Ok(allocated) + } - Ok(confirmed) + pub(crate) fn free_watchdog(&self, descriptor: OwnedDescriptor) { + if let Ok(mut guard) = self.available.lock() { + let _new_insert = guard.insert(descriptor); + #[cfg(feature = "test")] + assert!(_new_insert); + } } } diff --git a/commons/zenoh-shm/src/watchdog/validator.rs b/commons/zenoh-shm/src/watchdog/validator.rs index b2bd9682a0..243674ee1c 100644 --- a/commons/zenoh-shm/src/watchdog/validator.rs +++ b/commons/zenoh-shm/src/watchdog/validator.rs @@ -16,21 +16,63 @@ use std::{ collections::BTreeMap, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc }, - thread::{self}, - time::Duration, + time::{Duration, SystemTime}, }; -use zenoh_result::{bail, zerror, ZResult}; +use lazy_static::lazy_static; -use crate::header::descriptor::OwnedHeaderDescriptor; +use log::{error, warn}; +use thread_priority::{ThreadBuilderExt, ThreadPriority}; use super::descriptor::OwnedDescriptor; +pub(super) type InvalidateCallback = Box; + +lazy_static! { + pub static ref GLOBAL_VALIDATOR: WatchdogValidator = + WatchdogValidator::new(Duration::from_millis(100)); +} + +enum Transaction { + Add(InvalidateCallback), + Remove, +} + +#[derive(Default)] +struct ValidatedStorage { + transactions: lockfree::queue::Queue<(Transaction, OwnedDescriptor)>, +} + +impl ValidatedStorage { + fn add(&self, descriptor: OwnedDescriptor, on_invalidated: InvalidateCallback) { + self.transactions.push( (Transaction::Add(on_invalidated), descriptor)); + } + + fn remove(&self, descriptor: OwnedDescriptor) { + self.transactions.push( (Transaction::Remove, descriptor)); + } + + fn collect_transactions(&self, storage: &mut BTreeMap) { + while let Some((transaction, descriptor)) = self.transactions.pop() { + match transaction { + Transaction::Add(on_invalidated) => { + let _old = storage.insert(descriptor, on_invalidated); + #[cfg(feature = "test")] + assert!(_old.is_none()); + } + Transaction::Remove => { + let _ = storage.remove(&descriptor); + } + } + } + } +} + // todo: optimize validation by packing descriptors pub struct WatchdogValidator { - watched: Arc>>, + storage: Arc, running: Arc, } @@ -42,42 +84,61 @@ impl Drop for WatchdogValidator { } impl WatchdogValidator { - pub fn new(interval: Duration, on_dropped: F) -> Self - where - F: Fn(OwnedDescriptor) + Send + 'static, - { - let watched = Arc::new(Mutex::new( - BTreeMap::::default(), - )); + pub fn new(interval: Duration) -> Self { + let storage = Arc::new(ValidatedStorage::default()); let running = Arc::new(AtomicBool::new(true)); - let c_watched = watched.clone(); + let c_storage = storage.clone(); let c_running = running.clone(); - let _ = thread::spawn(move || { - while c_running.load(Ordering::Relaxed) { - let mut guard = c_watched.lock().unwrap(); - guard.retain(|watchdog, header| { - let old_val = watchdog.validate(); - if old_val == 0 { - header.header().watchdog_flag.store(false, Ordering::SeqCst); - on_dropped(watchdog.clone()); // todo: get rid of .clone() - return false; + let _ = std::thread::Builder::new() + .name("Watchdog Validator thread".to_owned()) + .spawn_with_priority(ThreadPriority::Min, move |result| { + if let Err(e) = result { + error!("Watchdog Validator: error setting thread priority: {:?}, will continue operating with default priority...", e); + panic!(""); + } + + let mut watchdogs = BTreeMap::default(); + while c_running.load(Ordering::Relaxed) { + let cycle_start = std::time::Instant::now(); + + c_storage.collect_transactions(&mut watchdogs); + + // sleep for next iteration + let elapsed = cycle_start.elapsed(); + if elapsed < interval { + let sleep_interval = interval - elapsed; + std::thread::sleep(sleep_interval); + } else { + warn!("Watchdog validation timer overrun!"); + #[cfg(feature = "test")] + panic!("Watchdog validation timer overrun!"); } - true - }); - drop(guard); - std::thread::sleep(interval); - } - }); - Self { watched, running } + watchdogs + .retain(|watchdog, on_invalidated| { + let old_val = watchdog.validate(); + if old_val == 0 { + on_invalidated(); + return false; + } + true + }); + } + }); + + Self { storage, running } } - pub fn add(&self, watchdog: OwnedDescriptor, header: OwnedHeaderDescriptor) -> ZResult<()> { - let mut guard = self.watched.lock().map_err(|e| zerror!("{e}"))?; - if guard.insert(watchdog, header).is_none() { - return Ok(()); - } - bail!("Watchdog already exists!") + pub fn add( + &self, + watchdog: OwnedDescriptor, + on_invalidated: InvalidateCallback, + ) { + self.storage.add(watchdog, on_invalidated); + } + + pub fn remove(&self, watchdog: OwnedDescriptor) { + self.storage.remove(watchdog); } } diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs new file mode 100644 index 0000000000..2e9786e65d --- /dev/null +++ b/commons/zenoh-shm/tests/header.rs @@ -0,0 +1,126 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::atomic::Ordering::Relaxed; + +use rand::Rng; +use zenoh_result::ZResult; +use zenoh_shm::{header::{ + descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, + subscription::GLOBAL_HEADER_SUBSCRIPTION, +}, test_helpers::execute_concurrent}; + +fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let _allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; + Ok(()) + } +} + +#[test] +fn header_alloc() { + execute_concurrent(1, 10000, header_alloc_fn()); +} + +#[test] +fn header_alloc_concurrent() { + execute_concurrent(100, 10000, header_alloc_fn()); +} + +fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| { + let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; + let descr = HeaderDescriptor::from(&allocated_header.descriptor); + let _linked_header = GLOBAL_HEADER_SUBSCRIPTION.link(&descr)?; + Ok(()) + } +} + +#[test] +fn header_link() { + execute_concurrent(1, 10000, header_link_fn()); +} + +#[test] +fn header_link_concurrent() { + execute_concurrent(100, 10000, header_link_fn()); +} + +fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| { + let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; + let descr = HeaderDescriptor::from(&allocated_header.descriptor); + drop(allocated_header); + + // Some comments on this behaviour... + // Even though the allocated_header is dropped, it's SHM segment still exists in GLOBAL_HEADER_STORAGE, + // so there is no way to detect that header is "deallocated" and the code below succeeds. The invalidation + // funcionality is implemented on higher level by means of generation mechanism and protects from both header + // and watchdog link-to-deallocated issues. This generation mechanism depends on the behaviour below, so + // everything is fair :) + let _linked_header = GLOBAL_HEADER_SUBSCRIPTION.link(&descr)?; + Ok(()) + } +} + +#[test] +fn header_link_failure() { + execute_concurrent(1, 10000, header_link_failure_fn()); +} + +#[test] +fn header_link_failure_concurrent() { + execute_concurrent(100, 10000, header_link_failure_fn()); +} + +fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { + let task_fun = |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?; + let descr = HeaderDescriptor::from(&allocated_header.descriptor); + let linked_header = GLOBAL_HEADER_SUBSCRIPTION.link(&descr)?; + + let mut rng = rand::thread_rng(); + let allocated = allocated_header.descriptor.header(); + let linked = linked_header.header(); + for _ in 0..100 { + let gen = rng.gen(); + allocated.generation.store(gen, Relaxed); + assert_eq!(gen, linked.generation.load(Relaxed)); + + let rc = rng.gen(); + allocated.refcount.store(rc, Relaxed); + assert_eq!(rc, linked.refcount.load(Relaxed)); + + let watchdog_inv = rng.gen(); + allocated.watchdog_invalidated.store(watchdog_inv, Relaxed); + assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); + + assert_eq!(gen, linked.generation.load(Relaxed)); + assert_eq!(rc, linked.refcount.load(Relaxed)); + assert_eq!(watchdog_inv, linked.watchdog_invalidated.load(Relaxed)); + } + Ok(()) + }; + execute_concurrent(parallel_tasks, iterations, task_fun); +} + +#[test] +fn header_check_memory() { + header_check_memory_fn(1, 10000); +} + +#[test] +fn header_check_memory_concurrent() { + header_check_memory_fn(100, 1000); +} diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs new file mode 100644 index 0000000000..3590936e85 --- /dev/null +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -0,0 +1,157 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{fmt::Debug, mem::size_of}; + +use num_traits::{AsPrimitive, PrimInt, Unsigned}; +use zenoh_shm::{posix_shm::array::ArrayInSHM, test_helpers::TEST_SEGMENT_PREFIX}; + +type TestElemID = u32; + +#[derive(Debug)] +struct TestElem { + value: u32, +} + +impl TestElem { + fn fill(&mut self, counter: &mut u32) { + self.value = *counter; + *counter += 1; + } + + fn validate(&self, counter: &mut u32) { + assert_eq!(self.value, *counter); + *counter += 1; + } +} + +fn validate_array( + array1: &mut ArrayInSHM, + array2: &ArrayInSHM, + expected_elem_count: usize, +) where + ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive, + isize: AsPrimitive, + usize: AsPrimitive, +{ + assert!(array1.elem_count() == expected_elem_count); + assert!(array2.elem_count() == expected_elem_count); + + let mut fill_ctr = 0; + let mut validate_ctr = 0; + + // first of all, fill and validate elements sequentially + for i in 0..array1.elem_count() { + unsafe { + let elem1 = &mut *array1.elem_mut(i.as_()); + let elem2 = &*array2.elem(i.as_()); + + elem1.fill(&mut fill_ctr); + elem2.validate(&mut validate_ctr); + } + } + + // then fill all the elements... + for i in 0..array1.elem_count() { + unsafe { + let elem1 = &mut *array1.elem_mut(i.as_()); + elem1.fill(&mut fill_ctr); + } + } + + // ...and validate all the elements + for i in 0..array2.elem_count() { + unsafe { + let elem2 = &*array2.elem(i.as_()); + elem2.validate(&mut validate_ctr); + } + } +} + +fn test_array() +where + ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive, + isize: AsPrimitive, + usize: AsPrimitive, +{ + // Estimate elem count to test + // NOTE: for index sizes <= 16 bit we use the whole index range to test, + // and for bigger indexes we use limited index range + let elem_count = { + match size_of::() > size_of::() { + true => 100, + false => ElemIndex::max_value().as_() + 1, + } + }; + + let mut new_arr: ArrayInSHM = + ArrayInSHM::create(elem_count, TEST_SEGMENT_PREFIX).expect("error creating new array!"); + + let opened_arr: ArrayInSHM<_, TestElem, ElemIndex> = + ArrayInSHM::open(new_arr.id(), TEST_SEGMENT_PREFIX).expect("error opening existing array!"); + + validate_array(&mut new_arr, &opened_arr, elem_count); +} + +/// MEMORY CHECKS /// + +#[test] +fn arr_u8_index_memory_test() { + test_array::(); +} + +#[test] +fn arr_u16_index_memory_test() { + test_array::(); +} + +#[test] +fn arr_u32_index_memory_test() { + test_array::(); +} + +/// ELEM COUNT CHECKS /// + +fn test_invalid_elem_index() +where + ElemIndex: Unsigned + PrimInt + 'static + AsPrimitive + Debug, + isize: AsPrimitive, + usize: AsPrimitive, +{ + let invalid_elem_count = ElemIndex::max_value().as_() + 2; + + let _ = ArrayInSHM::::create( + invalid_elem_count, + TEST_SEGMENT_PREFIX, + ) + .expect_err( + format!("must fail: element count {invalid_elem_count} is out of range for ElemIndex!") + .as_str(), + ); +} + +#[test] +fn arr_u8_index_invalid_elem_count() { + test_invalid_elem_index::(); +} + +#[test] +fn arr_u16_index_invalid_elem_count() { + test_invalid_elem_index::(); +} + +#[test] +fn arr_u32_index_invalid_elem_count() { + test_invalid_elem_index::(); +} diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs new file mode 100644 index 0000000000..6f2daa188c --- /dev/null +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -0,0 +1,132 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{fmt::Display, slice}; + +use zenoh_shm::{ + posix_shm::segment::Segment, + test_helpers::{validate_memory, TEST_SEGMENT_PREFIX}, +}; + +fn validate_segment(segment1: &Segment, segment2: &Segment) { + assert!(segment1.shmem.len() == segment2.shmem.len()); + + let ptr1 = segment1.shmem.as_ptr(); + let ptr2 = segment2.shmem.as_ptr(); + + let slice1 = unsafe { slice::from_raw_parts_mut(ptr1, segment1.shmem.len()) }; + let slice2 = unsafe { slice::from_raw_parts(ptr2, segment2.shmem.len()) }; + + validate_memory(slice1, slice2); +} + +fn test_segment() +where + rand::distributions::Standard: rand::distributions::Distribution, + ID: Copy + Clone + Display, +{ + let new_segment: Segment = + Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + + let opened_segment_instance_1 = Segment::open(new_segment.id, TEST_SEGMENT_PREFIX) + .expect("error opening existing segment!"); + + validate_segment(&new_segment, &opened_segment_instance_1); + + let opened_segment_instance_2 = Segment::open(new_segment.id, TEST_SEGMENT_PREFIX) + .expect("error opening existing segment!"); + + validate_segment(&new_segment, &opened_segment_instance_1); + validate_segment(&new_segment, &opened_segment_instance_2); + + drop(opened_segment_instance_1); + validate_segment(&new_segment, &opened_segment_instance_2); +} + +/// UNSIGNED /// + +#[test] +fn segment_u8_id() { + test_segment::() +} + +#[test] +fn segment_u16_id() { + test_segment::() +} + +#[test] +fn segment_u32_id() { + test_segment::() +} + +#[test] +fn segment_u64_id() { + test_segment::() +} + +#[test] +fn segment_u128_id() { + test_segment::() +} + +/// SIGNED /// + +#[test] +fn segment_i8_id() { + test_segment::() +} + +#[test] +fn segment_i16_id() { + test_segment::() +} + +#[test] +fn segment_i32_id() { + test_segment::() +} + +#[test] +fn segment_i64_id() { + test_segment::() +} + +#[test] +fn segment_i128_id() { + test_segment::() +} + +/// Behaviour checks /// + +#[test] +fn segment_open() { + let new_segment: Segment = + Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + + let _opened_segment = Segment::open(new_segment.id, TEST_SEGMENT_PREFIX) + .expect("error opening existing segment!"); +} + +#[test] +fn segment_open_error() { + let id = { + let new_segment: Segment = + Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); + new_segment.id + }; + + let _opened_segment = Segment::open(id, TEST_SEGMENT_PREFIX) + .expect_err("must fail: opened not existing segment!"); +} diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs new file mode 100644 index 0000000000..85b2c4b198 --- /dev/null +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -0,0 +1,236 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; + +use zenoh_result::{bail, ZResult}; +use zenoh_shm::{ + test_helpers::execute_concurrent, + watchdog::{ + confirmator::GLOBAL_CONFIRMATOR, storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR, + }, +}; + +const VALIDATION_PERIOD: Duration = Duration::from_millis(100); + +fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let _allocated = GLOBAL_STORAGE.allocate_watchdog()?; + Ok(()) + } +} + +#[test] +fn watchdog_alloc() { + execute_concurrent(1, 10000, watchdog_alloc_fn()); +} + +#[test] +fn watchdog_alloc_concurrent() { + execute_concurrent(1000, 10000, watchdog_alloc_fn()); +} + +fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated = GLOBAL_STORAGE.allocate_watchdog()?; + let confirmed = GLOBAL_CONFIRMATOR.add_owned(&allocated.descriptor)?; + + // check that the confirmed watchdog stays valid + for i in 0..10 { + std::thread::sleep(VALIDATION_PERIOD); + let valid = confirmed.owned.test_validate() != 0; + if !valid { + bail!("Invalid watchdog, iteration {i}"); + } + } + Ok(()) + } +} + +#[test] +fn watchdog_confirmed() { + execute_concurrent(1, 10, watchdog_confirmed_fn()); +} + +#[test] +fn watchdog_confirmed_concurrent() { + execute_concurrent(1000, 10, watchdog_confirmed_fn()); +} + +//#[test] +//fn watchdog_confirmed_dangling() { +// let allocated = GLOBAL_STORAGE +// .allocate_watchdog() +// .expect("error allocating watchdog!"); +// let confirmed = GLOBAL_CONFIRMATOR +// .add_owned(&allocated.descriptor) +// .expect("error adding watchdog to confirmator!"); +// drop(allocated); +// +// // confirm dangling (not allocated) watchdog +// for _ in 0..10 { +// std::thread::sleep(VALIDATION_PERIOD); +// confirmed.owned.confirm(); +// } +//} + +fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated = GLOBAL_STORAGE.allocate_watchdog()?; + let confirmed = GLOBAL_CONFIRMATOR.add_owned(&allocated.descriptor)?; + + let valid = Arc::new(AtomicBool::new(true)); + { + let c_valid = valid.clone(); + GLOBAL_VALIDATOR.add( + allocated.descriptor.clone(), + Box::new(move || { + c_valid.store(false, std::sync::atomic::Ordering::SeqCst); + }), + ); + } + + // check that the watchdog stays valid as it is confirmed + for i in 0..10 { + std::thread::sleep(VALIDATION_PERIOD); + if !valid.load(std::sync::atomic::Ordering::SeqCst) { + bail!("Invalid watchdog, iteration {i}"); + } + } + + // Worst-case timings: + // validation: |___________|___________|___________|___________| + // confirmation: __|_____|_____|_____|_____| + // drop(confirmed): ^ + // It means that the worst-case latency for the watchdog to become invalid is VALIDATION_PERIOD*2 + + // check that the watchdog becomes invalid once we stop it's confirmation + drop(confirmed); + std::thread::sleep(VALIDATION_PERIOD * 3 + VALIDATION_PERIOD / 2); + assert!(!valid.load(std::sync::atomic::Ordering::SeqCst)); + + Ok(()) + } +} + +#[test] +fn watchdog_validated() { + execute_concurrent(1, 10, watchdog_validated_fn()); +} + +#[test] +fn watchdog_validated_concurrent() { + execute_concurrent(1000, 10, watchdog_validated_fn()); +} + +fn watchdog_validated_invalid_without_confirmator_fn( +) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated = GLOBAL_STORAGE + .allocate_watchdog() + .expect("error allocating watchdog!"); + + let valid = Arc::new(AtomicBool::new(true)); + { + let c_valid = valid.clone(); + GLOBAL_VALIDATOR.add( + allocated.descriptor.clone(), + Box::new(move || { + c_valid.store(false, std::sync::atomic::Ordering::SeqCst); + }), + ); + } + + assert!(allocated.descriptor.test_validate() == 0); + + // check that the watchdog becomes invalid because we do not confirm it + std::thread::sleep(VALIDATION_PERIOD * 2 + VALIDATION_PERIOD / 2); + assert!(!valid.load(std::sync::atomic::Ordering::SeqCst)); + Ok(()) + } +} + +#[test] +fn watchdog_validated_invalid_without_confirmator() { + execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); +} + +#[test] +fn watchdog_validated_invalid_without_confirmator_concurrent() { + execute_concurrent( + 1000, + 10, + watchdog_validated_invalid_without_confirmator_fn(), + ); +} + +fn watchdog_validated_additional_confirmation_fn( +) -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static { + |_task_index: usize, _iteration: usize| -> ZResult<()> { + let allocated = GLOBAL_STORAGE + .allocate_watchdog() + .expect("error allocating watchdog!"); + let confirmed = GLOBAL_CONFIRMATOR + .add_owned(&allocated.descriptor) + .expect("error adding watchdog to confirmator!"); + + let allow_invalid = Arc::new(AtomicBool::new(false)); + { + let c_allow_invalid = allow_invalid.clone(); + GLOBAL_VALIDATOR.add( + allocated.descriptor.clone(), + Box::new(move || { + assert!(c_allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); + c_allow_invalid.store(false, std::sync::atomic::Ordering::SeqCst); + }), + ); + } + + // make additional confirmations + for _ in 0..100 { + std::thread::sleep(VALIDATION_PERIOD / 10); + confirmed.owned.confirm(); + } + + // check that the watchdog stays valid as we stop additional confirmation + std::thread::sleep(VALIDATION_PERIOD * 10); + + // Worst-case timings: + // validation: |___________|___________|___________|___________| + // confirmation: __|_____|_____|_____|_____| + // drop(confirmed): ^ + // It means that the worst-case latency for the watchdog to become invalid is VALIDATION_PERIOD*2 + + // check that the watchdog becomes invalid once we stop it's regular confirmation + drop(confirmed); + allow_invalid.store(true, std::sync::atomic::Ordering::SeqCst); + std::thread::sleep(VALIDATION_PERIOD * 2 + VALIDATION_PERIOD / 2); + // check that invalidation event happened! + assert!(!allow_invalid.load(std::sync::atomic::Ordering::SeqCst)); + Ok(()) + } +} + +#[test] +fn watchdog_validated_additional_confirmation() { + execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); +} + +#[test] +fn watchdog_validated_additional_confirmation_concurrent() { + execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); +} diff --git a/zenoh/tests/shm.rs b/zenoh/tests/shm.rs index 4fc3af4934..a7cbb3d777 100644 --- a/zenoh/tests/shm.rs +++ b/zenoh/tests/shm.rs @@ -122,7 +122,7 @@ mod tests { // Put data println!("[PS][03b] Putting on peer02 session. {MSG_COUNT} msgs of {size} bytes."); - for _ in 0..msg_count { + for c in 0..msg_count { // Create the message to send let sbuf = ztimeout!(async { loop { @@ -132,12 +132,14 @@ mod tests { } } }); + println!("{c} created"); ztimeout!(peer02 .put(&key_expr, sbuf) .congestion_control(CongestionControl::Block) .res_async()) .unwrap(); + println!("{c} putted"); } // wat for all messages received