Skip to content

Commit

Permalink
Peers and clients adminspace (#960)
Browse files Browse the repository at this point in the history
* Peers,clients adminspace reports under @/peer,@/client keys

* Only report linkstate graphs when they exist

* Remove useless zid_str

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
OlivierHecart and Mallets authored Apr 22, 2024
1 parent 5ccf2da commit 9ecc903
Showing 1 changed file with 93 additions and 62 deletions.
155 changes: 93 additions & 62 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use tracing::{error, trace};
use zenoh_buffers::buffer::SplitBuffer;
use zenoh_config::{ConfigValidator, ValidatedMap, WhatAmI};
use zenoh_config::{unwrap_or_default, ConfigValidator, ValidatedMap, WhatAmI};
#[cfg(all(feature = "unstable", feature = "plugins"))]
use zenoh_plugin_trait::{PluginControl, PluginStatus};
#[cfg(all(feature = "unstable", feature = "plugins"))]
Expand All @@ -47,7 +47,6 @@ use zenoh_transport::unicast::TransportUnicast;

pub struct AdminContext {
runtime: Runtime,
zid_str: String,
version: String,
metadata: serde_json::Value,
}
Expand Down Expand Up @@ -141,49 +140,61 @@ impl AdminSpace {

pub async fn start(runtime: &Runtime, version: String) {
let zid_str = runtime.state.zid.to_string();
let whatami_str = runtime.state.whatami.to_str();
let mut config = runtime.config().lock();
let metadata = runtime.state.metadata.clone();
let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap();
let root_key: OwnedKeyExpr = format!("@/{whatami_str}/{zid_str}").try_into().unwrap();

let mut handlers: HashMap<_, Handler> = HashMap::new();
handlers.insert(root_key.clone(), Arc::new(router_data));
handlers.insert(root_key.clone(), Arc::new(local_data));
handlers.insert(
format!("@/router/{zid_str}/metrics").try_into().unwrap(),
Arc::new(router_metrics),
);
handlers.insert(
format!("@/router/{zid_str}/linkstate/routers")
.try_into()
.unwrap(),
Arc::new(routers_linkstate_data),
);
handlers.insert(
format!("@/router/{zid_str}/linkstate/peers")
format!("@/{whatami_str}/{zid_str}/metrics")
.try_into()
.unwrap(),
Arc::new(peers_linkstate_data),
Arc::new(metrics),
);
if runtime.state.whatami == WhatAmI::Router {
handlers.insert(
format!("@/{whatami_str}/{zid_str}/linkstate/routers")
.try_into()
.unwrap(),
Arc::new(routers_linkstate_data),
);
}
if runtime.state.whatami != WhatAmI::Client
&& unwrap_or_default!(config.routing().peer().mode()) == *"linkstate"
{
handlers.insert(
format!("@/{whatami_str}/{zid_str}/linkstate/peers")
.try_into()
.unwrap(),
Arc::new(peers_linkstate_data),
);
}
handlers.insert(
format!("@/router/{zid_str}/subscriber/**")
format!("@/{whatami_str}/{zid_str}/subscriber/**")
.try_into()
.unwrap(),
Arc::new(subscribers_data),
);
handlers.insert(
format!("@/router/{zid_str}/queryable/**")
format!("@/{whatami_str}/{zid_str}/queryable/**")
.try_into()
.unwrap(),
Arc::new(queryables_data),
);

#[cfg(all(feature = "unstable", feature = "plugins"))]
handlers.insert(
format!("@/router/{zid_str}/plugins/**").try_into().unwrap(),
format!("@/{whatami_str}/{zid_str}/plugins/**")
.try_into()
.unwrap(),
Arc::new(plugins_data),
);

#[cfg(all(feature = "unstable", feature = "plugins"))]
handlers.insert(
format!("@/router/{zid_str}/status/plugins/**")
format!("@/{whatami_str}/{zid_str}/status/plugins/**")
.try_into()
.unwrap(),
Arc::new(plugins_status),
Expand All @@ -198,25 +209,18 @@ impl AdminSpace {

let context = Arc::new(AdminContext {
runtime: runtime.clone(),
zid_str,
version,
metadata,
});
let admin = Arc::new(AdminSpace {
zid: runtime.zid(),
zid: runtime.state.zid,
primitives: Mutex::new(None),
mappings: Mutex::new(HashMap::new()),
handlers,
context,
});

admin
.context
.runtime
.state
.config
.lock()
.set_plugin_validator(Arc::downgrade(&admin));
config.set_plugin_validator(Arc::downgrade(&admin));

#[cfg(all(feature = "unstable", feature = "plugins"))]
{
Expand Down Expand Up @@ -362,37 +366,42 @@ impl Primitives for AdminSpace {
}
}

if let Some(key) = msg
.wire_expr
.as_str()
.strip_prefix(&format!("@/router/{}/config/", &self.context.zid_str))
{
if let Some(key) = msg.wire_expr.as_str().strip_prefix(&format!(
"@/{}/{}/config/",
self.context.runtime.state.whatami, self.context.runtime.state.zid
)) {
match msg.payload {
PushBody::Put(put) => match std::str::from_utf8(&put.payload.contiguous()) {
Ok(json) => {
tracing::trace!(
"Insert conf value /@/router/{}/config/{} : {}",
&self.context.zid_str,
"Insert conf value /@/{}/{}/config/{} : {}",
self.context.runtime.state.whatami,
self.context.runtime.state.zid,
key,
json
);
if let Err(e) = (&self.context.runtime.state.config).insert_json5(key, json)
{
error!(
"Error inserting conf value /@/router/{}/config/{} : {} - {}",
&self.context.zid_str, key, json, e
"Error inserting conf value /@/{}/{}/config/{} : {} - {}",
self.context.runtime.state.whatami,
self.context.runtime.state.zid,
key,
json,
e
);
}
}
Err(e) => error!(
"Received non utf8 conf value on /@/router/{}/config/{} : {}",
&self.context.zid_str, key, e
"Received non utf8 conf value on /@/{}/{}/config/{} : {}",
self.context.runtime.state.whatami, self.context.runtime.state.zid, key, e
),
},
PushBody::Del(_) => {
tracing::trace!(
"Deleting conf value /@/router/{}/config/{}",
&self.context.zid_str,
"Deleting conf value /@/{}/{}/config/{}",
self.context.runtime.state.whatami,
self.context.runtime.state.zid,
key
);
if let Err(e) = self.context.runtime.state.config.remove(key) {
Expand Down Expand Up @@ -510,8 +519,13 @@ impl crate::net::primitives::EPrimitives for AdminSpace {
}
}

fn router_data(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!("@/router/{}", context.zid_str).try_into().unwrap();
fn local_data(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!(
"@/{}/{}",
context.runtime.state.whatami, context.runtime.state.zid
)
.try_into()
.unwrap();

let transport_mgr = context.runtime.manager().clone();

Expand Down Expand Up @@ -568,7 +582,7 @@ fn router_data(context: &AdminContext, query: Query) {

#[allow(unused_mut)]
let mut json = json!({
"zid": context.zid_str,
"zid": context.runtime.state.zid,
"version": context.version,
"metadata": context.metadata,
"locators": locators,
Expand Down Expand Up @@ -601,10 +615,13 @@ fn router_data(context: &AdminContext, query: Query) {
}
}

fn router_metrics(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!("@/router/{}/metrics", context.zid_str)
.try_into()
.unwrap();
fn metrics(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!(
"@/{}/{}/metrics",
context.runtime.state.whatami, context.runtime.state.zid
)
.try_into()
.unwrap();
#[allow(unused_mut)]
let mut metrics = format!(
r#"# HELP zenoh_build Informations about zenoh.
Expand Down Expand Up @@ -636,9 +653,12 @@ zenoh_build{{version="{}"}} 1
}

fn routers_linkstate_data(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/routers", context.zid_str)
.try_into()
.unwrap();
let reply_key: OwnedKeyExpr = format!(
"@/{}/{}/linkstate/routers",
context.runtime.state.whatami, context.runtime.state.zid
)
.try_into()
.unwrap();

let tables = zread!(context.runtime.state.router.tables.tables);

Expand All @@ -661,9 +681,12 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) {
}

fn peers_linkstate_data(context: &AdminContext, query: Query) {
let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/peers", context.zid_str)
.try_into()
.unwrap();
let reply_key: OwnedKeyExpr = format!(
"@/{}/{}/linkstate/peers",
context.runtime.state.whatami, context.runtime.state.zid
)
.try_into()
.unwrap();

let tables = zread!(context.runtime.state.router.tables.tables);

Expand All @@ -689,8 +712,9 @@ fn subscribers_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);
for sub in tables.hat_code.get_subscriptions(&tables) {
let key = KeyExpr::try_from(format!(
"@/router/{}/subscriber/{}",
context.zid_str,
"@/{}/{}/subscriber/{}",
context.runtime.state.whatami,
context.runtime.state.zid,
sub.expr()
))
.unwrap();
Expand All @@ -706,8 +730,9 @@ fn queryables_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);
for qabl in tables.hat_code.get_queryables(&tables) {
let key = KeyExpr::try_from(format!(
"@/router/{}/queryable/{}",
context.zid_str,
"@/{}/{}/queryable/{}",
context.runtime.state.whatami,
context.runtime.state.zid,
qabl.expr()
))
.unwrap();
Expand All @@ -722,7 +747,10 @@ fn queryables_data(context: &AdminContext, query: Query) {
#[cfg(all(feature = "unstable", feature = "plugins"))]
fn plugins_data(context: &AdminContext, query: Query) {
let guard = context.runtime.plugins_manager();
let root_key = format!("@/router/{}/plugins", &context.zid_str);
let root_key = format!(
"@/{}/{}/plugins",
context.runtime.state.whatami, &context.runtime.state.zid
);
let root_key = unsafe { keyexpr::from_str_unchecked(&root_key) };
tracing::debug!("requested plugins status {:?}", query.key_expr());
if let [names, ..] = query.key_expr().strip_prefix(root_key)[..] {
Expand All @@ -740,9 +768,12 @@ fn plugins_data(context: &AdminContext, query: Query) {

#[cfg(all(feature = "unstable", feature = "plugins"))]
fn plugins_status(context: &AdminContext, query: Query) {
let selector: crate::Selector<'_> = query.selector();
let selector = query.selector();
let guard = context.runtime.plugins_manager();
let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str);
let mut root_key = format!(
"@/{}/{}/status/plugins/",
context.runtime.state.whatami, &context.runtime.state.zid
);

for plugin in guard.started_plugins_iter() {
with_extended_string(&mut root_key, &[plugin.name()], |plugin_key| {
Expand Down

0 comments on commit 9ecc903

Please sign in to comment.