From 9ecc9031ac34f6ae0f8e5b996999277b02b3038e Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 22 Apr 2024 17:01:57 +0200 Subject: [PATCH] Peers and clients adminspace (#960) * Peers,clients adminspace reports under @/peer,@/client keys * Only report linkstate graphs when they exist * Remove useless zid_str --------- Co-authored-by: Luca Cominardi --- zenoh/src/net/runtime/adminspace.rs | 155 +++++++++++++++++----------- 1 file changed, 93 insertions(+), 62 deletions(-) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index f6e15ef113..d62379b862 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use std::sync::Mutex; use tracing::{error, trace}; use zenoh_buffers::buffer::SplitBuffer; -use zenoh_config::{ConfigValidator, ValidatedMap, WhatAmI}; +use zenoh_config::{unwrap_or_default, ConfigValidator, ValidatedMap, WhatAmI}; #[cfg(all(feature = "unstable", feature = "plugins"))] use zenoh_plugin_trait::{PluginControl, PluginStatus}; #[cfg(all(feature = "unstable", feature = "plugins"))] @@ -47,7 +47,6 @@ use zenoh_transport::unicast::TransportUnicast; pub struct AdminContext { runtime: Runtime, - zid_str: String, version: String, metadata: serde_json::Value, } @@ -141,35 +140,45 @@ impl AdminSpace { pub async fn start(runtime: &Runtime, version: String) { let zid_str = runtime.state.zid.to_string(); + let whatami_str = runtime.state.whatami.to_str(); + let mut config = runtime.config().lock(); let metadata = runtime.state.metadata.clone(); - let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); + let root_key: OwnedKeyExpr = format!("@/{whatami_str}/{zid_str}").try_into().unwrap(); let mut handlers: HashMap<_, Handler> = HashMap::new(); - handlers.insert(root_key.clone(), Arc::new(router_data)); + handlers.insert(root_key.clone(), Arc::new(local_data)); handlers.insert( - format!("@/router/{zid_str}/metrics").try_into().unwrap(), - Arc::new(router_metrics), - ); - handlers.insert( - format!("@/router/{zid_str}/linkstate/routers") - .try_into() - .unwrap(), - Arc::new(routers_linkstate_data), - ); - handlers.insert( - format!("@/router/{zid_str}/linkstate/peers") + format!("@/{whatami_str}/{zid_str}/metrics") .try_into() .unwrap(), - Arc::new(peers_linkstate_data), + Arc::new(metrics), ); + if runtime.state.whatami == WhatAmI::Router { + handlers.insert( + format!("@/{whatami_str}/{zid_str}/linkstate/routers") + .try_into() + .unwrap(), + Arc::new(routers_linkstate_data), + ); + } + if runtime.state.whatami != WhatAmI::Client + && unwrap_or_default!(config.routing().peer().mode()) == *"linkstate" + { + handlers.insert( + format!("@/{whatami_str}/{zid_str}/linkstate/peers") + .try_into() + .unwrap(), + Arc::new(peers_linkstate_data), + ); + } handlers.insert( - format!("@/router/{zid_str}/subscriber/**") + format!("@/{whatami_str}/{zid_str}/subscriber/**") .try_into() .unwrap(), Arc::new(subscribers_data), ); handlers.insert( - format!("@/router/{zid_str}/queryable/**") + format!("@/{whatami_str}/{zid_str}/queryable/**") .try_into() .unwrap(), Arc::new(queryables_data), @@ -177,13 +186,15 @@ impl AdminSpace { #[cfg(all(feature = "unstable", feature = "plugins"))] handlers.insert( - format!("@/router/{zid_str}/plugins/**").try_into().unwrap(), + format!("@/{whatami_str}/{zid_str}/plugins/**") + .try_into() + .unwrap(), Arc::new(plugins_data), ); #[cfg(all(feature = "unstable", feature = "plugins"))] handlers.insert( - format!("@/router/{zid_str}/status/plugins/**") + format!("@/{whatami_str}/{zid_str}/status/plugins/**") .try_into() .unwrap(), Arc::new(plugins_status), @@ -198,25 +209,18 @@ impl AdminSpace { let context = Arc::new(AdminContext { runtime: runtime.clone(), - zid_str, version, metadata, }); let admin = Arc::new(AdminSpace { - zid: runtime.zid(), + zid: runtime.state.zid, primitives: Mutex::new(None), mappings: Mutex::new(HashMap::new()), handlers, context, }); - admin - .context - .runtime - .state - .config - .lock() - .set_plugin_validator(Arc::downgrade(&admin)); + config.set_plugin_validator(Arc::downgrade(&admin)); #[cfg(all(feature = "unstable", feature = "plugins"))] { @@ -362,37 +366,42 @@ impl Primitives for AdminSpace { } } - if let Some(key) = msg - .wire_expr - .as_str() - .strip_prefix(&format!("@/router/{}/config/", &self.context.zid_str)) - { + if let Some(key) = msg.wire_expr.as_str().strip_prefix(&format!( + "@/{}/{}/config/", + self.context.runtime.state.whatami, self.context.runtime.state.zid + )) { match msg.payload { PushBody::Put(put) => match std::str::from_utf8(&put.payload.contiguous()) { Ok(json) => { tracing::trace!( - "Insert conf value /@/router/{}/config/{} : {}", - &self.context.zid_str, + "Insert conf value /@/{}/{}/config/{} : {}", + self.context.runtime.state.whatami, + self.context.runtime.state.zid, key, json ); if let Err(e) = (&self.context.runtime.state.config).insert_json5(key, json) { error!( - "Error inserting conf value /@/router/{}/config/{} : {} - {}", - &self.context.zid_str, key, json, e + "Error inserting conf value /@/{}/{}/config/{} : {} - {}", + self.context.runtime.state.whatami, + self.context.runtime.state.zid, + key, + json, + e ); } } Err(e) => error!( - "Received non utf8 conf value on /@/router/{}/config/{} : {}", - &self.context.zid_str, key, e + "Received non utf8 conf value on /@/{}/{}/config/{} : {}", + self.context.runtime.state.whatami, self.context.runtime.state.zid, key, e ), }, PushBody::Del(_) => { tracing::trace!( - "Deleting conf value /@/router/{}/config/{}", - &self.context.zid_str, + "Deleting conf value /@/{}/{}/config/{}", + self.context.runtime.state.whatami, + self.context.runtime.state.zid, key ); if let Err(e) = self.context.runtime.state.config.remove(key) { @@ -510,8 +519,13 @@ impl crate::net::primitives::EPrimitives for AdminSpace { } } -fn router_data(context: &AdminContext, query: Query) { - let reply_key: OwnedKeyExpr = format!("@/router/{}", context.zid_str).try_into().unwrap(); +fn local_data(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!( + "@/{}/{}", + context.runtime.state.whatami, context.runtime.state.zid + ) + .try_into() + .unwrap(); let transport_mgr = context.runtime.manager().clone(); @@ -568,7 +582,7 @@ fn router_data(context: &AdminContext, query: Query) { #[allow(unused_mut)] let mut json = json!({ - "zid": context.zid_str, + "zid": context.runtime.state.zid, "version": context.version, "metadata": context.metadata, "locators": locators, @@ -601,10 +615,13 @@ fn router_data(context: &AdminContext, query: Query) { } } -fn router_metrics(context: &AdminContext, query: Query) { - let reply_key: OwnedKeyExpr = format!("@/router/{}/metrics", context.zid_str) - .try_into() - .unwrap(); +fn metrics(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!( + "@/{}/{}/metrics", + context.runtime.state.whatami, context.runtime.state.zid + ) + .try_into() + .unwrap(); #[allow(unused_mut)] let mut metrics = format!( r#"# HELP zenoh_build Informations about zenoh. @@ -636,9 +653,12 @@ zenoh_build{{version="{}"}} 1 } fn routers_linkstate_data(context: &AdminContext, query: Query) { - let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/routers", context.zid_str) - .try_into() - .unwrap(); + let reply_key: OwnedKeyExpr = format!( + "@/{}/{}/linkstate/routers", + context.runtime.state.whatami, context.runtime.state.zid + ) + .try_into() + .unwrap(); let tables = zread!(context.runtime.state.router.tables.tables); @@ -661,9 +681,12 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) { } fn peers_linkstate_data(context: &AdminContext, query: Query) { - let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/peers", context.zid_str) - .try_into() - .unwrap(); + let reply_key: OwnedKeyExpr = format!( + "@/{}/{}/linkstate/peers", + context.runtime.state.whatami, context.runtime.state.zid + ) + .try_into() + .unwrap(); let tables = zread!(context.runtime.state.router.tables.tables); @@ -689,8 +712,9 @@ fn subscribers_data(context: &AdminContext, query: Query) { let tables = zread!(context.runtime.state.router.tables.tables); for sub in tables.hat_code.get_subscriptions(&tables) { let key = KeyExpr::try_from(format!( - "@/router/{}/subscriber/{}", - context.zid_str, + "@/{}/{}/subscriber/{}", + context.runtime.state.whatami, + context.runtime.state.zid, sub.expr() )) .unwrap(); @@ -706,8 +730,9 @@ fn queryables_data(context: &AdminContext, query: Query) { let tables = zread!(context.runtime.state.router.tables.tables); for qabl in tables.hat_code.get_queryables(&tables) { let key = KeyExpr::try_from(format!( - "@/router/{}/queryable/{}", - context.zid_str, + "@/{}/{}/queryable/{}", + context.runtime.state.whatami, + context.runtime.state.zid, qabl.expr() )) .unwrap(); @@ -722,7 +747,10 @@ fn queryables_data(context: &AdminContext, query: Query) { #[cfg(all(feature = "unstable", feature = "plugins"))] fn plugins_data(context: &AdminContext, query: Query) { let guard = context.runtime.plugins_manager(); - let root_key = format!("@/router/{}/plugins", &context.zid_str); + let root_key = format!( + "@/{}/{}/plugins", + context.runtime.state.whatami, &context.runtime.state.zid + ); let root_key = unsafe { keyexpr::from_str_unchecked(&root_key) }; tracing::debug!("requested plugins status {:?}", query.key_expr()); if let [names, ..] = query.key_expr().strip_prefix(root_key)[..] { @@ -740,9 +768,12 @@ fn plugins_data(context: &AdminContext, query: Query) { #[cfg(all(feature = "unstable", feature = "plugins"))] fn plugins_status(context: &AdminContext, query: Query) { - let selector: crate::Selector<'_> = query.selector(); + let selector = query.selector(); let guard = context.runtime.plugins_manager(); - let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str); + let mut root_key = format!( + "@/{}/{}/status/plugins/", + context.runtime.state.whatami, &context.runtime.state.zid + ); for plugin in guard.started_plugins_iter() { with_extended_string(&mut root_key, &[plugin.name()], |plugin_key| {