diff --git a/.config/nextest.toml b/.config/nextest.toml index 0e20150e7f..aa2c3ac37b 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,11 +1,16 @@ +# By default, retry a few times until pass the test within the specified timeout [profile.default] -retries = 2 +retries = 4 +slow-timeout = { period = "60s", terminate-after = 2 } +# Run the following tests exclusively with longer timeout [[profile.default.overrides]] filter = """ test(=zenoh_session_unicast) | test(=zenoh_session_multicast) | +test(=transport_tcp_intermittent) | +test(=transport_tcp_intermittent_for_lowlatency_transport) | test(=three_node_combination) """ threads-required = 'num-cpus' -slow-timeout = { period = "60s", terminate-after = 3 } +slow-timeout = { period = "60s", terminate-after = 6 } diff --git a/Cargo.lock b/Cargo.lock index 83819f136c..570482eaa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,15 +115,16 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if 1.0.0", "getrandom 0.2.10", "once_cell", "serde", "version_check", + "zerocopy", ] [[package]] @@ -4745,6 +4746,7 @@ dependencies = [ name = "zenoh-keyexpr" version = "0.11.0-dev" dependencies = [ + "ahash", "criterion", "hashbrown 0.14.0", "keyed-set", @@ -5219,6 +5221,26 @@ dependencies = [ "zenoh_backend_traits", ] +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index d45542ae81..e4da707ab4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute." # (https://github.com/rust-lang/cargo/issues/11329) [workspace.dependencies] aes = "0.8.2" +ahash = "0.8.7" anyhow = { version = "1.0.69", default-features = false } # Default features are disabled due to usage in no_std crates async-executor = "1.5.0" async-global-executor = "2.3.1" diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index bde1b8fd03..68dbaebdeb 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -202,10 +202,10 @@ /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. /// Higher values lead to a more aggressive batching but it will introduce additional latency. backoff: 100, - // Number of threads dedicated to transmission - // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) - // threads: 4, }, + // Number of threads dedicated to transmission + // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) + // threads: 4, }, /// Configure the zenoh RX parameters of a link rx: { diff --git a/README.md b/README.md index 9efe11dbdd..5bce835c29 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,8 @@ Zenoh's router is built as `target/release/zenohd`. All the examples are built i **Routed tests:** +> **Windows users**: to properly execute the commands below in PowerShell you need to escape `"` characters as `\"`. + - **put/store/get** - run the Zenoh router with a memory storage: `./target/release/zenohd --cfg='plugins/storage_manager/storages/demo:{key_expr:"demo/example/**",volume:"memory"}'` diff --git a/commons/zenoh-keyexpr/Cargo.toml b/commons/zenoh-keyexpr/Cargo.toml index 920db98696..41456af1ec 100644 --- a/commons/zenoh-keyexpr/Cargo.toml +++ b/commons/zenoh-keyexpr/Cargo.toml @@ -40,6 +40,7 @@ hashbrown = { workspace = true } # NOTE: May cause problems when testing no_std stuff. Check this tool: https://docs.rs/crate/cargo-no-dev-deps/0.1.0 [dev-dependencies] +ahash = { workspace = true } criterion = { workspace = true } lazy_static = { workspace = true } rand = { workspace = true, features = ["default"] } diff --git a/commons/zenoh-keyexpr/benches/keyexpr_tree.rs b/commons/zenoh-keyexpr/benches/keyexpr_tree.rs index f9151fdea1..4047e3cf5c 100644 --- a/commons/zenoh-keyexpr/benches/keyexpr_tree.rs +++ b/commons/zenoh-keyexpr/benches/keyexpr_tree.rs @@ -46,9 +46,11 @@ fn main() { let mut intersections = Averager::default(); let results = Benchmarker::benchmark(|b| { let keys = KeySet::generate(total, wildness, no_double_stars); - let mut ketree: KeBoxTree<_> = KeBoxTree::new(); - let mut vectree: KeBoxTree<_, bool, VecSetProvider> = KeBoxTree::new(); - let mut hashtree: KeBoxTree<_, bool, HashMapProvider> = KeBoxTree::new(); + let mut ketree = KeBoxTree::new(); + let mut vectree: KeBoxTree<_, bool, VecSetProvider> = KeBoxTree::default(); + let mut hashtree: KeBoxTree<_, bool, HashMapProvider> = KeBoxTree::default(); + let mut ahashtree: KeBoxTree<_, bool, HashMapProvider> = + KeBoxTree::default(); let (kearctree, mut token): (KeArcTree, _) = KeArcTree::new().unwrap(); let mut map = HashMap::new(); for key in keys.iter() { @@ -58,6 +60,7 @@ fn main() { }); b.run_once("vectree_insert", || vectree.insert(key, 0)); b.run_once("hashtree_insert", || hashtree.insert(key, 0)); + b.run_once("ahashtree_insert", || ahashtree.insert(key, 0)); b.run_once("hashmap_insert", || map.insert(key.to_owned(), 0)); } for key in keys.iter() { @@ -65,6 +68,7 @@ fn main() { b.run_once("kearctree_fetch", || kearctree.node(&token, key)); b.run_once("vectree_fetch", || vectree.node(key)); b.run_once("hashtree_fetch", || hashtree.node(key)); + b.run_once("ahashtree_fetch", || ahashtree.node(key)); b.run_once("hashmap_fetch", || map.get(key)); } for key in keys.iter() { @@ -81,6 +85,9 @@ fn main() { b.run_once("hashtree_intersect", || { hashtree.intersecting_nodes(key).count() }); + b.run_once("ahashtree_intersect", || { + ahashtree.intersecting_nodes(key).count() + }); b.run_once("hashmap_intersect", || { map.iter().filter(|(k, _)| key.intersects(k)).count() }); @@ -92,6 +99,9 @@ fn main() { }); b.run_once("vectree_include", || vectree.included_nodes(key).count()); b.run_once("hashtree_include", || hashtree.included_nodes(key).count()); + b.run_once("ahashtree_include", || { + ahashtree.included_nodes(key).count() + }); b.run_once("hashmap_include", || { map.iter().filter(|(k, _)| key.includes(k)).count() }); @@ -102,21 +112,25 @@ fn main() { "kearctree_insert", "vectree_insert", "hashtree_insert", + "ahashtree_insert", "hashmap_insert", "ketree_fetch", "kearctree_fetch", "vectree_fetch", "hashtree_fetch", + "ahashtree_fetch", "hashmap_fetch", "ketree_intersect", "kearctree_intersect", "vectree_intersect", "hashtree_intersect", + "ahashtree_intersect", "hashmap_intersect", "ketree_include", "kearctree_include", "vectree_include", "hashtree_include", + "ahashtree_include", "hashmap_include", ] { let b = results.benches.get(name).unwrap(); diff --git a/commons/zenoh-keyexpr/src/key_expr/fuzzer.rs b/commons/zenoh-keyexpr/src/key_expr/fuzzer.rs index 1405dc4802..869b7b63a1 100644 --- a/commons/zenoh-keyexpr/src/key_expr/fuzzer.rs +++ b/commons/zenoh-keyexpr/src/key_expr/fuzzer.rs @@ -15,7 +15,10 @@ use super::OwnedKeyExpr; fn random_chunk(rng: &'_ mut impl rand::Rng) -> impl Iterator + '_ { let n = rng.gen_range(1..3); - (0..n).map(move |_| rng.sample(rand::distributions::Uniform::from(b'a'..b'c'))) + rng.gen_bool(0.05) + .then_some(b'@') + .into_iter() + .chain((0..n).map(move |_| rng.sample(rand::distributions::Uniform::from(b'a'..b'c')))) } fn make(ke: &mut Vec, rng: &mut impl rand::Rng) { diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/arc_tree.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/arc_tree.rs index 4571ab050e..c2a7ff5375 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/arc_tree.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/arc_tree.rs @@ -56,7 +56,8 @@ fn ketree_borrow_mut<'a, T, Token: TokenTrait>( /// A shared KeTree. /// /// The tree and its nodes have shared ownership, while their mutability is managed through the `Token`. -/// The `(node, &token)` tuple implements [`core::ops::Deref`], while `(node, &mut token)` implements [`core::ops::DerefMut`]. +/// +/// Most of its methods are declared in the [`ITokenKeyExprTree`] trait. pub struct KeArcTree< Weight, Token: TokenTrait = DefaultToken, @@ -82,6 +83,14 @@ impl< > KeArcTree { /// Constructs the KeArcTree, returning it and its token, unless constructing the Token failed. + /// + /// # Type inference papercut + /// Despite some of `KeArcTree`'s generic parameters having default values, those are only taken into + /// account by the compiler when a type is named with some parameters omitted, and not when a type is + /// infered with the same parameters unconstrained. + /// + /// The simplest way to resolve this is to eventually assign to tree part of the return value + /// to a variable or field whose type is named `KeArcTree<_>` (the `Weight` parameter can generally be infered). pub fn new() -> Result<(Self, DefaultToken), ::ConstructionError> { let token = DefaultToken::new()?; Ok((Self::with_token(&token), token)) @@ -329,6 +338,57 @@ where IterOrOption::Opt(self.node_mut(token, key).map(Into::into)) } } + + type IncluderItem = Self::Node; + type Includer = IterOrOption< + TokenPacker< + Includer< + 'a, + Children, + Arc, Wildness, Children, Token>, Token>>, + Weight, + >, + &'a Token, + >, + Self::IncluderItem, + >; + fn nodes_including(&'a self, token: &'a Token, key: &'a keyexpr) -> Self::Includer { + let inner = ketree_borrow(&self.inner, token); + if inner.wildness.get() || key.is_wild() { + IterOrOption::Iter(TokenPacker { + iter: Includer::new(&inner.children, key), + token, + }) + } else { + IterOrOption::Opt(self.node(token, key)) + } + } + type IncluderItemMut = Self::TreeIterItemMut; + type IncluderMut = IterOrOption< + TokenPacker< + Includer< + 'a, + Children, + Arc, Wildness, Children, Token>, Token>>, + Weight, + >, + &'a mut Token, + >, + Self::IncluderItemMut, + >; + fn nodes_including_mut(&'a self, token: &'a mut Token, key: &'a keyexpr) -> Self::IncluderMut { + let inner = ketree_borrow(&self.inner, token); + if inner.wildness.get() || key.is_wild() { + unsafe { + IterOrOption::Iter(TokenPacker { + iter: Includer::new(core::mem::transmute(&inner.children), key), + token, + }) + } + } else { + IterOrOption::Opt(self.node_mut(token, key).map(Into::into)) + } + } type PruneNode = KeArcTreeNode, Wildness, Children, Token>; fn prune_where bool>( diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/box_tree.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/box_tree.rs index 39efd3af75..5aa23e78ac 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/box_tree.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/box_tree.rs @@ -27,6 +27,8 @@ use super::impls::KeyedSetProvider; use super::support::IterOrOption; /// A fully owned KeTree. +/// +/// Note that most of `KeBoxTree`'s methods are declared in the [`IKeyExprTree`] and [`IKeyExprTreeMut`] traits. #[repr(C)] pub struct KeBoxTree< Weight, @@ -37,17 +39,13 @@ pub struct KeBoxTree< wildness: Wildness, } -impl< - Weight, - Wildness: IWildness, - Children: IChildrenProvider>>, - > KeBoxTree +impl KeBoxTree +where + DefaultChildrenProvider: + IChildrenProvider>>, { pub fn new() -> Self { - KeBoxTree { - children: Default::default(), - wildness: Wildness::non_wild(), - } + Default::default() } } impl< @@ -57,7 +55,10 @@ impl< > Default for KeBoxTree { fn default() -> Self { - Self::new() + KeBoxTree { + children: Default::default(), + wildness: Wildness::non_wild(), + } } } @@ -117,6 +118,20 @@ where IterOrOption::Opt(node) } } + + type IncluderItem = ::Item; + type Includer = IterOrOption< + Includer<'a, Children, Box>, Weight>, + &'a Self::Node, + >; + fn nodes_including(&'a self, ke: &'a keyexpr) -> Self::Includer { + if self.wildness.get() || ke.is_wild() { + Includer::new(&self.children, ke).into() + } else { + let node = self.node(ke); + IterOrOption::Opt(node) + } + } } impl< 'a, @@ -218,6 +233,19 @@ where IterOrOption::Opt(node) } } + type IncluderItemMut = ::Item; + type IncluderMut = IterOrOption< + IncluderMut<'a, Children, Box>, Weight>, + &'a mut Self::Node, + >; + fn nodes_including_mut(&'a mut self, ke: &'a keyexpr) -> Self::IncluderMut { + if self.wildness.get() || ke.is_wild() { + IncluderMut::new(&mut self.children, ke).into() + } else { + let node = self.node_mut(ke); + IterOrOption::Opt(node) + } + } fn prune_where bool>(&mut self, mut predicate: F) { let mut wild = false; diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/includer.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/includer.rs new file mode 100644 index 0000000000..a22d0804b1 --- /dev/null +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/includer.rs @@ -0,0 +1,337 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::keyexpr_tree::*; +use alloc::vec::Vec; + +struct StackFrame<'a, Children: IChildrenProvider, Node: UIKeyExprTreeNode, Weight> +where + Children::Assoc: IChildren + 'a, + >::Node: 'a, +{ + iterator: >::Iter<'a>, + start: usize, + end: usize, + _marker: core::marker::PhantomData, +} +pub struct Includer<'a, Children: IChildrenProvider, Node: UIKeyExprTreeNode, Weight> +where + Children::Assoc: IChildren + 'a, +{ + key: &'a keyexpr, + ke_indices: Vec, + iterators: Vec>, +} + +impl<'a, Children: IChildrenProvider, Node: UIKeyExprTreeNode, Weight> + Includer<'a, Children, Node, Weight> +where + Children::Assoc: IChildren + 'a, +{ + pub(crate) fn new(children: &'a Children::Assoc, key: &'a keyexpr) -> Self { + let mut ke_indices = Vec::with_capacity(32); + ke_indices.push(0); + let mut iterators = Vec::with_capacity(16); + iterators.push(StackFrame { + iterator: children.children(), + start: 0, + end: 1, + _marker: Default::default(), + }); + Self { + key, + ke_indices, + iterators, + } + } +} + +impl< + 'a, + Children: IChildrenProvider, + Node: UIKeyExprTreeNode + 'a, + Weight, + > Iterator for Includer<'a, Children, Node, Weight> +where + Children::Assoc: IChildren + 'a, +{ + type Item = &'a Node; + fn next(&mut self) -> Option { + loop { + let StackFrame { + iterator, + start, + end, + _marker, + } = self.iterators.last_mut()?; + match iterator.next() { + Some(node) => { + let mut node_matches = false; + let new_start = *end; + let mut new_end = *end; + macro_rules! push { + ($index: expr) => { + let index = $index; + if new_end == new_start + || self.ke_indices[new_start..new_end] + .iter() + .rev() + .all(|c| *c < index) + { + self.ke_indices.push(index); + new_end += 1; + } + }; + } + let chunk = node.chunk(); + unsafe { node.as_node().__keyexpr() }; + let chunk_is_super = chunk == "**"; + if chunk_is_super { + let mut latest_idx = usize::MAX; + 'outer: for i in *start..*end { + let mut kec_start = self.ke_indices[i]; + if kec_start == self.key.len() { + node_matches = true; + break; + } + if latest_idx <= kec_start && latest_idx != usize::MAX { + continue; + } + loop { + push!(kec_start); + latest_idx = kec_start; + let key = &self.key.as_bytes()[kec_start..]; + if key[0] == b'@' { + break; + } + match key.iter().position(|&c| c == b'/') { + Some(kec_end) => kec_start += kec_end + 1, + None => { + node_matches = true; + break 'outer; + } + } + } + } + } else { + for i in *start..*end { + let kec_start = self.ke_indices[i]; + if kec_start == self.key.len() { + break; + } + let key = &self.key.as_bytes()[kec_start..]; + unsafe { keyexpr::from_slice_unchecked(key) }; + match key.iter().position(|&c| c == b'/') { + Some(kec_end) => { + let subkey = + unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; + if chunk.includes(subkey) { + push!(kec_start + kec_end + 1); + } + } + None => { + let key = unsafe { keyexpr::from_slice_unchecked(key) }; + if chunk.includes(key) { + push!(self.key.len()); + node_matches = true; + } + } + } + } + } + if new_end > new_start { + let iterator = unsafe { node.as_node().__children() }.children(); + self.iterators.push(StackFrame { + iterator, + start: new_start, + end: new_end, + _marker: Default::default(), + }) + } + if node_matches { + return Some(node.as_node()); + } + } + None => { + if let Some(StackFrame { start, .. }) = self.iterators.pop() { + self.ke_indices.truncate(start); + } + } + } + } + } +} +struct StackFrameMut<'a, Children: IChildrenProvider, Node: UIKeyExprTreeNode, Weight> +where + Children::Assoc: IChildren + 'a, + >::Node: 'a, +{ + iterator: >::IterMut<'a>, + start: usize, + end: usize, + _marker: core::marker::PhantomData, +} + +pub struct IncluderMut< + 'a, + Children: IChildrenProvider, + Node: UIKeyExprTreeNode, + Weight, +> where + Children::Assoc: IChildren + 'a, +{ + key: &'a keyexpr, + ke_indices: Vec, + iterators: Vec>, +} + +impl<'a, Children: IChildrenProvider, Node: UIKeyExprTreeNode, Weight> + IncluderMut<'a, Children, Node, Weight> +where + Children::Assoc: IChildren + 'a, +{ + pub(crate) fn new(children: &'a mut Children::Assoc, key: &'a keyexpr) -> Self { + let mut ke_indices = Vec::with_capacity(32); + ke_indices.push(0); + let mut iterators = Vec::with_capacity(16); + iterators.push(StackFrameMut { + iterator: children.children_mut(), + start: 0, + end: 1, + _marker: Default::default(), + }); + Self { + key, + ke_indices, + iterators, + } + } +} + +impl< + 'a, + Children: IChildrenProvider, + Node: IKeyExprTreeNodeMut + 'a, + Weight, + > Iterator for IncluderMut<'a, Children, Node, Weight> +where + Children::Assoc: IChildren + 'a, +{ + type Item = &'a mut >::Node; + fn next(&mut self) -> Option { + loop { + let StackFrameMut { + iterator, + start, + end, + _marker, + } = self.iterators.last_mut()?; + match iterator.next() { + Some(node) => { + let mut node_matches = false; + let new_start = *end; + let mut new_end = *end; + macro_rules! push { + ($index: expr) => { + let index = $index; + if new_end == new_start + || self.ke_indices[new_start..new_end] + .iter() + .rev() + .all(|c| *c < index) + { + self.ke_indices.push(index); + new_end += 1; + } + }; + } + let chunk = node.chunk(); + let chunk_is_super = chunk == "**"; + if chunk_is_super { + let mut latest_idx = usize::MAX; + 'outer: for i in *start..*end { + let mut kec_start = self.ke_indices[i]; + if kec_start == self.key.len() { + node_matches = true; + break; + } + if latest_idx <= kec_start && latest_idx != usize::MAX { + continue; + } + loop { + push!(kec_start); + latest_idx = kec_start; + let key = &self.key.as_bytes()[kec_start..]; + if key[0] == b'@' { + break; + } + match key.iter().position(|&c| c == b'/') { + Some(kec_end) => kec_start += kec_end + 1, + None => { + node_matches = true; + break 'outer; + } + } + } + } + } else { + for i in *start..*end { + let kec_start = self.ke_indices[i]; + if kec_start == self.key.len() { + break; + } + let key = &self.key.as_bytes()[kec_start..]; + unsafe { keyexpr::from_slice_unchecked(key) }; + match key.iter().position(|&c| c == b'/') { + Some(kec_end) => { + let subkey = + unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; + if chunk.includes(subkey) { + push!(kec_start + kec_end + 1); + } + } + None => { + let key = unsafe { keyexpr::from_slice_unchecked(key) }; + if chunk.includes(key) { + push!(self.key.len()); + node_matches = true; + } + } + } + } + } + if new_end > new_start { + let iterator = unsafe { &mut *(node.as_node_mut() as *mut Node) } + .children_mut() + .children_mut(); + self.iterators.push(StackFrameMut { + iterator, + start: new_start, + end: new_end, + _marker: Default::default(), + }) + } + if node_matches { + return Some(node); + } + } + None => { + if let Some(StackFrameMut { start, .. }) = self.iterators.pop() { + self.ke_indices.truncate(start); + } + } + } + } + } +} diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/inclusion.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/inclusion.rs index 86b20de9f1..bd875be1b9 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/inclusion.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/inclusion.rs @@ -96,6 +96,7 @@ where }; } let chunk = node.chunk(); + let chunk_is_verbatim = chunk.as_bytes()[0] == b'@'; for i in *start..*end { let kec_start = self.ke_indices[i]; if kec_start == self.key.len() { @@ -107,8 +108,10 @@ where let subkey = unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; if unlikely(subkey == "**") { - push!(kec_start); - push!(kec_start + kec_end + 1); + if !chunk_is_verbatim { + push!(kec_start); + push!(kec_start + kec_end + 1); + } let post_key = &key[kec_end + 1..]; match post_key.iter().position(|&c| c == b'/') { Some(sec_end) => { @@ -133,7 +136,7 @@ where } None => { let key = unsafe { keyexpr::from_slice_unchecked(key) }; - if unlikely(key == "**") { + if unlikely(key == "**") && chunk.as_bytes()[0] != b'@' { push!(kec_start); node_matches = true; } else if key.includes(chunk) { @@ -256,6 +259,7 @@ where }; } let chunk = node.chunk(); + let chunk_is_verbatim = chunk.as_bytes()[0] == b'@'; for i in *start..*end { let kec_start = self.ke_indices[i]; if kec_start == self.key.len() { @@ -267,8 +271,10 @@ where let subkey = unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; if unlikely(subkey == "**") { - push!(kec_start); - push!(kec_start + kec_end + 1); + if !chunk_is_verbatim { + push!(kec_start); + push!(kec_start + kec_end + 1); + } let post_key = &key[kec_end + 1..]; match post_key.iter().position(|&c| c == b'/') { Some(sec_end) => { @@ -293,7 +299,7 @@ where } None => { let key = unsafe { keyexpr::from_slice_unchecked(key) }; - if unlikely(key == "**") { + if unlikely(key == "**") && chunk.as_bytes()[0] != b'@' { push!(kec_start); node_matches = true; } else if key.includes(chunk) { diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/intersection.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/intersection.rs index 8acffecaf3..e46305adbf 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/intersection.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/intersection.rs @@ -95,14 +95,25 @@ where }; } let chunk = node.chunk(); + let chunk_is_verbatim = chunk.as_bytes()[0] == b'@'; if unlikely(chunk.as_bytes() == b"**") { - // If the current node is `**`, it is guaranteed to match + // If the current node is `**`, it is guaranteed to match... node_matches = true; // and may consume any number of chunks from the KE push!(self.ke_indices[*start]); - for i in self.ke_indices[*start]..self.key.len() { - if self.key.as_bytes()[i] == b'/' { - push!(i + 1); + if self.key.len() != self.ke_indices[*start] { + if self.key.as_bytes()[self.ke_indices[*start]] != b'@' { + for i in self.ke_indices[*start]..self.key.len() { + if self.key.as_bytes()[i] == b'/' { + push!(i + 1); + if self.key.as_bytes()[i + 1] == b'@' { + node_matches = false; // ...unless the KE contains a verbatim chunk. + break; + } + } + } + } else { + node_matches = false; } } } else { @@ -121,9 +132,11 @@ where let subkey = unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; if unlikely(subkey.as_bytes() == b"**") { - // If the chunk is `**`: - // children will have to process it again - push!(kec_start); + if !chunk_is_verbatim { + // If the query chunk is `**`: + // children will have to process it again + push!(kec_start); + } // and we need to process this chunk as if the `**` wasn't there, // but with the knowledge that the next chunk won't be `**`. let post_key = &key[kec_end + 1..]; @@ -144,6 +157,7 @@ where } .intersects(chunk) { + push!(self.key.len()); node_matches = true; } } @@ -155,7 +169,7 @@ where None => { // If it's the last chunk of the query, check whether it's `**` let key = unsafe { keyexpr::from_slice_unchecked(key) }; - if unlikely(key.as_bytes() == b"**") { + if unlikely(key.as_bytes() == b"**") && !chunk_is_verbatim { // If yes, it automatically matches, and must be reused from now on for iteration. push!(kec_start); node_matches = true; @@ -274,40 +288,57 @@ where macro_rules! push { ($index: expr) => { let index = $index; - if new_end == new_start - || self.ke_indices[new_start..new_end] - .iter() - .rev() - .all(|c| *c < index) - { + if new_end == new_start || self.ke_indices[new_end - 1] < index { self.ke_indices.push(index); new_end += 1; } }; } let chunk = node.chunk(); - if unlikely(chunk == "**") { + let chunk_is_verbatim = chunk.as_bytes()[0] == b'@'; + if unlikely(chunk.as_bytes() == b"**") { + // If the current node is `**`, it is guaranteed to match... node_matches = true; + // and may consume any number of chunks from the KE push!(self.ke_indices[*start]); - for i in self.ke_indices[*start]..self.key.len() { - if self.key.as_bytes()[i] == b'/' { - push!(i + 1); + if self.key.len() != self.ke_indices[*start] { + if self.key.as_bytes()[self.ke_indices[*start]] != b'@' { + for i in self.ke_indices[*start]..self.key.len() { + if self.key.as_bytes()[i] == b'/' { + push!(i + 1); + if self.key.as_bytes()[i + 1] == b'@' { + node_matches = false; // ...unless the KE contains a verbatim chunk. + break; + } + } + } + } else { + node_matches = false; } } } else { + // The current node is not `**` + // For all candidate chunks of the KE for i in *start..*end { + // construct that chunk, while checking whether or not it's the last one let kec_start = self.ke_indices[i]; - if kec_start == self.key.len() { + if unlikely(kec_start == self.key.len()) { break; } let key = &self.key.as_bytes()[kec_start..]; match key.iter().position(|&c| c == b'/') { Some(kec_end) => { + // If we aren't in the last chunk let subkey = unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) }; - if unlikely(subkey == "**") { - push!(kec_start); - push!(kec_start + kec_end + 1); + if unlikely(subkey.as_bytes() == b"**") { + if !chunk_is_verbatim { + // If the query chunk is `**`: + // children will have to process it again + push!(kec_start); + } + // and we need to process this chunk as if the `**` wasn't there, + // but with the knowledge that the next chunk won't be `**`. let post_key = &key[kec_end + 1..]; match post_key.iter().position(|&c| c == b'/') { Some(sec_end) => { @@ -326,6 +357,7 @@ where } .intersects(chunk) { + push!(self.key.len()); node_matches = true; } } @@ -335,11 +367,15 @@ where } } None => { + // If it's the last chunk of the query, check whether it's `**` let key = unsafe { keyexpr::from_slice_unchecked(key) }; - if unlikely(key == "**") { + if unlikely(key.as_bytes() == b"**") && !chunk_is_verbatim { + // If yes, it automatically matches, and must be reused from now on for iteration. push!(kec_start); node_matches = true; } else if chunk.intersects(key) { + // else, if it intersects with the chunk, make sure the children of the node + // are searched for `**` push!(self.key.len()); node_matches = true; } diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/mod.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/mod.rs index 3550ed0431..19545dbdd7 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/iters/mod.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/iters/mod.rs @@ -18,3 +18,5 @@ mod intersection; pub use intersection::{Intersection, IntersectionMut}; mod inclusion; pub use inclusion::{Inclusion, InclusionMut}; +mod includer; +pub use includer::{Includer, IncluderMut}; diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/mod.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/mod.rs index cc17278e38..e2833a912f 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/mod.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/mod.rs @@ -13,6 +13,51 @@ // //! KeTrees are specialized data structures to work with sets of values addressed by key expressions. +//! +//! Think of it like HashMaps that were specifically designed to allow iterating over key expressions related to a given query +//! in a fast and semantically correct way. +//! +//! # KeTrees +//! Key Expression Trees (KeTrees) are trees that split any KE inserted into them into its chunks. +//! +//! For example, if you take an empty KeTree, and insert the `(a/b, 0)`, `(a/b/c, 1)`, `(a/b/d, 2)`, `(a/e, 3)` and (a/@f, 4) pairs into it, +//! the tree will be as follows: +//! ```text +//! a --- e = 3 +//! |- b = 0 --- c = 1 +//! | |- d = 2 +//! |- @f = 4 +//! ``` +//! +//! Note that the `a` node exists, despite no value being assigned to it. If you iterate over all nodes, the iterator may yield +//! `(a, None), (a/e, Some(3)), (a/b, Some(0)), (a/b/c, Some(1)), (a/b/d, Some(2)), (a/@f, Some(4))`. +//! +//! # Ownership +//! KeTrees come in two flavours: +//! - [`KeBoxTree`] is the easier flavour. Much like a HashMap, it uniquely owns all of its nodes and data. +//! - [`KeArcTree`] allows the shared ownership of nodes, allowing you to store subsections of the tree elsewhere +//! without worrying about lifetimes. +//! +//! # Usage +//! KeTrees were designed to maximize code reuse. As such, their core properties are reflected through the [`IKeyExprTree`] and [`IKeyExprTreeMut`] traits. +//! +//! KeTrees are made up of node, where nodes may or may not have a value (called `weight`) associated with them. To access these weighs, as well as other +//! properties of a node, you can go throught the [`IKeyExprTreeNode`] and [`IKeyExprTreeNodeMut`] traits. +//! +//! # Iterators +//! KeTrees provide iterators for the following operations: +//! - Iterating on all nodes ([`IKeyExprTree::tree_iter`]/[`IKeyExprTreeMut::tree_iter_mut`]) +//! - Iterating on key-value pairs in the KeTree ([`IKeyExprTreeExt::key_value_pairs`]) +//! - Iterating on nodes whose KE intersects with a queried KE ([`IKeyExprTree::intersecting_nodes`], [`IKeyExprTreeMut::intersecting_nodes_mut`]) +//! - Iterating on nodes whose KE are included by a queried KE ([`IKeyExprTree::included_nodes`], [`IKeyExprTreeMut::included_nodes_mut`]) +//! - Iterating on nodes whose KE includes a queried KE ([`IKeyExprTree::nodes_including`], [`IKeyExprTreeMut::nodes_including_mut`]) +//! +//! While the order of iteration is not guaranteed, a node will never be yielded before one of its parents if both are to appear. +//! For example, iterating on nodes that intersect with `**` in the previously defined tree is guaranteed to yield one of the following sequences: +//! - `(a, None), (a/e, Some(3)), (a/b, Some(0)), (a/b/c, Some(1)), (a/b/d, Some(2))` +//! - `(a, None), (a/e, Some(3)), (a/b, Some(0)), (a/b/d, Some(2)), (a/b/c, Some(1))` +//! - `(a, None), (a/b, Some(0)), (a/b/c, Some(1)), (a/b/d, Some(2)), (a/e, Some(3))` +//! - `(a, None), (a/b, Some(0)), (a/b/d, Some(2)), (a/b/c, Some(1)), (a/e, Some(3))` use crate::{keyexpr, OwnedKeyExpr}; /// Allows importing all of the KeTree traits at once. @@ -21,7 +66,7 @@ pub use traits::*; /// An implementation of a KeTree with shared-ownership of nodes, using [`token_cell`] to allow safe access to the tree's data. /// -/// This implementation allows sharing references to members of the KeTree. +/// This implementation allows sharing ownership of members of the KeTree. pub mod arc_tree; pub use arc_tree::{DefaultToken, KeArcTree}; /// An implementation of a KeTree that owns all of its nodes. diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/test.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/test.rs index 933c14c354..fc2372a67b 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/test.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/test.rs @@ -106,7 +106,7 @@ fn into_ke(s: &str) -> &keyexpr { keyexpr::new(s).unwrap() } -fn test_keyset>(keys: &[K]) { +fn test_keyset + Debug>(keys: &[K]) { let mut tree = KeBoxTree::new(); let mut map = HashMap::new(); for (v, k) in keys.iter().map(|k| k.deref()).enumerate() { @@ -116,6 +116,7 @@ fn test_keyset>(keys: &[K]) { assert_eq!(node.weight(), map.get(&node.keyexpr()).unwrap().as_ref()); } for target in keys { + let target = target.deref(); let mut expected = HashMap::new(); for (k, v) in &map { if target.intersects(k) { @@ -126,12 +127,24 @@ fn test_keyset>(keys: &[K]) { for node in tree.intersecting_nodes(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } for node in tree.intersecting_nodes_mut(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(exclone.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + exclone + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -154,12 +167,24 @@ fn test_keyset>(keys: &[K]) { for node in tree.included_nodes(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } for node in tree.included_nodes_mut(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(exclone.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + exclone + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -169,7 +194,47 @@ fn test_keyset>(keys: &[K]) { ); assert!( exclone.is_empty(), - "MISSING MUTABLE INTERSECTS FOR {}: {:?}", + "MISSING MUTABLE INCLUDES FOR {}: {:?}", + target.deref(), + &exclone + ); + for (k, v) in &map { + if k.includes(target) { + assert!(expected.insert(k, v).is_none()); + } + } + exclone = expected.clone(); + for node in tree.nodes_including(dbg!(target)) { + let ke = node.keyexpr(); + let weight = node.weight(); + assert_eq!( + expected + .remove(dbg!(&ke)) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) + } + for node in tree.nodes_including_mut(target) { + let ke = node.keyexpr(); + let weight = node.weight(); + assert_eq!( + exclone + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) + } + assert!( + expected.is_empty(), + "MISSING INCLUDES FOR {}: {:?}", + target.deref(), + &expected + ); + assert!( + exclone.is_empty(), + "MISSING MUTABLE INCLUDES FOR {}: {:?}", target.deref(), &exclone ); @@ -181,7 +246,7 @@ fn test_keyset>(keys: &[K]) { } fn test_keyset_vec>(keys: &[K]) { - let mut tree = KeBoxTree::new(); + let mut tree = KeBoxTree::default(); let mut map = HashMap::new(); for (v, k) in keys.iter().map(|k| k.deref()).enumerate() { insert_vecset(&mut tree, &mut map, k, v); @@ -190,6 +255,7 @@ fn test_keyset_vec>(keys: &[K]) { assert_eq!(node.weight(), map.get(&node.keyexpr()).unwrap().as_ref()); } for target in keys { + let target = target.deref(); let mut expected = HashMap::new(); for (k, v) in &map { if target.intersects(k) { @@ -200,12 +266,24 @@ fn test_keyset_vec>(keys: &[K]) { for node in tree.intersecting_nodes(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } for node in tree.intersecting_nodes_mut(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(exclone.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + exclone + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -228,12 +306,24 @@ fn test_keyset_vec>(keys: &[K]) { for node in tree.included_nodes(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } for node in tree.included_nodes_mut(target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(exclone.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + exclone + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -243,7 +333,7 @@ fn test_keyset_vec>(keys: &[K]) { ); assert!( exclone.is_empty(), - "MISSING MUTABLE INTERSECTS FOR {}: {:?}", + "MISSING MUTABLE INCLUDES FOR {}: {:?}", target.deref(), &exclone ); @@ -264,6 +354,7 @@ fn test_keyarctree>(keys: &[K]) { assert_eq!(node.weight(), map.get(&node.keyexpr()).unwrap().as_ref()); } for target in keys { + let target = target.deref(); let mut expected = HashMap::new(); for (k, v) in &map { if target.intersects(k) { @@ -273,7 +364,13 @@ fn test_keyarctree>(keys: &[K]) { for node in tree.0.intersecting_nodes(&tree.1, target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -289,7 +386,13 @@ fn test_keyarctree>(keys: &[K]) { for node in tree.0.included_nodes(&tree.1, target) { let ke = node.keyexpr(); let weight = node.weight(); - assert_eq!(expected.remove(&ke).unwrap().as_ref(), weight) + assert_eq!( + expected + .remove(&ke) + .unwrap_or_else(|| panic!("Couldn't find {ke} in {target}'s expected output")) + .as_ref(), + weight + ) } assert!( expected.is_empty(), @@ -306,7 +409,7 @@ fn test_keyarctree>(keys: &[K]) { #[test] fn keyed_set_tree() { - let keys: [&keyexpr; 8] = [ + let keys: [&keyexpr; 16] = [ "a/b/**/c/**", "a/b/c", "a/b/c", @@ -315,6 +418,14 @@ fn keyed_set_tree() { "**/d", "d/b/c", "**/b/c", + "**/@c/**", + "**", + "**/@c", + "@c/**", + "@c/a", + "a/@c", + "b/a$*a/b/bb", + "**/b$*/bb", ] .map(into_ke); test_keyset(&keys); diff --git a/commons/zenoh-keyexpr/src/keyexpr_tree/traits/mod.rs b/commons/zenoh-keyexpr/src/keyexpr_tree/traits/mod.rs index a1b0f3aead..dd06cf14b8 100644 --- a/commons/zenoh-keyexpr/src/keyexpr_tree/traits/mod.rs +++ b/commons/zenoh-keyexpr/src/keyexpr_tree/traits/mod.rs @@ -16,103 +16,360 @@ use crate::{keyexpr, OwnedKeyExpr}; use alloc::boxed::Box; pub mod default_impls; -/// The basic immutable methods of all all KeTrees +/// The basic immutable methods of all KeTrees. pub trait IKeyExprTree<'a, Weight> { + /// The type of a given node in the KeTree. + /// + /// The methods of nodes are exposed in the [`IKeyExprTreeNode`] and [`IKeyExprTreeNodeMut`] traits type Node: IKeyExprTreeNodeMut; - /// Accesses the node at `key` if it exists, treating KEs as if they were litteral keys. + + /// Accesses the node at `key` if it exists, treating KEs as if they were completely verbatim keys. + /// + /// Returns `None` if `key` is not present in the KeTree. fn node(&'a self, key: &keyexpr) -> Option<&Self::Node>; + + /// Returns a reference to the weight of the node at `key` if it exists. + fn weight_at(&'a self, key: &keyexpr) -> Option<&'a Weight> { + self.node(key) + .and_then(>::weight) + } + type TreeIterItem; type TreeIter: Iterator; - /// Iterates over the whole tree, including nodes with no weight. + + /// Iterates over the whole tree, including nodes with no weight. + /// + /// [`IKeyExprTree::key_value_pairs`] provides an iterator over all key-value pairs in the tree. fn tree_iter(&'a self) -> Self::TreeIter; + + /// Iterates through weighted nodes, yielding their KE and Weight. + #[allow(clippy::type_complexity)] + fn key_value_pairs( + &'a self, + ) -> core::iter::FilterMap< + Self::TreeIter, + fn(Self::TreeIterItem) -> Option<(OwnedKeyExpr, &'a Weight)>, + > + where + Self::TreeIterItem: AsNode>, + { + self.tree_iter().filter_map(|node| { + unsafe { core::mem::transmute::<_, Option<&Weight>>(node.as_node().weight()) } + .map(|w| (node.as_node().keyexpr(), w)) + }) + } + type IntersectionItem; type Intersection: Iterator; + /// Iterates over all nodes of the tree whose KE intersects with the given `key`. /// /// Note that nodes without a `Weight` will also be yielded by the iterator. + /// + /// You can obtain an iterator over key-value pairs using `iter.filter_map(|node| node.weight.map(|w| (node.keyexpr(), w)))`, + /// keep in mind that the full keyexpr of nodes is not stored in them by default, but computed using the tree: + /// if you need to get a node's key often, inserting its keyexpr in the `Weight` could be a good idea. fn intersecting_nodes(&'a self, key: &'a keyexpr) -> Self::Intersection; + + /// Returns an iterator over the KEs contained in the tree that intersect with `key` + fn intersecting_keys( + &'a self, + key: &'a keyexpr, + ) -> Keys + where + Self::IntersectionItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.intersecting_nodes(key) + .filter_map(filter_map_weighted_node_to_key) + } + type InclusionItem; type Inclusion: Iterator; + /// Iterates over all nodes of the tree whose KE is included by the given `key`. /// /// Note that nodes without a `Weight` will also be yielded by the iterator. + /// + /// You can obtain an iterator over key-value pairs using `iter.filter_map(|node| node.weight.map(|w| (node.keyexpr(), w)))`, + /// keep in mind that the full keyexpr of nodes is not stored in them by default, but computed using the tree: + /// if you need to get a node's key often, inserting its keyexpr in the `Weight` could be a good idea. fn included_nodes(&'a self, key: &'a keyexpr) -> Self::Inclusion; + + /// Returns an iterator over the KEs contained in the tree that are included by `key` + fn included_keys(&'a self, key: &'a keyexpr) -> Keys + where + Self::InclusionItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.included_nodes(key) + .filter_map(filter_map_weighted_node_to_key) + } + + type IncluderItem; + type Includer: Iterator; + + /// Iterates over all nodes of the tree whose KE includes the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. + /// + /// You can obtain an iterator over key-value pairs using `iter.filter_map(|node| node.weight.map(|w| (node.keyexpr(), w)))`, + /// keep in mind that the full keyexpr of nodes is not stored in them by default, but computed using the tree: + /// if you need to get a node's key often, inserting its keyexpr in the `Weight` could be a good idea. + fn nodes_including(&'a self, key: &'a keyexpr) -> Self::Includer; + + /// Returns an iterator over the KEs contained in the tree that include `key` + fn keys_including(&'a self, key: &'a keyexpr) -> Keys + where + Self::IncluderItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.nodes_including(key) + .filter_map(filter_map_weighted_node_to_key) + } } -/// The basic mutable methods of all all KeTrees +/// The basic mutable methods of all KeTrees. pub trait IKeyExprTreeMut<'a, Weight>: IKeyExprTree<'a, Weight> { - /// Mutably accesses the node at `key` if it exists, treating KEs as if they were litteral keys. - fn node_mut<'b>(&'b mut self, key: &keyexpr) -> Option<&'b mut Self::Node>; - /// Clears the weight of the node at `key`. + /// Mutably accesses the node at `key` if it exists, treating KEs as if they were completely verbatim keys. /// - /// To actually destroy nodes, [`IKeyExprTreeMut::prune_where`] or [`IKeyExprTreeExtMut::prune`] must be called. - fn remove(&mut self, key: &keyexpr) -> Option; + /// Returns `None` if `key` is not present. Use [`IKeyExprTreeMut::node_mut_or_create`] if you wish to construct the node if it doesn't exist. + fn node_mut<'b>(&'b mut self, key: &keyexpr) -> Option<&'b mut Self::Node>; + + /// Returns a mutable reference to the weight of the node at `key`. + fn weight_at_mut(&'a mut self, key: &keyexpr) -> Option<&'a mut Weight> { + self.node_mut(key) + .and_then(>::weight_mut) + } + /// Mutably accesses the node at `key`, creating it if necessary. fn node_mut_or_create<'b>(&'b mut self, key: &keyexpr) -> &'b mut Self::Node; + + /// Inserts a weight at `key`, returning the previous weight if it existed. + fn insert(&mut self, key: &keyexpr, weight: Weight) -> Option { + self.node_mut_or_create(key).insert_weight(weight) + } + + /// Clears the weight of the node at `key`, but doesn't actually destroy the node. + /// + /// To actually destroy nodes, [`IKeyExprTreeMut::prune_where`] or [`IKeyExprTreeMut::prune`] must be called. + fn remove(&mut self, key: &keyexpr) -> Option; + type TreeIterItemMut; type TreeIterMut: Iterator; - /// Iterates over the whole tree, including nodes with no weight. + + /// Iterates over the whole tree, including nodes with no weight. fn tree_iter_mut(&'a mut self) -> Self::TreeIterMut; + type IntersectionItemMut; type IntersectionMut: Iterator; + /// Iterates over all nodes of the tree whose KE intersects with the given `key`. /// /// Note that nodes without a `Weight` will also be yielded by the iterator. fn intersecting_nodes_mut(&'a mut self, key: &'a keyexpr) -> Self::IntersectionMut; type InclusionItemMut; type InclusionMut: Iterator; + /// Iterates over all nodes of the tree whose KE is included by the given `key`. /// /// Note that nodes without a `Weight` will also be yielded by the iterator. fn included_nodes_mut(&'a mut self, key: &'a keyexpr) -> Self::InclusionMut; + type IncluderItemMut; + type IncluderMut: Iterator; + + /// Iterates over all nodes of the tree whose KE includes the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. + fn nodes_including_mut(&'a mut self, key: &'a keyexpr) -> Self::IncluderMut; /// Prunes node from the tree where the predicate returns `true`. /// /// Note that nodes that still have children will not be pruned. fn prune_where bool>(&mut self, predicate: F); + + /// Prunes empty nodes from the tree, unless they have at least one non-empty descendent. + fn prune(&mut self) { + self.prune_where(|node| node.weight().is_none()) + } } /// The basic operations of a KeTree when a Token is necessary to acess data. pub trait ITokenKeyExprTree<'a, Weight, Token> { + /// An immutable guard to a node of the tree. type Node: IKeyExprTreeNode; + // A mutable guard to a node of the tree. type NodeMut: IKeyExprTreeNodeMut; + + /// Accesses the node at `key` if it exists, treating KEs as if they were completely verbatim keys. + /// + /// Returns `None` if `key` is not present in the KeTree. fn node(&'a self, token: &'a Token, key: &keyexpr) -> Option; + + /// Mutably accesses the node at `key` if it exists, treating KEs as if they were completely verbatim keys. + /// + /// Returns `None` if `key` is not present. Use [`IKeyExprTreeMut::node_mut_or_create`] if you wish to construct the node if it doesn't exist. fn node_mut(&'a self, token: &'a mut Token, key: &keyexpr) -> Option; + + /// Mutably accesses the node at `key`, creating it if necessary. fn node_or_create(&'a self, token: &'a mut Token, key: &keyexpr) -> Self::NodeMut; + + /// Inserts a weight at `key`, returning the previous weight if it existed. + fn insert(&'a self, token: &'a mut Token, at: &keyexpr, weight: Weight) -> Option { + self.node_or_create(token, at).insert_weight(weight) + } + + /// Clears the weight of the node at `key`, but doesn't actually destroy the node. + /// + /// To actually destroy nodes, [`ITokenKeyExprTree::prune_where`] or [`ITokenKeyExprTree::prune`] must be called. + fn remove(&'a mut self, token: &'a mut Token, key: &keyexpr) -> Option { + self.node_mut(token, key) + .and_then(|mut node| node.take_weight()) + } + type TreeIterItem; type TreeIter: Iterator; + + /// Iterates over the whole tree, including nodes with no weight. fn tree_iter(&'a self, token: &'a Token) -> Self::TreeIter; + type TreeIterItemMut; type TreeIterMut: Iterator; + + /// Iterates over the whole tree, including nodes with no weight. fn tree_iter_mut(&'a self, token: &'a mut Token) -> Self::TreeIterMut; + type IntersectionItem; type Intersection: Iterator; + + /// Iterates over all nodes of the tree whose KE intersects with the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. + /// + /// You can obtain an iterator over key-value pairs using `iter.filter_map(|node| node.weight.map(|w| (node.keyexpr(), w)))`, + /// keep in mind that the full keyexpr of nodes is not stored in them by default, but computed using the tree: + /// if you need to get a node's key often, inserting its keyexpr in the `Weight` could be a good idea. fn intersecting_nodes(&'a self, token: &'a Token, key: &'a keyexpr) -> Self::Intersection; + + /// Returns an iterator over the KEs contained in the tree that intersect with `key` + fn intersecting_keys( + &'a self, + token: &'a Token, + key: &'a keyexpr, + ) -> Keys + where + Self::IntersectionItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.intersecting_nodes(token, key) + .filter_map(filter_map_weighted_node_to_key) + } + type IntersectionItemMut; type IntersectionMut: Iterator; + + /// Iterates over all nodes of the tree whose KE intersects with the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. fn intersecting_nodes_mut( &'a self, token: &'a mut Token, key: &'a keyexpr, ) -> Self::IntersectionMut; + type InclusionItem; type Inclusion: Iterator; + + /// Iterates over all nodes of the tree whose KE is included by the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. fn included_nodes(&'a self, token: &'a Token, key: &'a keyexpr) -> Self::Inclusion; + + /// Returns an iterator over the KEs contained in the tree that are included by `key` + fn included_keys( + &'a self, + token: &'a Token, + key: &'a keyexpr, + ) -> Keys + where + Self::InclusionItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.included_nodes(token, key) + .filter_map(filter_map_weighted_node_to_key) + } + type InclusionItemMut; type InclusionMut: Iterator; + + /// Iterates over all nodes of the tree whose KE is included by the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. fn included_nodes_mut(&'a self, token: &'a mut Token, key: &'a keyexpr) -> Self::InclusionMut; + + type IncluderItem; + type Includer: Iterator; + + /// Iterates over all nodes of the tree whose KE includes the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. + fn nodes_including(&'a self, token: &'a Token, key: &'a keyexpr) -> Self::Includer; + + /// Returns an iterator over the KEs contained in the tree that include `key` + fn keys_including( + &'a self, + token: &'a Token, + key: &'a keyexpr, + ) -> Keys + where + Self::IncluderItem: AsNode, + Self::Node: IKeyExprTreeNode, + { + self.nodes_including(token, key) + .filter_map(filter_map_weighted_node_to_key) + } + + type IncluderItemMut; + type IncluderMut: Iterator; + + /// Iterates over all nodes of the tree whose KE includes the given `key`. + /// + /// Note that nodes without a `Weight` will also be yielded by the iterator. + fn nodes_including_mut(&'a self, token: &'a mut Token, key: &'a keyexpr) -> Self::IncluderMut; + type PruneNode: IKeyExprTreeNodeMut; + fn prune_where bool>(&self, token: &mut Token, predicate: F); + fn prune(&self, token: &mut Token) { + self.prune_where(token, |node| node.weight().is_none()) + } } +/// The non-mutating methods of a KeTree node. pub trait IKeyExprTreeNode: UIKeyExprTreeNode { + /// Access the parent node if it exists (the node isn't the first chunk of a key-expression). fn parent(&self) -> Option<&Self::Parent> { unsafe { self.__parent() } } + /// Compute this node's full key expression. + /// + /// Note that KeTrees don't normally store each node's full key expression. + /// If you need to repeatedly access a node's full key expression, it is suggested + /// to store that key expression as part of the node's `Weight` (it's optional value). fn keyexpr(&self) -> OwnedKeyExpr { unsafe { self.__keyexpr() } } + + /// Access the node's weight (or value). + /// + /// Weights can be assigned to a node through many of [`IKeyExprTreeNodeMut`]'s methods, as well as through [`IKeyExprTreeMut::insert`]. + /// + /// Nodes may not have a value in any of the following cases: + /// - The node is a parent to other nodes, but was never assigned a weight itself (or that weight has been removed). + /// - The node is a leaf of the KeTree whose value was [`IKeyExprTreeMut::remove`]d, but [`IKeyExprTreeMut::prune`] hasn't been called yet. fn weight(&self) -> Option<&Weight> { unsafe { self.__weight() } } + + /// Access a node's children. fn children(&self) -> &Self::Children { unsafe { self.__children() } } @@ -129,17 +386,33 @@ pub trait UIKeyExprTreeNode { unsafe fn __children(&self) -> &Self::Children; } +/// The mutating methods of a KeTree node. pub trait IKeyExprTreeNodeMut: IKeyExprTreeNode { + /// Mutably access the parent node if it exists (the node isn't the first chunk of a key-expression). fn parent_mut(&mut self) -> Option<&mut Self::Parent>; + + /// Mutably access the node's weight. fn weight_mut(&mut self) -> Option<&mut Weight>; + + /// Remove the node's weight. fn take_weight(&mut self) -> Option; + + /// Assign a weight to the node, returning the previous weight if the node had one. fn insert_weight(&mut self, weight: Weight) -> Option; + + /// Mutably access the node's children. fn children_mut(&mut self) -> &mut Self::Children; } + +/// Nodes from a token-locked tree need a token to obtain read/write permissions. +/// +/// This trait allows tokenizing a node, allowing to use [`IKeyExprTreeNode`] and [`IKeyExprTreeNodeMut`]'s methods on it. pub trait ITokenKeyExprTreeNode<'a, Weight, Token> { type Tokenized: IKeyExprTreeNode; + /// Wrap the node with the an immutable reference to the token to allow immutable access to its contents. fn tokenize(&'a self, token: &'a Token) -> Self::Tokenized; type TokenizedMut: IKeyExprTreeNodeMut; + /// Wrap the node with a mutable reference to the token to allow mutable access to its contents fn tokenize_mut(&'a self, token: &'a mut Token) -> Self::TokenizedMut; } impl<'a, T: 'a, Weight, Token: 'a> ITokenKeyExprTreeNode<'a, Weight, Token> for T @@ -233,89 +506,9 @@ pub trait AsNodeMut: AsNode { } type Keys = core::iter::FilterMap Option>; -fn filter_map_weighted_node_to_key, I: AsNode, W>( +fn filter_map_weighted_node_to_key, I: AsNode, W>( item: I, ) -> Option { let node: &N = item.as_node(); node.weight().is_some().then(|| node.keyexpr()) } - -/// Extension methods for KeTrees -pub trait IKeyExprTreeExt<'a, Weight>: IKeyExprTree<'a, Weight> { - /// Returns a reference to the weight of the node at `key` - fn weight_at(&'a self, key: &keyexpr) -> Option<&'a Weight> { - self.node(key) - .and_then(>::weight) - } - /// Returns an iterator over the KEs contained in the tree that intersect with `key` - fn intersecting_keys( - &'a self, - key: &'a keyexpr, - ) -> Keys - where - Self::IntersectionItem: AsNode, - Self::Node: IKeyExprTreeNode, - { - self.intersecting_nodes(key) - .filter_map(filter_map_weighted_node_to_key) - } - /// Returns an iterator over the KEs contained in the tree that are included by `key` - fn included_keys(&'a self, key: &'a keyexpr) -> Keys - where - Self::InclusionItem: AsNode, - Self::Node: IKeyExprTreeNode, - { - self.included_nodes(key) - .filter_map(filter_map_weighted_node_to_key) - } - /// Iterates through weighted nodes, yielding their KE and Weight. - #[allow(clippy::type_complexity)] - fn key_value_pairs( - &'a self, - ) -> core::iter::FilterMap< - Self::TreeIter, - fn(Self::TreeIterItem) -> Option<(OwnedKeyExpr, &'a Weight)>, - > - where - Self::TreeIterItem: AsNode>, - { - self.tree_iter().filter_map(|node| { - unsafe { core::mem::transmute::<_, Option<&Weight>>(node.as_node().weight()) } - .map(|w| (node.as_node().keyexpr(), w)) - }) - } -} - -/// Extension methods for mutable KeTrees. -pub trait IKeyExprTreeExtMut<'a, Weight>: IKeyExprTreeMut<'a, Weight> { - /// Returns a mutable reference to the weight of the node at `key`. - fn weight_at_mut(&'a mut self, key: &keyexpr) -> Option<&'a mut Weight> { - self.node_mut(key) - .and_then(>::weight_mut) - } - /// Inserts a weight at `key`, returning the previous weight if it existed. - fn insert(&mut self, key: &keyexpr, weight: Weight) -> Option { - self.node_mut_or_create(key).insert_weight(weight) - } - /// Prunes empty nodes from the tree, unless they have at least one non-empty descendent. - fn prune(&mut self) { - self.prune_where(|node| node.weight().is_none()) - } -} - -impl<'a, Weight, T: IKeyExprTree<'a, Weight>> IKeyExprTreeExt<'a, Weight> for T {} -impl<'a, Weight, T: IKeyExprTreeMut<'a, Weight>> IKeyExprTreeExtMut<'a, Weight> for T {} - -pub trait ITokenKeyExprTreeExt<'a, Weight, Token>: ITokenKeyExprTree<'a, Weight, Token> { - fn insert(&'a self, token: &'a mut Token, at: &keyexpr, weight: Weight) -> Option { - self.node_or_create(token, at).insert_weight(weight) - } - - fn prune(&self, token: &mut Token) { - self.prune_where(token, |node| node.weight().is_none()) - } -} -impl<'a, Weight, Token, T: ITokenKeyExprTree<'a, Weight, Token>> - ITokenKeyExprTreeExt<'a, Weight, Token> for T -{ -} diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index a7a5480bec..a260f3f22b 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -48,31 +48,7 @@ pub fn get_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -149,33 +125,9 @@ pub fn get_local_addresses() -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut result = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; + let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); @@ -248,33 +200,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut addrs = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret); - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; + let mut addrs = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { if name == ffi::pstr_to_string(iface.AdapterName) @@ -311,31 +239,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -355,6 +259,57 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { } } +pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult> { + #[cfg(unix)] + { + if addr.is_unspecified() { + Ok(pnet_datalink::interfaces() + .iter() + .map(|iface| iface.name.clone()) + .collect::>()) + } else { + Ok(pnet_datalink::interfaces() + .iter() + .filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr)) + .map(|iface| iface.name.clone()) + .collect::>()) + } + } + #[cfg(windows)] + { + let mut result = vec![]; + unsafe { + use crate::ffi; + use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; + + if addr.is_unspecified() { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + result.push(ffi::pstr_to_string(iface.AdapterName)); + next_iface = iface.Next.as_ref(); + } + } else { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); + while let Some(ucast_addr) = next_ucast_addr { + if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { + if ifaddr.ip() == addr { + result.push(ffi::pstr_to_string(iface.AdapterName)); + } + } + next_ucast_addr = ucast_addr.Next.as_ref(); + } + next_iface = iface.Next.as_ref(); + } + } + } + Ok(result) + } +} + pub fn get_ipv4_ipaddrs() -> Vec { get_local_addresses() .unwrap_or_else(|_| vec![]) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 896aba9762..4833be3963 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -58,21 +58,6 @@ rand = { workspace = true, features = ["default"] } [build-dependencies] rustc_version = { workspace = true } -[package.metadata.deb] -name = "zenohd" -maintainer = "zenoh-dev@eclipse.org" -copyright = "2022 ZettaScale Technology" -section = "net" -license-file = ["../LICENSE", "0"] -depends = "$auto" -maintainer-scripts = "zenoh/.deb" -assets = [ - # binary - ["target/release/zenohd", "/usr/bin/", "755"], - # service - [".service/zenohd.service", "/lib/systemd/system/zenohd.service", "644"], -] - [[example]] name = "z_scout" path = "examples/z_scout.rs" diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index a3d3722fc5..d16f03049c 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -24,7 +24,7 @@ mod multicast; pub mod tls; mod unicast; -use alloc::{borrow::ToOwned, boxed::Box, string::String}; +use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; pub use multicast::*; @@ -44,6 +44,7 @@ pub struct Link { pub mtu: u16, pub is_reliable: bool, pub is_streamed: bool, + pub interfaces: Vec, } #[async_trait] @@ -71,6 +72,7 @@ impl From<&LinkUnicast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), + interfaces: link.get_interface_names(), } } } @@ -90,6 +92,7 @@ impl From<&LinkMulticast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: false, + interfaces: vec![], } } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 0eabb7437d..939f1ba5e1 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::{ fmt, @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; + fn get_interface_names(&self) -> Vec; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 31b32d37da..5dc85a1155 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -139,6 +139,13 @@ impl LinkUnicastTrait for LinkUnicastQuic { *QUIC_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastQuic is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 11ef5bebd6..a22d1e9b03 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -180,6 +180,13 @@ impl LinkUnicastTrait for LinkUnicastSerial { *SERIAL_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastSerial is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { false diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 269cf5a09f..d0ddda7e42 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -152,6 +152,28 @@ impl LinkUnicastTrait for LinkUnicastTcp { *TCP_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::trace!( + "get_interface_names for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); + interfaces + } + Err(e) => { + log::debug!( + "get_interface_names for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); + vec![] + } + } + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 291f8ba4f2..c332153c59 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -185,6 +185,13 @@ impl LinkUnicastTrait for LinkUnicastTls { *TLS_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastTls is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index f959936cf7..bd83995069 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -205,6 +205,28 @@ impl LinkUnicastTrait for LinkUnicastUdp { *UDP_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::trace!( + "get_interface_names for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); + interfaces + } + Err(e) => { + log::debug!( + "get_interface_names for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); + vec![] + } + } + } + #[inline(always)] fn is_reliable(&self) -> bool { false diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index fa470e7010..634bfff190 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -503,6 +503,13 @@ impl LinkUnicastTrait for UnicastPipe { LINUX_PIPE_MAX_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for UnicastPipe is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index eed62a38b6..6c3e95ae89 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -122,6 +122,13 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { *UNIXSOCKSTREAM_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastUnixSocketStream is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 44232ec346..6e1c676d74 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -203,6 +203,13 @@ impl LinkUnicastTrait for LinkUnicastWs { *WS_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastWs is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 84f592d899..115ed1e8d9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -30,10 +30,8 @@ use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig}; use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData}; use zenoh_keyexpr::key_expr::OwnedKeyExpr; use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider; -use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut; -use zenoh_keyexpr::keyexpr_tree::{ - support::NonWild, support::UnknownWildness, IKeyExprTreeExt, IKeyExprTreeExtMut, KeBoxTree, -}; +use zenoh_keyexpr::keyexpr_tree::{support::NonWild, support::UnknownWildness, KeBoxTree}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut}; use zenoh_result::bail; use zenoh_util::{zenoh_home, Timed, TimedEvent, Timer}; @@ -85,8 +83,8 @@ impl StorageService { strip_prefix: config.strip_prefix, storage: Mutex::new(store_intercept.storage), capability: store_intercept.capability, - tombstones: Arc::new(RwLock::new(KeBoxTree::new())), - wildcard_updates: Arc::new(RwLock::new(KeBoxTree::new())), + tombstones: Arc::new(RwLock::new(KeBoxTree::default())), + wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), in_interceptor: store_intercept.in_interceptor, out_interceptor: store_intercept.out_interceptor, replication, diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs index af4da5b9f2..e4266f3234 100644 --- a/zenoh-ext/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/z_pub_cache.rs @@ -22,7 +22,7 @@ async fn main() { // Initiate logging env_logger::init(); - let (config, key_expr, value, history, prefix) = parse_args(); + let (config, key_expr, value, history, prefix, complete) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -30,7 +30,8 @@ async fn main() { println!("Declaring PublicationCache on {}", &key_expr); let mut publication_cache_builder = session .declare_publication_cache(&key_expr) - .history(history); + .history(history) + .queryable_complete(complete); if let Some(prefix) = prefix { publication_cache_builder = publication_cache_builder.queryable_prefix(prefix); } @@ -44,7 +45,7 @@ async fn main() { } } -fn parse_args() -> (Config, String, String, usize, Option) { +fn parse_args() -> (Config, String, String, usize, Option, bool) { let args = Command::new("zenoh-ext pub cache example") .arg( arg!(-m --mode [MODE] "The zenoh session mode (peer by default)") @@ -58,11 +59,12 @@ fn parse_args() -> (Config, String, String, usize, Option) { ) .arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!")) .arg( - arg!(-h --history [SIZE] "The number of publications to keep in cache") + arg!(-i --history [SIZE] "The number of publications to keep in cache") .default_value("1"), ) .arg(arg!(-x --prefix [STRING] "An optional queryable prefix")) .arg(arg!(-c --config [FILE] "A configuration file.")) + .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) .get_matches(); @@ -100,6 +102,7 @@ fn parse_args() -> (Config, String, String, usize, Option) { let value = args.get_one::("value").unwrap().to_string(); let history: usize = args.get_one::("history").unwrap().parse().unwrap(); let prefix = args.get_one::("prefix").map(|s| (*s).to_owned()); + let complete = args.get_flag("complete"); - (config, key_expr, value, history, prefix) + (config, key_expr, value, history, prefix, complete) } diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index dca488ba80..7440d80a53 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder}; pub use querying_subscriber::{ FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder, }; -pub use session_ext::{ArcSessionExt, SessionExt}; +pub use session_ext::SessionExt; pub use subscriber_ext::SubscriberBuilderExt; pub use subscriber_ext::SubscriberForward; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 5d9cb3cee9..2efc09e99d 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -29,7 +29,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> { session: SessionRef<'a>, pub_key_expr: ZResult>, queryable_prefix: Option>>, - queryable_origin: Locality, + queryable_origin: Option, + complete: Option, history: usize, resources_limit: Option, } @@ -43,7 +44,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { session, pub_key_expr, queryable_prefix: None, - queryable_origin: Locality::default(), + queryable_origin: None, + complete: None, history: 1, resources_limit: None, } @@ -64,7 +66,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { #[zenoh_macros::unstable] #[inline] pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { - self.queryable_origin = origin; + self.queryable_origin = Some(origin); + self + } + + /// Set completeness option for the queryable. + pub fn queryable_complete(mut self, complete: bool) -> Self { + self.complete = Some(complete); self } @@ -134,28 +142,21 @@ impl<'a> PublicationCache<'a> { } // declare the local subscriber that will store the local publications - let (local_sub, queryable) = match conf.session.clone() { - SessionRef::Borrow(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - SessionRef::Shared(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - }; + let local_sub = conf + .session + .declare_subscriber(&key_expr) + .allowed_origin(Locality::SessionLocal) + .res_sync()?; + + // declare the queryable which returns the cached publications + let mut queryable = conf.session.declare_queryable(&queryable_key_expr); + if let Some(origin) = conf.queryable_origin { + queryable = queryable.allowed_origin(origin); + } + if let Some(complete) = conf.complete { + queryable = queryable.complete(complete); + } + let queryable = queryable.res_sync()?; // take local ownership of stuff to be moved into task let sub_recv = local_sub.receiver.clone(); diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index ed9b723d6b..978d348da1 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -681,33 +681,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> { // register fetch handler let handler = register_handler(state.clone(), callback.clone()); // declare subscriber - let subscriber = match conf.session.clone() { - SessionRef::Borrow(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, - SessionRef::Shared(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, + let subscriber = match conf.key_space.into() { + crate::KeySpace::User => conf + .session + .declare_subscriber(&key_expr) + .callback(sub_callback) + .reliability(conf.reliability) + .allowed_origin(conf.origin) + .res_sync()?, + crate::KeySpace::Liveliness => conf + .session + .liveliness() + .declare_subscriber(&key_expr) + .callback(sub_callback) + .res_sync()?, }; let fetch_subscriber = FetchingSubscriber { diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 3bd0f70183..73fbd7dfc4 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -18,68 +18,50 @@ use zenoh::prelude::KeyExpr; use zenoh::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) -pub trait SessionExt { - type PublicationCacheBuilder<'a, 'b, 'c> - where - Self: 'a; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, +pub trait SessionExt<'s, 'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &'s self, pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into; } -impl SessionExt for Session { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, +impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Borrow(self), - pub_key_expr.try_into().map_err(Into::into), - ) + PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into)) } } -impl SessionExt for T { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( +impl<'a> SessionExt<'a, 'a> for Session { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'a self, pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - ArcSessionExt::declare_publication_cache(self, pub_key_expr) + SessionRef::Borrow(self).declare_publication_cache(pub_key_expr) } } -pub trait ArcSessionExt { - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'static, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; -} - -impl ArcSessionExt for Arc { +impl<'s> SessionExt<'s, 'static> for Arc { /// Examples: /// ``` /// # #[tokio::main] /// # async fn main() { /// use zenoh::prelude::r#async::*; /// use zenoh::config::ModeDependentValue::Unique; - /// use zenoh_ext::ArcSessionExt; + /// use zenoh_ext::SessionExt; /// /// let mut config = config::default(); /// config.timestamping.set_enabled(Some(Unique(true))); @@ -91,16 +73,13 @@ impl ArcSessionExt for Arc { /// # } /// ``` fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'static, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Shared(self.clone()), - pub_key_expr.try_into().map_err(Into::into), - ) + SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr) } } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 274b600024..e239a316a1 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -28,6 +28,7 @@ use zenoh_config::unwrap_or_default; use zenoh_config::Config; use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId}; use zenoh_protocol::network::Mapping; +use zenoh_result::ZResult; // use zenoh_collections::Timer; use zenoh_sync::get_mut_unchecked; @@ -76,7 +77,12 @@ pub struct Tables { } impl Tables { - pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option>, config: &Config) -> Self { + pub fn new( + zid: ZenohId, + whatami: WhatAmI, + hlc: Option>, + config: &Config, + ) -> ZResult { let drop_future_timestamp = unwrap_or_default!(config.timestamping().drop_future_timestamp()); let router_peers_failover_brokering = @@ -84,7 +90,7 @@ impl Tables { // let queries_default_timeout = // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); - Tables { + Ok(Tables { zid, whatami, face_counter: 0, @@ -96,11 +102,11 @@ impl Tables { faces: HashMap::new(), mcast_groups: vec![], mcast_faces: vec![], - interceptors: interceptor_factories(config), + interceptors: interceptor_factories(config)?, pull_caches_lock: Mutex::new(()), hat: hat_code.new_tables(router_peers_failover_brokering), hat_code: hat_code.into(), - } + }) } #[doc(hidden)] diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 35a10557dc..662cf5b7bd 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -316,17 +316,26 @@ impl HatQueriesTrait for HatCode { let mres = mres.upgrade().unwrap(); let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - if let Some(qabl_info) = context.qabl.as_ref() { - route.push(QueryTargetQabl { - direction: (context.face.clone(), key_expr.to_owned(), NodeId::default()), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, - }); + if match tables.whatami { + WhatAmI::Router => context.face.whatami != WhatAmI::Router, + _ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client, + } { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + if let Some(qabl_info) = context.qabl.as_ref() { + route.push(QueryTargetQabl { + direction: ( + context.face.clone(), + key_expr.to_owned(), + NodeId::default(), + ), + complete: if complete { + qabl_info.complete as u64 + } else { + 0 + }, + distance: 0.5, + }); + } } } } diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 7503580405..0efe472fd3 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -20,6 +20,7 @@ use super::RoutingContext; use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; +use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; pub(crate) trait InterceptorTrait { @@ -44,11 +45,11 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(_config: &Config) -> Vec { +pub(crate) fn interceptor_factories(_config: &Config) -> ZResult> { // Add interceptors here // @TODO build the list of intercetors with the correct order from the config // vec![Box::new(LoggerInterceptor {})] - vec![] + Ok(vec![]) } pub(crate) struct InterceptorsChain { diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 26c9d36185..ba0249af1b 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -45,15 +45,20 @@ pub struct Router { } impl Router { - pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option>, config: &Config) -> Self { - Router { + pub fn new( + zid: ZenohId, + whatami: WhatAmI, + hlc: Option>, + config: &Config, + ) -> ZResult { + Ok(Router { // whatami, tables: Arc::new(TablesLock { - tables: RwLock::new(Tables::new(zid, whatami, hlc, config)), + tables: RwLock::new(Tables::new(zid, whatami, hlc, config)?), ctrl_lock: Mutex::new(hat::new_hat(whatami, config)), queries_lock: RwLock::new(()), }), - } + }) } #[allow(clippy::too_many_arguments)] diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index b06649717c..282c45f66c 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -94,7 +94,7 @@ impl Runtime { let hlc = (*unwrap_or_default!(config.timestamping().enabled().get(whatami))) .then(|| Arc::new(HLCBuilder::new().with_id(uhlc::ID::from(&zid)).build())); - let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)); + let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)?); let handler = Arc::new(RuntimeTransportEventHandler { runtime: std::sync::RwLock::new(None), diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 363803f682..ddcdc0084e 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -38,7 +38,8 @@ fn base_test() { WhatAmI::Client, Some(Arc::new(HLC::default())), &config, - ); + ) + .unwrap(); let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); @@ -133,7 +134,8 @@ fn match_test() { WhatAmI::Client, Some(Arc::new(HLC::default())), &config, - ); + ) + .unwrap(); let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); @@ -172,7 +174,8 @@ fn clean_test() { WhatAmI::Client, Some(Arc::new(HLC::default())), &config, - ); + ) + .unwrap(); let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); @@ -478,7 +481,8 @@ fn client_test() { WhatAmI::Client, Some(Arc::new(HLC::default())), &config, - ); + ) + .unwrap(); let tables = router.tables.clone(); let sub_info = SubscriberInfo { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 5f197e161a..f947fff400 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -283,6 +283,69 @@ pub enum SessionRef<'a> { Shared(Arc), } +impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + SubscriberBuilder { + session: self.clone(), + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + reliability: Reliability::default(), + mode: PushMode, + origin: Locality::default(), + handler: DefaultHandler, + } + } + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> QueryableBuilder<'a, 'b, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + QueryableBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + complete: false, + origin: Locality::default(), + handler: DefaultHandler, + } + } + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> PublisherBuilder<'a, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + PublisherBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + congestion_control: CongestionControl::default(), + priority: Priority::default(), + destination: Locality::default(), + } + } + #[zenoh_macros::unstable] + fn liveliness(&'s self) -> Liveliness<'a> { + Liveliness { + session: self.clone(), + } + } + fn info(&'s self) -> SessionInfo<'a> { + SessionInfo { + session: self.clone(), + } + } +} + impl Deref for SessionRef<'_> { type Target = Session; @@ -503,45 +566,13 @@ impl Session { pub fn config(&self) -> &Notifier { self.runtime.config() } +} - /// Get informations about the zenoh [`Session`](Session). - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let info = session.info(); - /// # } - /// ``` - pub fn info(&self) -> SessionInfo { - SessionInfo { - session: SessionRef::Borrow(self), - } +impl<'a> SessionDeclarations<'a, 'a> for Session { + fn info(&self) -> SessionInfo { + SessionRef::Borrow(self).info() } - - /// Create a [`Subscriber`](Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session.declare_subscriber("key/expression").res().await.unwrap(); - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// # } - /// ``` - pub fn declare_subscriber<'a, 'b, TryIntoKeyExpr>( + fn declare_subscriber<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> @@ -549,40 +580,9 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SubscriberBuilder { - session: SessionRef::Borrow(self), - key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - reliability: Reliability::default(), - mode: PushMode, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_subscriber(key_expr) } - - /// Create a [`Queryable`](Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let queryable = session.declare_queryable("key/expression").res().await.unwrap(); - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply(Ok(Sample::try_from( - /// "key/expression", - /// "value", - /// ).unwrap())).res().await.unwrap(); - /// } - /// # } - /// ``` - pub fn declare_queryable<'a, 'b, TryIntoKeyExpr>( + fn declare_queryable<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'a, 'b, DefaultHandler> @@ -590,36 +590,9 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - QueryableBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - complete: false, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_queryable(key_expr) } - - /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let publisher = session.declare_publisher("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// publisher.put("value").res().await.unwrap(); - /// # } - /// ``` - pub fn declare_publisher<'a, 'b, TryIntoKeyExpr>( + fn declare_publisher<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'a, 'b> @@ -627,15 +600,15 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - congestion_control: CongestionControl::default(), - priority: Priority::default(), - destination: Locality::default(), - } + SessionRef::Borrow(self).declare_publisher(key_expr) + } + #[zenoh_macros::unstable] + fn liveliness(&'a self) -> Liveliness { + SessionRef::Borrow(self).liveliness() } +} +impl Session { /// Informs Zenoh that you intend to use `key_expr` multiple times and that it should optimize its transmission. /// /// The returned `KeyExpr`'s internal structure may differ from what you would have obtained through a simple @@ -820,30 +793,6 @@ impl Session { handler: DefaultHandler, } } - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # } - /// ``` - #[zenoh_macros::unstable] - pub fn liveliness(&self) -> Liveliness { - Liveliness { - session: SessionRef::Borrow(self), - } - } } impl Session { @@ -1988,7 +1937,7 @@ impl Session { } } -impl SessionDeclarations for Arc { +impl<'s> SessionDeclarations<'s, 'static> for Arc { /// Create a [`Subscriber`](Subscriber) for the given key expression. /// /// # Arguments @@ -2014,7 +1963,7 @@ impl SessionDeclarations for Arc { /// # } /// ``` fn declare_subscriber<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'static, 'b, PushMode, DefaultHandler> where @@ -2060,7 +2009,7 @@ impl SessionDeclarations for Arc { /// # } /// ``` fn declare_queryable<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'static, 'b, DefaultHandler> where @@ -2097,7 +2046,7 @@ impl SessionDeclarations for Arc { /// # } /// ``` fn declare_publisher<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'static, 'b> where @@ -2131,11 +2080,17 @@ impl SessionDeclarations for Arc { /// # } /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static> { + fn liveliness(&'s self) -> Liveliness<'static> { Liveliness { session: SessionRef::Shared(self.clone()), } } + + fn info(&'s self) -> SessionInfo<'static> { + SessionInfo { + session: SessionRef::Shared(self.clone()), + } + } } impl Primitives for Session { @@ -2555,14 +2510,14 @@ impl fmt::Debug for Session { } } -/// Functions to create zenoh entities with `'static` lifetime. +/// Functions to create zenoh entities /// /// This trait contains functions to create zenoh entities like /// [`Subscriber`](crate::subscriber::Subscriber), and -/// [`Queryable`](crate::queryable::Queryable) with a `'static` lifetime. -/// This is useful to move zenoh entities to several threads and tasks. +/// [`Queryable`](crate::queryable::Queryable) /// -/// This trait is implemented for `Arc`. +/// This trait is implemented by [`Session`](crate::session::Session) itself and +/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](crate::session::Arc) /// /// # Examples /// ```no_run @@ -2582,7 +2537,7 @@ impl fmt::Debug for Session { /// }).await; /// # } /// ``` -pub trait SessionDeclarations { +pub trait SessionDeclarations<'s, 'a> { /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. /// /// # Arguments @@ -2607,13 +2562,13 @@ pub trait SessionDeclarations { /// }).await; /// # } /// ``` - fn declare_subscriber<'a, TryIntoKeyExpr>( - &self, + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'static, 'a, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. /// @@ -2643,13 +2598,13 @@ pub trait SessionDeclarations { /// }).await; /// # } /// ``` - fn declare_queryable<'a, TryIntoKeyExpr>( - &self, + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'static, 'a, DefaultHandler> + ) -> QueryableBuilder<'a, 'b, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. /// @@ -2671,13 +2626,13 @@ pub trait SessionDeclarations { /// publisher.put("value").res().await.unwrap(); /// # } /// ``` - fn declare_publisher<'a, TryIntoKeyExpr>( - &self, + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'static, 'a> + ) -> PublisherBuilder<'a, 'b> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. /// @@ -2697,7 +2652,19 @@ pub trait SessionDeclarations { /// # } /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static>; + fn liveliness(&'s self) -> Liveliness<'a>; + /// Get informations about the zenoh [`Session`](Session). + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let info = session.info(); + /// # }) + /// ``` + fn info(&'s self) -> SessionInfo<'a>; } impl crate::net::primitives::EPrimitives for Session { diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs new file mode 100644 index 0000000000..c325792c36 --- /dev/null +++ b/zenoh/tests/unicity.rs @@ -0,0 +1,274 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use zenoh::prelude::r#async::*; +use zenoh_core::ztimeout; +use tokio::runtime::Handle; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); + +const MSG_SIZE: [usize; 2] = [1_024, 100_000]; + + +async fn open_p2p_sessions() -> (Session, Session, Session) { + // Open the sessions + let mut config = config::peer(); + config.listen.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + println!("[ ][01a] Opening s01 session"); + let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + let mut config = config::peer(); + config.listen.endpoints = vec!["tcp/127.0.0.1:27448".parse().unwrap()]; + config.connect.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + println!("[ ][02a] Opening s02 session"); + let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + let mut config = config::peer(); + config.connect.endpoints = vec![ + "tcp/127.0.0.1:27447".parse().unwrap(), + "tcp/127.0.0.1:27448".parse().unwrap(), + ]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + println!("[ ][03a] Opening s03 session"); + let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + (s01, s02, s03) +} + +async fn open_router_session() -> Session { + // Open the sessions + let mut config = config::default(); + config.set_mode(Some(WhatAmI::Router)).unwrap(); + config.listen.endpoints = vec!["tcp/127.0.0.1:37447".parse().unwrap()]; + config.scouting.multicast.set_enabled(Some(false)).unwrap(); + println!("[ ][00a] Opening router session"); + ztimeout!(zenoh::open(config).res_async()).unwrap() +} + +async fn close_router_session(s: Session) { + println!("[ ][01d] Closing router session"); + ztimeout!(s.close().res_async()).unwrap(); +} + +async fn open_client_sessions() -> (Session, Session, Session) { + // Open the sessions + let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + println!("[ ][01a] Opening s01 session"); + let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + println!("[ ][02a] Opening s02 session"); + let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + println!("[ ][03a] Opening s03 session"); + let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + + (s01, s02, s03) +} + +async fn close_sessions(s01: Session, s02: Session, s03: Session) { + println!("[ ][01d] Closing s01 session"); + ztimeout!(s01.close().res_async()).unwrap(); + println!("[ ][02d] Closing s02 session"); + ztimeout!(s02.close().res_async()).unwrap(); + println!("[ ][03d] Closing s03 session"); + ztimeout!(s03.close().res_async()).unwrap(); +} + +async fn test_unicity_pubsub(s01: &Session, s02: &Session, s03: &Session) { + let key_expr = "test/unicity"; + let msg_count = 1; + let msgs1 = Arc::new(AtomicUsize::new(0)); + let msgs2 = Arc::new(AtomicUsize::new(0)); + + for size in MSG_SIZE { + msgs1.store(0, Ordering::Relaxed); + msgs2.store(0, Ordering::Relaxed); + + // Subscribe to data + println!("[PS][01b] Subscribing on s01 session"); + let c_msgs1 = msgs1.clone(); + let sub1 = ztimeout!(s01 + .declare_subscriber(key_expr) + .callback(move |sample| { + assert_eq!(sample.value.payload.len(), size); + c_msgs1.fetch_add(1, Ordering::Relaxed); + }) + .res_async()) + .unwrap(); + + // Subscribe to data + println!("[PS][02b] Subscribing on s02 session"); + let c_msgs2 = msgs2.clone(); + let sub2 = ztimeout!(s02 + .declare_subscriber(key_expr) + .callback(move |sample| { + assert_eq!(sample.value.payload.len(), size); + c_msgs2.fetch_add(1, Ordering::Relaxed); + }) + .res_async()) + .unwrap(); + + // Wait for the declaration to propagate + tokio::time::sleep(SLEEP).await; + + // Put data + println!("[PS][03b] Putting on s03 session. {msg_count} msgs of {size} bytes."); + for _ in 0..msg_count { + ztimeout!(s03 + .put(key_expr, vec![0u8; size]) + .congestion_control(CongestionControl::Block) + .res_async()) + .unwrap(); + } + + ztimeout!(async { + loop { + let cnt1 = msgs1.load(Ordering::Relaxed); + let cnt2 = msgs2.load(Ordering::Relaxed); + println!("[PS][01b] Received {cnt1}/{msg_count}."); + println!("[PS][02b] Received {cnt2}/{msg_count}."); + if cnt1 < msg_count || cnt2 < msg_count { + tokio::time::sleep(SLEEP).await; + } else { + break; + } + } + }); + + tokio::time::sleep(SLEEP).await; + + let cnt1 = msgs1.load(Ordering::Relaxed); + println!("[QR][01c] Got on s01 session. {cnt1}/{msg_count} msgs."); + assert_eq!(cnt1, msg_count); + let cnt2 = msgs1.load(Ordering::Relaxed); + println!("[QR][02c] Got on s02 session. {cnt2}/{msg_count} msgs."); + assert_eq!(cnt2, msg_count); + + println!("[PS][02b] Unsubscribing on s02 session"); + ztimeout!(sub2.undeclare().res_async()).unwrap(); + + println!("[PS][01b] Unsubscribing on s01 session"); + ztimeout!(sub1.undeclare().res_async()).unwrap(); + + // Wait for the declaration to propagate + tokio::time::sleep(SLEEP).await; + } +} + +async fn test_unicity_qryrep(s01: &Session, s02: &Session, s03: &Session) { + let key_expr = "test/unicity"; + let msg_count = 1; + let msgs1 = Arc::new(AtomicUsize::new(0)); + let msgs2 = Arc::new(AtomicUsize::new(0)); + + for size in MSG_SIZE { + msgs1.store(0, Ordering::Relaxed); + msgs2.store(0, Ordering::Relaxed); + + // Queryable to data + println!("[QR][01c] Queryable on s01 session"); + let c_msgs1 = msgs1.clone(); + let qbl1 = ztimeout!(s01 + .declare_queryable(key_expr) + .callback(move |sample| { + c_msgs1.fetch_add(1, Ordering::Relaxed); + let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() + }); + }); + }) + .res_async()) + .unwrap(); + + // Queryable to data + println!("[QR][02c] Queryable on s02 session"); + let c_msgs2 = msgs2.clone(); + let qbl2 = ztimeout!(s02 + .declare_queryable(key_expr) + .callback(move |sample| { + c_msgs2.fetch_add(1, Ordering::Relaxed); + let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + tokio::task::block_in_place(move || { + Handle::current().block_on(async move { + ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() + }); + }); + }) + .res_async()) + .unwrap(); + + // Wait for the declaration to propagate + tokio::time::sleep(SLEEP).await; + + // Get data + println!("[QR][03c] Getting on s03 session. {msg_count} msgs."); + let mut cnt = 0; + for _ in 0..msg_count { + let rs = ztimeout!(s03.get(key_expr).res_async()).unwrap(); + while let Ok(s) = ztimeout!(rs.recv_async()) { + assert_eq!(s.sample.unwrap().value.payload.len(), size); + cnt += 1; + } + } + let cnt1 = msgs1.load(Ordering::Relaxed); + println!("[QR][01c] Got on s01 session. {cnt1}/{msg_count} msgs."); + assert_eq!(cnt1, msg_count); + let cnt2 = msgs1.load(Ordering::Relaxed); + println!("[QR][02c] Got on s02 session. {cnt2}/{msg_count} msgs."); + assert_eq!(cnt2, msg_count); + println!("[QR][03c] Got on s03 session. {cnt}/{msg_count} msgs."); + assert_eq!(cnt, msg_count); + + println!("[PS][01c] Unqueryable on s01 session"); + ztimeout!(qbl1.undeclare().res_async()).unwrap(); + + println!("[PS][02c] Unqueryable on s02 session"); + ztimeout!(qbl2.undeclare().res_async()).unwrap(); + + // Wait for the declaration to propagate + tokio::time::sleep(SLEEP).await; + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn zenoh_unicity_p2p() { + let _ = env_logger::try_init(); + + let (s01, s02, s03) = open_p2p_sessions().await; + test_unicity_pubsub(&s01, &s02, &s03).await; + test_unicity_qryrep(&s01, &s02, &s03).await; + close_sessions(s01, s02, s03).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn zenoh_unicity_brokered() { + let _ = env_logger::try_init(); + let r = open_router_session().await; + + let (s01, s02, s03) = open_client_sessions().await; + test_unicity_pubsub(&s01, &s02, &s03).await; + test_unicity_qryrep(&s01, &s02, &s03).await; + close_sessions(s01, s02, s03).await; + + close_router_session(r).await; +}