diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e117507ae9..ce268572a6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -100,6 +100,11 @@ path = "examples/z_pull.rs" name = "z_queryable" path = "examples/z_queryable.rs" +[[example]] +name = "z_queryable_shm" +path = "examples/z_queryable_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_storage" path = "examples/z_storage.rs" @@ -108,6 +113,11 @@ path = "examples/z_storage.rs" name = "z_get" path = "examples/z_get.rs" +[[example]] +name = "z_get_shm" +path = "examples/z_get_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_forward" path = "examples/z_forward.rs" diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs new file mode 100644 index 0000000000..60015829aa --- /dev/null +++ b/examples/examples/z_get_shm.rs @@ -0,0 +1,157 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use std::time::Duration; +use zenoh::prelude::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, selector, mut value, target, timeout) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + let content = value + .take() + .unwrap_or_else(|| "Get from SharedMemory Rust!".to_string()); + sbuf[0..content.len()].copy_from_slice(content.as_bytes()); + + println!("Sending Query '{selector}'..."); + let replies = session + .get(&selector) + .value(sbuf) + .target(target) + .timeout(timeout) + .await + .unwrap(); + + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + print!(">> Received ('{}': ", sample.key_expr().as_str()); + match sample.payload().deserialize::<&zsliceshm>() { + Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), + Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), + } + } + Err(err) => { + let payload = err + .payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + println!(">> Received (ERROR: '{}')", payload); + } + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +#[value(rename_all = "SCREAMING_SNAKE_CASE")] +enum Qt { + BestMatching, + All, + AllComplete, +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The selection of resources to query + selector: Selector<'static>, + /// The value to publish. + value: Option, + #[arg(short, long, default_value = "BEST_MATCHING")] + /// The target queryables of the query. + target: Qt, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + Selector<'static>, + Option, + QueryTarget, + Duration, +) { + let args = Args::parse(); + ( + args.common.into(), + args.selector, + args.value, + match args.target { + Qt::BestMatching => QueryTarget::BestMatching, + Qt::All => QueryTarget::All, + Qt::AllComplete => QueryTarget::AllComplete, + }, + Duration::from_millis(args.timeout), + ) +} diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 92d19b6b06..356737c3cd 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -16,7 +16,6 @@ use zenoh::prelude::*; use zenoh_examples::CommonArgs; const N: usize = 10; -const K: u32 = 3; #[tokio::main] async fn main() -> Result<(), ZError> { @@ -72,7 +71,9 @@ async fn main() -> Result<(), ZError> { .unwrap(); println!("Press CTRL-C to quit..."); - for idx in 0..(K * N as u32) { + for idx in 0..u32::MAX { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let mut sbuf = layout .alloc() .with_policy::>() diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index e24b8e80cb..dcdca82c09 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -20,7 +20,12 @@ async fn main() { // initiate logging zenoh_util::try_init_log_from_env(); - let (config, key_expr, value, complete) = parse_args(); + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs new file mode 100644 index 0000000000..62fa7571d5 --- /dev/null +++ b/examples/examples/z_queryable_shm.rs @@ -0,0 +1,132 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::prelude::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Declaring Queryable on '{key_expr}'..."); + let queryable = session + .declare_queryable(&key_expr) + .complete(complete) + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(query) = queryable.recv_async().await { + print!( + ">> [Queryable] Received Query '{}' ('{}'", + query.selector(), + query.key_expr().as_str(), + ); + if let Some(payload) = query.payload() { + match payload.deserialize::<&zsliceshm>() { + Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), + Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), + } + } + println!(")"); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + sbuf[0..value.len()].copy_from_slice(value.as_bytes()); + + println!( + ">> [Queryable] Responding ('{}': '{}')", + key_expr.as_str(), + value, + ); + query + .reply(key_expr.clone(), sbuf) + .await + .unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}")); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] + /// The key expression matching queries to reply to. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Queryable from SharedMemory Rust!")] + /// The value to reply to queries. + value: String, + #[arg(long)] + /// Declare the queryable as complete w.r.t. the key expression. + complete: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, bool) { + let args = Args::parse(); + (args.common.into(), args.key, args.value, args.complete) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 5f5c77633f..a43b5c6cd0 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -36,18 +36,37 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { + print!( + ">> [Subscriber] Received {} ('{}': ", + sample.kind(), + sample.key_expr().as_str(), + ); match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } + Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), + Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), } + println!(")"); } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::zsliceshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 311402b618..6c5053ee5f 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -89,6 +89,11 @@ impl Reply { self.result.as_ref() } + /// Gets the a mutable borrowed result of this `Reply`. Use [`Reply::into_result`] to take ownership of the result. + pub fn result_mut(&mut self) -> Result<&mut Sample, &mut Value> { + self.result.as_mut() + } + /// Converts this `Reply` into the its result. Use [`Reply::result`] it you don't want to take ownership. pub fn into_result(self) -> Result { self.result diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index c83b4b6081..cd3bebd72c 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -51,18 +51,11 @@ use { }; pub(crate) struct QueryInner { - /// The key expression of this Query. pub(crate) key_expr: KeyExpr<'static>, - /// This Query's selector parameters. pub(crate) parameters: Parameters<'static>, - /// This Query's body. - pub(crate) value: Option, - pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -80,6 +73,9 @@ impl Drop for QueryInner { pub struct Query { pub(crate) inner: Arc, pub(crate) eid: EntityId, + pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Query { @@ -107,24 +103,43 @@ impl Query { /// This Query's value. #[inline(always)] pub fn value(&self) -> Option<&Value> { - self.inner.value.as_ref() + self.value.as_ref() + } + + /// This Query's value. + #[inline(always)] + pub fn value_mut(&mut self) -> Option<&mut Value> { + self.value.as_mut() } /// This Query's payload. #[inline(always)] pub fn payload(&self) -> Option<&ZBytes> { - self.inner.value.as_ref().map(|v| &v.payload) + self.value.as_ref().map(|v| &v.payload) + } + + /// This Query's payload. + #[inline(always)] + pub fn payload_mut(&mut self) -> Option<&mut ZBytes> { + self.value.as_mut().map(|v| &mut v.payload) } /// This Query's encoding. #[inline(always)] pub fn encoding(&self) -> Option<&Encoding> { - self.inner.value.as_ref().map(|v| &v.encoding) + self.value.as_ref().map(|v| &v.encoding) } + /// This Query's attachment. #[zenoh_macros::unstable] pub fn attachment(&self) -> Option<&ZBytes> { - self.inner.attachment.as_ref() + self.attachment.as_ref() + } + + /// This Query's attachment. + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() } /// Sends a reply in the form of [`Sample`] to this Query. diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index ca2354db85..11bfc92c0b 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -303,6 +303,12 @@ impl Sample { &self.payload } + /// Gets the payload of this Sample. + #[inline] + pub fn payload_mut(&mut self) -> &mut ZBytes { + &mut self.payload + } + /// Gets the kind of this Sample. #[inline] pub fn kind(&self) -> SampleKind { @@ -349,6 +355,13 @@ impl Sample { pub fn attachment(&self) -> Option<&ZBytes> { self.attachment.as_ref() } + + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. + #[zenoh_macros::unstable] + #[inline] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() + } } impl From for Value { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index dea322419c..4a0eb024bc 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1801,10 +1801,6 @@ impl Session { let query_inner = Arc::new(QueryInner { key_expr, parameters: parameters.to_owned().into(), - value: body.map(|b| Value { - payload: b.payload.into(), - encoding: b.encoding.into(), - }), qid, zid, primitives: if local { @@ -1812,13 +1808,17 @@ impl Session { } else { primitives }, - #[cfg(feature = "unstable")] - attachment, }); for (eid, callback) in queryables { callback(Query { inner: query_inner.clone(), eid, + value: body.as_ref().map(|b| Value { + payload: b.payload.clone().into(), + encoding: b.encoding.clone().into(), + }), + #[cfg(feature = "unstable")] + attachment: attachment.clone(), }); } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 9ea54b8d88..aa3d89422e 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -462,14 +462,14 @@ impl Primitives for AdminSpace { inner: Arc::new(QueryInner { key_expr: key_expr.clone(), parameters: query.parameters.into(), - value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), qid: msg.id, zid, primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), }), eid: self.queryable_id, + value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }; for (key, handler) in &self.handlers {