From 7b6ca9cb4f285b428d15dc5b118e22cea034103f Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 5 Mar 2024 09:24:02 +0100 Subject: [PATCH 1/6] fix: Set `publish = false` for zenoh-backend-example (#787) --- plugins/zenoh-backend-example/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/zenoh-backend-example/Cargo.toml b/plugins/zenoh-backend-example/Cargo.toml index eac0e0d803..dc6c4ea1ea 100644 --- a/plugins/zenoh-backend-example/Cargo.toml +++ b/plugins/zenoh-backend-example/Cargo.toml @@ -17,6 +17,7 @@ name = "zenoh-backend-example" version = { workspace = true } authors = { workspace = true } edition = { workspace = true } +publish = false [features] default = ["no_mangle"] From dffa0a27efd0955ac26f11907803f518e4f54d7c Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 5 Mar 2024 09:24:17 +0100 Subject: [PATCH 2/6] fix: Set `fail-fast = false` in sync-lockfiles workflow (#780) --- .github/workflows/sync-lockfiles.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/sync-lockfiles.yml b/.github/workflows/sync-lockfiles.yml index 04bdfe7b3a..f8ac82c1f5 100644 --- a/.github/workflows/sync-lockfiles.yml +++ b/.github/workflows/sync-lockfiles.yml @@ -30,6 +30,7 @@ jobs: needs: fetch runs-on: ubuntu-latest strategy: + fail-fast: false matrix: dependant: - zenoh-c From 8730335dbacdb7f1d773a4797c32d28eeccbf268 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 5 Mar 2024 09:25:32 +0100 Subject: [PATCH 3/6] fix: Remove unused dependencies (#761) --- Cargo.lock | 100 ------------------ Cargo.toml | 1 - commons/zenoh-codec/Cargo.toml | 1 - commons/zenoh-macros/Cargo.toml | 2 - commons/zenoh-protocol/Cargo.toml | 4 - commons/zenoh-shm/Cargo.toml | 1 - commons/zenoh-sync/Cargo.toml | 1 - commons/zenoh-util/Cargo.toml | 6 -- io/zenoh-link-commons/Cargo.toml | 2 - io/zenoh-links/zenoh-link-serial/Cargo.toml | 1 - io/zenoh-transport/Cargo.toml | 1 - plugins/zenoh-backend-example/Cargo.toml | 2 - plugins/zenoh-plugin-example/Cargo.toml | 1 - .../zenoh-plugin-storage-manager/Cargo.toml | 1 - zenoh/Cargo.toml | 1 - zenohd/Cargo.toml | 1 - 16 files changed, 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa9de7e800..2e33b82584 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1451,12 +1451,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "glob" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" - [[package]] name = "gloo-timers" version = "0.2.6" @@ -1531,12 +1525,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "hkdf" version = "0.10.0" @@ -2287,12 +2275,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "panic-message" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" - [[package]] name = "parking" version = "2.1.0" @@ -2495,20 +2477,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "pnet" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130c5b738eeda2dc5796fe2671e49027e6935e817ab51b930a36ec9e6a206a64" -dependencies = [ - "ipnetwork", - "pnet_base", - "pnet_datalink", - "pnet_packet", - "pnet_sys", - "pnet_transport", -] - [[package]] name = "pnet_base" version = "0.34.0" @@ -2531,39 +2499,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pnet_macros" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688b17499eee04a0408aca0aa5cba5fc86401d7216de8a63fdf7a4c227871804" -dependencies = [ - "proc-macro2", - "quote", - "regex", - "syn 2.0.33", -] - -[[package]] -name = "pnet_macros_support" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea925b72f4bd37f8eab0f221bbe4c78b63498350c983ffa9dd4bcde7e030f56" -dependencies = [ - "pnet_base", -] - -[[package]] -name = "pnet_packet" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9a005825396b7fe7a38a8e288dbc342d5034dac80c15212436424fef8ea90ba" -dependencies = [ - "glob", - "pnet_base", - "pnet_macros", - "pnet_macros_support", -] - [[package]] name = "pnet_sys" version = "0.34.0" @@ -2574,18 +2509,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "pnet_transport" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2637e14d7de974ee2f74393afccbc8704f3e54e6eb31488715e72481d1662cc3" -dependencies = [ - "libc", - "pnet_base", - "pnet_packet", - "pnet_sys", -] - [[package]] name = "polling" version = "2.8.0" @@ -4575,7 +4498,6 @@ dependencies = [ "form_urlencoded", "futures", "git-version", - "hex", "lazy_static", "log", "ordered-float", @@ -4615,9 +4537,7 @@ version = "0.11.0-dev" dependencies = [ "async-std", "async-trait", - "clap", "const_format", - "env_logger", "futures", "git-version", "log", @@ -4648,7 +4568,6 @@ dependencies = [ "rand 0.8.5", "serde", "uhlc", - "uuid", "zenoh-buffers", "zenoh-protocol", "zenoh-shm", @@ -4780,9 +4699,7 @@ dependencies = [ "async-trait", "flume", "log", - "lz4_flex", "serde", - "typenum", "zenoh-buffers", "zenoh-codec", "zenoh-core", @@ -4829,7 +4746,6 @@ dependencies = [ "uuid", "z-serial", "zenoh-collections", - "zenoh-config", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", @@ -4958,9 +4874,7 @@ version = "0.11.0-dev" dependencies = [ "proc-macro2", "quote", - "rustc_version 0.4.0", "syn 2.0.33", - "unzip-n", "zenoh-keyexpr", ] @@ -4969,7 +4883,6 @@ name = "zenoh-plugin-example" version = "0.11.0-dev" dependencies = [ "async-std", - "clap", "const_format", "env_logger", "futures", @@ -5018,7 +4931,6 @@ dependencies = [ "async-global-executor", "async-std", "async-trait", - "clap", "const_format", "crc", "derive-new", @@ -5064,12 +4976,10 @@ name = "zenoh-protocol" version = "0.11.0-dev" dependencies = [ "const_format", - "hex", "lazy_static", "rand 0.8.5", "serde", "uhlc", - "uuid", "zenoh-buffers", "zenoh-keyexpr", "zenoh-result", @@ -5086,7 +4996,6 @@ dependencies = [ name = "zenoh-shm" version = "0.11.0-dev" dependencies = [ - "bincode", "log", "serde", "shared_memory", @@ -5100,7 +5009,6 @@ version = "0.11.0-dev" dependencies = [ "async-std", "event-listener 4.0.0", - "flume", "futures", "tokio", "zenoh-buffers", @@ -5120,7 +5028,6 @@ dependencies = [ "flume", "log", "lz4_flex", - "panic-message", "paste", "rand 0.8.5", "ringbuffer-spsc", @@ -5147,23 +5054,17 @@ version = "0.11.0-dev" dependencies = [ "async-std", "async-trait", - "clap", - "const_format", "flume", - "futures", - "hex", "home", "humantime", "lazy_static", "libc", "libloading", "log", - "pnet", "pnet_datalink", "shellexpand", "winapi", "zenoh-core", - "zenoh-protocol", "zenoh-result", ] @@ -5198,7 +5099,6 @@ dependencies = [ "rand 0.8.5", "rustc_version 0.4.0", "zenoh", - "zenoh_backend_traits", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9af42deeae..32662e5bf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,6 @@ tide = "0.16.0" token-cell = { version = "1.4.2", default-features = false } tokio = { version = "1.26.0", default-features = false } # Default features are disabled due to some crates' requirements tokio-tungstenite = "0.20" -typenum = "1.16.0" uhlc = { version = "0.6.0", default-features = false } # Default features are disabled due to usage in no_std crates unzip-n = "0.1.2" url = "2.3.1" diff --git a/commons/zenoh-codec/Cargo.toml b/commons/zenoh-codec/Cargo.toml index 3e103a7b1a..72f507a596 100644 --- a/commons/zenoh-codec/Cargo.toml +++ b/commons/zenoh-codec/Cargo.toml @@ -56,7 +56,6 @@ zenoh-shm = { workspace = true, optional = true } criterion = { workspace = true } env_logger = { workspace = true } rand = { workspace = true, features = ["default"] } -uuid = { workspace = true, features = ["default"] } zenoh-protocol = { workspace = true, features = ["test"] } [[bench]] diff --git a/commons/zenoh-macros/Cargo.toml b/commons/zenoh-macros/Cargo.toml index 245e52f237..7d06482e48 100644 --- a/commons/zenoh-macros/Cargo.toml +++ b/commons/zenoh-macros/Cargo.toml @@ -27,9 +27,7 @@ description = "Internal crate for zenoh." [dependencies] proc-macro2 = { workspace = true } quote = { workspace = true } -rustc_version = { workspace = true } syn = { workspace = true, features = ["full"] } -unzip-n = { workspace = true } zenoh-keyexpr = { workspace = true, features = ["std"] } [lib] diff --git a/commons/zenoh-protocol/Cargo.toml b/commons/zenoh-protocol/Cargo.toml index 07c11cb2fc..93c92ee33f 100644 --- a/commons/zenoh-protocol/Cargo.toml +++ b/commons/zenoh-protocol/Cargo.toml @@ -26,12 +26,10 @@ description = "Internal crate for zenoh." [features] default = ["std"] std = [ - "hex/std", "rand?/std", "rand?/std_rng", "serde/std", "uhlc/std", - "uuid/std", "zenoh-keyexpr/std", "zenoh-result/std", ] @@ -42,11 +40,9 @@ complete_n = [] [dependencies] const_format = { workspace = true } -hex = { workspace = true, features = ["alloc"] } rand = { workspace = true, features = ["alloc", "getrandom"], optional = true } serde = { workspace = true, features = ["alloc"] } uhlc = { workspace = true, default-features = false } -uuid = { workspace = true } # Needs a getrandom::getrandom() custom implementation on embedded (in root crate) zenoh-buffers = { workspace = true, default-features = false } zenoh-keyexpr = { workspace = true } zenoh-result = { workspace = true } diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index dd680fb2fc..ccf23a8911 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -29,7 +29,6 @@ description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bincode = { workspace = true } log = { workspace = true } serde = { workspace = true, features = ["default"] } shared_memory = { workspace = true } diff --git a/commons/zenoh-sync/Cargo.toml b/commons/zenoh-sync/Cargo.toml index 8a1ec9c90f..b660dd371a 100644 --- a/commons/zenoh-sync/Cargo.toml +++ b/commons/zenoh-sync/Cargo.toml @@ -31,7 +31,6 @@ description = "Internal crate for zenoh." [dependencies] async-std = { workspace = true, features = ["default", "unstable"] } event-listener = { workspace = true } -flume = { workspace = true } futures = { workspace = true } tokio = { workspace = true, features = ["default", "sync"] } zenoh-buffers = { workspace = true } diff --git a/commons/zenoh-util/Cargo.toml b/commons/zenoh-util/Cargo.toml index 803645fb8a..256f53d33a 100644 --- a/commons/zenoh-util/Cargo.toml +++ b/commons/zenoh-util/Cargo.toml @@ -38,11 +38,7 @@ default = ["std"] [dependencies] async-std = { workspace = true, features = ["default", "unstable"] } async-trait = { workspace = true } -clap = { workspace = true } -const_format = { workspace = true } flume = { workspace = true } -futures = { workspace = true } -hex = { workspace = true, features = ["default"] } home = { workspace = true } humantime = { workspace = true } lazy_static = { workspace = true } @@ -50,7 +46,6 @@ libloading = { workspace = true } log = { workspace = true } shellexpand = { workspace = true } zenoh-core = { workspace = true } -zenoh-protocol = { workspace = true, features = ["default"] } zenoh-result = { workspace = true, features = ["default"] } [target.'cfg(windows)'.dependencies] @@ -58,7 +53,6 @@ winapi = { workspace = true } [target.'cfg(unix)'.dependencies] libc = { workspace = true } -pnet = { workspace = true } pnet_datalink = { workspace = true } [dev-dependencies] diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 019067de56..29bb0eabfc 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -32,10 +32,8 @@ async-std = { workspace = true } zenoh-sync = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } -lz4_flex = { workspace = true } log = { workspace = true } serde = { workspace = true, features = ["default"] } -typenum = { workspace = true } zenoh-buffers = { workspace = true } zenoh-codec = { workspace = true } zenoh-core = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-serial/Cargo.toml b/io/zenoh-links/zenoh-link-serial/Cargo.toml index aedf3eb849..a48755e328 100644 --- a/io/zenoh-links/zenoh-link-serial/Cargo.toml +++ b/io/zenoh-links/zenoh-link-serial/Cargo.toml @@ -40,7 +40,6 @@ tokio = { workspace = true, features = ["io-std", "macros", "net", "rt-multi-thr uuid = { workspace = true, default-features = true } z-serial = { workspace = true } zenoh-collections = { workspace = true } -zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 0921f4e1ee..0d9d494606 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -77,5 +77,4 @@ zenoh-util = { workspace = true } [dev-dependencies] env_logger = { workspace = true } -panic-message = { workspace = true } zenoh-protocol = { workspace = true, features = ["test"] } diff --git a/plugins/zenoh-backend-example/Cargo.toml b/plugins/zenoh-backend-example/Cargo.toml index dc6c4ea1ea..f230cc73f1 100644 --- a/plugins/zenoh-backend-example/Cargo.toml +++ b/plugins/zenoh-backend-example/Cargo.toml @@ -30,9 +30,7 @@ crate-type = ["cdylib"] [dependencies] async-std = { workspace = true, features = ["default"] } -clap = { workspace = true } const_format = { workspace = true } -env_logger = { workspace = true } futures = { workspace = true } git-version = { workspace = true } log = { workspace = true } diff --git a/plugins/zenoh-plugin-example/Cargo.toml b/plugins/zenoh-plugin-example/Cargo.toml index 6d49826238..3f65cb9839 100644 --- a/plugins/zenoh-plugin-example/Cargo.toml +++ b/plugins/zenoh-plugin-example/Cargo.toml @@ -36,7 +36,6 @@ crate-type = ["cdylib"] [dependencies] async-std = { workspace = true, features = ["default"] } const_format = { workspace = true } -clap = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } git-version = { workspace = true } diff --git a/plugins/zenoh-plugin-storage-manager/Cargo.toml b/plugins/zenoh-plugin-storage-manager/Cargo.toml index 87215d45eb..afb8b066e8 100644 --- a/plugins/zenoh-plugin-storage-manager/Cargo.toml +++ b/plugins/zenoh-plugin-storage-manager/Cargo.toml @@ -34,7 +34,6 @@ crate-type = ["cdylib", "rlib"] [dependencies] async-std = { workspace = true, features = ["default"] } async-trait = { workspace = true } -clap = { workspace = true } crc = { workspace = true } const_format = { workspace = true } derive-new = { workspace = true } diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index fb929e9893..11ecfad1bf 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -74,7 +74,6 @@ flume = { workspace = true } form_urlencoded = { workspace = true } futures = { workspace = true } git-version = { workspace = true } -hex = { workspace = true, features = ["default"] } lazy_static = { workspace = true } log = { workspace = true } ordered-float = { workspace = true } diff --git a/zenohd/Cargo.toml b/zenohd/Cargo.toml index 39f17d5965..892d3df21c 100644 --- a/zenohd/Cargo.toml +++ b/zenohd/Cargo.toml @@ -39,7 +39,6 @@ json5 = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } zenoh = { workspace = true, features = ["unstable"] } -zenoh_backend_traits = { workspace = true } [dev-dependencies] rand = { workspace = true, features = ["default"] } From ac45542548547d3f86527565e0b0dc1cfac02195 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 5 Mar 2024 12:35:24 +0100 Subject: [PATCH 4/6] fix: Use `clippy` from Stable Rust (#766) * chore: Apply clippy lints from Rust 1.76 * fix: Use clippy from Stable Rust in CI and Release workflows * chore: Cargo fmt run * chore: Apply Clippy lints from Rust 1.76 (manual) --- .github/workflows/ci.yml | 6 ++-- .github/workflows/release.yml | 6 ++-- commons/zenoh-core/src/lib.rs | 2 +- .../zenoh-keyexpr/src/key_expr/format/mod.rs | 5 +-- commons/zenoh-keyexpr/src/key_expr/include.rs | 6 ++-- commons/zenoh-protocol/src/core/endpoint.rs | 12 +++---- commons/zenoh-protocol/src/core/resolution.rs | 8 ++--- commons/zenoh-protocol/src/core/whatami.rs | 6 ++-- .../zenoh-link-udp/src/multicast.rs | 4 +-- io/zenoh-transport/src/multicast/mod.rs | 19 +++++++---- io/zenoh-transport/src/multicast/transport.rs | 4 +-- plugins/zenoh-backend-traits/src/config.rs | 14 ++++---- .../src/replica/mod.rs | 2 +- .../net/routing/hat/linkstate_peer/network.rs | 32 +++++++++---------- zenoh/src/net/routing/hat/p2p_peer/gossip.rs | 30 ++++++++--------- zenoh/src/net/routing/hat/router/network.rs | 32 +++++++++---------- zenoh/src/session.rs | 9 ++---- zenoh/tests/matching.rs | 2 +- 18 files changed, 100 insertions(+), 99 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f87077b516..51efbf1fd8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,14 +47,14 @@ jobs: run: cargo fmt --check - name: Clippy - run: cargo clippy --all-targets -- --deny warnings + run: cargo +stable clippy --all-targets -- --deny warnings - name: Clippy unstable targets - run: cargo clippy --all-targets --features unstable -- --deny warnings + run: cargo +stable clippy --all-targets --features unstable -- --deny warnings - name: Clippy all features if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macOS-latest' }} - run: cargo clippy --all-targets --all-features -- --deny warnings + run: cargo +stable clippy --all-targets --all-features -- --deny warnings - name: Install generic no_std target # Generic no_std target architecture is x86_64-unknown-none diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d8bd218ae8..982a474956 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -72,13 +72,13 @@ jobs: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse - name: Clippy check - run: cargo clippy --all-targets --features=${{ github.event.inputs.features}} -- --deny warnings + run: cargo +stable clippy --all-targets --features=${{ github.event.inputs.features}} -- --deny warnings - name: Clippy unstable check - run: cargo clippy --all-targets -- --deny warnings + run: cargo +stable clippy --all-targets -- --deny warnings - name: Clippy all features - run: cargo clippy --all-targets --all-features -- --deny warnings + run: cargo +stable clippy --all-targets --all-features -- --deny warnings - name: Environment setup id: env diff --git a/commons/zenoh-core/src/lib.rs b/commons/zenoh-core/src/lib.rs index 1380680260..f3ba5fd499 100644 --- a/commons/zenoh-core/src/lib.rs +++ b/commons/zenoh-core/src/lib.rs @@ -19,7 +19,7 @@ //! [Click here for Zenoh's documentation](../zenoh/index.html) pub use lazy_static::lazy_static; pub mod macros; -pub use macros::*; + use std::future::{Future, Ready}; // Re-exports after moving ZError/ZResult to zenoh-result diff --git a/commons/zenoh-keyexpr/src/key_expr/format/mod.rs b/commons/zenoh-keyexpr/src/key_expr/format/mod.rs index 9a39fbeee1..3a03d8a515 100644 --- a/commons/zenoh-keyexpr/src/key_expr/format/mod.rs +++ b/commons/zenoh-keyexpr/src/key_expr/format/mod.rs @@ -509,7 +509,7 @@ impl<'s, Storage: IKeFormatStorage<'s>> KeFormatter<'s, Storage> { let pattern = segments[i].spec.pattern(); let start = self.buffer.len(); write!(&mut self.buffer, "{value}").unwrap(); // Writing on `&mut String` should be infallible. - match (|| { + let mut set_value = || { let end = self.buffer.len(); if start == end { if !pattern.is_double_wild() { @@ -529,7 +529,8 @@ impl<'s, Storage: IKeFormatStorage<'s>> KeFormatter<'s, Storage> { NonMaxU32::new(end.try_into().map_err(|_| ())?).ok_or(())?, )); Ok(()) - })() { + }; + match set_value() { Ok(()) => Ok(self), Err(()) => { self.buffer.truncate(start); diff --git a/commons/zenoh-keyexpr/src/key_expr/include.rs b/commons/zenoh-keyexpr/src/key_expr/include.rs index f58d5b6e0e..ca9efaee2d 100644 --- a/commons/zenoh-keyexpr/src/key_expr/include.rs +++ b/commons/zenoh-keyexpr/src/key_expr/include.rs @@ -35,7 +35,7 @@ pub struct LTRIncluder; impl Includer<&[u8], &[u8]> for LTRIncluder { fn includes(&self, mut left: &[u8], mut right: &[u8]) -> bool { loop { - let (lchunk, lrest) = left.split_once(&DELIMITER); + let (lchunk, lrest) = Split::split_once(left, &DELIMITER); let lempty = lrest.is_empty(); if lchunk == DOUBLE_WILD { if (lempty && !right.has_verbatim()) || (!lempty && self.includes(lrest, right)) { @@ -44,12 +44,12 @@ impl Includer<&[u8], &[u8]> for LTRIncluder { if unsafe { right.has_direct_verbatim_non_empty() } { return false; } - right = right.split_once(&DELIMITER).1; + right = Split::split_once(right, &DELIMITER).1; if right.is_empty() { return false; } } else { - let (rchunk, rrest) = right.split_once(&DELIMITER); + let (rchunk, rrest) = Split::split_once(right, &DELIMITER); if rchunk.is_empty() || rchunk == DOUBLE_WILD || !self.non_double_wild_chunk_includes(lchunk, rchunk) diff --git a/commons/zenoh-protocol/src/core/endpoint.rs b/commons/zenoh-protocol/src/core/endpoint.rs index e596b78bde..5e921345e4 100644 --- a/commons/zenoh-protocol/src/core/endpoint.rs +++ b/commons/zenoh-protocol/src/core/endpoint.rs @@ -85,7 +85,7 @@ impl Parameters { } } - pub fn iter(s: &str) -> impl Iterator + DoubleEndedIterator { + pub fn iter(s: &str) -> impl DoubleEndedIterator { s.split(LIST_SEPARATOR).filter_map(|prop| { if prop.is_empty() { None @@ -99,7 +99,7 @@ impl Parameters { Self::iter(s).find(|x| x.0 == k).map(|x| x.1) } - pub fn values<'s>(s: &'s str, k: &str) -> impl Iterator + DoubleEndedIterator { + pub fn values<'s>(s: &'s str, k: &str) -> impl DoubleEndedIterator { match Self::get(s, k) { Some(v) => v.split(VALUE_SEPARATOR), None => { @@ -277,7 +277,7 @@ impl<'a> Metadata<'a> { self.as_str().is_empty() } - pub fn iter(&'a self) -> impl Iterator + DoubleEndedIterator { + pub fn iter(&'a self) -> impl DoubleEndedIterator { Parameters::iter(self.0) } @@ -285,7 +285,7 @@ impl<'a> Metadata<'a> { Parameters::get(self.0, k) } - pub fn values(&'a self, k: &str) -> impl Iterator + DoubleEndedIterator { + pub fn values(&'a self, k: &str) -> impl DoubleEndedIterator { Parameters::values(self.0, k) } } @@ -394,7 +394,7 @@ impl<'a> Config<'a> { self.as_str().is_empty() } - pub fn iter(&'a self) -> impl Iterator + DoubleEndedIterator { + pub fn iter(&'a self) -> impl DoubleEndedIterator { Parameters::iter(self.0) } @@ -402,7 +402,7 @@ impl<'a> Config<'a> { Parameters::get(self.0, k) } - pub fn values(&'a self, k: &str) -> impl Iterator + DoubleEndedIterator { + pub fn values(&'a self, k: &str) -> impl DoubleEndedIterator { Parameters::values(self.0, k) } } diff --git a/commons/zenoh-protocol/src/core/resolution.rs b/commons/zenoh-protocol/src/core/resolution.rs index a174ecdc9d..093fd33bb4 100644 --- a/commons/zenoh-protocol/src/core/resolution.rs +++ b/commons/zenoh-protocol/src/core/resolution.rs @@ -27,10 +27,10 @@ pub enum Bits { } impl Bits { - const S8: &str = "8bit"; - const S16: &str = "16bit"; - const S32: &str = "32bit"; - const S64: &str = "64bit"; + const S8: &'static str = "8bit"; + const S16: &'static str = "16bit"; + const S32: &'static str = "32bit"; + const S64: &'static str = "64bit"; pub const fn bits(&self) -> u32 { match self { diff --git a/commons/zenoh-protocol/src/core/whatami.rs b/commons/zenoh-protocol/src/core/whatami.rs index faeb4712e0..6aacb0d356 100644 --- a/commons/zenoh-protocol/src/core/whatami.rs +++ b/commons/zenoh-protocol/src/core/whatami.rs @@ -25,9 +25,9 @@ pub enum WhatAmI { } impl WhatAmI { - const STR_R: &str = "router"; - const STR_P: &str = "peer"; - const STR_C: &str = "client"; + const STR_R: &'static str = "router"; + const STR_P: &'static str = "peer"; + const STR_C: &'static str = "client"; const U8_R: u8 = Self::Router as u8; const U8_P: u8 = Self::Peer as u8; diff --git a/io/zenoh-links/zenoh-link-udp/src/multicast.rs b/io/zenoh-links/zenoh-link-udp/src/multicast.rs index 838bb8acd5..497120ed0d 100644 --- a/io/zenoh-links/zenoh-link-udp/src/multicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/multicast.rs @@ -174,7 +174,7 @@ impl LinkManagerMulticastUdp { }) .take(1) .collect::>() - .get(0) + .first() .copied(), }; } @@ -194,7 +194,7 @@ impl LinkManagerMulticastUdp { }) .take(1) .collect::>() - .get(0) + .first() .copied(); match iface { diff --git a/io/zenoh-transport/src/multicast/mod.rs b/io/zenoh-transport/src/multicast/mod.rs index 3ce0856df3..daf9b069ff 100644 --- a/io/zenoh-transport/src/multicast/mod.rs +++ b/io/zenoh-transport/src/multicast/mod.rs @@ -28,7 +28,7 @@ pub use manager::{ TransportManagerParamsMulticast, }; use std::{ - fmt, + fmt::{self, Write}, sync::{Arc, Weak}, }; use transport::TransportMulticastInner; @@ -147,12 +147,17 @@ impl fmt::Debug for TransportMulticast { match self.get_transport() { Ok(transport) => { let is_shm = zcondfeat!("shared-memory", transport.is_shm(), false); - let peers: String = zread!(transport.peers) - .iter() - .map(|(l, p)| { - format!("(locator: {}, zid: {}, whatami: {})", l, p.zid, p.whatami) - }) - .collect(); + let peers: String = + zread!(transport.peers) + .iter() + .fold(String::new(), |mut output, (l, p)| { + let _ = write!( + output, + "(locator: {}, zid: {}, whatami: {})", + l, p.zid, p.whatami + ); + output + }); f.debug_struct("Transport Multicast") .field("sn_resolution", &transport.get_sn_resolution()) diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index d5a1da14d4..b8aa41b253 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -115,7 +115,7 @@ impl TransportMulticastInner { ) -> ZResult { let mut priority_tx = vec![]; if (config.initial_sns.len() != 1) != (config.initial_sns.len() != Priority::NUM) { - for (_, sn) in config.initial_sns.iter().enumerate() { + for sn in config.initial_sns.iter() { let tct = TransportPriorityTx::make(config.sn_resolution)?; tct.sync(*sn)?; priority_tx.push(tct); @@ -359,7 +359,7 @@ impl TransportMulticastInner { .into_boxed_slice(); let mut priority_rx = Vec::with_capacity(next_sns.len()); - for (_, sn) in next_sns.iter().enumerate() { + for sn in next_sns.iter() { let tprx = TransportPriorityRx::make( join.resolution.get(Field::FrameSN), self.manager.config.defrag_buff_size, diff --git a/plugins/zenoh-backend-traits/src/config.rs b/plugins/zenoh-backend-traits/src/config.rs index dbcfa420b3..d3ddbd43cc 100644 --- a/plugins/zenoh-backend-traits/src/config.rs +++ b/plugins/zenoh-backend-traits/src/config.rs @@ -200,11 +200,11 @@ impl + AsRef, V: AsObject> TryFrom<(S, &V)> for PluginConfi storages, rest: value .into_iter() - .filter_map(|(k, v)| { - (!["__required__", "backend_search_dirs", "volumes", "storages"] - .contains(&k.as_str())) - .then(|| (k.clone(), v.clone())) + .filter(|&(k, _v)| { + !["__required__", "backend_search_dirs", "volumes", "storages"] + .contains(&k.as_str()) }) + .map(|(k, v)| (k.clone(), v.clone())) .collect(), }) } @@ -313,10 +313,8 @@ impl VolumeConfig { required, rest: config .iter() - .filter_map(|(k, v)| { - (!["__path__", "__required__"].contains(&k.as_str())) - .then(|| (k.clone(), v.clone())) - }) + .filter(|&(k, _v)| (!["__path__", "__required__"].contains(&k.as_str()))) + .map(|(k, v)| (k.clone(), v.clone())) .collect(), }) } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 1dc9df9262..b743a70451 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -40,7 +40,7 @@ pub mod storage; pub use align_queryable::AlignQueryable; pub use aligner::Aligner; pub use digest::{Digest, DigestConfig, EraType, LogEntry}; -pub use snapshotter::{ReplicationInfo, Snapshotter}; +pub use snapshotter::Snapshotter; pub use storage::{ReplicationService, StorageService}; const ERA: &str = "era"; diff --git a/zenoh/src/net/routing/hat/linkstate_peer/network.rs b/zenoh/src/net/routing/hat/linkstate_peer/network.rs index ac610a808b..ecd535eb86 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/network.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/network.rs @@ -773,26 +773,26 @@ impl Network { let idxs = self .graph .node_indices() - .filter_map(|idx| { - (self.full_linkstate + .filter(|&idx| { + self.full_linkstate || self.gossip_multihop || self.links.values().any(|link| link.zid == zid) || (self.router_peers_failover_brokering && idx == self.idx - && whatami == WhatAmI::Router)) - .then(|| { - ( - idx, - Details { - zid: true, - locators: self.propagate_locators(idx), - links: self.full_linkstate - || (self.router_peers_failover_brokering - && idx == self.idx - && whatami == WhatAmI::Router), - }, - ) - }) + && whatami == WhatAmI::Router) + }) + .map(|idx| { + ( + idx, + Details { + zid: true, + locators: self.propagate_locators(idx), + links: self.full_linkstate + || (self.router_peers_failover_brokering + && idx == self.idx + && whatami == WhatAmI::Router), + }, + ) }) .collect(); self.send_on_link(idxs, &transport); diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index ae3fda51a7..c413107f85 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -509,24 +509,24 @@ impl Network { let idxs = self .graph .node_indices() - .filter_map(|idx| { - (self.gossip_multihop + .filter(|&idx| { + self.gossip_multihop || self.links.values().any(|link| link.zid == zid) || (self.router_peers_failover_brokering && idx == self.idx - && whatami == WhatAmI::Router)) - .then(|| { - ( - idx, - Details { - zid: true, - locators: self.propagate_locators(idx), - links: (self.router_peers_failover_brokering - && idx == self.idx - && whatami == WhatAmI::Router), - }, - ) - }) + && whatami == WhatAmI::Router) + }) + .map(|idx| { + ( + idx, + Details { + zid: true, + locators: self.propagate_locators(idx), + links: (self.router_peers_failover_brokering + && idx == self.idx + && whatami == WhatAmI::Router), + }, + ) }) .collect(); self.send_on_link(idxs, &transport); diff --git a/zenoh/src/net/routing/hat/router/network.rs b/zenoh/src/net/routing/hat/router/network.rs index ccc8a55850..aa1209b7ed 100644 --- a/zenoh/src/net/routing/hat/router/network.rs +++ b/zenoh/src/net/routing/hat/router/network.rs @@ -778,26 +778,26 @@ impl Network { let idxs = self .graph .node_indices() - .filter_map(|idx| { - (self.full_linkstate + .filter(|&idx| { + self.full_linkstate || self.gossip_multihop || self.links.values().any(|link| link.zid == zid) || (self.router_peers_failover_brokering && idx == self.idx - && whatami == WhatAmI::Router)) - .then(|| { - ( - idx, - Details { - zid: true, - locators: self.propagate_locators(idx), - links: self.full_linkstate - || (self.router_peers_failover_brokering - && idx == self.idx - && whatami == WhatAmI::Router), - }, - ) - }) + && whatami == WhatAmI::Router) + }) + .map(|idx| { + ( + idx, + Details { + zid: true, + locators: self.propagate_locators(idx), + links: self.full_linkstate + || (self.router_peers_failover_brokering + && idx == self.idx + && whatami == WhatAmI::Router), + }, + ) }) .collect(); self.send_on_link(idxs, &transport); diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 374320bde9..7900a3add8 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -799,7 +799,7 @@ impl Session { } #[allow(clippy::new_ret_no_self)] - pub(super) fn new(config: Config) -> impl Resolve> + Send { + pub(super) fn new(config: Config) -> impl Resolve> { ResolveFuture::new(async move { log::debug!("Config: {:?}", &config); let aggregated_subscribers = config.aggregation().subscribers().clone(); @@ -827,10 +827,7 @@ impl Session { }) } - pub(crate) fn declare_prefix<'a>( - &'a self, - prefix: &'a str, - ) -> impl Resolve + Send + 'a { + pub(crate) fn declare_prefix<'a>(&'a self, prefix: &'a str) -> impl Resolve + 'a { ResolveClosure::new(move || { trace!("declare_prefix({:?})", prefix); let mut state = zwrite!(self.state); @@ -888,7 +885,7 @@ impl Session { pub(crate) fn declare_publication_intent<'a>( &'a self, _key_expr: KeyExpr<'a>, - ) -> impl Resolve> + Send + 'a { + ) -> impl Resolve> + 'a { ResolveClosure::new(move || { // log::trace!("declare_publication({:?})", key_expr); // let mut state = zwrite!(self.state); diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 92c4deba57..f36bf5481b 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -35,7 +35,7 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .listen - .set_endpoints(vec![locator.clone().parse().unwrap()]) + .set_endpoints(vec![locator.parse().unwrap()]) .unwrap(); config }; From c80124af25aa5c8712e7d9b7ef0392c6247207d3 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 5 Mar 2024 14:27:04 +0100 Subject: [PATCH 5/6] Implement get_interface_names for serial link (#772) --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 15 ++++++++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e33b82584..5ab14fff53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4472,9 +4472,9 @@ dependencies = [ [[package]] name = "z-serial" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a74ab200b318928231fa62809ca06397f2790d29ffb58d9cbbc7d517e93a6b17" +checksum = "f113597c6b880587004169f14bc010e4b440981ab2ad669779d3654f9b1c4af1" dependencies = [ "cobs", "futures", diff --git a/Cargo.toml b/Cargo.toml index 32662e5bf7..214f33145f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,7 +159,7 @@ validated_struct = "2.1.0" vec_map = "0.8.2" webpki-roots = "0.26.0" winapi = { version = "0.3.9", features = ["iphlpapi"] } -z-serial = "0.2.1" +z-serial = "0.2.3" zenoh-ext = { version = "0.11.0-dev", path = "zenoh-ext" } zenoh-shm = { version = "0.11.0-dev", path = "commons/zenoh-shm" } zenoh-result = { version = "0.11.0-dev", path = "commons/zenoh-result", default-features = false } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 51d734f91b..fafac4c393 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -183,9 +183,18 @@ impl LinkUnicastTrait for LinkUnicastSerial { #[inline(always)] fn get_interface_names(&self) -> Vec { - // @TODO: Not supported for now - log::debug!("The get_interface_names for LinkUnicastSerial is not supported"); - vec![] + // For POSIX systems, the interface name refers to the file name without the path + // e.g. for serial port "/dev/ttyUSB0" interface name will be "ttyUSB0" + match z_serial::get_available_port_names() { + Ok(interfaces) => { + log::trace!("get_interface_names for serial: {:?}", interfaces); + interfaces + } + Err(e) => { + log::debug!("get_interface_names for serial failed: {:?}", e); + vec![] + } + } } #[inline(always)] From f2e99b6fe07bf7899a458e0a16bfe85e3cdebca6 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 5 Mar 2024 17:37:30 +0100 Subject: [PATCH 6/6] Update downsampling interceptor (#788) * Implement caching for downsampling interceptor * Rename rate to freq * Improve test stability --- DEFAULT_CONFIG.json5 | 4 +- commons/zenoh-config/src/lib.rs | 2 +- .../net/routing/interceptor/downsampling.rs | 66 ++++++++++++------- zenoh/tests/interceptors.rs | 23 ++++--- 4 files changed, 62 insertions(+), 33 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 1ced78b6d6..66352fe141 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -125,9 +125,9 @@ // interfaces: [ "wlan0" ], // /// Data flow messages will be processed on. ("egress" or "ingress") // flow: "egress", - // /// A list of downsampling rules: key_expression and the rate (maximum frequency in Hertz) + // /// A list of downsampling rules: key_expression and the maximum frequency in Hertz // rules: [ - // { key_expr: "demo/example/zenoh-rs-pub", rate: 0.1 }, + // { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 }, // ], // }, // ], diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 21a0309c16..9346e9825e 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -83,7 +83,7 @@ pub struct DownsamplingRuleConf { /// Downsampling will be applied for all key extensions if the parameter is None pub key_expr: OwnedKeyExpr, /// The maximum frequency in Hertz; - pub rate: f64, + pub freq: f64, } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 5435c30bed..467ccd6e1e 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -19,12 +19,13 @@ //! [Click here for Zenoh's documentation](../zenoh/index.html) use crate::net::routing::interceptor::*; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; use zenoh_config::{DownsamplingFlow, DownsamplingItemConf, DownsamplingRuleConf}; use zenoh_core::zlock; use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider; -use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut; use zenoh_keyexpr::keyexpr_tree::{support::UnknownWildness, KeBoxTree}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut}; use zenoh_protocol::network::NetworkBody; use zenoh_result::ZResult; @@ -82,12 +83,16 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { match self.flow { DownsamplingFlow::Ingress => ( - Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))), + Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.rules.clone(), + )))), None, ), DownsamplingFlow::Egress => ( None, - Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))), + Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.rules.clone(), + )))), ), } } @@ -110,31 +115,45 @@ struct Timestate { } pub(crate) struct DownsamplingInterceptor { - ke_state: Arc>>, + ke_id: Arc>>, + ke_state: Arc>>, } impl InterceptorTrait for DownsamplingInterceptor { - fn compute_keyexpr_cache(&self, _key_expr: &KeyExpr<'_>) -> Option> { - None + fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { + let ke_id = zlock!(self.ke_id); + if let Some(id) = ke_id.weight_at(&key_expr.clone()) { + Some(Box::new(Some(*id))) + } else { + Some(Box::new(None::)) + } } fn intercept( &self, ctx: RoutingContext, - _cache: Option<&Box>, + cache: Option<&Box>, ) -> Option> { if matches!(ctx.msg.body, NetworkBody::Push(_)) { - if let Some(key_expr) = ctx.full_key_expr() { - let mut ke_state = zlock!(self.ke_state); - if let Some(state) = ke_state.weight_at_mut(&key_expr.clone()) { - let timestamp = std::time::Instant::now(); - - if timestamp - state.latest_message_timestamp >= state.threshold { - state.latest_message_timestamp = timestamp; - return Some(ctx); - } else { - return None; + if let Some(cache) = cache { + if let Some(id) = cache.downcast_ref::>() { + if let Some(id) = id { + let mut ke_state = zlock!(self.ke_state); + if let Some(state) = ke_state.get_mut(id) { + let timestamp = std::time::Instant::now(); + + if timestamp - state.latest_message_timestamp >= state.threshold { + state.latest_message_timestamp = timestamp; + return Some(ctx); + } else { + return None; + } + } else { + log::debug!("unxpected cache ID {}", id); + } } + } else { + log::debug!("unxpected cache type {:?}", ctx.full_expr()); } } } @@ -147,17 +166,19 @@ const NANOS_PER_SEC: f64 = 1_000_000_000.0; impl DownsamplingInterceptor { pub fn new(rules: Vec) -> Self { - let mut ke_state = KeBoxTree::default(); - for rule in rules { + let mut ke_id = KeBoxTree::default(); + let mut ke_state = HashMap::default(); + for (id, rule) in rules.into_iter().enumerate() { let mut threshold = std::time::Duration::MAX; let mut latest_message_timestamp = std::time::Instant::now(); - if rule.rate != 0.0 { + if rule.freq != 0.0 { threshold = - std::time::Duration::from_nanos((1. / rule.rate * NANOS_PER_SEC) as u64); + std::time::Duration::from_nanos((1. / rule.freq * NANOS_PER_SEC) as u64); latest_message_timestamp -= threshold; } + ke_id.insert(&rule.key_expr, id); ke_state.insert( - &rule.key_expr, + id, Timestate { threshold, latest_message_timestamp, @@ -165,6 +186,7 @@ impl DownsamplingInterceptor { ); } Self { + ke_id: Arc::new(Mutex::new(ke_id)), ke_state: Arc::new(Mutex::new(ke_state)), } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index cde287c1dc..2a5c30e7b8 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -33,6 +33,10 @@ impl IntervalCounter { self.total_time.as_millis() as u32 / self.count } + fn get_count(&self) -> u32 { + self.count + } + fn check_middle(&self, ms: u32) { let middle = self.get_middle(); println!("Interval {}, count: {}, middle: {}", ms, self.count, middle); @@ -51,8 +55,8 @@ fn downsampling_by_keyexpr_impl(egress: bool) { {{ flow: "{}", rules: [ - {{ key_expr: "test/downsamples_by_keyexp/r100", rate: 10, }}, - {{ key_expr: "test/downsamples_by_keyexp/r50", rate: 20, }} + {{ key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }}, + {{ key_expr: "test/downsamples_by_keyexp/r50", freq: 20, }} ], }}, ] "#, @@ -120,7 +124,10 @@ fn downsampling_by_keyexpr_impl(egress: bool) { } for _ in 0..100 { - if *zlock!(total_count) >= messages_count { + if *zlock!(total_count) >= messages_count + && zlock!(counter_r50_clone).get_count() > 0 + && zlock!(counter_r100_clone).get_count() > 0 + { break; } std::thread::sleep(std::time::Duration::from_millis(100)); @@ -150,14 +157,14 @@ fn downsampling_by_interface_impl(egress: bool) { interfaces: ["lo", "lo0"], flow: "{0}", rules: [ - {{ key_expr: "test/downsamples_by_interface/r100", rate: 10, }}, + {{ key_expr: "test/downsamples_by_interface/r100", freq: 10, }}, ], }}, {{ interfaces: ["some_unknown_interface"], flow: "{0}", rules: [ - {{ key_expr: "test/downsamples_by_interface/all", rate: 10, }}, + {{ key_expr: "test/downsamples_by_interface/all", freq: 10, }}, ], }}, ] "#, @@ -220,7 +227,7 @@ fn downsampling_by_interface_impl(egress: bool) { } for _ in 0..100 { - if *zlock!(total_count) >= messages_count { + if *zlock!(total_count) >= messages_count && zlock!(counter_r100_clone).get_count() > 0 { break; } std::thread::sleep(std::time::Duration::from_millis(100)); @@ -253,8 +260,8 @@ fn downsampling_config_error_wrong_strategy() { { flow: "down", rules: [ - { keyexpr: "test/downsamples_by_keyexp/r100", rate: 10, }, - { keyexpr: "test/downsamples_by_keyexp/r50", rate: 20, } + { keyexpr: "test/downsamples_by_keyexp/r100", freq: 10, }, + { keyexpr: "test/downsamples_by_keyexp/r50", freq: 20, } ], }, ]