diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml deleted file mode 100644 index 30b465b04c..0000000000 --- a/.github/workflows/publish-docker.yml +++ /dev/null @@ -1,88 +0,0 @@ -# -# Copyright (c) 2023 ZettaScale Technology -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License 2.0 which is available at -# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -# which is available at https://www.apache.org/licenses/LICENSE-2.0. -# -# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -# -# Contributors: -# ZettaScale Zenoh Team, -# -name: Publish (Docker) - -on: - workflow_dispatch: - inputs: - live-run: - type: boolean - required: true - version: - type: string - required: false - workflow_call: - inputs: - live-run: - type: boolean - required: true - version: - type: string - required: true - -jobs: - main: - name: Docker build and push - runs-on: ubuntu-latest - steps: - - name: Checkout this repository - uses: actions/checkout@v4 - with: - ref: ${{ inputs.version }} - - - name: Download packages from previous job - uses: actions/download-artifact@v3 - with: - path: build - - - name: Unzip artifacts - run: | - ls build - - mkdir -p docker/linux/amd - unzip build/zenoh-${{ inputs.version }}-x86_64-unknown-linux-musl-artifacts.zip -d docker/linux/amd64/ - rm docker/linux/amd64/libzenoh_plugin_example.so - - mkdir -p docker/linux/arm64 - unzip build/zenoh-${{ inputs.version }}-aarch64-unknown-linux-musl-artifacts.zip -d docker/linux/arm64/ - rm docker/linux/arm64/libzenoh_plugin_example.so - - tree docker - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Docker meta - set tags and labels - id: meta - uses: docker/metadata-action@v5 - with: - images: eclipse/zenoh - labels: | - org.opencontainers.image.licenses=EPL-2.0 OR Apache-2.0 - - - name: Login to DockerHub - uses: docker/login-action@v3 - with: - username: ${{ secrets.DOCKER_COM_USERNAME }} - password: ${{ secrets.DOCKER_COM_PASSWORD }} - - - name: Build and push - uses: docker/build-push-action@v5 - with: - context: . - push: ${{ inputs.live-run }} - platforms: linux/amd64, linux/arm64 - file: .github/workflows/Dockerfile - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e31f324eb9..fe050776ec 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,6 +22,7 @@ on: type: boolean description: If false (or undefined) the workflow runs in dry-run mode (i.e. with no side-effects) required: false + default: false version: type: string description: Release number. If undefined, the workflow auto-generates a version using git-describe @@ -33,7 +34,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/tag-crates.yml@main with: repo: ${{ github.repository }} - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ inputs.version }} inter-deps-pattern: zenoh.* secrets: inherit @@ -68,7 +69,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-cargo.yml@main with: repos: ${{ github.repository }} - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} branch: ${{ needs.tag.outputs.branch }} inter-deps-pattern: zenoh.* secrets: inherit @@ -79,7 +80,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-debian.yml@main with: no-build: true - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} repo: ${{ github.repository }} branch: ${{ needs.tag.outputs.branch }} @@ -92,7 +93,7 @@ jobs: with: no-build: true repo: ${{ github.repository }} - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} branch: ${{ needs.tag.outputs.branch }} artifact-patterns: | @@ -112,7 +113,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-eclipse.yml@main with: no-build: true - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} repo: ${{ github.repository }} branch: ${{ needs.tag.outputs.branch }} @@ -129,7 +130,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-github.yml@main with: no-build: true - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} repo: ${{ github.repository }} branch: ${{ needs.tag.outputs.branch }} @@ -145,7 +146,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-dockerhub.yml@main with: no-build: true - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} repo: ${{ github.repository }} tags: "eclipse/zenoh:${{ needs.tag.outputs.version }}" @@ -165,7 +166,7 @@ jobs: uses: eclipse-zenoh/ci/.github/workflows/release-crates-ghcr.yml@main with: no-build: true - live-run: ${{ inputs.live-run }} + live-run: ${{ inputs.live-run || false }} version: ${{ needs.tag.outputs.version }} repo: ${{ github.repository }} tags: "${{ github.repository }}:${{ needs.tag.outputs.version }}" diff --git a/Cargo.lock b/Cargo.lock index ba84d51ee8..4ad3f6f7e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,15 +1091,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -[[package]] -name = "encoding_rs" -version = "0.8.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "env_filter" version = "0.1.0" @@ -1463,25 +1454,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "h2" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.9", - "indexmap 1.9.3", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "half" version = "1.8.2" @@ -1567,17 +1539,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "http" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.0.0" @@ -1589,17 +1550,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" -dependencies = [ - "bytes", - "http 0.2.9", - "pin-project-lite 0.2.13", -] - [[package]] name = "http-client" version = "6.5.3" @@ -1646,30 +1596,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "hyper" -version = "0.14.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2", - "http 0.2.9", - "http-body", - "httparse", - "httpdate", - "itoa", - "pin-project-lite 0.2.13", - "socket2 0.4.9", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "iana-time-zone" version = "0.1.57" @@ -2007,9 +1933,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "log", @@ -2074,6 +2000,7 @@ dependencies = [ "bitflags 2.4.0", "cfg-if 1.0.0", "libc", + "memoffset 0.9.0", ] [[package]] @@ -2752,40 +2679,6 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" -[[package]] -name = "reqwest" -version = "0.11.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" -dependencies = [ - "base64 0.21.4", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http 0.2.9", - "http-body", - "hyper", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite 0.2.13", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "winreg", -] - [[package]] name = "ring" version = "0.16.20" @@ -3816,14 +3709,20 @@ dependencies = [ "hashbrown 0.14.0", "pin-project-lite 0.2.13", "tokio", - "tracing", ] [[package]] -name = "tower-service" -version = "0.3.2" +name = "tokio-vsock" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "2e336ac4b36df625d5429a735dd5847732fe5f62010e3ce0c50f3705d44730f8" +dependencies = [ + "bytes", + "futures", + "libc", + "tokio", + "vsock", +] [[package]] name = "tracing" @@ -3867,7 +3766,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.0.0", + "http", "httparse", "log", "rand 0.8.5", @@ -3901,9 +3800,9 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "uhlc" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1eadef1fa26cbbae1276c46781e8f4d888bdda434779c18ae6c2a0e69991885" +checksum = "99b6df3f3e948b40e20c38a6d1fd6d8f91b3573922fc164e068ad3331560487e" dependencies = [ "humantime", "lazy_static", @@ -4100,6 +3999,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsock" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfb6e7a74830912f1f4a7655227c9ded1ea4e9136676311fedf54bedb412f35" +dependencies = [ + "libc", + "nix 0.27.1", +] + [[package]] name = "waker-fn" version = "1.1.0" @@ -4562,6 +4471,7 @@ version = "0.11.0-dev" dependencies = [ "flume", "json5", + "log", "num_cpus", "secrecy", "serde", @@ -4631,6 +4541,7 @@ dependencies = [ "zenoh-core", "zenoh-macros", "zenoh-result", + "zenoh-runtime", "zenoh-sync", "zenoh-util", ] @@ -4666,6 +4577,7 @@ dependencies = [ "zenoh-link-udp", "zenoh-link-unixpipe", "zenoh-link-unixsock_stream", + "zenoh-link-vsock", "zenoh-link-ws", "zenoh-protocol", "zenoh-result", @@ -4679,13 +4591,11 @@ dependencies = [ "flume", "futures", "log", - "lz4_flex", "rustls 0.22.2", "rustls-webpki 0.102.2", "serde", "tokio", "tokio-util", - "typenum", "zenoh-buffers", "zenoh-codec", "zenoh-core", @@ -4848,6 +4758,25 @@ dependencies = [ "zenoh-sync", ] +[[package]] +name = "zenoh-link-vsock" +version = "0.11.0-dev" +dependencies = [ + "async-trait", + "libc", + "log", + "tokio", + "tokio-util", + "tokio-vsock", + "zenoh-core", + "zenoh-link-commons", + "zenoh-protocol", + "zenoh-result", + "zenoh-runtime", + "zenoh-sync", + "zenoh-util", +] + [[package]] name = "zenoh-link-ws" version = "0.11.0-dev" diff --git a/Cargo.toml b/Cargo.toml index 95e6981415..da99cb1fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "io/zenoh-links/zenoh-link-unixsock_stream/", "io/zenoh-links/zenoh-link-ws/", "io/zenoh-links/zenoh-link-unixpipe/", + "io/zenoh-links/zenoh-link-vsock/", "io/zenoh-transport", "plugins/zenoh-backend-example", "plugins/zenoh-plugin-example", @@ -152,9 +153,10 @@ tokio = { version = "1.35.1", default-features = false } # Default features are tokio-util = "0.7.10" tokio-tungstenite = "0.21" tokio-rustls = "0.25.0" +# tokio-vsock = see: io/zenoh-links/zenoh-link-vsock/Cargo.toml (workspaces does not support platform dependent dependencies) console-subscriber = "0.2" typenum = "1.16.0" -uhlc = { version = "0.6.0", default-features = false } # Default features are disabled due to usage in no_std crates +uhlc = { version = "0.7.0", default-features = false } # Default features are disabled due to usage in no_std crates unzip-n = "0.1.2" url = "2.3.1" urlencoding = "2.1.2" @@ -191,6 +193,7 @@ zenoh-link-udp = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-udp zenoh-link-ws = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-ws" } zenoh-link-unixpipe = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-unixpipe" } zenoh-link-serial = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-serial" } +zenoh-link-vsock = { version = "0.11.0-dev", path = "io/zenoh-links/zenoh-link-vsock" } zenoh-link = { version = "0.11.0-dev", path = "io/zenoh-link" } zenoh-link-commons = { version = "0.11.0-dev", path = "io/zenoh-link-commons" } zenoh = { version = "0.11.0-dev", path = "zenoh", default-features = false } diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 66352fe141..fda76f43f8 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -22,9 +22,31 @@ /// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to: /// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0 connect: { + /// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout) + /// Accepts a single value or different values for router, peer and client. + timeout_ms: { router: -1, peer: -1, client: 0 }, + endpoints: [ // "/
" ], + + /// Global connect configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000" + + /// exit from application, if timeout exceed + exit_on_failure: { router: false, peer: false, client: true }, + /// connect establishing retry configuration + retry: { + /// intial wait timeout until next connect try + period_init_ms: 1000, + /// maximum wait timeout until next connect try + period_max_ms: 4000, + /// increase factor for the next timeout until nexti connect try + period_increase_factor: 2, + }, }, /// Which endpoints to listen on. E.g. tcp/localhost:7447. @@ -33,9 +55,31 @@ /// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to: /// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0 listen: { + /// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout) + /// Accepts a single value or different values for router, peer and client. + timeout_ms: 0, + endpoints: [ // "/
" ], + + /// Global listen configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#exit_on_failure=false;retry_period_max_ms=1000" + + /// exit from application, if timeout exceed + exit_on_failure: true, + /// listen retry configuration + retry: { + /// intial wait timeout until next try + period_init_ms: 1000, + /// maximum wait timeout until next try + period_max_ms: 4000, + /// increase factor for the next timeout until next try + period_increase_factor: 2, + }, }, /// Configure the scouting mechanisms and their behaviours scouting: { @@ -177,7 +221,7 @@ link: { /// An optional whitelist of protocols to be used for accepting and opening sessions. /// If not configured, all the supported protocols are automatically whitelisted. - /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] + /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] /// For example, to only enable "tls" and "quic": // protocols: ["tls", "quic"], /// Configure the zenoh TX parameters of a link @@ -191,7 +235,9 @@ /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive /// messages will be sent at the configured time interval. /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, - /// set the actual keep_alive timeout to one fourth of the lease time. + /// set the actual keep_alive interval to one fourth of the lease time: i.e. send + /// 4 keep_alive messages in a lease period. Changing the lease time will have the + /// keep_alive messages sent more or less often. /// This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity /// check which considers a link as failed when no messages are received in 3.5 times the /// target interval. @@ -217,6 +263,13 @@ data_low: 4, background: 4, }, + /// Congestion occurs when the queue is empty (no available batch). + /// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue. + /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. + congestion_control: { + /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. + wait_before_drop: 1000 + }, /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. /// Higher values lead to a more aggressive batching but it will introduce additional latency. backoff: 100, diff --git a/commons/zenoh-config/Cargo.toml b/commons/zenoh-config/Cargo.toml index f0189ff3e7..feade8cc10 100644 --- a/commons/zenoh-config/Cargo.toml +++ b/commons/zenoh-config/Cargo.toml @@ -24,6 +24,7 @@ categories = { workspace = true } description = "Internal crate for zenoh." [dependencies] +log = { workspace = true } flume = { workspace = true } json5 = { workspace = true } num_cpus = { workspace = true } diff --git a/commons/zenoh-config/src/connection_retry.rs b/commons/zenoh-config/src/connection_retry.rs new file mode 100644 index 0000000000..a845fbfe6a --- /dev/null +++ b/commons/zenoh-config/src/connection_retry.rs @@ -0,0 +1,200 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + defaults::{ + self, DEFAULT_CONNECT_EXIT_ON_FAIL, DEFAULT_CONNECT_TIMEOUT_MS, + DEFAULT_LISTEN_EXIT_ON_FAIL, DEFAULT_LISTEN_TIMEOUT_MS, + }, + Config, +}; +use serde::{Deserialize, Serialize}; +use zenoh_core::zparse_default; +use zenoh_protocol::core::WhatAmI; + +use crate::mode_dependent::*; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ConnectionRetryModeDependentConf { + // intial wait timeout until next try + pub period_init_ms: Option>, + // maximum wait timeout until next try + pub period_max_ms: Option>, + // increase factor for the next timeout until next try + pub period_increase_factor: Option>, +} + +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] +pub struct ConnectionRetryConf { + pub exit_on_failure: bool, + pub period_init_ms: i64, + pub period_max_ms: i64, + pub period_increase_factor: f64, +} + +impl ConnectionRetryConf { + pub fn new( + whatami: WhatAmI, + exit_on_failure: bool, + retry: ConnectionRetryModeDependentConf, + default_retry: ConnectionRetryModeDependentConf, + ) -> ConnectionRetryConf { + ConnectionRetryConf { + exit_on_failure, + period_init_ms: *retry + .period_init_ms + .get(whatami) + .unwrap_or(default_retry.period_init_ms.get(whatami).unwrap()), + period_max_ms: *retry + .period_max_ms + .get(whatami) + .unwrap_or(default_retry.period_max_ms.get(whatami).unwrap()), + period_increase_factor: *retry + .period_increase_factor + .get(whatami) + .unwrap_or(default_retry.period_increase_factor.get(whatami).unwrap()), + } + } + + pub fn timeout(&self) -> std::time::Duration { + ms_to_duration(self.period_init_ms) + } + + pub fn period(&self) -> ConnectionRetryPeriod { + ConnectionRetryPeriod::new(self) + } +} + +pub struct ConnectionRetryPeriod { + conf: ConnectionRetryConf, + delay: i64, +} + +impl ConnectionRetryPeriod { + pub fn new(conf: &ConnectionRetryConf) -> ConnectionRetryPeriod { + ConnectionRetryPeriod { + conf: conf.clone(), + delay: conf.period_init_ms, + } + } + + pub fn duration(&self) -> std::time::Duration { + if self.conf.period_init_ms < 0 { + return std::time::Duration::MAX; + } + + if self.conf.period_init_ms == 0 { + return std::time::Duration::from_millis(0); + } + + std::time::Duration::from_millis(self.delay as u64) + } + + pub fn next_duration(&mut self) -> std::time::Duration { + let res = self.duration(); + + self.delay = (self.delay as f64 * self.conf.period_increase_factor) as i64; + if self.conf.period_max_ms > 0 && self.delay > self.conf.period_max_ms { + self.delay = self.conf.period_max_ms; + } + + res + } +} + +fn ms_to_duration(ms: i64) -> std::time::Duration { + if ms >= 0 { + std::time::Duration::from_millis(ms as u64) + } else { + std::time::Duration::MAX + } +} + +pub fn get_global_listener_timeout(config: &Config) -> std::time::Duration { + let whatami = config.mode().unwrap_or(defaults::mode); + ms_to_duration( + *config + .listen() + .timeout_ms() + .get(whatami) + .unwrap_or(DEFAULT_LISTEN_TIMEOUT_MS.get(whatami).unwrap()), + ) +} + +pub fn get_global_connect_timeout(config: &Config) -> std::time::Duration { + let whatami = config.mode().unwrap_or(defaults::mode); + ms_to_duration( + *config + .connect() + .timeout_ms() + .get(whatami) + .unwrap_or(DEFAULT_CONNECT_TIMEOUT_MS.get(whatami).unwrap()), + ) +} + +pub fn get_retry_config( + config: &Config, + endpoint: Option<&EndPoint>, + listen: bool, +) -> ConnectionRetryConf { + let whatami = config.mode().unwrap_or(defaults::mode); + + let default_retry = ConnectionRetryModeDependentConf::default(); + let retry: ConnectionRetryModeDependentConf; + let exit_on_failure: bool; + if listen { + retry = config + .listen() + .retry() + .clone() + .unwrap_or_else(|| default_retry.clone()); + + exit_on_failure = *config + .listen() + .exit_on_failure() + .get(whatami) + .unwrap_or(DEFAULT_LISTEN_EXIT_ON_FAIL.get(whatami).unwrap()); + } else { + retry = config + .connect() + .retry() + .clone() + .unwrap_or_else(|| default_retry.clone()); + + exit_on_failure = *config + .connect() + .exit_on_failure() + .get(whatami) + .unwrap_or(DEFAULT_CONNECT_EXIT_ON_FAIL.get(whatami).unwrap()); + } + + let mut res = ConnectionRetryConf::new(whatami, exit_on_failure, retry, default_retry); + + if let Some(endpoint) = endpoint { + let config = endpoint.config(); + if let Some(val) = config.get("exit_on_failure") { + res.exit_on_failure = zparse_default!(val, res.exit_on_failure); + } + if let Some(val) = config.get("retry_period_init_ms") { + res.period_init_ms = zparse_default!(val, res.period_init_ms); + } + if let Some(val) = config.get("retry_period_max_ms") { + res.period_max_ms = zparse_default!(val, res.period_max_ms); + } + if let Some(val) = config.get("retry_period_increase_factor") { + res.period_increase_factor = zparse_default!(val, res.period_increase_factor); + } + } + res +} diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..f1973d2c0d 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -170,6 +170,7 @@ impl Default for QueueConf { fn default() -> Self { Self { size: QueueSizeConf::default(), + congestion_control: CongestionControlConf::default(), backoff: 100, } } @@ -195,6 +196,14 @@ impl Default for QueueSizeConf { } } +impl Default for CongestionControlConf { + fn default() -> Self { + Self { + wait_before_drop: 1000, + } + } +} + impl Default for LinkRxConf { fn default() -> Self { Self { @@ -211,3 +220,30 @@ impl Default for SharedMemoryConf { Self { enabled: false } } } + +pub const DEFAULT_CONNECT_TIMEOUT_MS: ModeDependentValue = + ModeDependentValue::Dependent(ModeValues { + client: Some(0), + peer: Some(-1), + router: Some(-1), + }); + +pub const DEFAULT_CONNECT_EXIT_ON_FAIL: ModeDependentValue = + ModeDependentValue::Dependent(ModeValues { + client: Some(true), + peer: Some(false), + router: Some(false), + }); + +pub const DEFAULT_LISTEN_TIMEOUT_MS: ModeDependentValue = ModeDependentValue::Unique(0); +pub const DEFAULT_LISTEN_EXIT_ON_FAIL: ModeDependentValue = ModeDependentValue::Unique(true); + +impl Default for ConnectionRetryModeDependentConf { + fn default() -> Self { + Self { + period_init_ms: Some(ModeDependentValue::Unique(1000)), + period_max_ms: Some(ModeDependentValue::Unique(4000)), + period_increase_factor: Some(ModeDependentValue::Unique(2.)), + } + } +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 9346e9825e..2b5485fa6b 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -15,12 +15,10 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; + use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; -use serde::{ - de::{self, MapAccess, Visitor}, - Deserialize, Serialize, -}; +use serde::{Deserialize, Serialize}; use serde_json::Value; #[allow(unused_imports)] use std::convert::TryFrom; // This is a false positive from the rust analyser @@ -29,7 +27,6 @@ use std::{ collections::HashSet, fmt, io::Read, - marker::PhantomData, net::SocketAddr, path::Path, sync::{Arc, Mutex, MutexGuard, Weak}, @@ -47,6 +44,12 @@ use zenoh_protocol::{ use zenoh_result::{bail, zerror, ZResult}; use zenoh_util::LibLoader; +pub mod mode_dependent; +pub use mode_dependent::*; + +pub mod connection_retry; +pub use connection_retry::*; + // Wrappers for secrecy of values #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct SecretString(String); @@ -178,12 +181,22 @@ validated_struct::validator! { /// Which zenoh nodes to connect to. pub connect: #[derive(Default)] ConnectConfig { + /// global timeout for full connect cycle + pub timeout_ms: Option>, pub endpoints: Vec, + /// if connection timeout exceed, exit from application + pub exit_on_failure: Option>, + pub retry: Option, }, /// Which endpoints to listen on. `zenohd` will add `tcp/[::]:7447` to these locators if left empty. pub listen: #[derive(Default)] ListenConfig { + /// global timeout for full listen cycle + pub timeout_ms: Option>, pub endpoints: Vec, + /// if connection timeout exceed, exit from application + pub exit_on_failure: Option>, + pub retry: Option, }, pub scouting: #[derive(Default)] ScoutingConf { @@ -339,6 +352,13 @@ validated_struct::validator! { data_low: usize, background: usize, } where (queue_size_validator), + /// Congestion occurs when the queue is empty (no available batch). + /// Using CongestionControl::Block the caller is blocked until a batch is available and re-insterted into the queue. + /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here. + pub congestion_control: CongestionControlConf { + /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. + pub wait_before_drop: u64, + }, /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. /// Higher values lead to a more aggressive batching but it will introduce additional latency. backoff: u64, @@ -1245,190 +1265,6 @@ impl validated_struct::ValidatedMap for PluginsConfig { } } -pub trait ModeDependent { - fn router(&self) -> Option<&T>; - fn peer(&self) -> Option<&T>; - fn client(&self) -> Option<&T>; - #[inline] - fn get(&self, whatami: WhatAmI) -> Option<&T> { - match whatami { - WhatAmI::Router => self.router(), - WhatAmI::Peer => self.peer(), - WhatAmI::Client => self.client(), - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ModeValues { - #[serde(skip_serializing_if = "Option::is_none")] - router: Option, - #[serde(skip_serializing_if = "Option::is_none")] - peer: Option, - #[serde(skip_serializing_if = "Option::is_none")] - client: Option, -} - -impl ModeDependent for ModeValues { - #[inline] - fn router(&self) -> Option<&T> { - self.router.as_ref() - } - - #[inline] - fn peer(&self) -> Option<&T> { - self.peer.as_ref() - } - - #[inline] - fn client(&self) -> Option<&T> { - self.client.as_ref() - } -} - -#[derive(Clone, Debug)] -pub enum ModeDependentValue { - Unique(T), - Dependent(ModeValues), -} - -impl ModeDependent for ModeDependentValue { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.router(), - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.peer(), - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.client(), - } - } -} - -impl serde::Serialize for ModeDependentValue -where - T: Serialize, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - ModeDependentValue::Unique(value) => value.serialize(serializer), - ModeDependentValue::Dependent(options) => options.serialize(serializer), - } - } -} -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("bool or mode dependent bool") - } - - fn visit_bool(self, value: bool) -> Result - where - E: de::Error, - { - Ok(ModeDependentValue::Unique(value)) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - WhatAmIMatcherVisitor {} - .visit_str(value) - .map(ModeDependentValue::Unique) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl ModeDependent for Option> { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.router(), - None => None, - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.peer(), - None => None, - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.client(), - None => None, - } - } -} - #[macro_export] macro_rules! unwrap_or_default { ($val:ident$(.$field:ident($($param:ident)?))*) => { diff --git a/commons/zenoh-config/src/mode_dependent.rs b/commons/zenoh-config/src/mode_dependent.rs new file mode 100644 index 0000000000..91e366f452 --- /dev/null +++ b/commons/zenoh-config/src/mode_dependent.rs @@ -0,0 +1,281 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use serde::{ + de::{self, MapAccess, Visitor}, + Deserialize, Serialize, +}; +use std::fmt; +use std::marker::PhantomData; +pub use zenoh_protocol::core::{ + whatami, EndPoint, Locator, Priority, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, ZenohId, +}; + +pub trait ModeDependent { + fn router(&self) -> Option<&T>; + fn peer(&self) -> Option<&T>; + fn client(&self) -> Option<&T>; + #[inline] + fn get(&self, whatami: WhatAmI) -> Option<&T> { + match whatami { + WhatAmI::Router => self.router(), + WhatAmI::Peer => self.peer(), + WhatAmI::Client => self.client(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ModeValues { + #[serde(skip_serializing_if = "Option::is_none")] + pub router: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub peer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub client: Option, +} + +impl ModeDependent for ModeValues { + #[inline] + fn router(&self) -> Option<&T> { + self.router.as_ref() + } + + #[inline] + fn peer(&self) -> Option<&T> { + self.peer.as_ref() + } + + #[inline] + fn client(&self) -> Option<&T> { + self.client.as_ref() + } +} + +#[derive(Clone, Debug)] +pub enum ModeDependentValue { + Unique(T), + Dependent(ModeValues), +} + +impl ModeDependent for ModeDependentValue { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.router(), + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.peer(), + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.client(), + } + } +} + +impl serde::Serialize for ModeDependentValue +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + ModeDependentValue::Unique(value) => value.serialize(serializer), + ModeDependentValue::Dependent(options) => options.serialize(serializer), + } + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("bool or mode dependent bool") + } + + fn visit_bool(self, value: bool) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("i64 or mode dependent i64") + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("f64 or mode dependent f64") + } + + fn visit_f64(self, value: f64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value as f64)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + WhatAmIMatcherVisitor {} + .visit_str(value) + .map(ModeDependentValue::Unique) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl ModeDependent for Option> { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.router(), + None => None, + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.peer(), + None => None, + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.client(), + None => None, + } + } +} diff --git a/commons/zenoh-core/src/macros.rs b/commons/zenoh-core/src/macros.rs index 20b84f213f..5e8cefcf5a 100644 --- a/commons/zenoh-core/src/macros.rs +++ b/commons/zenoh-core/src/macros.rs @@ -192,6 +192,25 @@ macro_rules! zparse { }; } +// This macro allows to parse a string to the target type +// No faili, but log the error and use default +#[macro_export] +macro_rules! zparse_default { + ($str:expr, $default:expr) => { + match $str.parse() { + Ok(value) => value, + Err(_) => { + let e = zenoh_result::zerror!( + "Failed to read configuration: {} is not a valid value", + $str + ); + log::warn!("{}", e); + $default + } + } + }; +} + // This macro allows to do conditional compilation #[macro_export] macro_rules! zcondfeat { diff --git a/commons/zenoh-runtime/src/lib.rs b/commons/zenoh-runtime/src/lib.rs index c411e40a46..5c4a88b736 100644 --- a/commons/zenoh-runtime/src/lib.rs +++ b/commons/zenoh-runtime/src/lib.rs @@ -11,6 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // +use core::panic; use lazy_static::lazy_static; use std::{ collections::HashMap, @@ -23,7 +24,7 @@ use std::{ }, time::Duration, }; -use tokio::runtime::{Handle, Runtime}; +use tokio::runtime::{Handle, Runtime, RuntimeFlavor}; use zenoh_collections::Properties; use zenoh_result::ZResult as Result; @@ -45,7 +46,6 @@ impl ZRuntime { } fn init(&self) -> Result { - // dbg!(*ZRUNTIME_CONFIG); let config = &ZRUNTIME_CONFIG; let thread_name = format!("{self:?}"); @@ -111,6 +111,11 @@ impl ZRuntime { where F: Future, { + if let Ok(handle) = Handle::try_current() { + if handle.runtime_flavor() == RuntimeFlavor::CurrentThread { + panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`"); + } + } tokio::task::block_in_place(move || self.block_on(f)) } } @@ -234,3 +239,10 @@ impl Default for ZRuntimeConfig { } } } + +#[should_panic(expected = "Zenoh runtime doesn't support")] +#[tokio::test] +async fn block_in_place_fail_test() { + use crate::ZRuntime; + ZRuntime::TX.block_in_place(async { println!("Done") }); +} diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index 4b494e6444..54d93ffe3b 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -18,13 +18,13 @@ //! //! [Click here for Zenoh's documentation](../zenoh/index.html) +use futures::future::FutureExt; use std::future::Future; use std::time::Duration; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use zenoh_runtime::ZRuntime; -use futures::future::FutureExt; #[derive(Clone)] pub struct TaskController { @@ -112,13 +112,13 @@ impl TaskController { self.token.cancel(); let task = async move { tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(10)) => { - log::error!("Failed to terminate {} tasks", self.tracker.len()); + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("Failed to terminate {} tasks", self.tracker.len()); } _ = self.tracker.wait() => {} } }; - let _ = futures::executor::block_on(task); + futures::executor::block_on(task); } /// Async version of [`TaskController::terminate_all()`]. @@ -127,7 +127,7 @@ impl TaskController { self.token.cancel(); let task = async move { tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(10)) => { + _ = tokio::time::sleep(Duration::from_secs(10)) => { log::error!("Failed to terminate {} tasks", self.tracker.len()); } _ = self.tracker.wait() => {} diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 8ec0ed03a1..dd570b364d 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -423,30 +423,26 @@ pub fn get_ipv6_ipaddrs(interface: Option<&str>) -> Vec { .collect() } -#[cfg(target_os = "linux")] -pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: Option<&str>) -> ZResult<()> { - if let Some(iface) = iface { - socket.bind_device(Some(iface.as_bytes()))?; - } +#[cfg(any(target_os = "linux", target_os = "android"))] +pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: &str) -> ZResult<()> { + socket.bind_device(Some(iface.as_bytes()))?; Ok(()) } -#[cfg(target_os = "linux")] -pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: Option<&str>) -> ZResult<()> { - if let Some(iface) = iface { - socket.bind_device(Some(iface.as_bytes()))?; - } +#[cfg(any(target_os = "linux", target_os = "android"))] +pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: &str) -> ZResult<()> { + socket.bind_device(Some(iface.as_bytes()))?; Ok(()) } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: Option<&str>) -> ZResult<()> { - log::warn!("Binding the socket {socket:?} to the interface {iface:?} is not supported on macOS and Windows"); +pub fn set_bind_to_device_tcp_socket(socket: &TcpSocket, iface: &str) -> ZResult<()> { + log::warn!("Binding the socket {socket:?} to the interface {iface} is not supported on macOS and Windows"); Ok(()) } #[cfg(any(target_os = "macos", target_os = "windows"))] -pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: Option<&str>) -> ZResult<()> { - log::warn!("Binding the socket {socket:?} to the interface {iface:?} is not supported on macOS and Windows"); +pub fn set_bind_to_device_udp_socket(socket: &UdpSocket, iface: &str) -> ZResult<()> { + log::warn!("Binding the socket {socket:?} to the interface {iface} is not supported on macOS and Windows"); Ok(()) } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 4833be3963..b0cf6a0ece 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -49,7 +49,7 @@ futures = { workspace = true } git-version = { workspace = true } json5 = { workspace = true } log = { workspace = true } -zenoh = { workspace = true } +zenoh = { workspace = true, default-features = true } zenoh-ext = { workspace = true } [dev-dependencies] diff --git a/examples/README.md b/examples/README.md index 92bf388aa5..0d38e32185 100644 --- a/examples/README.md +++ b/examples/README.md @@ -21,7 +21,7 @@ Typical usage: ```bash - z_scout + z_scout ``` ### z_info @@ -30,7 +30,7 @@ Typical usage: ```bash - z_info + z_info ``` @@ -42,11 +42,11 @@ Typical usage: ```bash - z_put + z_put ``` or ```bash - z_put -k demo/example/test -v 'Hello World' + z_put -k demo/example/test -v 'Hello World' ``` ### z_pub @@ -56,11 +56,11 @@ Typical usage: ```bash - z_pub + z_pub ``` or ```bash - z_pub -k demo/example/test -v 'Hello World' + z_pub -k demo/example/test -v 'Hello World' ``` ### z_sub @@ -70,11 +70,11 @@ Typical usage: ```bash - z_sub + z_sub ``` or ```bash - z_sub -k 'demo/**' + z_sub -k 'demo/**' ``` ### z_pull @@ -85,11 +85,11 @@ Typical usage: ```bash - z_pull + z_pull ``` or ```bash - z_pull -k 'demo/**' + z_pull -k 'demo/**' ``` ### z_get @@ -100,11 +100,11 @@ Typical usage: ```bash - z_get + z_get ``` or ```bash - z_get -s 'demo/**' + z_get -s 'demo/**' ``` ### z_queryable @@ -115,11 +115,11 @@ Typical usage: ```bash - z_queryable + z_queryable ``` or ```bash - z_queryable -k demo/example/queryable -v 'This is the result' + z_queryable -k demo/example/queryable -v 'This is the result' ``` ### z_storage @@ -132,11 +132,11 @@ Typical usage: ```bash - z_storage + z_storage ``` or ```bash - z_storage -k 'demo/**' + z_storage -k 'demo/**' ``` ### z_pub_shm & z_sub @@ -146,12 +146,12 @@ Typical Subscriber usage: ```bash - z_sub + z_sub ``` Typical Publisher usage: ```bash - z_pub_shm + z_pub_shm ``` ### z_pub_thr & z_sub_thr @@ -162,12 +162,12 @@ Typical Subscriber usage: ```bash - z_sub_thr + z_sub_thr ``` Typical Publisher usage: ```bash - z_pub_thr 1024 + z_pub_thr 1024 ``` ### z_ping & z_pong @@ -179,14 +179,16 @@ The pong application waits for samples on the first key expression and replies by writing back the received data on the second key expression. + :warning: z_pong needs to start first to avoid missing the kickoff from z_ping. + Typical Pong usage: ```bash - z_pong + z_pong ``` Typical Ping usage: ```bash - z_ping 1024 + z_ping 1024 ``` ### z_pub_shm_thr & z_sub_thr @@ -199,12 +201,12 @@ Typical Subscriber usage: ```bash - z_sub_thr + z_sub_thr ``` Typical Publisher usage: ```bash - z_pub_shm_thr + z_pub_shm_thr ``` ### z_liveliness @@ -216,11 +218,11 @@ Typical usage: ```bash - z_liveliness + z_liveliness ``` or ```bash - z_liveliness -k 'group1/member1' + z_liveliness -k 'group1/member1' ``` ### z_get_liveliness @@ -230,11 +232,11 @@ Typical usage: ```bash - z_get_liveliness + z_get_liveliness ``` or ```bash - z_get_liveliness -k 'group1/**' + z_get_liveliness -k 'group1/**' ``` ### z_sub_liveliness @@ -248,9 +250,9 @@ Typical usage: ```bash - z_sub_liveliness + z_sub_liveliness ``` or ```bash - z_sub_liveliness -k 'group1/**' + z_sub_liveliness -k 'group1/**' ``` diff --git a/io/zenoh-link/Cargo.toml b/io/zenoh-link/Cargo.toml index 25d30903da..7a9772391f 100644 --- a/io/zenoh-link/Cargo.toml +++ b/io/zenoh-link/Cargo.toml @@ -33,6 +33,7 @@ transport_unixsock-stream = ["zenoh-link-unixsock_stream"] transport_ws = ["zenoh-link-ws"] transport_serial = ["zenoh-link-serial"] transport_unixpipe = ["zenoh-link-unixpipe", "zenoh-link-unixpipe/transport_unixpipe"] +transport_vsock = ["zenoh-link-vsock"] [dependencies] async-trait = { workspace = true } @@ -47,5 +48,6 @@ zenoh-link-udp = { workspace = true, optional = true } zenoh-link-unixsock_stream = { workspace = true, optional = true } zenoh-link-ws = { workspace = true, optional = true } zenoh-link-unixpipe = { workspace = true, optional = true } +zenoh-link-vsock = { workspace = true, optional = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index 0e3e5879a8..21f26ecf1b 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -72,6 +72,11 @@ use zenoh_link_unixpipe::{ LinkManagerUnicastPipe, UnixPipeConfigurator, UnixPipeLocatorInspector, UNIXPIPE_LOCATOR_PREFIX, }; +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +pub use zenoh_link_vsock as vsock; +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +use zenoh_link_vsock::{LinkManagerUnicastVsock, VsockLocatorInspector, VSOCK_LOCATOR_PREFIX}; + pub use zenoh_link_commons::*; pub use zenoh_protocol::core::{EndPoint, Locator}; @@ -92,6 +97,8 @@ pub const PROTOCOLS: &[&str] = &[ serial::SERIAL_LOCATOR_PREFIX, #[cfg(feature = "transport_unixpipe")] unixpipe::UNIXPIPE_LOCATOR_PREFIX, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + vsock::VSOCK_LOCATOR_PREFIX, ]; #[derive(Default, Clone)] @@ -112,6 +119,8 @@ pub struct LocatorInspector { serial_inspector: SerialLocatorInspector, #[cfg(feature = "transport_unixpipe")] unixpipe_inspector: UnixPipeLocatorInspector, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + vsock_inspector: VsockLocatorInspector, } impl LocatorInspector { pub async fn is_multicast(&self, locator: &Locator) -> ZResult { @@ -137,6 +146,8 @@ impl LocatorInspector { SERIAL_LOCATOR_PREFIX => self.serial_inspector.is_multicast(locator).await, #[cfg(feature = "transport_unixpipe")] UNIXPIPE_LOCATOR_PREFIX => self.unixpipe_inspector.is_multicast(locator).await, + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + VSOCK_LOCATOR_PREFIX => self.vsock_inspector.is_multicast(locator).await, _ => bail!("Unsupported protocol: {}.", protocol), } } @@ -226,6 +237,8 @@ impl LinkManagerBuilderUnicast { UNIXPIPE_LOCATOR_PREFIX => { Ok(std::sync::Arc::new(LinkManagerUnicastPipe::new(_manager))) } + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + VSOCK_LOCATOR_PREFIX => Ok(std::sync::Arc::new(LinkManagerUnicastVsock::new(_manager))), _ => bail!("Unicast not supported for {} protocol", protocol), } } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 5909b2ffe7..7137ac0212 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -218,7 +218,9 @@ impl LinkManagerUnicastTcp { SocketAddr::V6(_) => TcpSocket::new_v6(), }?; - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + if let Some(iface) = iface { + zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + } // Build a TcpStream from TcpSocket // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html @@ -248,7 +250,9 @@ impl LinkManagerUnicastTcp { SocketAddr::V6(_) => TcpSocket::new_v6(), }?; - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + if let Some(iface) = iface { + zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + } // Build a TcpListener from TcpSocket // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 0862928e1a..1cd4a0b1ec 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -277,7 +277,9 @@ impl LinkManagerUnicastUdp { e })?; - zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; + if let Some(iface) = iface { + zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; + } // Connect the socket to the remote address socket.connect(dst_addr).await.map_err(|e| { @@ -314,7 +316,9 @@ impl LinkManagerUnicastUdp { e })?; - zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; + if let Some(iface) = iface { + zenoh_util::net::set_bind_to_device_udp_socket(&socket, iface)?; + } let local_addr = socket.local_addr().map_err(|e| { let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index b6d4195927..eb8ee05d87 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -290,12 +290,9 @@ struct UnicastPipeListener { } impl UnicastPipeListener { async fn listen(endpoint: EndPoint, manager: Arc) -> ZResult { - let (path_uplink, path_downlink, access_mode) = parse_pipe_endpoint(&endpoint); - let local = Locator::new( - endpoint.protocol(), - path_uplink.as_str(), - endpoint.metadata(), - )?; + let (path, access_mode) = endpoint_to_pipe_path(&endpoint); + let (path_uplink, path_downlink) = split_pipe_path(&path); + let local = Locator::new(endpoint.protocol(), path, endpoint.metadata())?; // create request channel let mut request_channel = PipeR::new(&path_uplink, access_mode).await?; @@ -385,7 +382,8 @@ async fn dedicate_pipe( struct UnicastPipeClient; impl UnicastPipeClient { async fn connect_to(endpoint: EndPoint) -> ZResult { - let (path_uplink, path_downlink, access_mode) = parse_pipe_endpoint(&endpoint); + let (path, access_mode) = endpoint_to_pipe_path(&endpoint); + let (path_uplink, path_downlink) = split_pipe_path(&path); // open the request channel // this channel would be used to invite listener to the dedicated channel @@ -598,16 +596,19 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastPipe { } } -fn parse_pipe_endpoint(endpoint: &EndPoint) -> (String, String, u32) { - let address = endpoint.address(); - let path = address.as_str(); - let path_uplink = path.to_string() + "_uplink"; - let path_downlink = path.to_string() + "_downlink"; +fn endpoint_to_pipe_path(endpoint: &EndPoint) -> (String, u32) { + let path = endpoint.address().to_string(); let access_mode = endpoint .config() .get(config::FILE_ACCESS_MASK) .map_or(*FILE_ACCESS_MASK, |val| { val.parse().unwrap_or(*FILE_ACCESS_MASK) }); - (path_uplink, path_downlink, access_mode) + (path, access_mode) +} + +fn split_pipe_path(path: &str) -> (String, String) { + let path_uplink = format!("{path}_uplink"); + let path_downlink = format!("{path}_downlink"); + (path_uplink, path_downlink) } diff --git a/io/zenoh-links/zenoh-link-vsock/Cargo.toml b/io/zenoh-links/zenoh-link-vsock/Cargo.toml new file mode 100644 index 0000000000..c9b451b5b9 --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/Cargo.toml @@ -0,0 +1,44 @@ +# +# Copyright (c) 2024 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-link-vsock" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +categories = { workspace = true } +description = "Internal crate for zenoh." +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } +tokio-util = { workspace = true, features = ["rt"] } +log = { workspace = true } +libc = { workspace = true } +zenoh-core = { workspace = true } +zenoh-link-commons = { workspace = true } +zenoh-protocol = { workspace = true } +zenoh-result = { workspace = true } +zenoh-sync = { workspace = true } +zenoh-util = { workspace = true } +zenoh-runtime = { workspace = true } + +# Workspaces does not support platform dependent dependencies, and +# tokio-vsock not compiled on other platforms, so we put it there +[target.'cfg(target_os = "linux")'.dependencies] +tokio-vsock = "0.5.0" diff --git a/io/zenoh-links/zenoh-link-vsock/src/lib.rs b/io/zenoh-links/zenoh-link-vsock/src/lib.rs new file mode 100644 index 0000000000..7834050796 --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/src/lib.rs @@ -0,0 +1,54 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This crate is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) +//! +//! Implements [vsock](https://man7.org/linux/man-pages/man7/vsock.7.html) link support. +use async_trait::async_trait; +use zenoh_core::zconfigurable; +use zenoh_link_commons::LocatorInspector; +use zenoh_protocol::core::Locator; +use zenoh_result::ZResult; + +#[cfg(target_os = "linux")] +mod unicast; +#[cfg(target_os = "linux")] +pub use unicast::*; + +pub const VSOCK_LOCATOR_PREFIX: &str = "vsock"; + +#[derive(Default, Clone, Copy)] +pub struct VsockLocatorInspector; +#[async_trait] +impl LocatorInspector for VsockLocatorInspector { + fn protocol(&self) -> &str { + VSOCK_LOCATOR_PREFIX + } + + async fn is_multicast(&self, _locator: &Locator) -> ZResult { + Ok(false) + } +} + +zconfigurable! { + // Default MTU in bytes. + static ref VSOCK_DEFAULT_MTU: u16 = u16::MAX; + // Amount of time in microseconds to throttle the accept loop upon an error. + // Default set to 100 ms. + static ref VSOCK_ACCEPT_THROTTLE_TIME: u64 = 100_000; +} diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs new file mode 100644 index 0000000000..ced7b9dc15 --- /dev/null +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -0,0 +1,374 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use async_trait::async_trait; +use libc::VMADDR_PORT_ANY; +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::RwLock as AsyncRwLock; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use zenoh_core::{zasyncread, zasyncwrite}; +use zenoh_link_commons::{ + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, +}; +use zenoh_protocol::core::endpoint::Address; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{bail, zerror, ZResult}; + +use super::{VSOCK_ACCEPT_THROTTLE_TIME, VSOCK_DEFAULT_MTU, VSOCK_LOCATOR_PREFIX}; +use tokio_vsock::{ + VsockAddr, VsockListener, VsockStream, VMADDR_CID_ANY, VMADDR_CID_HOST, VMADDR_CID_HYPERVISOR, + VMADDR_CID_LOCAL, +}; + +pub const VSOCK_VMADDR_CID_ANY: &str = "VMADDR_CID_ANY"; +pub const VSOCK_VMADDR_CID_HYPERVISOR: &str = "VMADDR_CID_HYPERVISOR"; +pub const VSOCK_VMADDR_CID_LOCAL: &str = "VMADDR_CID_LOCAL"; +pub const VSOCK_VMADDR_CID_HOST: &str = "VMADDR_CID_HOST"; + +pub const VSOCK_VMADDR_PORT_ANY: &str = "VMADDR_PORT_ANY"; + +pub fn get_vsock_addr(address: Address<'_>) -> ZResult { + let parts: Vec<&str> = address.as_str().split(':').collect(); + + if parts.len() != 2 { + bail!("Incorrect vsock address: {:?}", address); + } + + let cid = match parts[0].to_uppercase().as_str() { + VSOCK_VMADDR_CID_HYPERVISOR => VMADDR_CID_HYPERVISOR, + VSOCK_VMADDR_CID_HOST => VMADDR_CID_HOST, + VSOCK_VMADDR_CID_LOCAL => VMADDR_CID_LOCAL, + VSOCK_VMADDR_CID_ANY => VMADDR_CID_ANY, + "-1" => VMADDR_CID_ANY, + _ => { + if let Ok(cid) = parts[0].parse::() { + cid + } else { + bail!("Incorrect vsock cid: {:?}", parts[0]); + } + } + }; + + let port = match parts[1].to_uppercase().as_str() { + VSOCK_VMADDR_PORT_ANY => VMADDR_PORT_ANY, + "-1" => VMADDR_PORT_ANY, + _ => { + if let Ok(cid) = parts[1].parse::() { + cid + } else { + bail!("Incorrect vsock port: {:?}", parts[1]); + } + } + }; + + Ok(VsockAddr::new(cid, port)) +} + +pub struct LinkUnicastVsock { + // The underlying socket as returned from the async-std library + socket: UnsafeCell, + // The source socket address of this link (address used on the local host) + src_addr: VsockAddr, + src_locator: Locator, + // The destination socket address of this link (address used on the remote host) + dst_addr: VsockAddr, + dst_locator: Locator, +} + +unsafe impl Sync for LinkUnicastVsock {} + +impl LinkUnicastVsock { + fn new(socket: VsockStream, src_addr: VsockAddr, dst_addr: VsockAddr) -> LinkUnicastVsock { + // Build the vsock object + LinkUnicastVsock { + socket: UnsafeCell::new(socket), + src_addr, + src_locator: Locator::new(VSOCK_LOCATOR_PREFIX, src_addr.to_string(), "").unwrap(), + dst_addr, + dst_locator: Locator::new(VSOCK_LOCATOR_PREFIX, dst_addr.to_string(), "").unwrap(), + } + } + #[allow(clippy::mut_from_ref)] + fn get_mut_socket(&self) -> &mut VsockStream { + unsafe { &mut *self.socket.get() } + } +} + +#[async_trait] +impl LinkUnicastTrait for LinkUnicastVsock { + async fn close(&self) -> ZResult<()> { + log::trace!("Closing vsock link: {}", self); + self.get_mut_socket().shutdown().await.map_err(|e| { + let e = zerror!("vsock link shutdown {}: {:?}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write(&self, buffer: &[u8]) -> ZResult { + self.get_mut_socket().write(buffer).await.map_err(|e| { + let e = zerror!("Write error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { + self.get_mut_socket().write_all(buffer).await.map_err(|e| { + let e = zerror!("Write error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn read(&self, buffer: &mut [u8]) -> ZResult { + self.get_mut_socket().read(buffer).await.map_err(|e| { + let e = zerror!("Read error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> { + let _ = self + .get_mut_socket() + .read_exact(buffer) + .await + .map_err(|e| { + let e = zerror!("Read error on vsock link {}: {}", self, e); + log::trace!("{}", e); + e + })?; + Ok(()) + } + + #[inline(always)] + fn get_src(&self) -> &Locator { + &self.src_locator + } + + #[inline(always)] + fn get_dst(&self) -> &Locator { + &self.dst_locator + } + + #[inline(always)] + fn get_mtu(&self) -> u16 { + *VSOCK_DEFAULT_MTU + } + + #[inline(always)] + fn get_interface_names(&self) -> Vec { + vec!["vsock".to_string()] + } + + #[inline(always)] + fn is_reliable(&self) -> bool { + true + } + + #[inline(always)] + fn is_streamed(&self) -> bool { + true + } +} + +impl fmt::Display for LinkUnicastVsock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} => {}", self.src_addr, self.dst_addr)?; + Ok(()) + } +} + +impl fmt::Debug for LinkUnicastVsock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Vsock") + .field("src", &self.src_addr) + .field("dst", &self.dst_addr) + .finish() + } +} + +struct ListenerUnicastVsock { + endpoint: EndPoint, + token: CancellationToken, + handle: JoinHandle>, +} + +impl ListenerUnicastVsock { + fn new(endpoint: EndPoint, token: CancellationToken, handle: JoinHandle>) -> Self { + Self { + endpoint, + token, + handle, + } + } + + async fn stop(&self) { + self.token.cancel(); + } +} + +pub struct LinkManagerUnicastVsock { + manager: NewLinkChannelSender, + listeners: Arc>>, +} + +impl LinkManagerUnicastVsock { + pub fn new(manager: NewLinkChannelSender) -> Self { + Self { + manager, + listeners: Arc::new(AsyncRwLock::new(HashMap::new())), + } + } +} + +#[async_trait] +impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { + async fn new_link(&self, endpoint: EndPoint) -> ZResult { + let addr = get_vsock_addr(endpoint.address())?; + if let Ok(stream) = VsockStream::connect(addr).await { + let local_addr = stream.local_addr()?; + let peer_addr = stream.peer_addr()?; + let link = Arc::new(LinkUnicastVsock::new(stream, local_addr, peer_addr)); + return Ok(LinkUnicast(link)); + } + + bail!("Can not create a new vsock link bound to {}", endpoint) + } + + async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { + let addr = get_vsock_addr(endpoint.address())?; + if let Ok(listener) = VsockListener::bind(addr) { + let local_addr = listener.local_addr()?; + // Update the endpoint locator address + endpoint = EndPoint::new( + endpoint.protocol(), + &format!("{local_addr}"), + endpoint.metadata(), + endpoint.config(), + )?; + let token = CancellationToken::new(); + let c_token = token.clone(); + + let c_manager = self.manager.clone(); + + let locator = endpoint.to_locator(); + + let mut listeners = zasyncwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let task = async move { + // Wait for the accept loop to terminate + let res = accept_task(listener, c_token, c_manager).await; + zasyncwrite!(c_listeners).remove(&c_addr); + res + }; + let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); + + let listener = ListenerUnicastVsock::new(endpoint, token, handle); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + return Ok(locator); + } + + bail!("Can not create a new vsock listener bound to {}", endpoint) + } + + async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { + let addr = get_vsock_addr(endpoint.address())?; + + let listener = zasyncwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.stop().await; + listener.handle.await? + } + + async fn get_listeners(&self) -> Vec { + zasyncread!(self.listeners) + .values() + .map(|x| x.endpoint.clone()) + .collect() + } + + async fn get_locators(&self) -> Vec { + zasyncread!(self.listeners) + .values() + .map(|x| x.endpoint.to_locator()) + .collect() + } +} + +async fn accept_task( + mut socket: VsockListener, + token: CancellationToken, + manager: NewLinkChannelSender, +) -> ZResult<()> { + async fn accept(socket: &mut VsockListener) -> ZResult<(VsockStream, VsockAddr)> { + let res = socket.accept().await.map_err(|e| zerror!(e))?; + Ok(res) + } + + let src_addr = socket.local_addr().map_err(|e| { + let e = zerror!("Can not accept vsock connections: {}", e); + log::warn!("{}", e); + e + })?; + + log::trace!("Ready to accept vsock connections on: {:?}", src_addr); + loop { + tokio::select! { + _ = token.cancelled() => break, + res = accept(&mut socket) => { + match res { + Ok((stream, dst_addr)) => { + log::debug!("Accepted vsock connection on {:?}: {:?}", src_addr, dst_addr); + // Create the new link object + let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); + + // Communicate the new link to the initial transport manager + if let Err(e) = manager.send_async(LinkUnicast(link)).await { + log::error!("{}-{}: {}", file!(), line!(), e) + } + }, + Err(e) => { + log::warn!("{}. Hint: increase the system open file limit.", e); + // Throttle the accept loop upon an error + // NOTE: This might be due to various factors. However, the most common case is that + // the process has reached the maximum number of open files in the system. On + // Linux systems this limit can be changed by using the "ulimit" command line + // tool. In case of systemd-based systems, this can be changed by using the + // "sysctl" command line tool. + tokio::time::sleep(Duration::from_micros(*VSOCK_ACCEPT_THROTTLE_TIME)).await; + } + + } + } + }; + } + + Ok(()) +} diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index e7a2e6e5f0..5304a9fa17 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -43,6 +43,7 @@ transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] transport_compression = [] transport_unixpipe = ["zenoh-link/transport_unixpipe"] +transport_vsock= ["zenoh-link/transport_vsock"] stats = ["zenoh-protocol/stats"] test = [] unstable = [] diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 6ef385276f..9df7632f7a 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -19,10 +19,12 @@ use super::{ }; use flume::{bounded, Receiver, Sender}; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; -use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; -use std::thread; use std::time::Duration; +use std::{ + sync::atomic::{AtomicBool, AtomicU16, Ordering}, + time::Instant, +}; use zenoh_buffers::{ reader::{HasReader, Reader}, writer::HasWriter, @@ -63,6 +65,10 @@ impl StageInRefill { fn wait(&self) -> bool { self.n_ref_r.recv().is_ok() } + + fn wait_deadline(&self, instant: Instant) -> bool { + self.n_ref_r.recv_deadline(instant).is_ok() + } } // Inner structure to link the initial stage with the final stage of the pipeline @@ -121,13 +127,15 @@ struct StageIn { } impl StageIn { - fn push_network_message(&mut self, msg: &mut NetworkMessage, priority: Priority) -> bool { + fn push_network_message( + &mut self, + msg: &mut NetworkMessage, + priority: Priority, + deadline_before_drop: Option, + ) -> bool { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); - // Check congestion control - let is_droppable = msg.is_droppable(); - macro_rules! zgetbatch_rets { ($fragment:expr, $restore_sn:expr) => { loop { @@ -140,19 +148,25 @@ impl StageIn { } None => { drop(c_guard); - if !$fragment && is_droppable { - // Restore the sequence number - $restore_sn; - // We are in the congestion scenario - // The yield is to avoid the writing task to spin - // indefinitely and monopolize the CPU usage. - thread::yield_now(); - return false; - } else { - if !self.s_ref.wait() { - // Restore the sequence number - $restore_sn; - return false; + match deadline_before_drop { + Some(deadline) if !$fragment => { + // We are in the congestion scenario and message is droppable + // Wait for an available batch until deadline + if !self.s_ref.wait_deadline(deadline) { + // Still no available batch. + // Restore the sequence number and drop the message + $restore_sn; + return false + } + } + _ => { + // Block waiting for an available batch + if !self.s_ref.wait() { + // Some error prevented the queue to wait and give back an available batch + // Restore the sequence number and drop the message + $restore_sn; + return false; + } } } c_guard = self.mutex.current(); @@ -487,24 +501,10 @@ impl StageOut { pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], + pub(crate) wait_before_drop: Duration, pub(crate) backoff: Duration, } -impl Default for TransmissionPipelineConf { - fn default() -> Self { - Self { - batch: BatchConfig { - mtu: BatchSize::MAX, - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: false, - }, - queue_size: [1; Priority::NUM], - backoff: Duration::from_micros(1), - } - } -} - // A 2-stage transmission pipeline pub(crate) struct TransmissionPipeline; impl TransmissionPipeline { @@ -579,6 +579,7 @@ impl TransmissionPipeline { let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), active: active.clone(), + wait_before_drop: config.wait_before_drop, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), @@ -595,6 +596,7 @@ pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, active: Arc, + wait_before_drop: Duration, } impl TransmissionPipelineProducer { @@ -607,9 +609,15 @@ impl TransmissionPipelineProducer { } else { (0, Priority::default()) }; + // If message is droppable, compute a deadline after which the sample could be dropped + let deadline_before_drop = if msg.is_droppable() { + Some(Instant::now() + self.wait_before_drop) + } else { + None + }; // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority) + queue.push_network_message(&mut msg, priority, deadline_before_drop) } #[inline] @@ -732,7 +740,7 @@ mod tests { const SLEEP: Duration = Duration::from_millis(100); const TIMEOUT: Duration = Duration::from_secs(60); - const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { + const CONFIG_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { batch: BatchConfig { mtu: BatchSize::MAX, is_streamed: true, @@ -740,6 +748,19 @@ mod tests { is_compression: true, }, queue_size: [1; Priority::NUM], + wait_before_drop: Duration::from_millis(1), + backoff: Duration::from_micros(1), + }; + + const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, + queue_size: [1; Priority::NUM], + wait_before_drop: Duration::from_millis(1), backoff: Duration::from_micros(1), }; @@ -847,10 +868,8 @@ mod tests { // Compute the number of messages to send let num_msg = max_msgs.min(bytes / ps); - let (producer, consumer) = TransmissionPipeline::make( - TransmissionPipelineConf::default(), - priorities.as_slice(), - ); + let (producer, consumer) = + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); let t_c = task::spawn(async move { consume(consumer, num_msg).await; @@ -874,7 +893,7 @@ mod tests { // Make sure to put only one message per batch: set the payload size // to half of the batch in such a way the serialized zenoh message // will be larger then half of the batch size (header + payload). - let payload_size = (CONFIG.batch.mtu / 2) as usize; + let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize; // Send reliable messages let key = "test".into(); @@ -900,7 +919,7 @@ mod tests { // The last push should block since there shouldn't any more batches // available for serialization. - let num_msg = 1 + CONFIG.queue_size[0]; + let num_msg = 1 + CONFIG_STREAMED.queue_size[0]; for i in 0..num_msg { println!( "Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes" @@ -919,7 +938,7 @@ mod tests { let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; let priorities = vec![tct]; let (producer, mut consumer) = - TransmissionPipeline::make(TransmissionPipelineConf::default(), priorities.as_slice()); + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); let counter = Arc::new(AtomicUsize::new(0)); @@ -937,10 +956,12 @@ mod tests { // Wait to have sent enough messages and to have blocked println!( "Pipeline Blocking [---]: waiting to have {} messages being scheduled", - CONFIG.queue_size[Priority::MAX as usize] + CONFIG_STREAMED.queue_size[Priority::MAX as usize] ); let check = async { - while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] { + while counter.load(Ordering::Acquire) + < CONFIG_STREAMED.queue_size[Priority::MAX as usize] + { tokio::time::sleep(SLEEP).await; } }; @@ -972,7 +993,8 @@ mod tests { // Queue let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap(); let priorities = vec![tct]; - let (producer, mut consumer) = TransmissionPipeline::make(CONFIG, priorities.as_slice()); + let (producer, mut consumer) = + TransmissionPipeline::make(CONFIG_STREAMED, priorities.as_slice()); let count = Arc::new(AtomicUsize::new(0)); let size = Arc::new(AtomicUsize::new(0)); diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 63934c379d..32d60df30a 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -94,6 +94,7 @@ pub struct TransportManagerConfig { pub whatami: WhatAmI, pub resolution: Resolution, pub batch_size: u16, + pub wait_before_drop: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -122,6 +123,7 @@ pub struct TransportManagerBuilder { whatami: WhatAmI, resolution: Resolution, batch_size: u16, + wait_before_drop: Duration, queue_size: QueueSizeConf, queue_backoff: Duration, defrag_buff_size: usize, @@ -154,6 +156,11 @@ impl TransportManagerBuilder { self } + pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self { + self.wait_before_drop = wait_before_drop; + self + } + pub fn queue_size(mut self, queue_size: QueueSizeConf) -> Self { self.queue_size = queue_size; self @@ -212,7 +219,11 @@ impl TransportManagerBuilder { self = self.batch_size(*link.tx().batch_size()); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); + self = self.wait_before_drop(Duration::from_micros( + *link.tx().queue().congestion_control().wait_before_drop(), + )); self = self.queue_size(link.tx().queue().size().clone()); + self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff())); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -259,6 +270,7 @@ impl TransportManagerBuilder { whatami: self.whatami, resolution: self.resolution, batch_size: self.batch_size, + wait_before_drop: self.wait_before_drop, queue_size, queue_backoff: self.queue_backoff, defrag_buff_size: self.defrag_buff_size, @@ -292,12 +304,14 @@ impl Default for TransportManagerBuilder { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); let backoff = *queue.backoff(); + let wait_before_drop = *queue.congestion_control().wait_before_drop(); Self { version: VERSION, zid: ZenohId::rand(), whatami: zenoh_config::defaults::mode, resolution: Resolution::default(), batch_size: BatchSize::MAX, + wait_before_drop: Duration::from_micros(wait_before_drop), queue_size: queue.size, queue_backoff: Duration::from_nanos(backoff), defrag_buff_size: *link_rx.max_message_size(), @@ -353,14 +367,14 @@ impl TransportManager { let this = this.clone(); async move { loop { - tokio::select! { - res = new_unicast_link_receiver.recv_async() => { - if let Ok(link) = res { - this.handle_new_link_unicast(link).await; + tokio::select! { + res = new_unicast_link_receiver.recv_async() => { + if let Ok(link) = res { + this.handle_new_link_unicast(link).await; + } } - } - _ = cancellation_token.cancelled() => { break; } - } + _ = cancellation_token.cancelled() => { break; } + } } } }); diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 3565f747a0..bfbdd3af61 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -320,6 +320,7 @@ impl TransportLinkMulticastUniversal { let tpc = TransmissionPipelineConf { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, + wait_before_drop: self.transport.manager.config.wait_before_drop, backoff: self.transport.manager.config.queue_backoff, }; // The pipeline diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 5c07b69738..93a6c717dd 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -59,6 +59,7 @@ impl TransportLinkUnicastUniversal { is_compression: link.config.batch.is_compression, }, queue_size: transport.manager.config.queue_size, + wait_before_drop: transport.manager.config.wait_before_drop, backoff: transport.manager.config.queue_backoff, }; diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index 2f4335ca31..13a605a588 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -408,3 +408,17 @@ AXVFFIgCSluyrolaD6CWD9MqOex4YOfJR2bNxI7lFvuK4AwjyUJzT1U1HXib17mM let endpoints = vec![endpoint]; run(&endpoints).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn endpoint_vsock() { + let _ = env_logger::try_init(); + // Define the locators + let endpoints: Vec = vec![ + "vsock/-1:1234".parse().unwrap(), + "vsock/VMADDR_CID_ANY:VMADDR_PORT_ANY".parse().unwrap(), + "vsock/VMADDR_CID_LOCAL:2345".parse().unwrap(), + "vsock/VMADDR_CID_LOCAL:VMADDR_PORT_ANY".parse().unwrap(), + ]; + run(&endpoints).await; +} diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index da7ec67703..ccc74e679e 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -142,3 +142,17 @@ async fn transport_whitelist_unixpipe() { // Run run(&endpoints).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_whitelist_vsock() { + let _ = env_logger::try_init(); + + // Define the locators + let endpoints: Vec = vec![ + "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(), + "vsock/1:17001".parse().unwrap(), + ]; + // Run + run(&endpoints).await; +} diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 04711e66ec..6d9f889d8c 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -464,3 +464,11 @@ async fn transport_unixpipe_intermittent_for_lowlatency_transport() { .unwrap(); lowlatency_transport_intermittent(&endpoint).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn transport_vsock_intermittent() { + let _ = env_logger::try_init(); + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + universal_transport_intermittent(&endpoint).await; +} diff --git a/io/zenoh-transport/tests/unicast_multilink.rs b/io/zenoh-transport/tests/unicast_multilink.rs index 2fe73853b9..5e4499be2a 100644 --- a/io/zenoh-transport/tests/unicast_multilink.rs +++ b/io/zenoh-transport/tests/unicast_multilink.rs @@ -722,4 +722,13 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== multilink_transport(&endpoint).await; } + + #[cfg(all(feature = "transport_vsock", target_os = "linux"))] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn multilink_vsock_only() { + let _ = env_logger::try_init(); + + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + multilink_transport(&endpoint).await; + } } diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index dfa690c889..56e4a1b140 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -825,3 +825,11 @@ async fn openclose_udp_only_listen_with_interface_restriction() { // should not connect to local interface and external address openclose_transport(&listen_endpoint, &connect_endpoint, false).await; } + +#[cfg(all(feature = "transport_vsock", target_os = "linux"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn openclose_vsock() { + let _ = env_logger::try_init(); + let endpoint: EndPoint = "vsock/VMADDR_CID_LOCAL:17000".parse().unwrap(); + openclose_lowlatency_transport(&endpoint).await; +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 8cd4b9fa19..144e5dbf72 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -48,6 +48,7 @@ transport_tls = ["zenoh-transport/transport_tls"] transport_udp = ["zenoh-transport/transport_udp"] transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] +transport_vsock= ["zenoh-transport/transport_vsock"] unstable = [] default = [ "auth_pubkey", diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 3ee115e293..d8820f7ad1 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -116,6 +116,7 @@ pub const FEATURES: &str = concat_enabled_features!( "transport_udp", "transport_unixsock-stream", "transport_ws", + "transport_vsock", "unstable", "default" ] diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 9645af0f74..3105c6195f 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -532,6 +532,8 @@ pub fn route_query( .compute_local_replies(&rtables, &prefix, expr.suffix, face); let zid = rtables.zid; + let timeout = rtables.queries_default_timeout; + drop(queries_lock); drop(rtables); @@ -589,19 +591,18 @@ pub fn route_query( expr.full_expr().to_string(), )); } else { - // let timer = tables.timer.clone(); - // let timeout = tables.queries_default_timeout; #[cfg(feature = "complete_n")] { for ((outface, key_expr, context), qid, t) in route.values() { - // timer.add(TimedEvent::once( - // Instant::now() + timeout, - // QueryCleanup { - // tables: tables_ref.clone(), - // face: Arc::downgrade(&outface), - // *qid, - // }, - // )); + let mut cleanup = QueryCleanup { + tables: tables_ref.clone(), + face: Arc::downgrade(outface), + qid: *qid, + }; + zenoh_runtime::ZRuntime::Net.spawn(async move { + tokio::time::sleep(timeout).await; + cleanup.run().await + }); #[cfg(feature = "stats")] if !admin { inc_req_stats!(outface, tx, user, body) @@ -630,14 +631,15 @@ pub fn route_query( #[cfg(not(feature = "complete_n"))] { for ((outface, key_expr, context), qid) in route.values() { - // timer.add(TimedEvent::once( - // Instant::now() + timeout, - // QueryCleanup { - // tables: tables_ref.clone(), - // face: Arc::downgrade(&outface), - // *qid, - // }, - // )); + let mut cleanup = QueryCleanup { + tables: tables_ref.clone(), + face: Arc::downgrade(outface), + qid: *qid, + }; + zenoh_runtime::ZRuntime::Net.spawn(async move { + tokio::time::sleep(timeout).await; + cleanup.run().await + }); #[cfg(feature = "stats")] if !admin { inc_req_stats!(outface, tx, user, body) diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index e239a316a1..73338cc79d 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -23,13 +23,13 @@ use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, Weak}; use std::sync::{Mutex, RwLock}; +use std::time::Duration; use uhlc::HLC; use zenoh_config::unwrap_or_default; use zenoh_config::Config; use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId}; use zenoh_protocol::network::Mapping; use zenoh_result::ZResult; -// use zenoh_collections::Timer; use zenoh_sync::get_mut_unchecked; pub(crate) struct RoutingExpr<'a> { @@ -64,8 +64,7 @@ pub struct Tables { #[allow(dead_code)] pub(crate) hlc: Option>, pub(crate) drop_future_timestamp: bool, - // pub(crate) timer: Timer, - // pub(crate) queries_default_timeout: Duration, + pub(crate) queries_default_timeout: Duration, pub(crate) root_res: Arc, pub(crate) faces: HashMap>, pub(crate) mcast_groups: Vec>, @@ -87,8 +86,8 @@ impl Tables { unwrap_or_default!(config.timestamping().drop_future_timestamp()); let router_peers_failover_brokering = unwrap_or_default!(config.routing().router().peers_failover_brokering()); - // let queries_default_timeout = - // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); + let queries_default_timeout = + Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); Ok(Tables { zid, @@ -96,8 +95,7 @@ impl Tables { face_counter: 0, hlc, drop_future_timestamp, - // timer: Timer::new(true), - // queries_default_timeout, + queries_default_timeout, root_res: Resource::root(), faces: HashMap::new(), mcast_groups: vec![], diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 1629a80321..878ed27f40 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -28,10 +28,10 @@ use crate::GIT_VERSION; pub use adminspace::AdminSpace; use futures::stream::StreamExt; use futures::Future; -use tokio_util::sync::CancellationToken; use std::any::Any; use std::sync::{Arc, Weak}; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use uhlc::{HLCBuilder, HLC}; use zenoh_link::{EndPoint, Link}; use zenoh_plugin_trait::{PluginStartArgs, StructVersion}; @@ -143,7 +143,7 @@ impl Runtime { let runtime2 = runtime.clone(); async move { let mut stream = receiver.into_stream(); - loop { + loop { tokio::select! { res = stream.next() => { match res { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index accb09f1c8..3f1026268a 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -20,7 +20,9 @@ use tokio::net::UdpSocket; use zenoh_buffers::reader::DidntRead; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_config::{unwrap_or_default, ModeDependent}; +use zenoh_config::{ + get_global_connect_timeout, get_global_listener_timeout, unwrap_or_default, ModeDependent, +}; use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, WhatAmI, ZenohId}, @@ -32,10 +34,6 @@ const RCV_BUF_SIZE: usize = u16::MAX as usize; const SCOUT_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); const SCOUT_MAX_PERIOD: Duration = Duration::from_millis(8_000); const SCOUT_PERIOD_INCREASE_FACTOR: u32 = 2; -const CONNECTION_TIMEOUT: Duration = Duration::from_millis(10_000); -const CONNECTION_RETRY_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); -const CONNECTION_RETRY_MAX_PERIOD: Duration = Duration::from_millis(4_000); -const CONNECTION_RETRY_PERIOD_INCREASE_FACTOR: u32 = 2; const ROUTER_DEFAULT_LISTENER: &str = "tcp/[::]:7447"; const PEER_DEFAULT_LISTENER: &str = "tcp/[::]:0"; @@ -87,27 +85,7 @@ impl Runtime { bail!("No peer specified and multicast scouting desactivated!") } } - _ => { - for locator in &peers { - match tokio::time::timeout( - CONNECTION_TIMEOUT, - self.manager().open_transport_unicast(locator.clone()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(e)) => log::warn!("Unable to connect to {}! {}", locator, e), - Err(e) => log::warn!("Unable to connect to {}! {}", locator, e), - } - } - let e = zerror!( - "{:?} Unable to connect to any of {:?}! ", - self.manager().get_locators(), - peers - ); - log::error!("{}", &e); - Err(e.into()) - } + _ => self.connect_peers(&peers, true).await, } } @@ -146,9 +124,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - for peer in peers { - self.spawn_peer_connector(peer).await?; - } + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -191,9 +167,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - for peer in peers { - self.spawn_peer_connector(peer).await?; - } + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -233,9 +207,9 @@ impl Runtime { }); } (false, false) => { - self.spawn_abortable( - async move { this.connect_all(&sockets, autoconnect, &addr).await }, - ); + self.spawn_abortable(async move { + this.connect_all(&sockets, autoconnect, &addr).await + }); } _ => {} } @@ -244,6 +218,116 @@ impl Runtime { Ok(()) } + async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + let timeout = self.get_global_connect_timeout(); + if timeout.is_zero() { + self.connect_peers_impl(peers, single_link).await + } else { + let res = tokio::time::timeout(timeout, async { + self.connect_peers_impl(peers, single_link).await.ok() + }) + .await; + match res { + Ok(_) => Ok(()), + Err(_) => { + let e = zerror!( + "{:?} Unable to connect to any of {:?}! ", + self.manager().get_locators(), + peers + ); + log::error!("{}", &e); + Err(e.into()) + } + } + } + } + + async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + if single_link { + self.connect_peers_single_link(peers).await + } else { + self.connect_peers_multiply_links(peers).await + } + } + + async fn connect_peers_single_link(&self, peers: &[EndPoint]) -> ZResult<()> { + for peer in peers { + let endpoint = peer.clone(); + let retry_config = self.get_connect_retry_config(&endpoint); + log::debug!( + "Try to connect: {:?}: global timeout: {:?}, retry: {:?}", + endpoint, + self.get_global_connect_timeout(), + retry_config + ); + if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { + // try to connect and exit immediately without retry + if self + .peer_connector(endpoint, retry_config.timeout()) + .await + .is_ok() + { + return Ok(()); + } + } else { + // try to connect with retry waiting + self.peer_connector_retry(endpoint).await; + return Ok(()); + } + } + let e = zerror!( + "{:?} Unable to connect to any of {:?}! ", + self.manager().get_locators(), + peers + ); + log::error!("{}", &e); + Err(e.into()) + } + + async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> { + for peer in peers { + let endpoint = peer.clone(); + let retry_config = self.get_connect_retry_config(&endpoint); + log::debug!( + "Try to connect: {:?}: global timeout: {:?}, retry: {:?}", + endpoint, + self.get_global_connect_timeout(), + retry_config + ); + if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { + // try to connect and exit immediately without retry + if let Err(e) = self.peer_connector(endpoint, retry_config.timeout()).await { + if retry_config.exit_on_failure { + return Err(e); + } + } + } else if retry_config.exit_on_failure { + // try to connect with retry waiting + self.peer_connector_retry(endpoint).await; + } else { + // try to connect in background + self.spawn_peer_connector(endpoint).await? + } + } + Ok(()) + } + + async fn peer_connector(&self, peer: EndPoint, timeout: std::time::Duration) -> ZResult<()> { + match tokio::time::timeout(timeout, self.manager().open_transport_unicast(peer.clone())) + .await + { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + log::warn!("Unable to connect to {}! {}", peer, e); + Err(e) + } + Err(e) => { + log::warn!("Unable to connect to {}! {}", peer, e); + Err(e.into()) + } + } + } + pub(crate) async fn update_peers(&self) -> ZResult<()> { let peers = { self.state.config.lock().connect().endpoints().clone() }; let tranports = self.manager().get_transports_unicast().await; @@ -293,24 +377,118 @@ impl Runtime { Ok(()) } + fn get_listen_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, Some(endpoint), true) + } + + fn get_connect_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, Some(endpoint), false) + } + + fn get_global_connect_retry_config(&self) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, None, false) + } + + fn get_global_listener_timeout(&self) -> std::time::Duration { + let guard = &self.state.config.lock(); + get_global_listener_timeout(guard) + } + + fn get_global_connect_timeout(&self) -> std::time::Duration { + let guard = &self.state.config.lock(); + get_global_connect_timeout(guard) + } + async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> { + let timeout = self.get_global_listener_timeout(); + if timeout.is_zero() { + self.bind_listeners_impl(listeners).await + } else { + let res = tokio::time::timeout(timeout, async { + self.bind_listeners_impl(listeners).await.ok() + }) + .await; + match res { + Ok(_) => Ok(()), + Err(e) => { + log::error!("Unable to open listeners: {}", e); + Err(Box::new(e)) + } + } + } + } + + async fn bind_listeners_impl(&self, listeners: &[EndPoint]) -> ZResult<()> { for listener in listeners { let endpoint = listener.clone(); - match self.manager().add_listener(endpoint).await { - Ok(listener) => log::debug!("Listener added: {}", listener), - Err(err) => { - log::error!("Unable to open listener {}: {}", listener, err); - return Err(err); - } + let retry_config = self.get_listen_retry_config(&endpoint); + log::debug!("Try to add listener: {:?}: {:?}", endpoint, retry_config); + if retry_config.timeout().is_zero() || self.get_global_listener_timeout().is_zero() { + // try to add listener and exit immediately without retry + if let Err(e) = self.add_listener(endpoint).await { + if retry_config.exit_on_failure { + return Err(e); + } + }; + } else if retry_config.exit_on_failure { + // try to add listener with retry waiting + self.add_listener_retry(endpoint, retry_config).await + } else { + // try to add listener in background + self.spawn_add_listener(endpoint, retry_config).await + } + } + self.print_locators(); + Ok(()) + } + + async fn spawn_add_listener( + &self, + listener: EndPoint, + retry_config: zenoh_config::ConnectionRetryConf, + ) { + let this = self.clone(); + self.spawn(async move { + this.add_listener_retry(listener, retry_config).await; + this.print_locators(); + }); + } + + async fn add_listener_retry( + &self, + listener: EndPoint, + retry_config: zenoh_config::ConnectionRetryConf, + ) { + let mut period = retry_config.period(); + loop { + if self.add_listener(listener.clone()).await.is_ok() { + break; } + tokio::time::sleep(period.next_duration()).await; } + } + async fn add_listener(&self, listener: EndPoint) -> ZResult<()> { + let endpoint = listener.clone(); + match self.manager().add_listener(endpoint).await { + Ok(listener) => log::debug!("Listener added: {}", listener), + Err(err) => { + log::warn!("Unable to open listener {}: {}", listener, err); + return Err(err); + } + } + Ok(()) + } + + fn print_locators(&self) { let mut locators = self.state.locators.write().unwrap(); *locators = self.manager().get_locators(); for locator in &*locators { log::info!("Zenoh can be reached at: {}", locator); } - Ok(()) } pub fn get_interfaces(names: &str) -> Vec { @@ -470,21 +648,22 @@ impl Runtime { .await? { let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn(async move { this.peer_connector_retry(peer).await }); Ok(()) } else { bail!("Forbidden multicast endpoint in connect list!") } } - async fn peer_connector(&self, peer: EndPoint) { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + async fn peer_connector_retry(&self, peer: EndPoint) { + let retry_config = self.get_connect_retry_config(&peer); + let mut period = retry_config.period(); let cancellation_token = self.get_cancellation_token(); loop { log::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); tokio::select! { - res = tokio::time::timeout(CONNECTION_TIMEOUT, self.manager().open_transport_unicast(endpoint)) => { + res = tokio::time::timeout(retry_config.timeout(), self.manager().open_transport_unicast(endpoint)) => { match res { Ok(Ok(transport)) => { log::debug!("Successfully connected to configured peer {}", peer); @@ -503,7 +682,7 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } Err(e) => { @@ -511,18 +690,14 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } } } _ = cancellation_token.cancelled() => { break; } } - tokio::time::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + tokio::time::sleep(period.next_duration()).await; } } @@ -637,10 +812,11 @@ impl Runtime { }; let endpoint = locator.to_owned().into(); + let retry_config = self.get_connect_retry_config(&endpoint); let manager = self.manager(); if is_multicast { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.timeout(), manager.open_transport_multicast(endpoint), ) .await @@ -657,7 +833,7 @@ impl Runtime { } } else { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.timeout(), manager.open_transport_unicast(endpoint), ) .await @@ -845,16 +1021,13 @@ impl Runtime { let runtime = session.runtime.clone(); let cancellation_token = runtime.get_cancellation_token(); session.runtime.spawn(async move { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + let retry_config = runtime.get_global_connect_retry_config(); + let mut period = retry_config.period(); while runtime.start_client().await.is_err() { - tokio::select! { - _ = tokio::time::sleep(delay) => {} + tokio::select! { + _ = tokio::time::sleep(period.next_duration()) => {} _ = cancellation_token.cancelled() => { break; } } - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } } }); } @@ -875,7 +1048,7 @@ impl Runtime { let runtime = session.runtime.clone(); session .runtime - .spawn(async move { runtime.peer_connector(endpoint).await }); + .spawn(async move { runtime.peer_connector_retry(endpoint).await }); } } } diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index ff867cecee..e94b1a9973 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -68,7 +68,10 @@ pub struct SourceInfo { #[test] #[cfg(feature = "unstable")] fn source_info_stack_size() { - assert_eq!(std::mem::size_of::(), 16 * 2); + assert_eq!(std::mem::size_of::(), 16); + assert_eq!(std::mem::size_of::>(), 17); + assert_eq!(std::mem::size_of::>(), 16); + assert_eq!(std::mem::size_of::(), 17 + 16 + 7); } #[zenoh_macros::unstable] diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs new file mode 100644 index 0000000000..db84d7bd5d --- /dev/null +++ b/zenoh/tests/connection_retry.rs @@ -0,0 +1,171 @@ +use config::ConnectionRetryConf; + +use zenoh::prelude::sync::*; + +#[test] +fn retry_config_overriding() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#" + [ + "tcp/1.2.3.4:0", + "tcp/1.2.3.4:0#retry_period_init_ms=30000", + "tcp/1.2.3.4:0#retry_period_init_ms=30000;retry_period_max_ms=60000;retry_period_increase_factor=15;exit_on_failure=true", + ] + "#, + ) + .unwrap(); + + config + .insert_json5( + "listen/retry", + r#" + { + try_timeout_ms: 2000, + period_init_ms: 3000, + period_max_ms: 6000, + period_increase_factor: 1.5, + } + "#, + ) + .unwrap(); + + config + .insert_json5("listen/exit_on_failure", "false") + .unwrap(); + + let expected = vec![ + // global value + ConnectionRetryConf { + period_init_ms: 3000, + period_max_ms: 6000, + period_increase_factor: 1.5, + exit_on_failure: false, + }, + // override one key + ConnectionRetryConf { + period_init_ms: 30000, + period_max_ms: 6000, + period_increase_factor: 1.5, + exit_on_failure: false, + }, + // override all keys + ConnectionRetryConf { + period_init_ms: 30000, + period_max_ms: 60000, + period_increase_factor: 15., + exit_on_failure: true, + }, + ]; + + for (i, endpoint) in config.listen().endpoints().iter().enumerate() { + let retry_config = zenoh_config::get_retry_config(&config, Some(endpoint), true); + assert_eq!(retry_config, expected[i]); + } +} + +#[test] +fn retry_config_parsing() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: 1000, + period_max_ms: 6000, + period_increase_factor: 2, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + let expected = vec![1000, 2000, 4000, 6000, 6000, 6000, 6000]; + + for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); + assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); + } +} + +#[test] +fn retry_config_const_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: 1000, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + let expected = vec![1000, 1000, 1000, 1000]; + + for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); + assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); + } +} + +#[test] +fn retry_config_infinit_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: -1, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + + assert_eq!(period.duration(), std::time::Duration::MAX); + assert_eq!(period.next_duration(), std::time::Duration::MAX); +} + +#[test] +#[should_panic(expected = "Can not create a new TCP listener")] +fn listen_no_retry() { + let mut config = Config::default(); + config + .insert_json5("listen/endpoints", r#"["tcp/8.8.8.8:8"]"#) + .unwrap(); + + config.insert_json5("listen/timeout_ms", "0").unwrap(); + zenoh::open(config).res().unwrap(); +} + +#[test] +#[should_panic(expected = "value: Elapsed(())")] +fn listen_with_retry() { + let mut config = Config::default(); + config + .insert_json5("listen/endpoints", r#"["tcp/8.8.8.8:8"]"#) + .unwrap(); + + config.insert_json5("listen/timeout_ms", "1000").unwrap(); + + zenoh::open(config).res().unwrap(); +} diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 6c5afe0673..38ed8ff087 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -27,7 +27,7 @@ use zenoh_core::ztimeout; use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher}; use zenoh_result::bail; -const TIMEOUT: Duration = Duration::from_secs(10); +const TIMEOUT: Duration = Duration::from_secs(60); const MSG_COUNT: usize = 50; const MSG_SIZE: [usize; 2] = [1_024, 131_072]; // Maximal recipes to run at once diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 7204a83612..d7cb9a52a9 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -386,6 +386,7 @@ fn test_default_features() { " zenoh/transport_udp", " zenoh/transport_unixsock-stream", " zenoh/transport_ws", + // " zenoh/transport_vsock", " zenoh/unstable", " zenoh/default", ) @@ -412,6 +413,7 @@ fn test_no_default_features() { // " zenoh/transport_udp", // " zenoh/transport_unixsock-stream", // " zenoh/transport_ws", + // " zenoh/transport_vsock", " zenoh/unstable", // " zenoh/default", )