Skip to content

Commit

Permalink
Merge branch 'attachment' of github.com:eclipse-zenoh/zenoh into atta…
Browse files Browse the repository at this point in the history
…chment
  • Loading branch information
Mallets committed Dec 5, 2023
2 parents b65b817 + 2fa04c6 commit 76e355c
Show file tree
Hide file tree
Showing 28 changed files with 1,269 additions and 83 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ jobs:
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
ASYNC_STD_THREAD_COUNT: 4

- name: Check for feature leaks
if: ${{ matrix.os == 'ubuntu-latest' }}
uses: actions-rs/cargo@v1
with:
command: nextest
args: run -p zenohd --no-default-features

- name: Run doctests
uses: actions-rs/cargo@v1
with:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ zenoh-link-unixpipe = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-lin
zenoh-link-serial = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-serial" }
zenoh-link = { version = "0.11.0-dev", path = "io/zenoh-link" }
zenoh-link-commons = { version = "0.11.0-dev", path = "io/zenoh-link-commons" }
zenoh = { version = "0.11.0-dev", path = "zenoh" }
zenoh = { version = "0.11.0-dev", path = "zenoh", default-features = false }

[profile.dev]
debug = true
Expand Down
17 changes: 13 additions & 4 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,15 @@ impl Config {

impl std::fmt::Display for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut json = serde_json::to_value(self).unwrap();
sift_privates(&mut json);
write!(f, "{json}")
serde_json::to_value(self)
.map(|mut json| {
sift_privates(&mut json);
write!(f, "{json}")
})
.map_err(|e| {
_ = write!(f, "{e:?}");
fmt::Error
})?
}
}

Expand Down Expand Up @@ -1030,9 +1036,12 @@ impl<'a> serde::Deserialize<'a> for PluginsConfig {
})
}
}

impl std::fmt::Debug for PluginsConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", &self.values)
let mut values: Value = self.values.clone();
sift_privates(&mut values);
write!(f, "{:?}", values)
}
}

Expand Down
10 changes: 9 additions & 1 deletion commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,15 @@ impl NetworkMessage {

impl fmt::Display for NetworkMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, f)
use NetworkBody::*;
match &self.body {
OAM(_) => write!(f, "OAM"),
Push(_) => write!(f, "Push"),
Request(_) => write!(f, "Request"),
Response(_) => write!(f, "Response"),
ResponseFinal(_) => write!(f, "ResponseFinal"),
Declare(_) => write!(f, "Declare"),
}
}
}

Expand Down
30 changes: 30 additions & 0 deletions commons/zenoh-protocol/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub mod keepalive;
pub mod oam;
pub mod open;

use core::fmt;

pub use close::Close;
pub use fragment::{Fragment, FragmentHeader};
pub use frame::{Frame, FrameHeader};
Expand Down Expand Up @@ -207,6 +209,34 @@ impl From<Join> for TransportMessage {
}
}

impl fmt::Display for TransportMessage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
use TransportBody::*;
match &self.body {
OAM(_) => write!(f, "OAM"),
InitSyn(_) => write!(f, "InitSyn"),
InitAck(_) => write!(f, "InitAck"),
OpenSyn(_) => write!(f, "OpenSyn"),
OpenAck(_) => write!(f, "OpenAck"),
Close(_) => write!(f, "Close"),
KeepAlive(_) => write!(f, "KeepAlive"),
Frame(m) => {
write!(f, "Frame[")?;
let mut netmsgs = m.payload.iter().peekable();
while let Some(m) = netmsgs.next() {
m.fmt(f)?;
if netmsgs.peek().is_some() {
write!(f, ", ")?;
}
}
write!(f, "]")
}
Fragment(_) => write!(f, "Fragment"),
Join(_) => write!(f, "Join"),
}
}
}

pub mod ext {
use crate::{common::ZExtZ64, core::Priority};

Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ default = ["std"]
async-std = { workspace = true, features = ["default", "unstable"] }
async-trait = { workspace = true }
clap = { workspace = true }
const_format = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
hex = { workspace = true, features = ["default"] }
Expand Down
12 changes: 12 additions & 0 deletions commons/zenoh-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ extern crate alloc;
#[cfg_attr(feature = "std", macro_use)]
extern crate lazy_static;

#[macro_export]
macro_rules! concat_enabled_features {
(prefix = $prefix:literal, features = [$($feature:literal),*]) => {
{
use const_format::concatcp;
concatcp!("" $(,
if cfg!(feature = $feature) { concatcp!(" ", concatcp!($prefix, "/", $feature)) } else { "" }
)*)
}
};
}

#[deprecated = "This module is now a separate crate. Use the `zenoh_core` crate directly for shorter compile-times. You may disable this re-export by disabling `zenoh-util`'s default features."]
pub use zenoh_core as core;

Expand Down
10 changes: 6 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@
z_storage -k demo/**
```

### z_pub_shm & z_sub_shm
### z_pub_shm & z_sub

A pub/sub example involving the shared-memory feature.
Note that on subscriber side, the same `z_sub` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm
z_sub
```

Typical Publisher usage:
Expand Down Expand Up @@ -188,16 +189,17 @@
z_ping 1024
```

### z_pub_shm_thr & z_sub_shm_thr
### z_pub_shm_thr & z_sub_thr

Pub/Sub throughput test involving the shared-memory feature.
This example allows performing throughput measurements between a publisher performing
put operations with the shared-memory feature and a subscriber receiving notifications
of those puts.
Note that on subscriber side, the same `z_sub_thr` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm_thr
z_sub_thr
```

Typical Publisher usage:
Expand Down
9 changes: 7 additions & 2 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ async fn main() -> Result<(), zenoh::Error> {
// Initiate logging
env_logger::init();

let (config, path, value) = parse_args();
let (mut config, path, value) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -39,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> {
let publisher = session.declare_publisher(&path).res().await.unwrap();

for idx in 0..(K * N as u32) {
sleep(Duration::from_secs(1)).await;
let mut sbuf = match shm.alloc(1024) {
Ok(buf) => buf,
Err(_) => {
Expand Down Expand Up @@ -88,7 +94,6 @@ async fn main() -> Result<(), zenoh::Error> {
let defrag = shm.defragment();
println!("De-framented {defrag} bytes");
}
// sleep(Duration::from_millis(100)).await;
// Dropping the SharedMemoryBuf means to free it.
drop(sbuf);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use zenoh::shm::SharedMemoryManager;
async fn main() {
// initiate logging
env_logger::init();
let (config, sm_size, size) = parse_args();
let (mut config, sm_size, size) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let z = zenoh::open(config).res().await.unwrap();
let id = z.zid();
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr) = parse_args();
let (mut config, key_expr) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ fn main() {
// initiate logging
env_logger::init();

let (config, m, n) = parse_args();
let (mut config, m, n) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let session = zenoh::open(config).res().unwrap();

Expand Down
27 changes: 15 additions & 12 deletions io/zenoh-link/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use std::collections::HashMap;
use std::sync::Arc;
use zenoh_config::Config;
use zenoh_result::{bail, ZResult};

Expand Down Expand Up @@ -206,23 +205,27 @@ impl LinkManagerBuilderUnicast {
pub fn make(_manager: NewLinkChannelSender, protocol: &str) -> ZResult<LinkManagerUnicast> {
match protocol {
#[cfg(feature = "transport_tcp")]
TCP_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastTcp::new(_manager))),
TCP_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastTcp::new(_manager))),
#[cfg(feature = "transport_udp")]
UDP_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastUdp::new(_manager))),
UDP_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastUdp::new(_manager))),
#[cfg(feature = "transport_tls")]
TLS_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastTls::new(_manager))),
TLS_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastTls::new(_manager))),
#[cfg(feature = "transport_quic")]
QUIC_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastQuic::new(_manager))),
QUIC_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastQuic::new(_manager))),
#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))]
UNIXSOCKSTREAM_LOCATOR_PREFIX => {
Ok(Arc::new(LinkManagerUnicastUnixSocketStream::new(_manager)))
}
UNIXSOCKSTREAM_LOCATOR_PREFIX => Ok(std::sync::Arc::new(
LinkManagerUnicastUnixSocketStream::new(_manager),
)),
#[cfg(feature = "transport_ws")]
WS_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastWs::new(_manager))),
WS_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastWs::new(_manager))),
#[cfg(feature = "transport_serial")]
SERIAL_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastSerial::new(_manager))),
SERIAL_LOCATOR_PREFIX => {
Ok(std::sync::Arc::new(LinkManagerUnicastSerial::new(_manager)))
}
#[cfg(feature = "transport_unixpipe")]
UNIXPIPE_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastPipe::new(_manager))),
UNIXPIPE_LOCATOR_PREFIX => {
Ok(std::sync::Arc::new(LinkManagerUnicastPipe::new(_manager)))
}
_ => bail!("Unicast not supported for {} protocol", protocol),
}
}
Expand All @@ -238,7 +241,7 @@ impl LinkManagerBuilderMulticast {
pub fn make(protocol: &str) -> ZResult<LinkManagerMulticast> {
match protocol {
#[cfg(feature = "transport_udp")]
UDP_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerMulticastUdp)),
UDP_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerMulticastUdp)),
_ => bail!("Multicast not supported for {} protocol", protocol),
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ flume = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["default"] }
zenoh = { workspace = true, features = ["unstable"] }
zenoh = { workspace = true, features = ["unstable"], default-features = false }
zenoh-core = { workspace = true }
zenoh-macros = { workspace = true }
zenoh-result = { workspace = true }
Expand Down
Loading

0 comments on commit 76e355c

Please sign in to comment.