Skip to content

Commit

Permalink
WIP on SHM
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Jan 11, 2024
1 parent 26e0ac5 commit 0c1e385
Show file tree
Hide file tree
Showing 22 changed files with 1,148 additions and 167 deletions.
9 changes: 9 additions & 0 deletions commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ categories = { workspace = true }
description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
test = []

[dependencies]
bincode = { workspace = true }
log = { workspace = true }
Expand All @@ -38,3 +41,9 @@ zenoh-result = { workspace = true }
rand = { workspace = true }
lazy_static = { workspace = true }
num-traits = { workspace = true }
thread-priority = "0.15.1"
lockfree = "0.5.1"

[dev-dependencies]
zenoh-shm = { workspace = true, features = ["test"] }
libc = { workspace = true }
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/header/chunk_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::sync::atomic::{AtomicBool, AtomicU32};
pub struct ChunkHeaderType {
/*
todo: We don't really need 32 bits here, but access to 16-bit felds with 1 byte alignment is less performant on most of the platforms.
We need to bench and select the reasonable integer sizes here when we have an implementation to bench
We need to bench and select reasonable integer sizes here once we have an implementation to bench
*/
pub refcount: AtomicU32,
pub watchdog_invalidated: AtomicBool,
pub generation: AtomicU32,
pub watchdog_flag: AtomicBool,
}
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/header/descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct HeaderDescriptor {
impl From<&OwnedHeaderDescriptor> for HeaderDescriptor {
fn from(item: &OwnedHeaderDescriptor) -> Self {
let id = item.segment.array.id();
let index = unsafe { item.segment.array.index(item.header) } as HeaderIndex;
let index = unsafe { item.segment.array.index(item.header) };

Self { id, index }
}
Expand All @@ -44,7 +44,7 @@ unsafe impl Send for OwnedHeaderDescriptor {}
unsafe impl Sync for OwnedHeaderDescriptor {}

impl OwnedHeaderDescriptor {
pub fn new(segment: Arc<HeaderSegment>, header: *const ChunkHeaderType) -> Self {
pub(crate) fn new(segment: Arc<HeaderSegment>, header: *const ChunkHeaderType) -> Self {
Self { segment, header }
}

Expand Down
13 changes: 8 additions & 5 deletions commons/zenoh-shm/src/header/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod allocated_descriptor;
pub mod chunk_header;
pub mod descriptor;
pub mod segment;
pub mod storage;
pub mod subscription;

tested_crate_module!(storage);
tested_crate_module!(subscription);

pub(crate) mod allocated_descriptor;
pub(crate) mod chunk_header;

mod segment;
8 changes: 4 additions & 4 deletions commons/zenoh-shm/src/header/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ impl Storage {
.refcount
.store(1, std::sync::atomic::Ordering::SeqCst);
header
.watchdog_flag
.store(true, std::sync::atomic::Ordering::SeqCst);
.watchdog_invalidated
.store(false, std::sync::atomic::Ordering::SeqCst);

Ok(AllocatedHeaderDescriptor { descriptor })
}

pub(crate) fn reclaim_header(&self, header: OwnedHeaderDescriptor) {
// header deallocated - increment it's generation to invalidate any existing references, if any
pub fn reclaim_header(&self, header: OwnedHeaderDescriptor) {
// header deallocated - increment it's generation to invalidate any existing references
header
.header()
.generation
Expand Down
87 changes: 72 additions & 15 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,51 @@ use std::{
},
};
use watchdog::{
allocated_watchdog::AllocatedWatchdog,
confirmator::{ConfirmedDescriptor, GLOBAL_CONFIRMATOR},
descriptor::Descriptor,
storage::GLOBAL_STORAGE,
validator::GLOBAL_VALIDATOR,
};
use zenoh_buffers::ZSliceBuffer;
use zenoh_result::{zerror, ShmError, ZResult};
use zenoh_result::{bail, zerror, ShmError, ZResult};

#[macro_export]
macro_rules! tested_module {
($module:ident) => {
#[cfg(feature = "test")]
pub mod $module;
#[cfg(not(feature = "test"))]
mod $module;
};
}

#[macro_export]
macro_rules! tested_crate_module {
($module:ident) => {
#[cfg(feature = "test")]
pub mod $module;
#[cfg(not(feature = "test"))]
pub(crate) mod $module;
};
}

#[macro_export]
macro_rules! test_helpers_module {
() => {
#[cfg(feature = "test")]
pub mod test_helpers;
};
}

pub mod header;
mod posix_shm;
mod segment;
pub mod watchdog;

tested_module!(posix_shm);
tested_module!(segment);

test_helpers_module!();

const MIN_FREE_CHUNK_SIZE: usize = 1_024;

fn align_addr_at(addr: usize, align: usize) -> usize {
Expand Down Expand Up @@ -265,7 +298,11 @@ impl SharedMemoryReader {
info: info.clone(),
watchdog: Arc::new(GLOBAL_CONFIRMATOR.add(&info.watchdog_descriptor)?),
};
Ok(shmb)
// Validate buffer's generation
match shmb.is_generation_valid() {
true => Ok(shmb),
false => bail!(""),
}
}
None => {
let e = zerror!("Unable to find shared memory segment: {}", info.segment_id);
Expand Down Expand Up @@ -302,6 +339,7 @@ impl fmt::Debug for SharedMemoryReader {
pub struct BusyChunk {
chunk: Chunk,
header: AllocatedHeaderDescriptor,
_watchdog: AllocatedWatchdog,
}

/// A shared memory segment manager.
Expand Down Expand Up @@ -352,17 +390,35 @@ impl SharedMemoryManager {
}

fn free_chunk_map_to_shmbuf(&self, chunk: &Chunk) -> ZResult<(SharedMemoryBuf, BusyChunk)> {
// allocate shared header
let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?;
let header = allocated_header.descriptor.clone();

let watchdog = GLOBAL_STORAGE.allocate_watchdog(header.clone())?;
// allocate watchdog
let allocated_watchdog = GLOBAL_STORAGE.allocate_watchdog()?;
let descriptor = Descriptor::from(&allocated_watchdog.descriptor);

// add watchdog to confirmator
let confirmed_watchdog = GLOBAL_CONFIRMATOR.add(&descriptor)?;

// add watchdog to validator
let c_header = header.clone();
GLOBAL_VALIDATOR.add(
allocated_watchdog.descriptor.clone(),
Box::new(move || {
c_header
.header()
.watchdog_invalidated
.store(true, Ordering::SeqCst);
}),
);

let info = SharedMemoryBufInfo::new(
chunk.offset,
chunk.size,
self.own_segment.segment.id,
0,
Descriptor::from(&watchdog.owned),
descriptor,
HeaderDescriptor::from(&header),
header.header().generation.load(Ordering::SeqCst),
);
Expand All @@ -371,12 +427,13 @@ impl SharedMemoryManager {
buf: AtomicPtr::<u8>::new(chunk.base_addr),
len: chunk.size,
info,
watchdog: Arc::new(watchdog),
watchdog: Arc::new(confirmed_watchdog),
};

let busy_chunk = BusyChunk {
chunk: *chunk,
header: allocated_header,
_watchdog: allocated_watchdog,
};

Ok((shmb, busy_chunk))
Expand Down Expand Up @@ -412,7 +469,9 @@ impl SharedMemoryManager {
match self.free_chunk_map_to_shmbuf(&chunk) {
Ok(val) => Ok(val),
Err(_) => {
// no free watchdogs, try to free some by collecting the garbage
// no free watchdogs or headers, try to free some by collecting the garbage
println!("No free watchdogs or headers, trying to reclaim...");
log::trace!("No free watchdogs or headers, trying to reclaim...");
self.garbage_collect();
self.free_chunk_map_to_shmbuf(&chunk)
}
Expand Down Expand Up @@ -453,13 +512,11 @@ impl SharedMemoryManager {
}

fn is_free_chunk(chunk: &BusyChunk) -> bool {
let rc = chunk
.header
.descriptor
.header()
.refcount
.load(Ordering::SeqCst);
rc == 0
let header = chunk.header.descriptor.header();
if header.refcount.load(Ordering::SeqCst) != 0 {
return header.watchdog_invalidated.load(Ordering::SeqCst);
}
true
}

fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
Expand Down
6 changes: 6 additions & 0 deletions commons/zenoh-shm/src/posix_shm/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use zenoh_result::{bail, ZResult};
use super::segment::Segment;

/// An SHM segment that is intended to be an array of elements of some certain type
#[derive(Debug)]
pub struct ArrayInSHM<ID, Elem, ElemIndex> {
inner: Segment<ID>,
_phantom: PhantomData<(Elem, ElemIndex)>,
Expand Down Expand Up @@ -66,6 +67,11 @@ where
pub unsafe fn elem(&self, index: ElemIndex) -> *const Elem {
(self.inner.shmem.as_ptr() as *const Elem).add(index.as_())
}

#[cfg(feature = "test")]
pub unsafe fn elem_mut(&mut self, index: ElemIndex) -> *mut Elem {
(self.inner.shmem.as_ptr() as *mut Elem).add(index.as_())
}

pub unsafe fn index(&self, elem: *const Elem) -> ElemIndex {
elem.offset_from(self.inner.shmem.as_ptr() as *const Elem)
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/posix_shm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod array;
pub mod segment;
tested_crate_module!(array);
tested_crate_module!(segment);
14 changes: 13 additions & 1 deletion commons/zenoh-shm/src/posix_shm/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::fmt::Display;
use std::fmt::{Debug, Display};

use rand::Rng;
use shared_memory::{Shmem, ShmemConf, ShmemError};
Expand All @@ -26,6 +26,18 @@ pub struct Segment<ID> {
pub id: ID,
}

impl<ID> Debug for Segment<ID>
where
ID: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Segment")
.field("shmem", &self.shmem.as_ptr())
.field("id", &self.id)
.finish()
}
}

impl<ID> Segment<ID>
where
rand::distributions::Standard: rand::distributions::Distribution<ID>,
Expand Down
54 changes: 54 additions & 0 deletions commons/zenoh-shm/src/test_helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh_result::ZResult;

pub const TEST_SEGMENT_PREFIX: &str = "test";

pub fn validate_memory(mem1: &mut [u8], mem2: &[u8]) {
assert!(mem1.len() == mem2.len());
for cycle in 0..255u8 {
// sequentially fill segment1 with values checking segment2 having these changes
for i in 0..mem1.len() {
mem1[i] = cycle;
assert!(mem2[i] == cycle);
}

// check the whole segment2 having proper values
for i in mem2 {
assert!(*i == cycle);
}
}
}

pub fn execute_concurrent<TaskFun>(concurrent_tasks: usize, iterations: usize, task_fun: TaskFun)
where
TaskFun: Fn(usize, usize) -> ZResult<()> + Clone + Send + Sync + 'static,
{
let mut tasks = vec![];
for task_index in 0..concurrent_tasks {
let c_task_fun = task_fun.clone();
let task_handle = std::thread::spawn(move || {
for iteration in 0..iterations {
if let Err(e) = c_task_fun(task_index, iteration) {
panic!("task {task_index}: iteration {iteration}: {e}")
}
}
});
tasks.push(task_handle);
}
for task in tasks {
task.join().expect("Error joining thread!");
}
}
35 changes: 35 additions & 0 deletions commons/zenoh-shm/src/watchdog/allocated_watchdog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use super::{descriptor::OwnedDescriptor, storage::GLOBAL_STORAGE, validator::GLOBAL_VALIDATOR};

#[derive(Debug)]
pub struct AllocatedWatchdog {
pub descriptor: OwnedDescriptor,
}

impl AllocatedWatchdog {
pub(crate) fn new(descriptor: OwnedDescriptor) -> Self {
// reset descriptor on allocation
descriptor.validate();
Self { descriptor }
}
}

impl Drop for AllocatedWatchdog {
fn drop(&mut self) {
GLOBAL_VALIDATOR.remove(self.descriptor.clone());
GLOBAL_STORAGE.free_watchdog(self.descriptor.clone());
}
}
Loading

0 comments on commit 0c1e385

Please sign in to comment.