From 32935317965751e968451e2adce7fcff4b14f3fe Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 11 Oct 2023 10:22:22 +0200 Subject: [PATCH] Transports can be closed through adminspace --- zenoh/src/net/runtime/adminspace.rs | 181 ++++++++++++++++++++-------- 1 file changed, 130 insertions(+), 51 deletions(-) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 0eb099a098..ef72354c1f 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -17,6 +17,7 @@ use crate::plugins::sealed as plugins; use crate::prelude::sync::{Sample, SyncResolve}; use crate::queryable::Query; use crate::queryable::QueryInner; +use crate::sample::DataInfo; use crate::value::Value; use async_std::task; use log::{error, trace}; @@ -24,10 +25,12 @@ use serde_json::json; use std::collections::HashMap; use std::convert::TryFrom; use std::convert::TryInto; +use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; -use zenoh_buffers::SplitBuffer; +use zenoh_buffers::{SplitBuffer, ZBuf}; use zenoh_config::ValidatedMap; +use zenoh_protocol::core::SampleKind; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, ExprId, KnownEncoding, WireExpr, ZenohId, EMPTY_EXPR_ID}, network::{ @@ -48,13 +51,15 @@ pub struct AdminContext { metadata: serde_json::Value, } -type Handler = Arc; +type QueryHandler = Arc; +type DataHandler = Arc; pub struct AdminSpace { zid: ZenohId, primitives: Mutex>>, mappings: Mutex>, - handlers: HashMap, + query_handlers: HashMap, + data_handlers: HashMap, context: Arc, } @@ -70,42 +75,53 @@ impl AdminSpace { let metadata = runtime.metadata.clone(); let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); - let mut handlers: HashMap<_, Handler> = HashMap::new(); - handlers.insert(root_key.clone(), Arc::new(router_data)); - handlers.insert( + let mut query_handlers: HashMap<_, QueryHandler> = HashMap::new(); + query_handlers.insert(root_key.clone(), Arc::new(router_data)); + query_handlers.insert( format!("@/router/{zid_str}/metrics").try_into().unwrap(), Arc::new(router_metrics), ); - handlers.insert( + query_handlers.insert( format!("@/router/{zid_str}/linkstate/routers") .try_into() .unwrap(), Arc::new(routers_linkstate_data), ); - handlers.insert( + query_handlers.insert( format!("@/router/{zid_str}/linkstate/peers") .try_into() .unwrap(), Arc::new(peers_linkstate_data), ); - handlers.insert( + query_handlers.insert( format!("@/router/{zid_str}/subscriber/**") .try_into() .unwrap(), Arc::new(subscribers_data), ); - handlers.insert( + query_handlers.insert( format!("@/router/{zid_str}/queryable/**") .try_into() .unwrap(), Arc::new(queryables_data), ); - handlers.insert( + query_handlers.insert( format!("@/router/{zid_str}/status/plugins/**") .try_into() .unwrap(), Arc::new(plugins_status), ); + let mut data_handlers: HashMap<_, DataHandler> = HashMap::new(); + data_handlers.insert( + format!("@/router/{zid_str}/config/**").try_into().unwrap(), + Arc::new(config), + ); + data_handlers.insert( + format!("@/router/{zid_str}/transport/unicast/*") + .try_into() + .unwrap(), + Arc::new(transport), + ); let mut active_plugins = plugins_mgr .running_plugins_info() @@ -124,7 +140,8 @@ impl AdminSpace { zid: runtime.zid, primitives: Mutex::new(None), mappings: Mutex::new(HashMap::new()), - handlers, + query_handlers, + data_handlers, context, }); @@ -244,7 +261,7 @@ impl AdminSpace { ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // TODO - wire_expr: [&root_key, "/config/**"].concat().into(), + wire_expr: [&root_key, "/**"].concat().into(), ext_info: SubscriberInfo::default(), }), }); @@ -293,42 +310,39 @@ impl Primitives for AdminSpace { } } - if let Some(key) = msg - .wire_expr - .as_str() - .strip_prefix(&format!("@/router/{}/config/", &self.context.zid_str)) - { - match msg.payload { - PushBody::Put(put) => match std::str::from_utf8(&put.payload.contiguous()) { - Ok(json) => { - log::trace!( - "Insert conf value /@/router/{}/config/{} : {}", - &self.context.zid_str, - key, - json - ); - if let Err(e) = (&self.context.runtime.config).insert_json5(key, json) { - error!( - "Error inserting conf value /@/router/{}/config/{} : {} - {}", - &self.context.zid_str, key, json, e - ); - } - } - Err(e) => error!( - "Received non utf8 conf value on /@/router/{}/config/{} : {}", - &self.context.zid_str, key, e - ), - }, - PushBody::Del(_) => { - log::trace!( - "Deleting conf value /@/router/{}/config/{}", - &self.context.zid_str, - key - ); - if let Err(e) = self.context.runtime.config.remove(key) { - log::error!("Error deleting conf value {} : {}", msg.wire_expr, e) - } - } + let key_expr = match self.key_expr_to_string(&msg.wire_expr) { + Ok(key_expr) => key_expr.into_owned(), + Err(e) => { + log::error!("Unknown KeyExpr: {}", e); + return; + } + }; + + let sample = match msg.payload { + PushBody::Put(m) => { + let info = DataInfo { + kind: SampleKind::Put, + encoding: Some(m.encoding), + timestamp: m.timestamp, + source_id: m.ext_sinfo.as_ref().map(|i| i.zid), + source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + }; + Sample::with_info(key_expr.clone(), m.payload, Some(info)) + } + PushBody::Del(m) => { + let info = DataInfo { + kind: SampleKind::Delete, + encoding: None, + timestamp: m.timestamp, + source_id: m.ext_sinfo.as_ref().map(|i| i.zid), + source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + }; + Sample::with_info(key_expr.clone(), ZBuf::empty(), Some(info)) + } + }; + for (key, data_handler) in &self.data_handlers { + if key_expr.intersects(key) { + data_handler(&self.context, sample.clone()); } } } @@ -381,9 +395,9 @@ impl Primitives for AdminSpace { }), }; - for (key, handler) in &self.handlers { + for (key, query_handler) in &self.query_handlers { if key_expr.intersects(key) { - handler(&self.context, query.clone()); + query_handler(&self.context, query.clone()); } } } @@ -678,6 +692,71 @@ fn plugins_status(context: &AdminContext, query: Query) { } } +fn config(context: &AdminContext, sample: Sample) { + if let Some(key) = sample + .key_expr + .as_str() + .strip_prefix(&format!("@/router/{}/config/", &context.zid_str)) + { + match sample.kind { + SampleKind::Put => match std::str::from_utf8(&sample.payload.contiguous()) { + Ok(json) => { + log::trace!( + "Insert conf value @/router/{}/config/{} : {}", + &context.zid_str, + key, + json + ); + if let Err(e) = (&context.runtime.config).insert_json5(key, json) { + error!( + "Error inserting conf value @/router/{}/config/{} : {} - {}", + &context.zid_str, key, json, e + ); + } + } + Err(e) => error!( + "Received non utf8 conf value on @/router/{}/config/{} : {}", + &context.zid_str, key, e + ), + }, + SampleKind::Delete => { + log::trace!( + "Deleting conf value @/router/{}/config/{}", + &context.zid_str, + key + ); + if let Err(e) = context.runtime.config.remove(key) { + log::error!("Error deleting conf value {} : {}", sample.key_expr, e) + } + } + } + } +} + +fn transport(context: &AdminContext, sample: Sample) { + if sample.kind == SampleKind::Delete { + let split = sample.key_expr.as_str().split('/').collect::>(); + if split.len() == 6 { + if let Ok(id) = ZenohId::from_str(split[5]) { + task::block_on(async { + if let Some(transport) = + context.runtime.manager().get_transport_unicast(&id).await + { + log::trace!( + "Closing transport @/router/{}/transport/unicast/{}", + &context.zid_str, + id + ); + if let Err(e) = transport.close().await { + log::error!("Error closing transport {} : {}", id, e) + } + } + }); + } + } + } +} + fn with_extended_string R>( prefix: &mut String, suffixes: &[&str],