Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main in protocol changes #997

Merged
merged 8 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
// ],
// },
// ],

// /// configure access control (ACL) rules
// access_control: {
// ///[true/false] acl will be activated only if this is set to true
Expand All @@ -199,6 +200,7 @@
// },
// ]
//},

/// Configure internal transport parameters
transport: {
unicast: {
Expand Down
12 changes: 2 additions & 10 deletions commons/zenoh-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,9 @@ ron = { workspace = true }
serde = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
zenoh-result = { workspace = true, features = ["std"] }
zenoh-protocol = { workspace = true }
zenoh-collections = { workspace = true, features = ["std"] }
zenoh-macros = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"io-util",
"macros",
"net",
"rt-multi-thread",
"sync",
"time",
] }
tracing = { workspace = true }
33 changes: 5 additions & 28 deletions commons/zenoh-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);

impl ZRuntimePool {
fn new() -> Self {
// It has been recognized that using atexit within Windows DLL is problematic
#[cfg(not(target_os = "windows"))]
// Register a callback to clean the static variables.
unsafe {
libc::atexit(cleanup);
Expand Down Expand Up @@ -184,42 +186,17 @@ impl ZRuntimePool {
// If there are any blocking tasks spawned by ZRuntimes, the function will block until they return.
impl Drop for ZRuntimePool {
fn drop(&mut self) {
std::panic::set_hook(Box::new(|_| {
// To suppress the panic error caught in the following `catch_unwind`.
}));

let handles: Vec<_> = self
.0
.drain()
.filter_map(|(_name, mut rt)| {
rt.take().map(|r| {
// NOTE: The error of the atexit handler in DLL (static lib is fine)
// failing to spawn a new thread in `cleanup` has been identified.
std::panic::catch_unwind(|| {
std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1)))
})
})
rt.take()
.map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))))
})
.collect();

for hd in handles {
match hd {
Ok(handle) => {
if let Err(err) = handle.join() {
tracing::error!(
"The handle failed to join during `ZRuntimePool` drop due to {err:?}"
);
}
}
Err(err) => {
// WARN: Windows with DLL is expected to panic for the time being.
// Otherwise, report the error.
#[cfg(not(target_os = "windows"))]
tracing::error!("`ZRuntimePool` failed to drop due to {err:?}");
#[cfg(target_os = "windows")]
tracing::trace!("`ZRuntimePool` failed to drop due to {err:?}");
}
}
let _ = hd.join();
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/zenoh-plugin-storage-manager/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ async fn test_updates_in_order() {
)
.unwrap();

let runtime = zenoh::runtime::Runtime::new(config).await.unwrap();
let runtime = zenoh::runtime::RuntimeBuilder::new(config)
.build()
.await
.unwrap();
let storage =
zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap();

Expand Down
5 changes: 4 additions & 1 deletion plugins/zenoh-plugin-storage-manager/tests/wildcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ async fn test_wild_card_in_order() {
)
.unwrap();

let runtime = zenoh::runtime::Runtime::new(config).await.unwrap();
let runtime = zenoh::runtime::RuntimeBuilder::new(config)
.build()
.await
.unwrap();
let storage =
zenoh_plugin_storage_manager::StoragesPlugin::start("storage-manager", &runtime).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<StartArgs: PluginStartArgs, Instance: PluginInstance> PluginStatus
if let Some(starter) = &self.starter {
starter.path()
} else {
"<not loaded>"
"__not_loaded__"
}
}
fn state(&self) -> PluginState {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-trait/src/manager/static_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
Some(P::PLUGIN_LONG_VERSION)
}
fn path(&self) -> &str {
"<static>"
"__static_lib__"
}
fn state(&self) -> PluginState {
self.instance
Expand Down
1 change: 1 addition & 0 deletions zenoh-ext/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ edition = { workspace = true }
license = { workspace = true }
categories = { workspace = true }
description = "Internal crate for zenoh"
publish = false

[badges]
maintenance = { status = "actively-developed" }
Expand Down
19 changes: 12 additions & 7 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use super::{
value::Value,
Id,
};
use crate::net::{primitives::Primitives, routing::dispatcher::face::Face, runtime::Runtime};
use crate::net::{
primitives::Primitives,
routing::dispatcher::face::Face,
runtime::{Runtime, RuntimeBuilder},
};
use std::future::IntoFuture;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -842,12 +846,13 @@ impl Session {
tracing::debug!("Config: {:?}", &config);
let aggregated_subscribers = config.aggregation().subscribers().clone();
let aggregated_publishers = config.aggregation().publishers().clone();
let mut runtime = Runtime::init(
config,
#[cfg(all(feature = "unstable", feature = "shared-memory"))]
shm_clients,
)
.await?;
#[allow(unused_mut)] // Required for shared-memory
let mut runtime = RuntimeBuilder::new(config);
#[cfg(all(feature = "unstable", feature = "shared-memory"))]
{
runtime = runtime.shm_clients(shm_clients);
}
let mut runtime = runtime.build().await?;

let mut session = Self::init(
runtime.clone(),
Expand Down
20 changes: 10 additions & 10 deletions zenoh/src/net/routing/interceptor/access_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,22 @@ impl InterceptorTrait for IngressAclEnforcer {
None
}
})
.or_else(|| ctx.full_expr())?;
.or_else(|| ctx.full_expr());

match &ctx.msg.body {
NetworkBody::Push(Push {
payload: PushBody::Put(_),
..
}) => {
if self.action(Action::Put, "Put (ingress)", key_expr) == Permission::Deny {
if self.action(Action::Put, "Put (ingress)", key_expr?) == Permission::Deny {
return None;
}
}
NetworkBody::Request(Request {
payload: RequestBody::Query(_),
..
}) => {
if self.action(Action::Get, "Get (ingress)", key_expr) == Permission::Deny {
if self.action(Action::Get, "Get (ingress)", key_expr?) == Permission::Deny {
return None;
}
}
Expand All @@ -188,7 +188,7 @@ impl InterceptorTrait for IngressAclEnforcer {
if self.action(
Action::DeclareSubscriber,
"Declare Subscriber (ingress)",
key_expr,
key_expr?,
) == Permission::Deny
{
return None;
Expand All @@ -201,7 +201,7 @@ impl InterceptorTrait for IngressAclEnforcer {
if self.action(
Action::DeclareQueryable,
"Declare Queryable (ingress)",
key_expr,
key_expr?,
) == Permission::Deny
{
return None;
Expand Down Expand Up @@ -230,22 +230,22 @@ impl InterceptorTrait for EgressAclEnforcer {
None
}
})
.or_else(|| ctx.full_expr())?;
.or_else(|| ctx.full_expr());

match &ctx.msg.body {
NetworkBody::Push(Push {
payload: PushBody::Put(_),
..
}) => {
if self.action(Action::Put, "Put (egress)", key_expr) == Permission::Deny {
if self.action(Action::Put, "Put (egress)", key_expr?) == Permission::Deny {
return None;
}
}
NetworkBody::Request(Request {
payload: RequestBody::Query(_),
..
}) => {
if self.action(Action::Get, "Get (egress)", key_expr) == Permission::Deny {
if self.action(Action::Get, "Get (egress)", key_expr?) == Permission::Deny {
return None;
}
}
Expand All @@ -256,7 +256,7 @@ impl InterceptorTrait for EgressAclEnforcer {
if self.action(
Action::DeclareSubscriber,
"Declare Subscriber (egress)",
key_expr,
key_expr?,
) == Permission::Deny
{
return None;
Expand All @@ -269,7 +269,7 @@ impl InterceptorTrait for EgressAclEnforcer {
if self.action(
Action::DeclareQueryable,
"Declare Queryable (egress)",
key_expr,
key_expr?,
) == Permission::Deny
{
return None;
Expand Down
9 changes: 8 additions & 1 deletion zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,14 @@ fn plugins_status(context: &AdminContext, query: Query) {
with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| {
if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) {
if query.key_expr().intersects(&key_expr) {
if let Err(e) = query.reply(key_expr, plugin.path()).wait() {
if let Err(e) = query
.reply(
key_expr,
serde_json::to_string(plugin.path())
.unwrap_or_else(|_| String::from("{}")),
)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand Down
36 changes: 5 additions & 31 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ impl RuntimeBuilder {
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone());

// Admin space
if start_admin_space {
AdminSpace::start(&runtime, LONG_VERSION.clone()).await;
}

// Start plugins
#[cfg(all(feature = "unstable", feature = "plugins"))]
start_plugins(&runtime);
Expand Down Expand Up @@ -215,11 +220,6 @@ impl RuntimeBuilder {
}
});

// Admin space
if start_admin_space {
AdminSpace::start(&runtime, LONG_VERSION.clone()).await;
}

Ok(runtime)
}
}
Expand All @@ -241,32 +241,6 @@ impl StructVersion for Runtime {
impl PluginStartArgs for Runtime {}

impl Runtime {
pub async fn new(config: Config) -> ZResult<Runtime> {
// Create plugin_manager and load plugins
let mut runtime = Runtime::init(
config,
#[cfg(all(feature = "unstable", feature = "shared-memory"))]
None,
)
.await?;
match runtime.start().await {
Ok(()) => Ok(runtime),
Err(err) => Err(err),
}
}

pub(crate) async fn init(
config: Config,
#[cfg(all(feature = "unstable", feature = "shared-memory"))] shm_clients: Option<
Arc<SharedMemoryClientStorage>,
>,
) -> ZResult<Runtime> {
let builder = RuntimeBuilder::new(config);
#[cfg(all(feature = "unstable", feature = "shared-memory"))]
let builder = builder.shm_clients(shm_clients);
builder.build().await
}

#[inline(always)]
pub(crate) fn manager(&self) -> &TransportManager {
&self.state.manager
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum Loop {
}

impl Runtime {
pub(crate) async fn start(&mut self) -> ZResult<()> {
pub async fn start(&mut self) -> ZResult<()> {
match self.whatami() {
WhatAmI::Client => self.start_client().await,
WhatAmI::Peer => self.start_peer().await,
Expand Down
9 changes: 7 additions & 2 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use std::sync::{Arc, Mutex};
use zenoh::internal::zlock;
use zenoh::prelude::*;

#[cfg(target_os = "windows")]
static MINIMAL_SLEEP_INTERVAL_MS: u64 = 17;
#[cfg(not(target_os = "windows"))]
static MINIMAL_SLEEP_INTERVAL_MS: u64 = 2;

struct IntervalCounter {
first_tick: bool,
last_time: std::time::Instant,
Expand Down Expand Up @@ -143,7 +148,7 @@ fn downsampling_by_keyexpr_impl(egress: bool) {
.unwrap();

// WARN(yuyuan): 2 ms is the limit of tokio
let interval = std::time::Duration::from_millis(2);
let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS);
let messages_count = 1000;
for i in 0..messages_count {
publisher_r100.put(format!("message {}", i)).wait().unwrap();
Expand Down Expand Up @@ -245,7 +250,7 @@ fn downsampling_by_interface_impl(egress: bool) {
.unwrap();

// WARN(yuyuan): 2 ms is the limit of tokio
let interval = std::time::Duration::from_millis(2);
let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS);
let messages_count = 1000;
for i in 0..messages_count {
publisher_r100.put(format!("message {}", i)).wait().unwrap();
Expand Down
Loading
Loading