From 3d6a6e8c1d0334be8c1d3ef16a87095f6bebdb83 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 23 Apr 2024 14:06:00 +0300 Subject: [PATCH 01/17] [skip ci] SHM Payload API example and test --- examples/Cargo.toml | 7 +- examples/examples/z_payload_shm.rs | 101 +++++++++++++++++++++++++++++ zenoh/tests/payload.rs | 51 +++++---------- 3 files changed, 124 insertions(+), 35 deletions(-) create mode 100644 examples/examples/z_payload_shm.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e117507ae9..b240d06723 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -156,4 +156,9 @@ path = "examples/z_pong.rs" [[example]] name = "z_alloc_shm" path = "examples/z_alloc_shm.rs" -required-features = ["unstable", "shared-memory"] \ No newline at end of file +required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_payload_shm" +path = "examples/z_payload_shm.rs" +required-features = ["unstable", "shared-memory"] diff --git a/examples/examples/z_payload_shm.rs b/examples/examples/z_payload_shm.rs new file mode 100644 index 0000000000..3b03b80502 --- /dev/null +++ b/examples/examples/z_payload_shm.rs @@ -0,0 +1,101 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::shm::slice::zsliceshm::{zsliceshm, ZSliceShm}; +use zenoh::shm::slice::zsliceshmmut::{zsliceshmmut, ZSliceShmMut}; +use zenoh::{ + bytes::ZBytes, + shm::{ + protocol_implementations::posix::{ + posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, + protocol_id::POSIX_PROTOCOL_ID, + }, + provider::shared_memory_provider::SharedMemoryProviderBuilder, + }, +}; + +fn main() { + // create an SHM backend... + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(4096) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + // Prepare a layout for allocations + let layout = provider.alloc_layout().size(1024).res().unwrap(); + + // allocate an SHM buffer (ZSliceShmMut) + let mut owned_shm_buf_mut = layout.alloc().res().unwrap(); + + // mutable and immutable API + let _data: &[u8] = &owned_shm_buf_mut; + let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; + + // convert into immutable owned buffer (ZSliceShmMut -> ZSlceShm) + let owned_shm_buf: ZSliceShm = owned_shm_buf_mut.into(); + + // immutable API + let _data: &[u8] = &owned_shm_buf; + + // convert again into mutable owned buffer (ZSliceShm -> ZSlceShmMut) + let mut owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); + + // mutable and immutable API + let _data: &[u8] = &owned_shm_buf_mut; + let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; + + // build a ZBytes from an SHM buffer (ZSliceShmMut -> ZBytes) + let mut payload: ZBytes = owned_shm_buf_mut.into(); + + // branch to illustrate immutable access to SHM data + { + // deserialize ZBytes as an immutably borrowed zsliceshm (ZBytes -> &zsliceshm) + let borrowed_shm_buf: &zsliceshm = payload.deserialize().unwrap(); + + // immutable API + let _data: &[u8] = borrowed_shm_buf; + + // construct owned buffer from borrowed type (&zsliceshm -> ZSliceShm) + let owned = borrowed_shm_buf.to_owned(); + + // immutable API + let _data: &[u8] = &owned; + + // try to construct mutable ZSliceShmMut (ZSliceShm -> ZSliceShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZSliceShm has two existing references ('owned' and inside 'payload') + assert!(owned_mut.is_err()) + } + + // branch to illustrate mutable access to SHM data + { + // deserialize ZBytes as mutably borrowed zsliceshm (ZBytes -> &mut zsliceshm) + let borrowed_shm_buf: &mut zsliceshm = payload.deserialize_mut().unwrap(); + + // immutable API + let _data: &[u8] = borrowed_shm_buf; + + // convert zsliceshm to zsliceshmmut (&mut zsliceshm -> &mut zsliceshmmut) + let borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); + + // mutable and immutable API + let _data: &[u8] = borrowed_shm_buf_mut; + let _data_mut: &mut [u8] = borrowed_shm_buf_mut; + } +} diff --git a/zenoh/tests/payload.rs b/zenoh/tests/payload.rs index d9910bedf5..1bcbf33ef4 100644 --- a/zenoh/tests/payload.rs +++ b/zenoh/tests/payload.rs @@ -43,55 +43,38 @@ fn shm_payload_single_buf() { // Prepare a layout for allocations let layout = provider.alloc_layout().size(1024).res().unwrap(); - // allocate an SHM buffer - let mut owned_shm_buf_mut = layout.alloc().res().unwrap(); + // allocate an SHM buffer (ZSliceShmMut) + let owned_shm_buf_mut = layout.alloc().res().unwrap(); - // get data - let _data: &[u8] = &owned_shm_buf_mut; - let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; - - // convert into immutable owned buffer + // convert into immutable owned buffer (ZSliceShmMut -> ZSlceShm) let owned_shm_buf: ZSliceShm = owned_shm_buf_mut.into(); - // get data - let _data: &[u8] = &owned_shm_buf; - - // convert again into mutable owned buffer - let mut owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); - - // get data - let _data: &[u8] = &owned_shm_buf_mut; - let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; + // convert again into mutable owned buffer (ZSliceShm -> ZSlceShmMut) + let owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); - // build a ZBytes from an SHM buffer + // build a ZBytes from an SHM buffer (ZSliceShmMut -> ZBytes) let mut payload: ZBytes = owned_shm_buf_mut.into(); + // branch to illustrate immutable access to SHM data { - // deserialize ZBytes as borrowed zsliceshm + // deserialize ZBytes as an immutably borrowed zsliceshm (ZBytes -> &zsliceshm) let borrowed_shm_buf: &zsliceshm = payload.deserialize().unwrap(); - // get data - let _data: &[u8] = borrowed_shm_buf; - - // construct owned buffer from borrowed type + // construct owned buffer from borrowed type (&zsliceshm -> ZSliceShm) let owned = borrowed_shm_buf.to_owned(); - // get data - let _data: &[u8] = &owned; + // try to construct mutable ZSliceShmMut (ZSliceShm -> ZSliceShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZSliceShm has two existing references ('owned' and inside 'payload') + assert!(owned_mut.is_err()) } + // branch to illustrate mutable access to SHM data { - // deserialize ZBytes as mutably borrowed zsliceshm + // deserialize ZBytes as mutably borrowed zsliceshm (ZBytes -> &mut zsliceshm) let borrowed_shm_buf: &mut zsliceshm = payload.deserialize_mut().unwrap(); - // get data - let _data: &[u8] = borrowed_shm_buf; - - // convert zsliceshm to zsliceshmmut - let borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); - - // get data - let _data: &[u8] = borrowed_shm_buf_mut; - let _data_mut: &mut [u8] = borrowed_shm_buf_mut; + // convert zsliceshm to zsliceshmmut (&mut zsliceshm -> &mut zsliceshmmut) + let _borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); } } From 9284388c466b3ad5822e826bd612ccb847bd5eba Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 12:00:32 +0200 Subject: [PATCH 02/17] Add payload_mut to sample for zsliceshmmut deserialization --- examples/examples/z_sub_shm.rs | 20 ++++++++++++++++++++ zenoh/src/sample/mod.rs | 6 ++++++ 2 files changed, 26 insertions(+) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index aa3967becd..35fb80d833 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -49,6 +49,26 @@ async fn main() { } } } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/zenoh/src/sample/mod.rs b/zenoh/src/sample/mod.rs index b5dbd727ec..0c1180fb8f 100644 --- a/zenoh/src/sample/mod.rs +++ b/zenoh/src/sample/mod.rs @@ -308,6 +308,12 @@ impl Sample { &self.payload } + /// Gets the payload of this Sample. + #[inline] + pub fn payload_mut(&mut self) -> &mut ZBytes { + &mut self.payload + } + /// Gets the kind of this Sample. #[inline] pub fn kind(&self) -> SampleKind { From 0983d58ef38128566f9b18b26046b09c73d1ab3c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 16:26:11 +0200 Subject: [PATCH 03/17] Improve SHM examples --- examples/examples/z_pub_shm.rs | 5 ++-- examples/examples/z_sub_shm.rs | 50 +++++++++++++++++----------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 8287509f1b..0dce88b8e7 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -25,7 +25,6 @@ use zenoh::shm::provider::types::MemoryLayout; use zenoh_examples::CommonArgs; const N: usize = 10; -const K: u32 = 3; #[tokio::main] async fn main() -> Result<(), zenoh::Error> { @@ -81,7 +80,9 @@ async fn main() -> Result<(), zenoh::Error> { .unwrap(); println!("Press CTRL-C to quit..."); - for idx in 0..(K * N as u32) { + for idx in 0..u32::MAX { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let mut sbuf = layout .alloc() .with_policy::>() diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 35fb80d833..319d8ecf90 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -36,39 +36,39 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); println!("Press CTRL-C to quit..."); - while let Ok(sample) = subscriber.recv_async().await { - match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } - } - } - - // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber - // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. - // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. - // - // use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; - - // while let Ok(mut sample) = subscriber.recv_async().await { - // let kind = sample.kind(); - // let key_expr = sample.key_expr().to_string(); - // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // while let Ok(sample) = subscriber.recv_async().await { + // match sample.payload().deserialize::<&zsliceshm>() { // Ok(payload) => println!( // ">> [Subscriber] Received {} ('{}': '{:02x?}')", - // kind, key_expr, payload + // sample.kind(), + // sample.key_expr().as_str(), + // payload // ), // Err(e) => { // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); // } // } // } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; + + while let Ok(mut sample) = subscriber.recv_async().await { + let kind = sample.kind(); + let key_expr = sample.key_expr().to_string(); + match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + Ok(payload) => println!( + ">> [Subscriber] Received {} ('{}': '{:02x?}')", + kind, key_expr, payload + ), + Err(e) => { + println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + } + } + } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] From f08ea12c2760d42c122a228f565c9463b7df0ccb Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 16:36:32 +0200 Subject: [PATCH 04/17] Fix merge --- examples/examples/z_sub_shm.rs | 51 +++++++++++++++++----------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index fdb3204ac9..45180f598b 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -14,6 +14,7 @@ use clap::Parser; use zenoh::config::Config; use zenoh::prelude::r#async::*; +use zenoh::shm::zsliceshm; use zenoh_examples::CommonArgs; #[tokio::main] @@ -35,39 +36,39 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); println!("Press CTRL-C to quit..."); - // while let Ok(sample) = subscriber.recv_async().await { - // match sample.payload().deserialize::<&zsliceshm>() { - // Ok(payload) => println!( - // ">> [Subscriber] Received {} ('{}': '{:02x?}')", - // sample.kind(), - // sample.key_expr().as_str(), - // payload - // ), - // Err(e) => { - // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - // } - // } - // } - - // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber - // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. - // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. - // - use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; - - while let Ok(mut sample) = subscriber.recv_async().await { - let kind = sample.kind(); - let key_expr = sample.key_expr().to_string(); - match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + while let Ok(sample) = subscriber.recv_async().await { + match sample.payload().deserialize::<&zsliceshm>() { Ok(payload) => println!( ">> [Subscriber] Received {} ('{}': '{:02x?}')", - kind, key_expr, payload + sample.kind(), + sample.key_expr().as_str(), + payload ), Err(e) => { println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); } } } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::zsliceshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] From 12376c0d968b60bc49da5643ca3ac6c470edb1ad Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 18:03:09 +0200 Subject: [PATCH 05/17] Query/Reply shared memory examples --- examples/Cargo.toml | 10 ++ examples/examples/z_get.rs | 2 +- examples/examples/z_get_shm.rs | 158 +++++++++++++++++++++++++++ examples/examples/z_queryable.rs | 7 +- examples/examples/z_queryable_shm.rs | 134 +++++++++++++++++++++++ examples/examples/z_sub_shm.rs | 17 ++- zenoh/src/api/query.rs | 5 + zenoh/src/api/queryable.rs | 37 +++++-- zenoh/src/api/session.rs | 12 +- zenoh/src/net/runtime/adminspace.rs | 6 +- 10 files changed, 357 insertions(+), 31 deletions(-) create mode 100644 examples/examples/z_get_shm.rs create mode 100644 examples/examples/z_queryable_shm.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e117507ae9..ce268572a6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -100,6 +100,11 @@ path = "examples/z_pull.rs" name = "z_queryable" path = "examples/z_queryable.rs" +[[example]] +name = "z_queryable_shm" +path = "examples/z_queryable_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_storage" path = "examples/z_storage.rs" @@ -108,6 +113,11 @@ path = "examples/z_storage.rs" name = "z_get" path = "examples/z_get.rs" +[[example]] +name = "z_get_shm" +path = "examples/z_get_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_forward" path = "examples/z_forward.rs" diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index 56693d9fa1..76add34286 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -32,7 +32,7 @@ async fn main() { // // By default get receives replies from a FIFO. // // Uncomment this line to use a ring channel instead. // // More information on the ring channel are available in the z_pull example. - .with(zenoh::handlers::RingChannel::default()) + // .with(zenoh::handlers::RingChannel::default()) .value(value) .target(target) .timeout(timeout) diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs new file mode 100644 index 0000000000..c5f766f0f2 --- /dev/null +++ b/examples/examples/z_get_shm.rs @@ -0,0 +1,158 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use std::time::Duration; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, selector, mut value, target, timeout) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + let content = value + .take() + .unwrap_or_else(|| "Get from SharedMemory Rust!".to_string()); + sbuf[0..content.len()].copy_from_slice(content.as_bytes()); + + println!("Sending Query '{selector}'..."); + let replies = session + .get(&selector) + .value(sbuf) + .target(target) + .timeout(timeout) + .res() + .await + .unwrap(); + + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + print!(">> Received ('{}': ", sample.key_expr().as_str()); + match sample.payload().deserialize::<&zsliceshm>() { + Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), + Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), + } + } + Err(err) => { + let payload = err + .payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + println!(">> Received (ERROR: '{}')", payload); + } + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +#[value(rename_all = "SCREAMING_SNAKE_CASE")] +enum Qt { + BestMatching, + All, + AllComplete, +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The selection of resources to query + selector: Selector<'static>, + /// The value to publish. + value: Option, + #[arg(short, long, default_value = "BEST_MATCHING")] + /// The target queryables of the query. + target: Qt, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + Selector<'static>, + Option, + QueryTarget, + Duration, +) { + let args = Args::parse(); + ( + args.common.into(), + args.selector, + args.value, + match args.target { + Qt::BestMatching => QueryTarget::BestMatching, + Qt::All => QueryTarget::All, + Qt::AllComplete => QueryTarget::AllComplete, + }, + Duration::from_millis(args.timeout), + ) +} diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 47f70c30c3..8407f9f66f 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -20,7 +20,12 @@ async fn main() { // initiate logging zenoh_util::try_init_log_from_env(); - let (config, key_expr, value, complete) = parse_args(); + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs new file mode 100644 index 0000000000..f689e15b51 --- /dev/null +++ b/examples/examples/z_queryable_shm.rs @@ -0,0 +1,134 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Declaring Queryable on '{key_expr}'..."); + let queryable = session + .declare_queryable(&key_expr) + .complete(complete) + .res() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(query) = queryable.recv_async().await { + print!( + ">> [Queryable] Received Query '{}' ('{}'", + query.selector(), + query.key_expr().as_str(), + ); + if let Some(payload) = query.payload() { + match payload.deserialize::<&zsliceshm>() { + Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), + Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), + } + } + println!(")"); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + sbuf[0..value.len()].copy_from_slice(value.as_bytes()); + + println!( + ">> [Queryable] Responding ('{}': '{}')", + key_expr.as_str(), + value, + ); + query + .reply(key_expr.clone(), sbuf) + .res() + .await + .unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}")); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] + /// The key expression matching queries to reply to. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Queryable from SharedMemory Rust!")] + /// The value to reply to queries. + value: String, + #[arg(long)] + /// Declare the queryable as complete w.r.t. the key expression. + complete: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, bool) { + let args = Args::parse(); + (args.common.into(), args.key, args.value, args.complete) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 45180f598b..2e0f5bf910 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -37,17 +37,16 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { + print!( + ">> [Subscriber] Received {} ('{}': ", + sample.kind(), + sample.key_expr().as_str(), + ); match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } + Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), + Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), } + println!(")"); } // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 1cb4078ee6..d95a1bd417 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -88,6 +88,11 @@ impl Reply { self.result.as_ref() } + /// Gets the a mutable borrowed result of this `Reply`. Use [`Reply::into_result`] to take ownership of the result. + pub fn result_mut(&mut self) -> Result<&mut Sample, &mut Value> { + self.result.as_mut() + } + /// Converts this `Reply` into the its result. Use [`Reply::result`] it you don't want to take ownership. pub fn into_result(self) -> Result { self.result diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index dc13468181..53fea80b10 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -50,18 +50,11 @@ use { }; pub(crate) struct QueryInner { - /// The key expression of this Query. pub(crate) key_expr: KeyExpr<'static>, - /// This Query's selector parameters. pub(crate) parameters: Parameters<'static>, - /// This Query's body. - pub(crate) value: Option, - pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -79,6 +72,9 @@ impl Drop for QueryInner { pub struct Query { pub(crate) inner: Arc, pub(crate) eid: EntityId, + pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Query { @@ -106,24 +102,43 @@ impl Query { /// This Query's value. #[inline(always)] pub fn value(&self) -> Option<&Value> { - self.inner.value.as_ref() + self.value.as_ref() + } + + /// This Query's value. + #[inline(always)] + pub fn value_mut(&mut self) -> Option<&mut Value> { + self.value.as_mut() } /// This Query's payload. #[inline(always)] pub fn payload(&self) -> Option<&ZBytes> { - self.inner.value.as_ref().map(|v| &v.payload) + self.value.as_ref().map(|v| &v.payload) + } + + /// This Query's payload. + #[inline(always)] + pub fn payload_mut(&mut self) -> Option<&mut ZBytes> { + self.value.as_mut().map(|v| &mut v.payload) } /// This Query's encoding. #[inline(always)] pub fn encoding(&self) -> Option<&Encoding> { - self.inner.value.as_ref().map(|v| &v.encoding) + self.value.as_ref().map(|v| &v.encoding) } + /// This Query's attachment. #[zenoh_macros::unstable] pub fn attachment(&self) -> Option<&ZBytes> { - self.inner.attachment.as_ref() + self.attachment.as_ref() + } + + /// This Query's attachment. + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() } /// Sends a reply in the form of [`Sample`] to this Query. diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 01fc345c3b..eb70129e55 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1806,10 +1806,6 @@ impl Session { let query_inner = Arc::new(QueryInner { key_expr, parameters: parameters.to_owned().into(), - value: body.map(|b| Value { - payload: b.payload.into(), - encoding: b.encoding.into(), - }), qid, zid, primitives: if local { @@ -1817,13 +1813,17 @@ impl Session { } else { primitives }, - #[cfg(feature = "unstable")] - attachment, }); for (eid, callback) in queryables { callback(Query { inner: query_inner.clone(), eid, + value: body.as_ref().map(|b| Value { + payload: b.payload.clone().into(), + encoding: b.encoding.clone().into(), + }), + #[cfg(feature = "unstable")] + attachment: attachment.clone(), }); } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ea084c453b..c13e64f71f 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -462,14 +462,14 @@ impl Primitives for AdminSpace { inner: Arc::new(QueryInner { key_expr: key_expr.clone(), parameters: query.parameters.into(), - value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), qid: msg.id, zid, primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), }), eid: self.queryable_id, + value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }; for (key, handler) in &self.handlers { From 2dbc20f1f1010ee0e6cdf4fe457ea0ac7c07f9f0 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sat, 27 Apr 2024 11:57:33 +0300 Subject: [PATCH 06/17] rename payload tests to bytes tests --- zenoh/tests/{payload.rs => bytes.rs} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename zenoh/tests/{payload.rs => bytes.rs} (98%) diff --git a/zenoh/tests/payload.rs b/zenoh/tests/bytes.rs similarity index 98% rename from zenoh/tests/payload.rs rename to zenoh/tests/bytes.rs index 44daadf18c..41e6d14c6e 100644 --- a/zenoh/tests/payload.rs +++ b/zenoh/tests/bytes.rs @@ -14,7 +14,7 @@ #[test] #[cfg(all(feature = "shared-memory", feature = "unstable"))] -fn shm_payload_single_buf() { +fn shm_bytes_single_buf() { use zenoh::prelude::r#async::*; // create an SHM backend... From 6f8f6b745b062cb0b66123e36b3125d2a5a2c780 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sat, 27 Apr 2024 12:23:22 +0300 Subject: [PATCH 07/17] - fix API exports - fix z_payload_shm example --- examples/examples/z_payload_shm.rs | 9 ++------- zenoh/src/lib.rs | 1 + 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/examples/examples/z_payload_shm.rs b/examples/examples/z_payload_shm.rs index 3b03b80502..4bf45381de 100644 --- a/examples/examples/z_payload_shm.rs +++ b/examples/examples/z_payload_shm.rs @@ -11,16 +11,11 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh::shm::slice::zsliceshm::{zsliceshm, ZSliceShm}; -use zenoh::shm::slice::zsliceshmmut::{zsliceshmmut, ZSliceShmMut}; use zenoh::{ bytes::ZBytes, shm::{ - protocol_implementations::posix::{ - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, - protocol_id::POSIX_PROTOCOL_ID, - }, - provider::shared_memory_provider::SharedMemoryProviderBuilder, + zsliceshm, zsliceshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, + ZSliceShm, ZSliceShmMut, POSIX_PROTOCOL_ID, }, }; diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 3c011e2439..2a238ea875 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -371,6 +371,7 @@ pub mod shm { pub use zenoh_shm::api::slice::zsliceshmmut::{zsliceshmmut, ZSliceShmMut}; pub use zenoh_shm::api::{ protocol_implementations::posix::{ + posix_shared_memory_client::PosixSharedMemoryClient, posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, protocol_id::POSIX_PROTOCOL_ID, }, From c3f993da4baf385adb04d17f911790ca0becec41 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 29 Apr 2024 18:44:09 +0200 Subject: [PATCH 08/17] Add attachment_mut to Sample --- zenoh/src/api/sample.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 365c2a7728..11bfc92c0b 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -355,6 +355,13 @@ impl Sample { pub fn attachment(&self) -> Option<&ZBytes> { self.attachment.as_ref() } + + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. + #[zenoh_macros::unstable] + #[inline] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() + } } impl From for Value { From d4218fccddf6e5b52539082dd823a19817b3ef44 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 30 Apr 2024 13:04:05 +0300 Subject: [PATCH 09/17] [skip ci] fix SHM exports in new api export mechanism --- zenoh/src/lib.rs | 39 ++++++++++++++++++++++++++------------- zenoh/src/prelude.rs | 4 ++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 2a238ea875..b8be3d905c 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -362,19 +362,32 @@ pub mod internal { #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub mod shm { - pub use zenoh_shm::api::client_storage::SharedMemoryClientStorage; - pub use zenoh_shm::api::provider::shared_memory_provider::{BlockOn, GarbageCollect}; - pub use zenoh_shm::api::provider::shared_memory_provider::{Deallocate, Defragment}; - pub use zenoh_shm::api::provider::types::AllocAlignment; - pub use zenoh_shm::api::provider::types::MemoryLayout; - pub use zenoh_shm::api::slice::zsliceshm::{zsliceshm, ZSliceShm}; - pub use zenoh_shm::api::slice::zsliceshmmut::{zsliceshmmut, ZSliceShmMut}; - pub use zenoh_shm::api::{ - protocol_implementations::posix::{ - posix_shared_memory_client::PosixSharedMemoryClient, - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, - protocol_id::POSIX_PROTOCOL_ID, + pub use zenoh_shm::api::client::{ + shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, + }; + pub use zenoh_shm::api::client_storage::{SharedMemoryClientStorage, GLOBAL_CLIENT_STORAGE}; + pub use zenoh_shm::api::common::types::{ChunkID, ProtocolID, SegmentID}; + pub use zenoh_shm::api::protocol_implementations::posix::{ + posix_shared_memory_client::PosixSharedMemoryClient, + posix_shared_memory_provider_backend::{ + LayoutedPosixSharedMemoryProviderBackendBuilder, PosixSharedMemoryProviderBackend, + PosixSharedMemoryProviderBackendBuilder, }, - provider::shared_memory_provider::SharedMemoryProviderBuilder, + protocol_id::POSIX_PROTOCOL_ID, + }; + pub use zenoh_shm::api::provider::shared_memory_provider::{ + AllocBuilder, AllocLayout, AllocLayoutAlignedBuilder, AllocLayoutBuilder, + AllocLayoutSizedBuilder, AllocPolicy, AsyncAllocPolicy, BlockOn, DeallocEldest, + DeallocOptimal, DeallocYoungest, Deallocate, Defragment, DynamicProtocolID, + ForceDeallocPolicy, GarbageCollect, JustAlloc, ProtocolIDSource, SharedMemoryProvider, + SharedMemoryProviderBuilder, SharedMemoryProviderBuilderBackendID, + SharedMemoryProviderBuilderID, StaticProtocolID, + }; + pub use zenoh_shm::api::provider::types::{ + AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError, + }; + pub use zenoh_shm::api::slice::{ + zsliceshm::{zsliceshm, ZSliceShm}, + zsliceshmmut::{zsliceshmmut, ZSliceShmMut}, }; } diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 17286ddeea..ac60f16c89 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -51,7 +51,7 @@ pub(crate) mod flat { pub use crate::scouting::*; pub use crate::selector::*; pub use crate::session::*; - #[cfg(feature = "shared-memory")] + #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub use crate::shm::*; pub use crate::subscriber::*; pub use crate::time::*; @@ -74,7 +74,7 @@ pub(crate) mod mods { pub use crate::scouting; pub use crate::selector; pub use crate::session; - #[cfg(feature = "shared-memory")] + #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub use crate::shm; pub use crate::subscriber; pub use crate::time; From bca2fd74ca5bdc132e194ef62d56429580d852bc Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 30 Apr 2024 18:10:38 +0300 Subject: [PATCH 10/17] Massive renaming for ZSliceShm and ZSliceShmMut --- .../src/api/{slice => buffer}/mod.rs | 4 +- .../src/api/{slice => buffer}/traits.rs | 0 .../{slice/zsliceshm.rs => buffer/zshm.rs} | 76 ++++++++-------- .../zsliceshmmut.rs => buffer/zshmmut.rs} | 86 +++++++++---------- commons/zenoh-shm/src/api/mod.rs | 2 +- .../api/provider/shared_memory_provider.rs | 12 +-- commons/zenoh-shm/src/api/provider/types.rs | 4 +- examples/examples/z_get_shm.rs | 2 +- examples/examples/z_payload_shm.rs | 36 ++++---- examples/examples/z_ping_shm.rs | 2 +- examples/examples/z_queryable_shm.rs | 2 +- examples/examples/z_sub_shm.rs | 6 +- zenoh/src/api/bytes.rs | 48 +++++------ zenoh/src/api/encoding.rs | 6 +- zenoh/src/lib.rs | 8 +- zenoh/tests/bytes.rs | 32 +++---- 16 files changed, 163 insertions(+), 163 deletions(-) rename commons/zenoh-shm/src/api/{slice => buffer}/mod.rs (92%) rename commons/zenoh-shm/src/api/{slice => buffer}/traits.rs (100%) rename commons/zenoh-shm/src/api/{slice/zsliceshm.rs => buffer/zshm.rs} (59%) rename commons/zenoh-shm/src/api/{slice/zsliceshmmut.rs => buffer/zshmmut.rs} (59%) diff --git a/commons/zenoh-shm/src/api/slice/mod.rs b/commons/zenoh-shm/src/api/buffer/mod.rs similarity index 92% rename from commons/zenoh-shm/src/api/slice/mod.rs rename to commons/zenoh-shm/src/api/buffer/mod.rs index 59c793f94a..8a3e040da9 100644 --- a/commons/zenoh-shm/src/api/slice/mod.rs +++ b/commons/zenoh-shm/src/api/buffer/mod.rs @@ -13,5 +13,5 @@ // pub mod traits; -pub mod zsliceshm; -pub mod zsliceshmmut; +pub mod zshm; +pub mod zshmmut; diff --git a/commons/zenoh-shm/src/api/slice/traits.rs b/commons/zenoh-shm/src/api/buffer/traits.rs similarity index 100% rename from commons/zenoh-shm/src/api/slice/traits.rs rename to commons/zenoh-shm/src/api/buffer/traits.rs diff --git a/commons/zenoh-shm/src/api/slice/zsliceshm.rs b/commons/zenoh-shm/src/api/buffer/zshm.rs similarity index 59% rename from commons/zenoh-shm/src/api/slice/zsliceshm.rs rename to commons/zenoh-shm/src/api/buffer/zshm.rs index 86f4395ebb..e7cf2a3197 100644 --- a/commons/zenoh-shm/src/api/slice/zsliceshm.rs +++ b/commons/zenoh-shm/src/api/buffer/zshm.rs @@ -22,43 +22,43 @@ use zenoh_buffers::{ZBuf, ZSlice}; use crate::SharedMemoryBuf; -use super::{traits::SHMBuf, zsliceshmmut::zsliceshmmut}; +use super::{traits::SHMBuf, zshmmut::zshmmut}; -/// An immutable SHM slice +/// An immutable SHM buffer #[zenoh_macros::unstable_doc] #[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ZSliceShm(pub(crate) SharedMemoryBuf); +pub struct ZShm(pub(crate) SharedMemoryBuf); -impl SHMBuf for ZSliceShm { +impl SHMBuf for ZShm { fn is_valid(&self) -> bool { self.0.is_valid() } } -impl PartialEq<&zsliceshm> for ZSliceShm { - fn eq(&self, other: &&zsliceshm) -> bool { +impl PartialEq<&zshm> for ZShm { + fn eq(&self, other: &&zshm) -> bool { self.0 == other.0 .0 } } -impl Borrow for ZSliceShm { - fn borrow(&self) -> &zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShm { + fn borrow(&self) -> &zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShm { - fn borrow_mut(&mut self) -> &mut zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShm { + fn borrow_mut(&mut self) -> &mut zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Deref for ZSliceShm { +impl Deref for ZShm { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -66,37 +66,37 @@ impl Deref for ZSliceShm { } } -impl AsRef<[u8]> for ZSliceShm { +impl AsRef<[u8]> for ZShm { fn as_ref(&self) -> &[u8] { self } } -impl From for ZSliceShm { +impl From for ZShm { fn from(value: SharedMemoryBuf) -> Self { Self(value) } } -impl From for ZSlice { - fn from(value: ZSliceShm) -> Self { +impl From for ZSlice { + fn from(value: ZShm) -> Self { value.0.into() } } -impl From for ZBuf { - fn from(value: ZSliceShm) -> Self { +impl From for ZBuf { + fn from(value: ZShm) -> Self { value.0.into() } } -impl TryFrom<&mut ZSliceShm> for &mut zsliceshmmut { +impl TryFrom<&mut ZShm> for &mut zshmmut { type Error = (); - fn try_from(value: &mut ZSliceShm) -> Result { + fn try_from(value: &mut ZShm) -> Result { match value.0.is_unique() && value.0.is_valid() { true => { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } @@ -105,64 +105,64 @@ impl TryFrom<&mut ZSliceShm> for &mut zsliceshmmut { } } -/// A borrowed immutable SHM slice +/// A borrowed immutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[allow(non_camel_case_types)] #[repr(transparent)] -pub struct zsliceshm(ZSliceShm); +pub struct zshm(ZShm); -impl ToOwned for zsliceshm { - type Owned = ZSliceShm; +impl ToOwned for zshm { + type Owned = ZShm; fn to_owned(&self) -> Self::Owned { self.0.clone() } } -impl PartialEq for &zsliceshm { - fn eq(&self, other: &ZSliceShm) -> bool { +impl PartialEq for &zshm { + fn eq(&self, other: &ZShm) -> bool { self.0 .0 == other.0 } } -impl Deref for zsliceshm { - type Target = ZSliceShm; +impl Deref for zshm { + type Target = ZShm; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for zsliceshm { +impl DerefMut for zshm { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl From<&SharedMemoryBuf> for &zsliceshm { +impl From<&SharedMemoryBuf> for &zshm { fn from(value: &SharedMemoryBuf) -> Self { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } -impl From<&mut SharedMemoryBuf> for &mut zsliceshm { +impl From<&mut SharedMemoryBuf> for &mut zshm { fn from(value: &mut SharedMemoryBuf) -> Self { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } -impl TryFrom<&mut zsliceshm> for &mut zsliceshmmut { +impl TryFrom<&mut zshm> for &mut zshmmut { type Error = (); - fn try_from(value: &mut zsliceshm) -> Result { + fn try_from(value: &mut zshm) -> Result { match value.0 .0.is_unique() && value.0 .0.is_valid() { true => { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } diff --git a/commons/zenoh-shm/src/api/slice/zsliceshmmut.rs b/commons/zenoh-shm/src/api/buffer/zshmmut.rs similarity index 59% rename from commons/zenoh-shm/src/api/slice/zsliceshmmut.rs rename to commons/zenoh-shm/src/api/buffer/zshmmut.rs index 62823785da..e40c9c77f1 100644 --- a/commons/zenoh-shm/src/api/slice/zsliceshmmut.rs +++ b/commons/zenoh-shm/src/api/buffer/zshmmut.rs @@ -21,36 +21,36 @@ use crate::SharedMemoryBuf; use super::{ traits::{SHMBuf, SHMBufMut}, - zsliceshm::{zsliceshm, ZSliceShm}, + zshm::{zshm, ZShm}, }; -/// A mutable SHM slice +/// A mutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[repr(transparent)] -pub struct ZSliceShmMut(SharedMemoryBuf); +pub struct ZShmMut(SharedMemoryBuf); -impl SHMBuf for ZSliceShmMut { +impl SHMBuf for ZShmMut { fn is_valid(&self) -> bool { self.0.is_valid() } } -impl SHMBufMut for ZSliceShmMut {} +impl SHMBufMut for ZShmMut {} -impl ZSliceShmMut { +impl ZShmMut { pub(crate) unsafe fn new_unchecked(data: SharedMemoryBuf) -> Self { Self(data) } } -impl PartialEq for &ZSliceShmMut { - fn eq(&self, other: &zsliceshmmut) -> bool { +impl PartialEq for &ZShmMut { + fn eq(&self, other: &zshmmut) -> bool { self.0 == other.0 .0 } } -impl TryFrom for ZSliceShmMut { +impl TryFrom for ZShmMut { type Error = SharedMemoryBuf; fn try_from(value: SharedMemoryBuf) -> Result { @@ -61,10 +61,10 @@ impl TryFrom for ZSliceShmMut { } } -impl TryFrom for ZSliceShmMut { - type Error = ZSliceShm; +impl TryFrom for ZShmMut { + type Error = ZShm; - fn try_from(value: ZSliceShm) -> Result { + fn try_from(value: ZShm) -> Result { match value.0.is_unique() && value.0.is_valid() { true => Ok(Self(value.0)), false => Err(value), @@ -72,39 +72,39 @@ impl TryFrom for ZSliceShmMut { } } -impl Borrow for ZSliceShmMut { - fn borrow(&self) -> &zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShmMut { + fn borrow(&self) -> &zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShmMut { - fn borrow_mut(&mut self) -> &mut zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShmMut { + fn borrow_mut(&mut self) -> &mut zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Borrow for ZSliceShmMut { - fn borrow(&self) -> &zsliceshmmut { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShmMut { + fn borrow(&self) -> &zshmmut { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShmMut { - fn borrow_mut(&mut self) -> &mut zsliceshmmut { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShmMut { + fn borrow_mut(&mut self) -> &mut zshmmut { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Deref for ZSliceShmMut { +impl Deref for ZShmMut { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -112,75 +112,75 @@ impl Deref for ZSliceShmMut { } } -impl DerefMut for ZSliceShmMut { +impl DerefMut for ZShmMut { fn deref_mut(&mut self) -> &mut Self::Target { self.0.as_mut() } } -impl AsRef<[u8]> for ZSliceShmMut { +impl AsRef<[u8]> for ZShmMut { fn as_ref(&self) -> &[u8] { self } } -impl AsMut<[u8]> for ZSliceShmMut { +impl AsMut<[u8]> for ZShmMut { fn as_mut(&mut self) -> &mut [u8] { self } } -impl From for ZSliceShm { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZShm { + fn from(value: ZShmMut) -> Self { value.0.into() } } -impl From for ZSlice { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZSlice { + fn from(value: ZShmMut) -> Self { value.0.into() } } -impl From for ZBuf { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZBuf { + fn from(value: ZShmMut) -> Self { value.0.into() } } -/// A borrowed mutable SHM slice +/// A borrowed mutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[allow(non_camel_case_types)] #[repr(transparent)] -pub struct zsliceshmmut(ZSliceShmMut); +pub struct zshmmut(ZShmMut); -impl PartialEq for &zsliceshmmut { - fn eq(&self, other: &ZSliceShmMut) -> bool { +impl PartialEq for &zshmmut { + fn eq(&self, other: &ZShmMut) -> bool { self.0 .0 == other.0 } } -impl Deref for zsliceshmmut { - type Target = ZSliceShmMut; +impl Deref for zshmmut { + type Target = ZShmMut; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for zsliceshmmut { +impl DerefMut for zshmmut { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl TryFrom<&mut SharedMemoryBuf> for &mut zsliceshmmut { +impl TryFrom<&mut SharedMemoryBuf> for &mut zshmmut { type Error = (); fn try_from(value: &mut SharedMemoryBuf) -> Result { match value.is_unique() && value.is_valid() { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction true => Ok(unsafe { core::mem::transmute(value) }), false => Err(()), diff --git a/commons/zenoh-shm/src/api/mod.rs b/commons/zenoh-shm/src/api/mod.rs index 08a5678fa8..a87188da29 100644 --- a/commons/zenoh-shm/src/api/mod.rs +++ b/commons/zenoh-shm/src/api/mod.rs @@ -12,9 +12,9 @@ // ZettaScale Zenoh Team, // +pub mod buffer; pub mod client; pub mod client_storage; pub mod common; pub mod protocol_implementations; pub mod provider; -pub mod slice; diff --git a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs b/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs index c3b8128300..82a4789738 100644 --- a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs @@ -23,7 +23,7 @@ use async_trait::async_trait; use zenoh_result::ZResult; use crate::{ - api::{common::types::ProtocolID, slice::zsliceshmmut::ZSliceShmMut}, + api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID}, header::{ allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, @@ -713,11 +713,11 @@ where self.backend.defragment() } - /// Map externally-allocated chunk into ZSliceShmMut. + /// Map externally-allocated chunk into ZShmMut. /// This method is designed to be used with push data sources. /// Remember that chunk's len may be >= len! #[zenoh_macros::unstable_doc] - pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { + pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { // allocate resources for SHM buffer let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; @@ -729,7 +729,7 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } /// Try to collect free chunks. @@ -806,7 +806,7 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } fn alloc_resources() -> ZResult<( @@ -911,6 +911,6 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } } diff --git a/commons/zenoh-shm/src/api/provider/types.rs b/commons/zenoh-shm/src/api/provider/types.rs index 662482f567..b7f1ad2de6 100644 --- a/commons/zenoh-shm/src/api/provider/types.rs +++ b/commons/zenoh-shm/src/api/provider/types.rs @@ -16,7 +16,7 @@ use std::fmt::Display; use zenoh_result::{bail, ZResult}; -use crate::api::slice::zsliceshmmut::ZSliceShmMut; +use crate::api::buffer::zshmmut::ZShmMut; use super::chunk::AllocatedChunk; @@ -170,4 +170,4 @@ pub type ChunkAllocResult = Result; /// SHM buffer allocation result #[zenoh_macros::unstable_doc] -pub type BufAllocResult = Result; +pub type BufAllocResult = Result; diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs index 60015829aa..2773348fd0 100644 --- a/examples/examples/z_get_shm.rs +++ b/examples/examples/z_get_shm.rs @@ -94,7 +94,7 @@ async fn main() { match reply.result() { Ok(sample) => { print!(">> Received ('{}': ", sample.key_expr().as_str()); - match sample.payload().deserialize::<&zsliceshm>() { + match sample.payload().deserialize::<&zshm>() { Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), } diff --git a/examples/examples/z_payload_shm.rs b/examples/examples/z_payload_shm.rs index 4bf45381de..d9ab4e1f82 100644 --- a/examples/examples/z_payload_shm.rs +++ b/examples/examples/z_payload_shm.rs @@ -14,8 +14,8 @@ use zenoh::{ bytes::ZBytes, shm::{ - zsliceshm, zsliceshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, - ZSliceShm, ZSliceShmMut, POSIX_PROTOCOL_ID, + zshm, zshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, ZShm, + ZShmMut, POSIX_PROTOCOL_ID, }, }; @@ -35,59 +35,59 @@ fn main() { // Prepare a layout for allocations let layout = provider.alloc_layout().size(1024).res().unwrap(); - // allocate an SHM buffer (ZSliceShmMut) + // allocate an SHM buffer (ZShmMut) let mut owned_shm_buf_mut = layout.alloc().res().unwrap(); // mutable and immutable API let _data: &[u8] = &owned_shm_buf_mut; let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; - // convert into immutable owned buffer (ZSliceShmMut -> ZSlceShm) - let owned_shm_buf: ZSliceShm = owned_shm_buf_mut.into(); + // convert into immutable owned buffer (ZShmMut -> ZSlceShm) + let owned_shm_buf: ZShm = owned_shm_buf_mut.into(); // immutable API let _data: &[u8] = &owned_shm_buf; - // convert again into mutable owned buffer (ZSliceShm -> ZSlceShmMut) - let mut owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); + // convert again into mutable owned buffer (ZShm -> ZSlceShmMut) + let mut owned_shm_buf_mut: ZShmMut = owned_shm_buf.try_into().unwrap(); // mutable and immutable API let _data: &[u8] = &owned_shm_buf_mut; let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; - // build a ZBytes from an SHM buffer (ZSliceShmMut -> ZBytes) + // build a ZBytes from an SHM buffer (ZShmMut -> ZBytes) let mut payload: ZBytes = owned_shm_buf_mut.into(); // branch to illustrate immutable access to SHM data { - // deserialize ZBytes as an immutably borrowed zsliceshm (ZBytes -> &zsliceshm) - let borrowed_shm_buf: &zsliceshm = payload.deserialize().unwrap(); + // deserialize ZBytes as an immutably borrowed zshm (ZBytes -> &zshm) + let borrowed_shm_buf: &zshm = payload.deserialize().unwrap(); // immutable API let _data: &[u8] = borrowed_shm_buf; - // construct owned buffer from borrowed type (&zsliceshm -> ZSliceShm) + // construct owned buffer from borrowed type (&zshm -> ZShm) let owned = borrowed_shm_buf.to_owned(); // immutable API let _data: &[u8] = &owned; - // try to construct mutable ZSliceShmMut (ZSliceShm -> ZSliceShmMut) - let owned_mut: Result = owned.try_into(); - // the attempt fails because ZSliceShm has two existing references ('owned' and inside 'payload') + // try to construct mutable ZShmMut (ZShm -> ZShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZShm has two existing references ('owned' and inside 'payload') assert!(owned_mut.is_err()) } // branch to illustrate mutable access to SHM data { - // deserialize ZBytes as mutably borrowed zsliceshm (ZBytes -> &mut zsliceshm) - let borrowed_shm_buf: &mut zsliceshm = payload.deserialize_mut().unwrap(); + // deserialize ZBytes as mutably borrowed zshm (ZBytes -> &mut zshm) + let borrowed_shm_buf: &mut zshm = payload.deserialize_mut().unwrap(); // immutable API let _data: &[u8] = borrowed_shm_buf; - // convert zsliceshm to zsliceshmmut (&mut zsliceshm -> &mut zsliceshmmut) - let borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); + // convert zshm to zshmmut (&mut zshm -> &mut zshmmut) + let borrowed_shm_buf_mut: &mut zshmmut = borrowed_shm_buf.try_into().unwrap(); // mutable and immutable API let _data: &[u8] = borrowed_shm_buf_mut; diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/z_ping_shm.rs index 7a7bd61580..372967f6e8 100644 --- a/examples/examples/z_ping_shm.rs +++ b/examples/examples/z_ping_shm.rs @@ -80,7 +80,7 @@ fn main() { .res() .unwrap(); - // convert ZSliceShmMut into ZSlice as ZSliceShmMut does not support Clone + // convert ZShmMut into ZSlice as ZShmMut does not support Clone let buf: ZSlice = buf.into(); // -- warmup -- diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs index 62fa7571d5..49939dcb0a 100644 --- a/examples/examples/z_queryable_shm.rs +++ b/examples/examples/z_queryable_shm.rs @@ -76,7 +76,7 @@ async fn main() { query.key_expr().as_str(), ); if let Some(payload) = query.payload() { - match payload.deserialize::<&zsliceshm>() { + match payload.deserialize::<&zshm>() { Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), } diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index a43b5c6cd0..a7e96c2b75 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -41,7 +41,7 @@ async fn main() { sample.kind(), sample.key_expr().as_str(), ); - match sample.payload().deserialize::<&zsliceshm>() { + match sample.payload().deserialize::<&zshm>() { Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), } @@ -52,12 +52,12 @@ async fn main() { // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. // - // use zenoh::shm::zsliceshmmut; + // use zenoh::shm::zshmmut; // while let Ok(mut sample) = subscriber.recv_async().await { // let kind = sample.kind(); // let key_expr = sample.key_expr().to_string(); - // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // match sample.payload_mut().deserialize_mut::<&mut zshmmut>() { // Ok(payload) => println!( // ">> [Subscriber] Received {} ('{}': '{:02x?}')", // kind, key_expr, payload diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index c36136ef81..ce88b2bdbe 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -30,9 +30,9 @@ use zenoh_protocol::{core::Properties, zenoh::ext::AttachmentType}; use zenoh_result::{ZError, ZResult}; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh_shm::{ - api::slice::{ - zsliceshm::{zsliceshm, ZSliceShm}, - zsliceshmmut::{zsliceshmmut, ZSliceShmMut}, + api::buffer::{ + zshm::{zshm, ZShm}, + zshmmut::{zshmmut, ZShmMut}, }, SharedMemoryBuf, }; @@ -1524,47 +1524,47 @@ impl TryFrom<&mut ZBytes> for serde_pickle::Value { // Shared memory conversion #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl Serialize for ZSerde { +impl Serialize for ZSerde { type Output = ZBytes; - fn serialize(self, t: ZSliceShm) -> Self::Output { + fn serialize(self, t: ZShm) -> Self::Output { let slice: ZSlice = t.into(); ZBytes::new(slice) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl From for ZBytes { - fn from(t: ZSliceShm) -> Self { +impl From for ZBytes { + fn from(t: ZShm) -> Self { ZSerde.serialize(t) } } // Shared memory conversion #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl Serialize for ZSerde { +impl Serialize for ZSerde { type Output = ZBytes; - fn serialize(self, t: ZSliceShmMut) -> Self::Output { + fn serialize(self, t: ZShmMut) -> Self::Output { let slice: ZSlice = t.into(); ZBytes::new(slice) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl From for ZBytes { - fn from(t: ZSliceShmMut) -> Self { +impl From for ZBytes { + fn from(t: ZShmMut) -> Self { ZSerde.serialize(t) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a zsliceshm> for ZSerde { +impl<'a> Deserialize<'a, &'a zshm> for ZSerde { type Input = &'a ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a zsliceshm, Self::Error> { - // A ZSliceShm is expected to have only one slice + fn deserialize(self, v: Self::Input) -> Result<&'a zshm, Self::Error> { + // A ZShm is expected to have only one slice let mut zslices = v.0.zslices(); if let Some(zs) = zslices.next() { if let Some(shmb) = zs.downcast_ref::() { @@ -1576,7 +1576,7 @@ impl<'a> Deserialize<'a, &'a zsliceshm> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a ZBytes> for &'a zsliceshm { +impl<'a> TryFrom<&'a ZBytes> for &'a zshm { type Error = ZDeserializeError; fn try_from(value: &'a ZBytes) -> Result { @@ -1585,7 +1585,7 @@ impl<'a> TryFrom<&'a ZBytes> for &'a zsliceshm { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshm { +impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zshm { type Error = ZDeserializeError; fn try_from(value: &'a mut ZBytes) -> Result { @@ -1594,11 +1594,11 @@ impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshm { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a mut zsliceshm> for ZSerde { +impl<'a> Deserialize<'a, &'a mut zshm> for ZSerde { type Input = &'a mut ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a mut zsliceshm, Self::Error> { + fn deserialize(self, v: Self::Input) -> Result<&'a mut zshm, Self::Error> { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { @@ -1611,11 +1611,11 @@ impl<'a> Deserialize<'a, &'a mut zsliceshm> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a mut zsliceshmmut> for ZSerde { +impl<'a> Deserialize<'a, &'a mut zshmmut> for ZSerde { type Input = &'a mut ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a mut zsliceshmmut, Self::Error> { + fn deserialize(self, v: Self::Input) -> Result<&'a mut zshmmut, Self::Error> { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { @@ -1628,7 +1628,7 @@ impl<'a> Deserialize<'a, &'a mut zsliceshmmut> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshmmut { +impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zshmmut { type Error = ZDeserializeError; fn try_from(value: &'a mut ZBytes) -> Result { @@ -1838,7 +1838,7 @@ mod tests { protocol_id::POSIX_PROTOCOL_ID, }, provider::shared_memory_provider::SharedMemoryProviderBuilder, - slice::zsliceshm::{zsliceshm, ZSliceShm}, + slice::zshm::{zshm, ZShm}, }; const NUM: usize = 1_000; @@ -1964,9 +1964,9 @@ mod tests { let mutable_shm_buf = layout.alloc().res().unwrap(); // convert to immutable SHM buffer - let immutable_shm_buf: ZSliceShm = mutable_shm_buf.into(); + let immutable_shm_buf: ZShm = mutable_shm_buf.into(); - serialize_deserialize!(&zsliceshm, immutable_shm_buf); + serialize_deserialize!(&zshm, immutable_shm_buf); } // Properties diff --git a/zenoh/src/api/encoding.rs b/zenoh/src/api/encoding.rs index 6c08303612..791bdbd3ea 100644 --- a/zenoh/src/api/encoding.rs +++ b/zenoh/src/api/encoding.rs @@ -17,7 +17,7 @@ use std::{borrow::Cow, convert::Infallible, fmt, str::FromStr}; use zenoh_buffers::{ZBuf, ZSlice}; use zenoh_protocol::core::EncodingId; #[cfg(feature = "shared-memory")] -use zenoh_shm::api::slice::{zsliceshm::ZSliceShm, zsliceshmmut::ZSliceShmMut}; +use zenoh_shm::api::buffer::{zshm::ZShm, zshmmut::ZShmMut}; /// Default encoding values used by Zenoh. /// @@ -835,10 +835,10 @@ impl EncodingMapping for serde_pickle::Value { // - Zenoh SHM #[cfg(feature = "shared-memory")] -impl EncodingMapping for ZSliceShm { +impl EncodingMapping for ZShm { const ENCODING: Encoding = Encoding::ZENOH_BYTES; } #[cfg(feature = "shared-memory")] -impl EncodingMapping for ZSliceShmMut { +impl EncodingMapping for ZShmMut { const ENCODING: Encoding = Encoding::ZENOH_BYTES; } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 92901c54b3..6e6f7bae64 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -417,6 +417,10 @@ pub mod internal { #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub mod shm { + pub use zenoh_shm::api::buffer::{ + zshm::{zshm, ZShm}, + zshmmut::{zshmmut, ZShmMut}, + }; pub use zenoh_shm::api::client::{ shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, }; @@ -441,8 +445,4 @@ pub mod shm { pub use zenoh_shm::api::provider::types::{ AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError, }; - pub use zenoh_shm::api::slice::{ - zsliceshm::{zsliceshm, ZSliceShm}, - zsliceshmmut::{zsliceshmmut, ZSliceShmMut}, - }; } diff --git a/zenoh/tests/bytes.rs b/zenoh/tests/bytes.rs index 41e6d14c6e..f8eb11bf63 100644 --- a/zenoh/tests/bytes.rs +++ b/zenoh/tests/bytes.rs @@ -32,38 +32,38 @@ fn shm_bytes_single_buf() { // Prepare a layout for allocations let layout = provider.alloc_layout().size(1024).res().unwrap(); - // allocate an SHM buffer (ZSliceShmMut) + // allocate an SHM buffer (ZShmMut) let owned_shm_buf_mut = layout.alloc().res().unwrap(); - // convert into immutable owned buffer (ZSliceShmMut -> ZSlceShm) - let owned_shm_buf: ZSliceShm = owned_shm_buf_mut.into(); + // convert into immutable owned buffer (ZShmMut -> ZSlceShm) + let owned_shm_buf: ZShm = owned_shm_buf_mut.into(); - // convert again into mutable owned buffer (ZSliceShm -> ZSlceShmMut) - let owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); + // convert again into mutable owned buffer (ZShm -> ZSlceShmMut) + let owned_shm_buf_mut: ZShmMut = owned_shm_buf.try_into().unwrap(); - // build a ZBytes from an SHM buffer (ZSliceShmMut -> ZBytes) + // build a ZBytes from an SHM buffer (ZShmMut -> ZBytes) let mut payload: ZBytes = owned_shm_buf_mut.into(); // branch to illustrate immutable access to SHM data { - // deserialize ZBytes as an immutably borrowed zsliceshm (ZBytes -> &zsliceshm) - let borrowed_shm_buf: &zsliceshm = payload.deserialize().unwrap(); + // deserialize ZBytes as an immutably borrowed zshm (ZBytes -> &zshm) + let borrowed_shm_buf: &zshm = payload.deserialize().unwrap(); - // construct owned buffer from borrowed type (&zsliceshm -> ZSliceShm) + // construct owned buffer from borrowed type (&zshm -> ZShm) let owned = borrowed_shm_buf.to_owned(); - // try to construct mutable ZSliceShmMut (ZSliceShm -> ZSliceShmMut) - let owned_mut: Result = owned.try_into(); - // the attempt fails because ZSliceShm has two existing references ('owned' and inside 'payload') + // try to construct mutable ZShmMut (ZShm -> ZShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZShm has two existing references ('owned' and inside 'payload') assert!(owned_mut.is_err()) } // branch to illustrate mutable access to SHM data { - // deserialize ZBytes as mutably borrowed zsliceshm (ZBytes -> &mut zsliceshm) - let borrowed_shm_buf: &mut zsliceshm = payload.deserialize_mut().unwrap(); + // deserialize ZBytes as mutably borrowed zshm (ZBytes -> &mut zshm) + let borrowed_shm_buf: &mut zshm = payload.deserialize_mut().unwrap(); - // convert zsliceshm to zsliceshmmut (&mut zsliceshm -> &mut zsliceshmmut) - let _borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); + // convert zshm to zshmmut (&mut zshm -> &mut zshmmut) + let _borrowed_shm_buf_mut: &mut zshmmut = borrowed_shm_buf.try_into().unwrap(); } } From ccb960dc7814206e68692898cadeb49189eac133 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 30 Apr 2024 18:40:12 +0300 Subject: [PATCH 11/17] fix ci --- zenoh/src/api/bytes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index ce88b2bdbe..874f37ba8c 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -1838,7 +1838,7 @@ mod tests { protocol_id::POSIX_PROTOCOL_ID, }, provider::shared_memory_provider::SharedMemoryProviderBuilder, - slice::zshm::{zshm, ZShm}, + buffer::zshm::{zshm, ZShm}, }; const NUM: usize = 1_000; From bd5a0da5fa1ad2536d06a72740907b79978ee8cc Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 30 Apr 2024 18:41:55 +0300 Subject: [PATCH 12/17] [skip ci] z_payload_shm -> z_bytes_shm --- examples/Cargo.toml | 4 ++-- examples/examples/{z_payload_shm.rs => z_bytes_shm.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename examples/examples/{z_payload_shm.rs => z_bytes_shm.rs} (100%) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 602c3833db..263653028a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -169,6 +169,6 @@ path = "examples/z_alloc_shm.rs" required-features = ["unstable", "shared-memory"] [[example]] -name = "z_payload_shm" -path = "examples/z_payload_shm.rs" +name = "z_bytes_shm" +path = "examples/z_bytes_shm.rs" required-features = ["unstable", "shared-memory"] diff --git a/examples/examples/z_payload_shm.rs b/examples/examples/z_bytes_shm.rs similarity index 100% rename from examples/examples/z_payload_shm.rs rename to examples/examples/z_bytes_shm.rs From c4dfd101701527e9e1441f59f867aa49e529cc6e Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 2 May 2024 12:06:34 +0300 Subject: [PATCH 13/17] Polish SHM examples --- examples/Cargo.toml | 5 +++ examples/examples/z_alloc_shm.rs | 32 +++++------------ examples/examples/z_bytes_shm.rs | 17 ++++++--- examples/examples/z_get_shm.rs | 44 ++++++++--------------- examples/examples/z_ping_shm.rs | 37 +++++++------------ examples/examples/z_posix_shm_provider.rs | 44 +++++++++++++++++++++++ examples/examples/z_pub_shm.rs | 43 +++++++--------------- examples/examples/z_pub_shm_thr.rs | 37 +++++++------------ examples/examples/z_queryable_shm.rs | 44 ++++++++--------------- zenoh/src/api/bytes.rs | 2 +- 10 files changed, 140 insertions(+), 165 deletions(-) create mode 100644 examples/examples/z_posix_shm_provider.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 263653028a..90281ae558 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -172,3 +172,8 @@ required-features = ["unstable", "shared-memory"] name = "z_bytes_shm" path = "examples/z_bytes_shm.rs" required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_posix_shm_provider" +path = "examples/z_posix_shm_provider.rs" +required-features = ["unstable", "shared-memory"] diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index acff39379c..a01de8d2fa 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -21,29 +21,15 @@ async fn main() { } async fn run() -> ZResult<()> { - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(65536, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(65536) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); diff --git a/examples/examples/z_bytes_shm.rs b/examples/examples/z_bytes_shm.rs index d9ab4e1f82..5c582e56e6 100644 --- a/examples/examples/z_bytes_shm.rs +++ b/examples/examples/z_bytes_shm.rs @@ -21,6 +21,7 @@ use zenoh::{ fn main() { // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs let backend = PosixSharedMemoryProviderBackend::builder() .with_size(4096) .unwrap() @@ -32,11 +33,17 @@ fn main() { .backend(backend) .res(); - // Prepare a layout for allocations - let layout = provider.alloc_layout().size(1024).res().unwrap(); - - // allocate an SHM buffer (ZShmMut) - let mut owned_shm_buf_mut = layout.alloc().res().unwrap(); + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let mut owned_shm_buf_mut = provider + .alloc_layout() + .size(1024) + .res() + .unwrap() + .alloc() + .res() + .unwrap(); // mutable and immutable API let _data: &[u8] = &owned_shm_buf_mut; diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs index 2773348fd0..7466f6eabc 100644 --- a/examples/examples/z_get_shm.rs +++ b/examples/examples/z_get_shm.rs @@ -33,43 +33,29 @@ async fn main() { println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); - println!("Creating POSIX SHM backend..."); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - println!("Creating SHM Provider with POSIX backend..."); - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example println!("Allocating Shared Memory Buffer..."); - let layout = shared_memory_provider + let mut sbuf = provider .alloc_layout() .size(1024) .res() - .unwrap(); - - let mut sbuf = layout + .unwrap() .alloc() .with_policy::>() .res_async() diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/z_ping_shm.rs index 372967f6e8..f19c4274a4 100644 --- a/examples/examples/z_ping_shm.rs +++ b/examples/examples/z_ping_shm.rs @@ -44,34 +44,23 @@ fn main() { let mut samples = Vec::with_capacity(n); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(size) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); - let buf = shared_memory_provider + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let mut buf = provider .alloc_layout() .size(size) .res() diff --git a/examples/examples/z_posix_shm_provider.rs b/examples/examples/z_posix_shm_provider.rs new file mode 100644 index 0000000000..cdf502bc61 --- /dev/null +++ b/examples/examples/z_posix_shm_provider.rs @@ -0,0 +1,44 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::prelude::*; + +fn main() { + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + + // Total amount of shared memory to allocate + let size = 4096; + + // An alignment for POSIX SHM provider + // Due to internal optimization, all allocations will be aligned corresponding to this alignment, + // so the provider will be able to satisfy allocation layouts with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // A layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); + + // Build a provider backend + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let _shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); +} diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 356737c3cd..d2a87a59cc 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -32,48 +32,31 @@ async fn main() -> Result<(), ZError> { println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); - println!("Creating POSIX SHM backend..."); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - println!("Creating SHM Provider with POSIX backend..."); - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); let publisher = session.declare_publisher(&path).await.unwrap(); + // Create allocation layout for series of similar allocations println!("Allocating Shared Memory Buffer..."); - let layout = shared_memory_provider - .alloc_layout() - .size(1024) - .res() - .unwrap(); + let layout = provider.alloc_layout().size(1024).res().unwrap(); println!("Press CTRL-C to quit..."); for idx in 0..u32::MAX { tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // Allocate particular SHM buffer using pre-created layout let mut sbuf = layout .alloc() .with_policy::>() diff --git a/examples/examples/z_pub_shm_thr.rs b/examples/examples/z_pub_shm_thr.rs index 0b94304321..0d44fbe6ee 100644 --- a/examples/examples/z_pub_shm_thr.rs +++ b/examples/examples/z_pub_shm_thr.rs @@ -28,34 +28,23 @@ async fn main() { let z = zenoh::open(config).await.unwrap(); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(sm_size, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(sm_size) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); - let mut buf = shared_memory_provider + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let mut buf = provider .alloc_layout() .size(size) .res() diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs index 49939dcb0a..ed2320d2c5 100644 --- a/examples/examples/z_queryable_shm.rs +++ b/examples/examples/z_queryable_shm.rs @@ -32,31 +32,16 @@ async fn main() { println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); - println!("Creating POSIX SHM backend..."); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - println!("Creating SHM Provider with POSIX backend..."); - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -83,14 +68,15 @@ async fn main() { } println!(")"); + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example println!("Allocating Shared Memory Buffer..."); - let layout = shared_memory_provider + let mut sbuf = provider .alloc_layout() .size(1024) .res() - .unwrap(); - - let mut sbuf = layout + .unwrap() .alloc() .with_policy::>() .res_async() diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index 874f37ba8c..8a53d5ba34 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -1833,12 +1833,12 @@ mod tests { #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh_shm::api::{ + buffer::zshm::{zshm, ZShm}, protocol_implementations::posix::{ posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, protocol_id::POSIX_PROTOCOL_ID, }, provider::shared_memory_provider::SharedMemoryProviderBuilder, - buffer::zshm::{zshm, ZShm}, }; const NUM: usize = 1_000; From 446fa2fc28770f92cb44d456dc9638faf4263761 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 2 May 2024 12:15:20 +0300 Subject: [PATCH 14/17] fix lints --- zenoh/tests/bytes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/tests/bytes.rs b/zenoh/tests/bytes.rs index f8eb11bf63..6de12ab63f 100644 --- a/zenoh/tests/bytes.rs +++ b/zenoh/tests/bytes.rs @@ -15,7 +15,7 @@ #[test] #[cfg(all(feature = "shared-memory", feature = "unstable"))] fn shm_bytes_single_buf() { - use zenoh::prelude::r#async::*; + use zenoh::prelude::*; // create an SHM backend... let backend = PosixSharedMemoryProviderBackend::builder() From 74b444efa1a636ed7f2c7099a53914a6f7da98ee Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 2 May 2024 12:59:35 +0300 Subject: [PATCH 15/17] fix lint --- examples/examples/z_ping_shm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/z_ping_shm.rs index f19c4274a4..354f11d789 100644 --- a/examples/examples/z_ping_shm.rs +++ b/examples/examples/z_ping_shm.rs @@ -60,7 +60,7 @@ fn main() { // Allocate an SHM buffer // NOTE: For allocation API please check z_alloc_shm.rs example // NOTE: For buf's API please check z_bytes_shm.rs example - let mut buf = provider + let buf = provider .alloc_layout() .size(size) .res() From 77654a0b16da29716faa311f3f8a4040b8338bf0 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 3 May 2024 15:17:18 +0300 Subject: [PATCH 16/17] fix after merge --- examples/examples/z_alloc_shm.rs | 4 +-- examples/examples/z_get_shm.rs | 3 +- zenoh/src/lib.rs | 55 +++++++++++++++++--------------- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index a01de8d2fa..2db5e5a44e 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -40,14 +40,14 @@ async fn run() -> ZResult<()> { // This layout is reusable and can handle series of similar allocations let buffer_layout = { // OPTION 1: Simple (default) configuration: - let simple_layout = shared_memory_provider + let simple_layout = provider .alloc_layout() .size(512) .res() .unwrap(); // OPTION 2: Comprehensive configuration: - let _comprehensive_layout = shared_memory_provider + let _comprehensive_layout = provider .alloc_layout() .size(512) .alignment(AllocAlignment::new(2)) diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs index 7466f6eabc..39caf3a101 100644 --- a/examples/examples/z_get_shm.rs +++ b/examples/examples/z_get_shm.rs @@ -11,8 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // -use clap::Parser; use std::time::Duration; + +use clap::Parser; use zenoh::prelude::*; use zenoh_examples::CommonArgs; diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index e6cf1e437a..caf961984b 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -378,32 +378,35 @@ pub mod internal { #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub mod shm { - pub use zenoh_shm::api::buffer::{ - zshm::{zshm, ZShm}, - zshmmut::{zshmmut, ZShmMut}, - }; - pub use zenoh_shm::api::client::{ - shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, - }; - pub use zenoh_shm::api::client_storage::{SharedMemoryClientStorage, GLOBAL_CLIENT_STORAGE}; - pub use zenoh_shm::api::common::types::{ChunkID, ProtocolID, SegmentID}; - pub use zenoh_shm::api::protocol_implementations::posix::{ - posix_shared_memory_client::PosixSharedMemoryClient, - posix_shared_memory_provider_backend::{ - LayoutedPosixSharedMemoryProviderBackendBuilder, PosixSharedMemoryProviderBackend, - PosixSharedMemoryProviderBackendBuilder, + pub use zenoh_shm::api::{ + buffer::{ + zshm::{zshm, ZShm}, + zshmmut::{zshmmut, ZShmMut}, + }, + client::{ + shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, + }, + client_storage::{SharedMemoryClientStorage, GLOBAL_CLIENT_STORAGE}, + common::types::{ChunkID, ProtocolID, SegmentID}, + protocol_implementations::posix::{ + posix_shared_memory_client::PosixSharedMemoryClient, + posix_shared_memory_provider_backend::{ + LayoutedPosixSharedMemoryProviderBackendBuilder, PosixSharedMemoryProviderBackend, + PosixSharedMemoryProviderBackendBuilder, + }, + protocol_id::POSIX_PROTOCOL_ID, + }, + provider::{ + shared_memory_provider::{ + AllocBuilder, AllocLayout, AllocLayoutAlignedBuilder, AllocLayoutBuilder, + AllocLayoutSizedBuilder, AllocPolicy, AsyncAllocPolicy, BlockOn, DeallocEldest, + DeallocOptimal, DeallocYoungest, Deallocate, Defragment, DynamicProtocolID, + ForceDeallocPolicy, GarbageCollect, JustAlloc, ProtocolIDSource, + SharedMemoryProvider, SharedMemoryProviderBuilder, + SharedMemoryProviderBuilderBackendID, SharedMemoryProviderBuilderID, + StaticProtocolID, + }, + types::{AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError}, }, - protocol_id::POSIX_PROTOCOL_ID, - }; - pub use zenoh_shm::api::provider::shared_memory_provider::{ - AllocBuilder, AllocLayout, AllocLayoutAlignedBuilder, AllocLayoutBuilder, - AllocLayoutSizedBuilder, AllocPolicy, AsyncAllocPolicy, BlockOn, DeallocEldest, - DeallocOptimal, DeallocYoungest, Deallocate, Defragment, DynamicProtocolID, - ForceDeallocPolicy, GarbageCollect, JustAlloc, ProtocolIDSource, SharedMemoryProvider, - SharedMemoryProviderBuilder, SharedMemoryProviderBuilderBackendID, - SharedMemoryProviderBuilderID, StaticProtocolID, - }; - pub use zenoh_shm::api::provider::types::{ - AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError, }; } From 9fb1e1926139b6d297cb1d0cf347ea0d62a890d2 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 3 May 2024 15:20:47 +0300 Subject: [PATCH 17/17] Update z_alloc_shm.rs --- examples/examples/z_alloc_shm.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index 2db5e5a44e..93df5d821d 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -40,11 +40,7 @@ async fn run() -> ZResult<()> { // This layout is reusable and can handle series of similar allocations let buffer_layout = { // OPTION 1: Simple (default) configuration: - let simple_layout = provider - .alloc_layout() - .size(512) - .res() - .unwrap(); + let simple_layout = provider.alloc_layout().size(512).res().unwrap(); // OPTION 2: Comprehensive configuration: let _comprehensive_layout = provider