diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index bd3bbbaf6b..ec9a827777 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -175,6 +175,7 @@ // ], // }, // ], + // /// configure access control (ACL) rules // access_control: { // ///[true/false] acl will be activated only if this is set to true @@ -199,6 +200,7 @@ // }, // ] //}, + /// Configure internal transport parameters transport: { unicast: { diff --git a/commons/zenoh-runtime/Cargo.toml b/commons/zenoh-runtime/Cargo.toml index e3a08a9de8..3625e5036f 100644 --- a/commons/zenoh-runtime/Cargo.toml +++ b/commons/zenoh-runtime/Cargo.toml @@ -18,17 +18,9 @@ ron = { workspace = true } serde = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } +tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "sync", "time"] } +tracing = { workspace = true } zenoh-result = { workspace = true, features = ["std"] } zenoh-protocol = { workspace = true } zenoh-collections = { workspace = true, features = ["std"] } zenoh-macros = { workspace = true } -tokio = { workspace = true, features = [ - "fs", - "io-util", - "macros", - "net", - "rt-multi-thread", - "sync", - "time", -] } -tracing = { workspace = true } diff --git a/commons/zenoh-runtime/src/lib.rs b/commons/zenoh-runtime/src/lib.rs index cb58cac570..dcd46744e6 100644 --- a/commons/zenoh-runtime/src/lib.rs +++ b/commons/zenoh-runtime/src/lib.rs @@ -157,6 +157,8 @@ pub struct ZRuntimePool(HashMap>); impl ZRuntimePool { fn new() -> Self { + // It has been recognized that using atexit within Windows DLL is problematic + #[cfg(not(target_os = "windows"))] // Register a callback to clean the static variables. unsafe { libc::atexit(cleanup); @@ -184,42 +186,17 @@ impl ZRuntimePool { // If there are any blocking tasks spawned by ZRuntimes, the function will block until they return. impl Drop for ZRuntimePool { fn drop(&mut self) { - std::panic::set_hook(Box::new(|_| { - // To suppress the panic error caught in the following `catch_unwind`. - })); - let handles: Vec<_> = self .0 .drain() .filter_map(|(_name, mut rt)| { - rt.take().map(|r| { - // NOTE: The error of the atexit handler in DLL (static lib is fine) - // failing to spawn a new thread in `cleanup` has been identified. - std::panic::catch_unwind(|| { - std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))) - }) - }) + rt.take() + .map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1)))) }) .collect(); for hd in handles { - match hd { - Ok(handle) => { - if let Err(err) = handle.join() { - tracing::error!( - "The handle failed to join during `ZRuntimePool` drop due to {err:?}" - ); - } - } - Err(err) => { - // WARN: Windows with DLL is expected to panic for the time being. - // Otherwise, report the error. - #[cfg(not(target_os = "windows"))] - tracing::error!("`ZRuntimePool` failed to drop due to {err:?}"); - #[cfg(target_os = "windows")] - tracing::trace!("`ZRuntimePool` failed to drop due to {err:?}"); - } - } + let _ = hd.join(); } } } diff --git a/plugins/zenoh-plugin-storage-manager/tests/operations.rs b/plugins/zenoh-plugin-storage-manager/tests/operations.rs index b5384e13be..61ea53deba 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/operations.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/operations.rs @@ -70,7 +70,10 @@ async fn test_updates_in_order() { ) .unwrap(); - let runtime = zenoh::runtime::Runtime::new(config).await.unwrap(); + let runtime = zenoh::runtime::RuntimeBuilder::new(config) + .build() + .await + .unwrap(); let storage = zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap(); diff --git a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs index bd38e834d7..f2482da8e5 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs @@ -71,7 +71,10 @@ async fn test_wild_card_in_order() { ) .unwrap(); - let runtime = zenoh::runtime::Runtime::new(config).await.unwrap(); + let runtime = zenoh::runtime::RuntimeBuilder::new(config) + .build() + .await + .unwrap(); let storage = zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap(); diff --git a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs index 90008aad36..a8a78306ea 100644 --- a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs @@ -142,7 +142,7 @@ impl PluginStatus if let Some(starter) = &self.starter { starter.path() } else { - "" + "__not_loaded__" } } fn state(&self) -> PluginState { diff --git a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs index 6d1bcae278..c275fb9818 100644 --- a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs @@ -51,7 +51,7 @@ where Some(P::PLUGIN_LONG_VERSION) } fn path(&self) -> &str { - "" + "__static_lib__" } fn state(&self) -> PluginState { self.instance diff --git a/zenoh-ext/examples/Cargo.toml b/zenoh-ext/examples/Cargo.toml index 3493016835..9cca8848ff 100644 --- a/zenoh-ext/examples/Cargo.toml +++ b/zenoh-ext/examples/Cargo.toml @@ -22,6 +22,7 @@ edition = { workspace = true } license = { workspace = true } categories = { workspace = true } description = "Internal crate for zenoh" +publish = false [badges] maintenance = { status = "actively-developed" } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index dea322419c..703fca2e9d 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -31,7 +31,11 @@ use super::{ value::Value, Id, }; -use crate::net::{primitives::Primitives, routing::dispatcher::face::Face, runtime::Runtime}; +use crate::net::{ + primitives::Primitives, + routing::dispatcher::face::Face, + runtime::{Runtime, RuntimeBuilder}, +}; use std::future::IntoFuture; use std::{ collections::HashMap, @@ -842,12 +846,13 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.aggregation().subscribers().clone(); let aggregated_publishers = config.aggregation().publishers().clone(); - let mut runtime = Runtime::init( - config, - #[cfg(all(feature = "unstable", feature = "shared-memory"))] - shm_clients, - ) - .await?; + #[allow(unused_mut)] // Required for shared-memory + let mut runtime = RuntimeBuilder::new(config); + #[cfg(all(feature = "unstable", feature = "shared-memory"))] + { + runtime = runtime.shm_clients(shm_clients); + } + let mut runtime = runtime.build().await?; let mut session = Self::init( runtime.clone(), diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index b23db9765e..102e30a0df 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -162,14 +162,14 @@ impl InterceptorTrait for IngressAclEnforcer { None } }) - .or_else(|| ctx.full_expr())?; + .or_else(|| ctx.full_expr()); match &ctx.msg.body { NetworkBody::Push(Push { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (ingress)", key_expr) == Permission::Deny { + if self.action(Action::Put, "Put (ingress)", key_expr?) == Permission::Deny { return None; } } @@ -177,7 +177,7 @@ impl InterceptorTrait for IngressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (ingress)", key_expr) == Permission::Deny { + if self.action(Action::Get, "Get (ingress)", key_expr?) == Permission::Deny { return None; } } @@ -188,7 +188,7 @@ impl InterceptorTrait for IngressAclEnforcer { if self.action( Action::DeclareSubscriber, "Declare Subscriber (ingress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -201,7 +201,7 @@ impl InterceptorTrait for IngressAclEnforcer { if self.action( Action::DeclareQueryable, "Declare Queryable (ingress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -230,14 +230,14 @@ impl InterceptorTrait for EgressAclEnforcer { None } }) - .or_else(|| ctx.full_expr())?; + .or_else(|| ctx.full_expr()); match &ctx.msg.body { NetworkBody::Push(Push { payload: PushBody::Put(_), .. }) => { - if self.action(Action::Put, "Put (egress)", key_expr) == Permission::Deny { + if self.action(Action::Put, "Put (egress)", key_expr?) == Permission::Deny { return None; } } @@ -245,7 +245,7 @@ impl InterceptorTrait for EgressAclEnforcer { payload: RequestBody::Query(_), .. }) => { - if self.action(Action::Get, "Get (egress)", key_expr) == Permission::Deny { + if self.action(Action::Get, "Get (egress)", key_expr?) == Permission::Deny { return None; } } @@ -256,7 +256,7 @@ impl InterceptorTrait for EgressAclEnforcer { if self.action( Action::DeclareSubscriber, "Declare Subscriber (egress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; @@ -269,7 +269,7 @@ impl InterceptorTrait for EgressAclEnforcer { if self.action( Action::DeclareQueryable, "Declare Queryable (egress)", - key_expr, + key_expr?, ) == Permission::Deny { return None; diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 9ea54b8d88..3f2e0b488f 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -793,7 +793,14 @@ fn plugins_status(context: &AdminContext, query: Query) { with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| { if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) { if query.key_expr().intersects(&key_expr) { - if let Err(e) = query.reply(key_expr, plugin.path()).wait() { + if let Err(e) = query + .reply( + key_expr, + serde_json::to_string(plugin.path()) + .unwrap_or_else(|_| String::from("{}")), + ) + .wait() + { tracing::error!("Error sending AdminSpace reply: {:?}", e); } } diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 4991844650..f1cf4d95d2 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -184,6 +184,11 @@ impl RuntimeBuilder { *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone()); + // Admin space + if start_admin_space { + AdminSpace::start(&runtime, LONG_VERSION.clone()).await; + } + // Start plugins #[cfg(all(feature = "unstable", feature = "plugins"))] start_plugins(&runtime); @@ -215,11 +220,6 @@ impl RuntimeBuilder { } }); - // Admin space - if start_admin_space { - AdminSpace::start(&runtime, LONG_VERSION.clone()).await; - } - Ok(runtime) } } @@ -241,32 +241,6 @@ impl StructVersion for Runtime { impl PluginStartArgs for Runtime {} impl Runtime { - pub async fn new(config: Config) -> ZResult { - // Create plugin_manager and load plugins - let mut runtime = Runtime::init( - config, - #[cfg(all(feature = "unstable", feature = "shared-memory"))] - None, - ) - .await?; - match runtime.start().await { - Ok(()) => Ok(runtime), - Err(err) => Err(err), - } - } - - pub(crate) async fn init( - config: Config, - #[cfg(all(feature = "unstable", feature = "shared-memory"))] shm_clients: Option< - Arc, - >, - ) -> ZResult { - let builder = RuntimeBuilder::new(config); - #[cfg(all(feature = "unstable", feature = "shared-memory"))] - let builder = builder.shm_clients(shm_clients); - builder.build().await - } - #[inline(always)] pub(crate) fn manager(&self) -> &TransportManager { &self.state.manager diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 687fa90649..c2c7ecedd2 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -43,7 +43,7 @@ pub enum Loop { } impl Runtime { - pub(crate) async fn start(&mut self) -> ZResult<()> { + pub async fn start(&mut self) -> ZResult<()> { match self.whatami() { WhatAmI::Client => self.start_client().await, WhatAmI::Peer => self.start_peer().await, diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index f6e876d92e..37f193630d 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -15,6 +15,11 @@ use std::sync::{Arc, Mutex}; use zenoh::internal::zlock; use zenoh::prelude::*; +#[cfg(target_os = "windows")] +static MINIMAL_SLEEP_INTERVAL_MS: u64 = 17; +#[cfg(not(target_os = "windows"))] +static MINIMAL_SLEEP_INTERVAL_MS: u64 = 2; + struct IntervalCounter { first_tick: bool, last_time: std::time::Instant, @@ -143,7 +148,7 @@ fn downsampling_by_keyexpr_impl(egress: bool) { .unwrap(); // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(2); + let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); let messages_count = 1000; for i in 0..messages_count { publisher_r100.put(format!("message {}", i)).wait().unwrap(); @@ -245,7 +250,7 @@ fn downsampling_by_interface_impl(egress: bool) { .unwrap(); // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(2); + let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); let messages_count = 1000; for i in 0..messages_count { publisher_r100.put(format!("message {}", i)).wait().unwrap(); diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index b52dbb90b8..43dfc79470 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -17,7 +17,7 @@ use std::time::Duration; use zenoh::internal::ztimeout; use zenoh::prelude::*; #[cfg(feature = "unstable")] -use zenoh::runtime::Runtime; +use zenoh::runtime::{Runtime, RuntimeBuilder}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -264,7 +264,8 @@ async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Creating r1 session runtime: {:?}", endpoints); - let r1 = Runtime::new(config).await.unwrap(); + let mut r1 = RuntimeBuilder::new(config).build().await.unwrap(); + r1.start().await.unwrap(); let mut config = config::peer(); config.connect.endpoints = endpoints @@ -273,7 +274,8 @@ async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][02a] Creating r2 session runtime: {:?}", endpoints); - let r2 = Runtime::new(config).await.unwrap(); + let mut r2 = RuntimeBuilder::new(config).build().await.unwrap(); + r2.start().await.unwrap(); (r1, r2) }