Skip to content

Commit

Permalink
Transports can be closed through adminspace
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Oct 11, 2023
1 parent 5edd0eb commit 3293531
Showing 1 changed file with 130 additions and 51 deletions.
181 changes: 130 additions & 51 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ use crate::plugins::sealed as plugins;
use crate::prelude::sync::{Sample, SyncResolve};
use crate::queryable::Query;
use crate::queryable::QueryInner;
use crate::sample::DataInfo;
use crate::value::Value;
use async_std::task;
use log::{error, trace};
use serde_json::json;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use zenoh_buffers::SplitBuffer;
use zenoh_buffers::{SplitBuffer, ZBuf};
use zenoh_config::ValidatedMap;
use zenoh_protocol::core::SampleKind;
use zenoh_protocol::{
core::{key_expr::OwnedKeyExpr, ExprId, KnownEncoding, WireExpr, ZenohId, EMPTY_EXPR_ID},
network::{
Expand All @@ -48,13 +51,15 @@ pub struct AdminContext {
metadata: serde_json::Value,
}

type Handler = Arc<dyn Fn(&AdminContext, Query) + Send + Sync>;
type QueryHandler = Arc<dyn Fn(&AdminContext, Query) + Send + Sync>;
type DataHandler = Arc<dyn Fn(&AdminContext, Sample) + Send + Sync>;

pub struct AdminSpace {
zid: ZenohId,
primitives: Mutex<Option<Arc<Face>>>,
mappings: Mutex<HashMap<ExprId, String>>,
handlers: HashMap<OwnedKeyExpr, Handler>,
query_handlers: HashMap<OwnedKeyExpr, QueryHandler>,
data_handlers: HashMap<OwnedKeyExpr, DataHandler>,
context: Arc<AdminContext>,
}

Expand All @@ -70,42 +75,53 @@ impl AdminSpace {
let metadata = runtime.metadata.clone();
let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap();

let mut handlers: HashMap<_, Handler> = HashMap::new();
handlers.insert(root_key.clone(), Arc::new(router_data));
handlers.insert(
let mut query_handlers: HashMap<_, QueryHandler> = HashMap::new();
query_handlers.insert(root_key.clone(), Arc::new(router_data));
query_handlers.insert(
format!("@/router/{zid_str}/metrics").try_into().unwrap(),
Arc::new(router_metrics),
);
handlers.insert(
query_handlers.insert(
format!("@/router/{zid_str}/linkstate/routers")
.try_into()
.unwrap(),
Arc::new(routers_linkstate_data),
);
handlers.insert(
query_handlers.insert(
format!("@/router/{zid_str}/linkstate/peers")
.try_into()
.unwrap(),
Arc::new(peers_linkstate_data),
);
handlers.insert(
query_handlers.insert(
format!("@/router/{zid_str}/subscriber/**")
.try_into()
.unwrap(),
Arc::new(subscribers_data),
);
handlers.insert(
query_handlers.insert(
format!("@/router/{zid_str}/queryable/**")
.try_into()
.unwrap(),
Arc::new(queryables_data),
);
handlers.insert(
query_handlers.insert(
format!("@/router/{zid_str}/status/plugins/**")
.try_into()
.unwrap(),
Arc::new(plugins_status),
);
let mut data_handlers: HashMap<_, DataHandler> = HashMap::new();
data_handlers.insert(
format!("@/router/{zid_str}/config/**").try_into().unwrap(),
Arc::new(config),
);
data_handlers.insert(
format!("@/router/{zid_str}/transport/unicast/*")
.try_into()
.unwrap(),
Arc::new(transport),
);

let mut active_plugins = plugins_mgr
.running_plugins_info()
Expand All @@ -124,7 +140,8 @@ impl AdminSpace {
zid: runtime.zid,
primitives: Mutex::new(None),
mappings: Mutex::new(HashMap::new()),
handlers,
query_handlers,
data_handlers,
context,
});

Expand Down Expand Up @@ -244,7 +261,7 @@ impl AdminSpace {
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
id: 0, // TODO
wire_expr: [&root_key, "/config/**"].concat().into(),
wire_expr: [&root_key, "/**"].concat().into(),
ext_info: SubscriberInfo::default(),
}),
});
Expand Down Expand Up @@ -293,42 +310,39 @@ impl Primitives for AdminSpace {
}
}

if let Some(key) = msg
.wire_expr
.as_str()
.strip_prefix(&format!("@/router/{}/config/", &self.context.zid_str))
{
match msg.payload {
PushBody::Put(put) => match std::str::from_utf8(&put.payload.contiguous()) {
Ok(json) => {
log::trace!(
"Insert conf value /@/router/{}/config/{} : {}",
&self.context.zid_str,
key,
json
);
if let Err(e) = (&self.context.runtime.config).insert_json5(key, json) {
error!(
"Error inserting conf value /@/router/{}/config/{} : {} - {}",
&self.context.zid_str, key, json, e
);
}
}
Err(e) => error!(
"Received non utf8 conf value on /@/router/{}/config/{} : {}",
&self.context.zid_str, key, e
),
},
PushBody::Del(_) => {
log::trace!(
"Deleting conf value /@/router/{}/config/{}",
&self.context.zid_str,
key
);
if let Err(e) = self.context.runtime.config.remove(key) {
log::error!("Error deleting conf value {} : {}", msg.wire_expr, e)
}
}
let key_expr = match self.key_expr_to_string(&msg.wire_expr) {
Ok(key_expr) => key_expr.into_owned(),
Err(e) => {
log::error!("Unknown KeyExpr: {}", e);
return;
}
};

let sample = match msg.payload {
PushBody::Put(m) => {
let info = DataInfo {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Sample::with_info(key_expr.clone(), m.payload, Some(info))
}
PushBody::Del(m) => {
let info = DataInfo {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Sample::with_info(key_expr.clone(), ZBuf::empty(), Some(info))
}
};
for (key, data_handler) in &self.data_handlers {
if key_expr.intersects(key) {
data_handler(&self.context, sample.clone());
}
}
}
Expand Down Expand Up @@ -381,9 +395,9 @@ impl Primitives for AdminSpace {
}),
};

for (key, handler) in &self.handlers {
for (key, query_handler) in &self.query_handlers {
if key_expr.intersects(key) {
handler(&self.context, query.clone());
query_handler(&self.context, query.clone());
}
}
}
Expand Down Expand Up @@ -678,6 +692,71 @@ fn plugins_status(context: &AdminContext, query: Query) {
}
}

fn config(context: &AdminContext, sample: Sample) {
if let Some(key) = sample
.key_expr
.as_str()
.strip_prefix(&format!("@/router/{}/config/", &context.zid_str))
{
match sample.kind {
SampleKind::Put => match std::str::from_utf8(&sample.payload.contiguous()) {
Ok(json) => {
log::trace!(
"Insert conf value @/router/{}/config/{} : {}",
&context.zid_str,
key,
json
);
if let Err(e) = (&context.runtime.config).insert_json5(key, json) {
error!(
"Error inserting conf value @/router/{}/config/{} : {} - {}",
&context.zid_str, key, json, e
);
}
}
Err(e) => error!(
"Received non utf8 conf value on @/router/{}/config/{} : {}",
&context.zid_str, key, e
),
},
SampleKind::Delete => {
log::trace!(
"Deleting conf value @/router/{}/config/{}",
&context.zid_str,
key
);
if let Err(e) = context.runtime.config.remove(key) {
log::error!("Error deleting conf value {} : {}", sample.key_expr, e)
}
}
}
}
}

fn transport(context: &AdminContext, sample: Sample) {
if sample.kind == SampleKind::Delete {
let split = sample.key_expr.as_str().split('/').collect::<Vec<&str>>();
if split.len() == 6 {
if let Ok(id) = ZenohId::from_str(split[5]) {
task::block_on(async {
if let Some(transport) =
context.runtime.manager().get_transport_unicast(&id).await
{
log::trace!(
"Closing transport @/router/{}/transport/unicast/{}",
&context.zid_str,
id
);
if let Err(e) = transport.close().await {
log::error!("Error closing transport {} : {}", id, e)
}
}
});
}
}
}
}

fn with_extended_string<R, F: FnMut(&mut String) -> R>(
prefix: &mut String,
suffixes: &[&str],
Expand Down

0 comments on commit 3293531

Please sign in to comment.