From 9284388c466b3ad5822e826bd612ccb847bd5eba Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 12:00:32 +0200 Subject: [PATCH 1/5] Add payload_mut to sample for zsliceshmmut deserialization --- examples/examples/z_sub_shm.rs | 20 ++++++++++++++++++++ zenoh/src/sample/mod.rs | 6 ++++++ 2 files changed, 26 insertions(+) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index aa3967becd..35fb80d833 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -49,6 +49,26 @@ async fn main() { } } } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/zenoh/src/sample/mod.rs b/zenoh/src/sample/mod.rs index b5dbd727ec..0c1180fb8f 100644 --- a/zenoh/src/sample/mod.rs +++ b/zenoh/src/sample/mod.rs @@ -308,6 +308,12 @@ impl Sample { &self.payload } + /// Gets the payload of this Sample. + #[inline] + pub fn payload_mut(&mut self) -> &mut ZBytes { + &mut self.payload + } + /// Gets the kind of this Sample. #[inline] pub fn kind(&self) -> SampleKind { From 0983d58ef38128566f9b18b26046b09c73d1ab3c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 16:26:11 +0200 Subject: [PATCH 2/5] Improve SHM examples --- examples/examples/z_pub_shm.rs | 5 ++-- examples/examples/z_sub_shm.rs | 50 +++++++++++++++++----------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 8287509f1b..0dce88b8e7 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -25,7 +25,6 @@ use zenoh::shm::provider::types::MemoryLayout; use zenoh_examples::CommonArgs; const N: usize = 10; -const K: u32 = 3; #[tokio::main] async fn main() -> Result<(), zenoh::Error> { @@ -81,7 +80,9 @@ async fn main() -> Result<(), zenoh::Error> { .unwrap(); println!("Press CTRL-C to quit..."); - for idx in 0..(K * N as u32) { + for idx in 0..u32::MAX { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let mut sbuf = layout .alloc() .with_policy::>() diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 35fb80d833..319d8ecf90 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -36,39 +36,39 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); println!("Press CTRL-C to quit..."); - while let Ok(sample) = subscriber.recv_async().await { - match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } - } - } - - // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber - // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. - // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. - // - // use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; - - // while let Ok(mut sample) = subscriber.recv_async().await { - // let kind = sample.kind(); - // let key_expr = sample.key_expr().to_string(); - // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // while let Ok(sample) = subscriber.recv_async().await { + // match sample.payload().deserialize::<&zsliceshm>() { // Ok(payload) => println!( // ">> [Subscriber] Received {} ('{}': '{:02x?}')", - // kind, key_expr, payload + // sample.kind(), + // sample.key_expr().as_str(), + // payload // ), // Err(e) => { // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); // } // } // } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; + + while let Ok(mut sample) = subscriber.recv_async().await { + let kind = sample.kind(); + let key_expr = sample.key_expr().to_string(); + match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + Ok(payload) => println!( + ">> [Subscriber] Received {} ('{}': '{:02x?}')", + kind, key_expr, payload + ), + Err(e) => { + println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + } + } + } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] From f08ea12c2760d42c122a228f565c9463b7df0ccb Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 16:36:32 +0200 Subject: [PATCH 3/5] Fix merge --- examples/examples/z_sub_shm.rs | 51 +++++++++++++++++----------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index fdb3204ac9..45180f598b 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -14,6 +14,7 @@ use clap::Parser; use zenoh::config::Config; use zenoh::prelude::r#async::*; +use zenoh::shm::zsliceshm; use zenoh_examples::CommonArgs; #[tokio::main] @@ -35,39 +36,39 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); println!("Press CTRL-C to quit..."); - // while let Ok(sample) = subscriber.recv_async().await { - // match sample.payload().deserialize::<&zsliceshm>() { - // Ok(payload) => println!( - // ">> [Subscriber] Received {} ('{}': '{:02x?}')", - // sample.kind(), - // sample.key_expr().as_str(), - // payload - // ), - // Err(e) => { - // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - // } - // } - // } - - // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber - // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. - // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. - // - use zenoh::shm::slice::zsliceshmmut::zsliceshmmut; - - while let Ok(mut sample) = subscriber.recv_async().await { - let kind = sample.kind(); - let key_expr = sample.key_expr().to_string(); - match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + while let Ok(sample) = subscriber.recv_async().await { + match sample.payload().deserialize::<&zsliceshm>() { Ok(payload) => println!( ">> [Subscriber] Received {} ('{}': '{:02x?}')", - kind, key_expr, payload + sample.kind(), + sample.key_expr().as_str(), + payload ), Err(e) => { println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); } } } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::zsliceshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zsliceshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] From 12376c0d968b60bc49da5643ca3ac6c470edb1ad Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 18:03:09 +0200 Subject: [PATCH 4/5] Query/Reply shared memory examples --- examples/Cargo.toml | 10 ++ examples/examples/z_get.rs | 2 +- examples/examples/z_get_shm.rs | 158 +++++++++++++++++++++++++++ examples/examples/z_queryable.rs | 7 +- examples/examples/z_queryable_shm.rs | 134 +++++++++++++++++++++++ examples/examples/z_sub_shm.rs | 17 ++- zenoh/src/api/query.rs | 5 + zenoh/src/api/queryable.rs | 37 +++++-- zenoh/src/api/session.rs | 12 +- zenoh/src/net/runtime/adminspace.rs | 6 +- 10 files changed, 357 insertions(+), 31 deletions(-) create mode 100644 examples/examples/z_get_shm.rs create mode 100644 examples/examples/z_queryable_shm.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e117507ae9..ce268572a6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -100,6 +100,11 @@ path = "examples/z_pull.rs" name = "z_queryable" path = "examples/z_queryable.rs" +[[example]] +name = "z_queryable_shm" +path = "examples/z_queryable_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_storage" path = "examples/z_storage.rs" @@ -108,6 +113,11 @@ path = "examples/z_storage.rs" name = "z_get" path = "examples/z_get.rs" +[[example]] +name = "z_get_shm" +path = "examples/z_get_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_forward" path = "examples/z_forward.rs" diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index 56693d9fa1..76add34286 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -32,7 +32,7 @@ async fn main() { // // By default get receives replies from a FIFO. // // Uncomment this line to use a ring channel instead. // // More information on the ring channel are available in the z_pull example. - .with(zenoh::handlers::RingChannel::default()) + // .with(zenoh::handlers::RingChannel::default()) .value(value) .target(target) .timeout(timeout) diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs new file mode 100644 index 0000000000..c5f766f0f2 --- /dev/null +++ b/examples/examples/z_get_shm.rs @@ -0,0 +1,158 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use std::time::Duration; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, selector, mut value, target, timeout) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + let content = value + .take() + .unwrap_or_else(|| "Get from SharedMemory Rust!".to_string()); + sbuf[0..content.len()].copy_from_slice(content.as_bytes()); + + println!("Sending Query '{selector}'..."); + let replies = session + .get(&selector) + .value(sbuf) + .target(target) + .timeout(timeout) + .res() + .await + .unwrap(); + + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + print!(">> Received ('{}': ", sample.key_expr().as_str()); + match sample.payload().deserialize::<&zsliceshm>() { + Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), + Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), + } + } + Err(err) => { + let payload = err + .payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + println!(">> Received (ERROR: '{}')", payload); + } + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +#[value(rename_all = "SCREAMING_SNAKE_CASE")] +enum Qt { + BestMatching, + All, + AllComplete, +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The selection of resources to query + selector: Selector<'static>, + /// The value to publish. + value: Option, + #[arg(short, long, default_value = "BEST_MATCHING")] + /// The target queryables of the query. + target: Qt, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + Selector<'static>, + Option, + QueryTarget, + Duration, +) { + let args = Args::parse(); + ( + args.common.into(), + args.selector, + args.value, + match args.target { + Qt::BestMatching => QueryTarget::BestMatching, + Qt::All => QueryTarget::All, + Qt::AllComplete => QueryTarget::AllComplete, + }, + Duration::from_millis(args.timeout), + ) +} diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 47f70c30c3..8407f9f66f 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -20,7 +20,12 @@ async fn main() { // initiate logging zenoh_util::try_init_log_from_env(); - let (config, key_expr, value, complete) = parse_args(); + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs new file mode 100644 index 0000000000..f689e15b51 --- /dev/null +++ b/examples/examples/z_queryable_shm.rs @@ -0,0 +1,134 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Creating POSIX SHM backend..."); + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + // The initialisation of SHM backend is completely backend-specific and user is free to do + // anything reasonable here. This code is execuated at the provider's first use + + // Alignment for POSIX SHM provider + // All allocations will be aligned corresponding to this alignment - + // that means that the provider will be able to satisfy allocation layouts + // with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // Create layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); + + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + println!("Creating SHM Provider with POSIX backend..."); + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Declaring Queryable on '{key_expr}'..."); + let queryable = session + .declare_queryable(&key_expr) + .complete(complete) + .res() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(query) = queryable.recv_async().await { + print!( + ">> [Queryable] Received Query '{}' ('{}'", + query.selector(), + query.key_expr().as_str(), + ); + if let Some(payload) = query.payload() { + match payload.deserialize::<&zsliceshm>() { + Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), + Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), + } + } + println!(")"); + + println!("Allocating Shared Memory Buffer..."); + let layout = shared_memory_provider + .alloc_layout() + .size(1024) + .res() + .unwrap(); + + let mut sbuf = layout + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + sbuf[0..value.len()].copy_from_slice(value.as_bytes()); + + println!( + ">> [Queryable] Responding ('{}': '{}')", + key_expr.as_str(), + value, + ); + query + .reply(key_expr.clone(), sbuf) + .res() + .await + .unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}")); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] + /// The key expression matching queries to reply to. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Queryable from SharedMemory Rust!")] + /// The value to reply to queries. + value: String, + #[arg(long)] + /// Declare the queryable as complete w.r.t. the key expression. + complete: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, bool) { + let args = Args::parse(); + (args.common.into(), args.key, args.value, args.complete) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 45180f598b..2e0f5bf910 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -37,17 +37,16 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { + print!( + ">> [Subscriber] Received {} ('{}': ", + sample.kind(), + sample.key_expr().as_str(), + ); match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } + Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), + Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), } + println!(")"); } // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 1cb4078ee6..d95a1bd417 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -88,6 +88,11 @@ impl Reply { self.result.as_ref() } + /// Gets the a mutable borrowed result of this `Reply`. Use [`Reply::into_result`] to take ownership of the result. + pub fn result_mut(&mut self) -> Result<&mut Sample, &mut Value> { + self.result.as_mut() + } + /// Converts this `Reply` into the its result. Use [`Reply::result`] it you don't want to take ownership. pub fn into_result(self) -> Result { self.result diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index dc13468181..53fea80b10 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -50,18 +50,11 @@ use { }; pub(crate) struct QueryInner { - /// The key expression of this Query. pub(crate) key_expr: KeyExpr<'static>, - /// This Query's selector parameters. pub(crate) parameters: Parameters<'static>, - /// This Query's body. - pub(crate) value: Option, - pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -79,6 +72,9 @@ impl Drop for QueryInner { pub struct Query { pub(crate) inner: Arc, pub(crate) eid: EntityId, + pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Query { @@ -106,24 +102,43 @@ impl Query { /// This Query's value. #[inline(always)] pub fn value(&self) -> Option<&Value> { - self.inner.value.as_ref() + self.value.as_ref() + } + + /// This Query's value. + #[inline(always)] + pub fn value_mut(&mut self) -> Option<&mut Value> { + self.value.as_mut() } /// This Query's payload. #[inline(always)] pub fn payload(&self) -> Option<&ZBytes> { - self.inner.value.as_ref().map(|v| &v.payload) + self.value.as_ref().map(|v| &v.payload) + } + + /// This Query's payload. + #[inline(always)] + pub fn payload_mut(&mut self) -> Option<&mut ZBytes> { + self.value.as_mut().map(|v| &mut v.payload) } /// This Query's encoding. #[inline(always)] pub fn encoding(&self) -> Option<&Encoding> { - self.inner.value.as_ref().map(|v| &v.encoding) + self.value.as_ref().map(|v| &v.encoding) } + /// This Query's attachment. #[zenoh_macros::unstable] pub fn attachment(&self) -> Option<&ZBytes> { - self.inner.attachment.as_ref() + self.attachment.as_ref() + } + + /// This Query's attachment. + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() } /// Sends a reply in the form of [`Sample`] to this Query. diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 01fc345c3b..eb70129e55 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1806,10 +1806,6 @@ impl Session { let query_inner = Arc::new(QueryInner { key_expr, parameters: parameters.to_owned().into(), - value: body.map(|b| Value { - payload: b.payload.into(), - encoding: b.encoding.into(), - }), qid, zid, primitives: if local { @@ -1817,13 +1813,17 @@ impl Session { } else { primitives }, - #[cfg(feature = "unstable")] - attachment, }); for (eid, callback) in queryables { callback(Query { inner: query_inner.clone(), eid, + value: body.as_ref().map(|b| Value { + payload: b.payload.clone().into(), + encoding: b.encoding.clone().into(), + }), + #[cfg(feature = "unstable")] + attachment: attachment.clone(), }); } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ea084c453b..c13e64f71f 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -462,14 +462,14 @@ impl Primitives for AdminSpace { inner: Arc::new(QueryInner { key_expr: key_expr.clone(), parameters: query.parameters.into(), - value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), qid: msg.id, zid, primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), }), eid: self.queryable_id, + value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }; for (key, handler) in &self.handlers { From c3f993da4baf385adb04d17f911790ca0becec41 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 29 Apr 2024 18:44:09 +0200 Subject: [PATCH 5/5] Add attachment_mut to Sample --- zenoh/src/api/sample.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 365c2a7728..11bfc92c0b 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -355,6 +355,13 @@ impl Sample { pub fn attachment(&self) -> Option<&ZBytes> { self.attachment.as_ref() } + + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. + #[zenoh_macros::unstable] + #[inline] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() + } } impl From for Value {