Skip to content

Commit

Permalink
Query/Reply shared memory examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Apr 26, 2024
1 parent f08ea12 commit 12376c0
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 31 deletions.
10 changes: 10 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ path = "examples/z_pull.rs"
name = "z_queryable"
path = "examples/z_queryable.rs"

[[example]]
name = "z_queryable_shm"
path = "examples/z_queryable_shm.rs"
required-features = ["unstable", "shared-memory"]

[[example]]
name = "z_storage"
path = "examples/z_storage.rs"
Expand All @@ -108,6 +113,11 @@ path = "examples/z_storage.rs"
name = "z_get"
path = "examples/z_get.rs"

[[example]]
name = "z_get_shm"
path = "examples/z_get_shm.rs"
required-features = ["unstable", "shared-memory"]

[[example]]
name = "z_forward"
path = "examples/z_forward.rs"
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() {
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
.with(zenoh::handlers::RingChannel::default())
// .with(zenoh::handlers::RingChannel::default())
.value(value)
.target(target)
.timeout(timeout)
Expand Down
158 changes: 158 additions & 0 deletions examples/examples/z_get_shm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;

const N: usize = 10;

#[tokio::main]
async fn main() {
// initiate logging
zenoh_util::try_init_log_from_env();

let (mut config, selector, mut value, target, timeout) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Creating POSIX SHM backend...");
// Construct an SHM backend
let backend = {
// NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API.
// The initialisation of SHM backend is completely backend-specific and user is free to do
// anything reasonable here. This code is execuated at the provider's first use

// Alignment for POSIX SHM provider
// All allocations will be aligned corresponding to this alignment -
// that means that the provider will be able to satisfy allocation layouts
// with alignment <= provider_alignment
let provider_alignment = AllocAlignment::default();

// Create layout for POSIX Provider's memory
let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap();

PosixSharedMemoryProviderBackend::builder()
.with_layout(provider_layout)
.res()
.unwrap()
};

println!("Creating SHM Provider with POSIX backend...");
// Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID
let shared_memory_provider = SharedMemoryProviderBuilder::builder()
.protocol_id::<POSIX_PROTOCOL_ID>()
.backend(backend)
.res();

println!("Allocating Shared Memory Buffer...");
let layout = shared_memory_provider
.alloc_layout()
.size(1024)
.res()
.unwrap();

let mut sbuf = layout
.alloc()
.with_policy::<BlockOn<GarbageCollect>>()
.res_async()
.await
.unwrap();

let content = value
.take()
.unwrap_or_else(|| "Get from SharedMemory Rust!".to_string());
sbuf[0..content.len()].copy_from_slice(content.as_bytes());

println!("Sending Query '{selector}'...");
let replies = session
.get(&selector)
.value(sbuf)
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();

while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
print!(">> Received ('{}': ", sample.key_expr().as_str());
match sample.payload().deserialize::<&zsliceshm>() {
Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),),
Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e),
}
}
Err(err) => {
let payload = err
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
}
}
}
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
enum Qt {
BestMatching,
All,
AllComplete,
}

#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The selection of resources to query
selector: Selector<'static>,
/// The value to publish.
value: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
#[arg(short = 'o', long, default_value = "10000")]
/// The query timeout in milliseconds.
timeout: u64,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
Selector<'static>,
Option<String>,
QueryTarget,
Duration,
) {
let args = Args::parse();
(
args.common.into(),
args.selector,
args.value,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Qt::AllComplete => QueryTarget::AllComplete,
},
Duration::from_millis(args.timeout),
)
}
7 changes: 6 additions & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ async fn main() {
// initiate logging
zenoh_util::try_init_log_from_env();

let (config, key_expr, value, complete) = parse_args();
let (mut config, key_expr, value, complete) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand Down
134 changes: 134 additions & 0 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;

const N: usize = 10;

#[tokio::main]
async fn main() {
// initiate logging
zenoh_util::try_init_log_from_env();

let (mut config, key_expr, value, complete) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Creating POSIX SHM backend...");
// Construct an SHM backend
let backend = {
// NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API.
// The initialisation of SHM backend is completely backend-specific and user is free to do
// anything reasonable here. This code is execuated at the provider's first use

// Alignment for POSIX SHM provider
// All allocations will be aligned corresponding to this alignment -
// that means that the provider will be able to satisfy allocation layouts
// with alignment <= provider_alignment
let provider_alignment = AllocAlignment::default();

// Create layout for POSIX Provider's memory
let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap();

PosixSharedMemoryProviderBackend::builder()
.with_layout(provider_layout)
.res()
.unwrap()
};

println!("Creating SHM Provider with POSIX backend...");
// Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID
let shared_memory_provider = SharedMemoryProviderBuilder::builder()
.protocol_id::<POSIX_PROTOCOL_ID>()
.backend(backend)
.res();

println!("Declaring Queryable on '{key_expr}'...");
let queryable = session
.declare_queryable(&key_expr)
.complete(complete)
.res()
.await
.unwrap();

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
print!(
">> [Queryable] Received Query '{}' ('{}'",
query.selector(),
query.key_expr().as_str(),
);
if let Some(payload) = query.payload() {
match payload.deserialize::<&zsliceshm>() {
Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)),
Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e),
}
}
println!(")");

println!("Allocating Shared Memory Buffer...");
let layout = shared_memory_provider
.alloc_layout()
.size(1024)
.res()
.unwrap();

let mut sbuf = layout
.alloc()
.with_policy::<BlockOn<GarbageCollect>>()
.res_async()
.await
.unwrap();

sbuf[0..value.len()].copy_from_slice(value.as_bytes());

println!(
">> [Queryable] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
query
.reply(key_expr.clone(), sbuf)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")]
/// The key expression matching queries to reply to.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Queryable from SharedMemory Rust!")]
/// The value to reply to queries.
value: String,
#[arg(long)]
/// Declare the queryable as complete w.r.t. the key expression.
complete: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.complete)
}
17 changes: 8 additions & 9 deletions examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
print!(
">> [Subscriber] Received {} ('{}': ",
sample.kind(),
sample.key_expr().as_str(),
);
match sample.payload().deserialize::<&zsliceshm>() {
Ok(payload) => println!(
">> [Subscriber] Received {} ('{}': '{:02x?}')",
sample.kind(),
sample.key_expr().as_str(),
payload
),
Err(e) => {
println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e);
}
Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)),
Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e),
}
println!(")");
}

// // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber
Expand Down
5 changes: 5 additions & 0 deletions zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ impl Reply {
self.result.as_ref()
}

/// Gets the a mutable borrowed result of this `Reply`. Use [`Reply::into_result`] to take ownership of the result.
pub fn result_mut(&mut self) -> Result<&mut Sample, &mut Value> {
self.result.as_mut()
}

/// Converts this `Reply` into the its result. Use [`Reply::result`] it you don't want to take ownership.
pub fn into_result(self) -> Result<Sample, Value> {
self.result
Expand Down
Loading

0 comments on commit 12376c0

Please sign in to comment.