From ad58af6c93babe0e4310b37e04dae7bc9523e246 Mon Sep 17 00:00:00 2001 From: Yuyuan Yuan Date: Fri, 26 Apr 2024 19:54:57 +0800 Subject: [PATCH 1/7] fix(zenoh_runtime): disable atexit on windows (#981) * Revert "fix(zenoh-runtime): zenoh-c DLL crash in `libc::atexit` handler (#972)" This reverts commit 274166d778945be0bb9250944f1374e3c0dfc892. * ci: disable atexit cleanup on Windows --- commons/zenoh-runtime/Cargo.toml | 1 - commons/zenoh-runtime/src/lib.rs | 33 +++++--------------------------- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/commons/zenoh-runtime/Cargo.toml b/commons/zenoh-runtime/Cargo.toml index cfb63b7e60..e3f0c7a3c0 100644 --- a/commons/zenoh-runtime/Cargo.toml +++ b/commons/zenoh-runtime/Cargo.toml @@ -22,4 +22,3 @@ zenoh-result = { workspace = true, features = ["std"] } zenoh-collections = { workspace = true, features = ["std"] } zenoh-macros = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "sync", "time"] } -tracing = { workspace = true } diff --git a/commons/zenoh-runtime/src/lib.rs b/commons/zenoh-runtime/src/lib.rs index cb58cac570..dcd46744e6 100644 --- a/commons/zenoh-runtime/src/lib.rs +++ b/commons/zenoh-runtime/src/lib.rs @@ -157,6 +157,8 @@ pub struct ZRuntimePool(HashMap>); impl ZRuntimePool { fn new() -> Self { + // It has been recognized that using atexit within Windows DLL is problematic + #[cfg(not(target_os = "windows"))] // Register a callback to clean the static variables. unsafe { libc::atexit(cleanup); @@ -184,42 +186,17 @@ impl ZRuntimePool { // If there are any blocking tasks spawned by ZRuntimes, the function will block until they return. impl Drop for ZRuntimePool { fn drop(&mut self) { - std::panic::set_hook(Box::new(|_| { - // To suppress the panic error caught in the following `catch_unwind`. - })); - let handles: Vec<_> = self .0 .drain() .filter_map(|(_name, mut rt)| { - rt.take().map(|r| { - // NOTE: The error of the atexit handler in DLL (static lib is fine) - // failing to spawn a new thread in `cleanup` has been identified. - std::panic::catch_unwind(|| { - std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))) - }) - }) + rt.take() + .map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1)))) }) .collect(); for hd in handles { - match hd { - Ok(handle) => { - if let Err(err) = handle.join() { - tracing::error!( - "The handle failed to join during `ZRuntimePool` drop due to {err:?}" - ); - } - } - Err(err) => { - // WARN: Windows with DLL is expected to panic for the time being. - // Otherwise, report the error. - #[cfg(not(target_os = "windows"))] - tracing::error!("`ZRuntimePool` failed to drop due to {err:?}"); - #[cfg(target_os = "windows")] - tracing::trace!("`ZRuntimePool` failed to drop due to {err:?}"); - } - } + let _ = hd.join(); } } } From 3263306e61baf3f90ea6469fd3f062238ab4ecf4 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 26 Apr 2024 14:46:06 +0200 Subject: [PATCH 2/7] fix: Deny publishing of zenoh-ext-examples (#984) --- zenoh-ext/examples/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh-ext/examples/Cargo.toml b/zenoh-ext/examples/Cargo.toml index 3493016835..9cca8848ff 100644 --- a/zenoh-ext/examples/Cargo.toml +++ b/zenoh-ext/examples/Cargo.toml @@ -22,6 +22,7 @@ edition = { workspace = true } license = { workspace = true } categories = { workspace = true } description = "Internal crate for zenoh" +publish = false [badges] maintenance = { status = "actively-developed" } From e8916bf3ce3e3418b953eccd58f2aa1482839257 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 26 Apr 2024 15:19:51 +0200 Subject: [PATCH 3/7] Fix runtime start calling (#985) --- .../tests/operations.rs | 5 +++- .../tests/wildcard.rs | 5 +++- zenoh/src/net/runtime/mod.rs | 23 ++++--------------- zenoh/src/net/runtime/orchestrator.rs | 2 +- zenoh/src/session.rs | 3 ++- zenoh/tests/session.rs | 8 ++++--- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/tests/operations.rs b/plugins/zenoh-plugin-storage-manager/tests/operations.rs index a4293f31f1..cb1c42c201 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/operations.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/operations.rs @@ -78,7 +78,10 @@ async fn test_updates_in_order() { ) .unwrap(); - let runtime = zenoh::runtime::Runtime::new(config).await.unwrap(); + let runtime = zenoh::runtime::RuntimeBuilder::new(config) + .build() + .await + .unwrap(); let storage = zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap(); diff --git a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs index 60970b2247..75fb6d3a87 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs @@ -79,7 +79,10 @@ async fn test_wild_card_in_order() { ) .unwrap(); - let runtime = zenoh::runtime::Runtime::new(config).await.unwrap(); + let runtime = zenoh::runtime::RuntimeBuilder::new(config) + .build() + .await + .unwrap(); let storage = zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap(); diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 765eab91bb..364891460a 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -153,6 +153,11 @@ impl RuntimeBuilder { *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone()); + // Admin space + if start_admin_space { + AdminSpace::start(&runtime, LONG_VERSION.clone()).await; + } + // Start plugins #[cfg(all(feature = "unstable", feature = "plugins"))] crate::plugins::loader::start_plugins(&runtime); @@ -184,11 +189,6 @@ impl RuntimeBuilder { } }); - // Admin space - if start_admin_space { - AdminSpace::start(&runtime, LONG_VERSION.clone()).await; - } - Ok(runtime) } } @@ -210,19 +210,6 @@ impl StructVersion for Runtime { impl PluginStartArgs for Runtime {} impl Runtime { - pub async fn new(config: Config) -> ZResult { - // Create plugin_manager and load plugins - let mut runtime = Runtime::init(config).await?; - match runtime.start().await { - Ok(()) => Ok(runtime), - Err(err) => Err(err), - } - } - - pub(crate) async fn init(config: Config) -> ZResult { - RuntimeBuilder::new(config).build().await - } - #[inline(always)] pub(crate) fn manager(&self) -> &TransportManager { &self.state.manager diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 687fa90649..c2c7ecedd2 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -43,7 +43,7 @@ pub enum Loop { } impl Runtime { - pub(crate) async fn start(&mut self) -> ZResult<()> { + pub async fn start(&mut self) -> ZResult<()> { match self.whatami() { WhatAmI::Client => self.start_client().await, WhatAmI::Peer => self.start_peer().await, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 37ae02fe8b..eaca07d964 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -28,6 +28,7 @@ use crate::prelude::{KeyExpr, Parameters}; use crate::publication::*; use crate::query::*; use crate::queryable::*; +use crate::runtime::RuntimeBuilder; #[cfg(feature = "unstable")] use crate::sample::Attachment; use crate::sample::DataInfo; @@ -824,7 +825,7 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.aggregation().subscribers().clone(); let aggregated_publishers = config.aggregation().publishers().clone(); - let mut runtime = Runtime::init(config).await?; + let mut runtime = RuntimeBuilder::new(config).build().await?; let mut session = Self::init( runtime.clone(), diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 1c18a16c50..f5061f7fd7 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh::prelude::r#async::*; -use zenoh::runtime::Runtime; +use zenoh::runtime::{Runtime, RuntimeBuilder}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(60); @@ -209,7 +209,8 @@ async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Creating r1 session runtime: {:?}", endpoints); - let r1 = Runtime::new(config).await.unwrap(); + let mut r1 = RuntimeBuilder::new(config).build().await.unwrap(); + r1.start().await.unwrap(); let mut config = config::peer(); config.connect.endpoints = endpoints @@ -218,7 +219,8 @@ async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][02a] Creating r2 session runtime: {:?}", endpoints); - let r2 = Runtime::new(config).await.unwrap(); + let mut r2 = RuntimeBuilder::new(config).build().await.unwrap(); + r2.start().await.unwrap(); (r1, r2) } From feaf07f830d59fbc57496fdc6dd98c8c4a839be2 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Mon, 29 Apr 2024 11:01:35 +0200 Subject: [PATCH 4/7] Fix invalid JSON in admin space for static plugins (#988) --- plugins/zenoh-plugin-trait/src/manager/static_plugin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs index 6d1bcae278..7483d77343 100644 --- a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs @@ -51,7 +51,7 @@ where Some(P::PLUGIN_LONG_VERSION) } fn path(&self) -> &str { - "" + r#""""# } fn state(&self) -> PluginState { self.instance From ea604b6a7298eca0dc0731ddc5d2311665a10e6b Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Mon, 29 Apr 2024 14:20:43 +0200 Subject: [PATCH 5/7] Fix admin space: plugins __path__ was invalid JSON (#990) --- plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs | 2 +- plugins/zenoh-plugin-trait/src/manager/static_plugin.rs | 2 +- zenoh/src/net/runtime/adminspace.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs index 90008aad36..a8a78306ea 100644 --- a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs @@ -142,7 +142,7 @@ impl PluginStatus if let Some(starter) = &self.starter { starter.path() } else { - "" + "__not_loaded__" } } fn state(&self) -> PluginState { diff --git a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs index 7483d77343..c275fb9818 100644 --- a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs @@ -51,7 +51,7 @@ where Some(P::PLUGIN_LONG_VERSION) } fn path(&self) -> &str { - r#""""# + "__static_lib__" } fn state(&self) -> PluginState { self.instance diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 0040c96666..a7ac55baf9 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -800,7 +800,7 @@ fn plugins_status(context: &AdminContext, query: Query) { if let Err(e) = query .reply(Ok(Sample::new( key_expr, - Value::from(plugin.path()).encoding(KnownEncoding::AppJson.into()), + serde_json::Value::String(plugin.path().into()), ))) .res() { From 4c277d3d245bef80a63b6ba89cbdae6977f1f08e Mon Sep 17 00:00:00 2001 From: Yuyuan Yuan Date: Tue, 30 Apr 2024 15:28:30 +0800 Subject: [PATCH 6/7] fix(test): sporadic failures of downsampling test on Windows (#995) * fix: set the minimal sleep interval to 17ms on windows * fixup! fix: set the minimal sleep interval to 17ms on windows * fixup! fix: set the minimal sleep interval to 17ms on windows --- zenoh/tests/interceptors.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index d74a12c17f..b8f9164cfd 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -14,6 +14,11 @@ use std::sync::{Arc, Mutex}; use zenoh_core::zlock; +#[cfg(target_os = "windows")] +static MINIMAL_SLEEP_INTERVAL_MS: u64 = 17; +#[cfg(not(target_os = "windows"))] +static MINIMAL_SLEEP_INTERVAL_MS: u64 = 2; + struct IntervalCounter { first_tick: bool, last_time: std::time::Instant, @@ -144,7 +149,7 @@ fn downsampling_by_keyexpr_impl(egress: bool) { .unwrap(); // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(2); + let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); let messages_count = 1000; for i in 0..messages_count { publisher_r100.put(format!("message {}", i)).res().unwrap(); @@ -248,7 +253,7 @@ fn downsampling_by_interface_impl(egress: bool) { .unwrap(); // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(2); + let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); let messages_count = 1000; for i in 0..messages_count { publisher_r100.put(format!("message {}", i)).res().unwrap(); From 4806af01611d24ce157f3d411b30a064f45f259a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 30 Apr 2024 09:38:32 +0200 Subject: [PATCH 7/7] Acl fix (#993) * ACL does not intercept messages with no key_expr * Update DEFAULT_CONFIG.json5 --- DEFAULT_CONFIG.json5 | 2 ++ .../net/routing/interceptor/access_control.rs | 20 +++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index bd3bbbaf6b..ec9a827777 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -175,6 +175,7 @@ // ], // }, // ], + // /// configure access control (ACL) rules // access_control: { // ///[true/false] acl will be activated only if this is set to true @@ -199,6 +200,7 @@ // }, // ] //}, + /// Configure internal transport parameters transport: { unicast: { diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 1b0876160a..8ce4c840b4 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -162,14 +162,14 @@ impl InterceptorTrait for IngressAclEnforcer { None } }) - .or_else(|| ctx.full_expr())?; + .or_else(|| ctx.full_expr()); match &ctx.msg.body { NetworkBody::Push(Push { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (ingress)", key_expr) == Permission::Deny { + if self.action(Action::Put, "Put (ingress)", key_expr?) == Permission::Deny { return None; } } @@ -177,7 +177,7 @@ impl InterceptorTrait for IngressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (ingress)", key_expr) == Permission::Deny { + if self.action(Action::Get, "Get (ingress)", key_expr?) == Permission::Deny { return None; } } @@ -188,7 +188,7 @@ impl InterceptorTrait for IngressAclEnforcer { if self.action( Action::DeclareSubscriber, "Declare Subscriber (ingress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -201,7 +201,7 @@ impl InterceptorTrait for IngressAclEnforcer { if self.action( Action::DeclareQueryable, "Declare Queryable (ingress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -230,14 +230,14 @@ impl InterceptorTrait for EgressAclEnforcer { None } }) - .or_else(|| ctx.full_expr())?; + .or_else(|| ctx.full_expr()); match &ctx.msg.body { NetworkBody::Push(Push { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (egress)", key_expr) == Permission::Deny { + if self.action(Action::Put, "Put (egress)", key_expr?) == Permission::Deny { return None; } } @@ -245,7 +245,7 @@ impl InterceptorTrait for EgressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (egress)", key_expr) == Permission::Deny { + if self.action(Action::Get, "Get (egress)", key_expr?) == Permission::Deny { return None; } } @@ -256,7 +256,7 @@ impl InterceptorTrait for EgressAclEnforcer { if self.action( Action::DeclareSubscriber, "Declare Subscriber (egress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -269,7 +269,7 @@ impl InterceptorTrait for EgressAclEnforcer { if self.action( Action::DeclareQueryable, "Declare Queryable (egress)", - key_expr, + key_expr?, ) == Permission::Deny { return None;