Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zenoh plugin API rework #567

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ use serde_json::Value;
use std::convert::TryFrom; // This is a false positive from the rust analyser
use std::{
any::Any,
collections::{HashMap, HashSet},
collections::HashSet,
fmt,
io::Read,
marker::PhantomData,
net::SocketAddr,
path::Path,
sync::{Arc, Mutex, MutexGuard},
sync::{Arc, Mutex, MutexGuard, Weak},
};
use validated_struct::ValidatedMapAssociatedTypes;
pub use validated_struct::{GetError, ValidatedMap};
Expand All @@ -46,15 +46,21 @@ use zenoh_protocol::{
use zenoh_result::{bail, zerror, ZResult};
use zenoh_util::LibLoader;

pub type ValidationFunction = std::sync::Arc<
dyn Fn(
&str,
&serde_json::Map<String, serde_json::Value>,
&serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>>
+ Send
+ Sync,
>;
pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
_plugin_name: &str,
_path: &str,
_current: &serde_json::Map<String, serde_json::Value>,
_new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
Ok(None)
}
}

// Necessary to allow to set default emplty weak referece value to plugin.validator field
// because empty weak value is not allowed for Arc<dyn Trait>
impl ConfigValidator for () {}

/// Creates an empty zenoh net Session configuration.
pub fn empty() -> Config {
Expand Down Expand Up @@ -482,8 +488,8 @@ fn config_deser() {
}

impl Config {
pub fn add_plugin_validator(&mut self, name: impl Into<String>, validator: ValidationFunction) {
self.plugins.validators.insert(name.into(), validator);
pub fn set_plugin_validator<T: ConfigValidator + 'static>(&mut self, validator: Weak<T>) {
self.plugins.validator = validator;
}

pub fn plugin(&self, name: &str) -> Option<&Value> {
Expand Down Expand Up @@ -836,7 +842,7 @@ fn user_conf_validator(u: &UsrPwdConf) -> bool {
#[derive(Clone)]
pub struct PluginsConfig {
values: Value,
validators: HashMap<String, ValidationFunction>,
validator: std::sync::Weak<dyn ConfigValidator>,
}
fn sift_privates(value: &mut serde_json::Value) {
match value {
Expand Down Expand Up @@ -902,11 +908,9 @@ impl PluginsConfig {
Some(first_in_plugin) => first_in_plugin,
None => {
self.values.as_object_mut().unwrap().remove(plugin);
self.validators.remove(plugin);
return Ok(());
}
};
let validator = self.validators.get(plugin);
let (old_conf, mut new_conf) = match self.values.get_mut(plugin) {
Some(plugin) => {
let clone = plugin.clone();
Expand Down Expand Up @@ -947,8 +951,9 @@ impl PluginsConfig {
}
other => bail!("{} cannot be indexed", other),
}
let new_conf = if let Some(validator) = validator {
match validator(
let new_conf = if let Some(validator) = self.validator.upgrade() {
match validator.check_config(
plugin,
&key[("plugins/".len() + plugin.len())..],
old_conf.as_object().unwrap(),
new_conf.as_object().unwrap(),
Expand Down Expand Up @@ -977,7 +982,7 @@ impl Default for PluginsConfig {
fn default() -> Self {
Self {
values: Value::Object(Default::default()),
validators: Default::default(),
validator: std::sync::Weak::<()>::new(),
}
}
}
Expand All @@ -987,8 +992,8 @@ impl<'a> serde::Deserialize<'a> for PluginsConfig {
D: serde::Deserializer<'a>,
{
Ok(PluginsConfig {
validators: Default::default(),
values: serde::Deserialize::deserialize(deserializer)?,
validator: std::sync::Weak::<()>::new(),
})
}
}
Expand Down Expand Up @@ -1073,7 +1078,6 @@ impl validated_struct::ValidatedMap for PluginsConfig {
validated_struct::InsertionError: From<D::Error>,
{
let (plugin, key) = validated_struct::split_once(key, '/');
let validator = self.validators.get(plugin);
let new_value: Value = serde::Deserialize::deserialize(deserializer)?;
let value = self
.values
Expand All @@ -1082,8 +1086,9 @@ impl validated_struct::ValidatedMap for PluginsConfig {
.entry(plugin)
.or_insert(Value::Null);
let mut new_value = value.clone().merge(key, new_value)?;
if let Some(validator) = validator {
match validator(
if let Some(validator) = self.validator.upgrade() {
match validator.check_config(
plugin,
key,
value.as_object().unwrap(),
new_value.as_object().unwrap(),
Expand Down
72 changes: 34 additions & 38 deletions plugins/example-plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex,
};
use zenoh::plugins::{Plugin, RunningPluginTrait, ValidationFunction, ZenohPlugin};
use zenoh::plugins::{Plugin, RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::runtime::Runtime;
use zenoh_core::zlock;
Expand All @@ -47,7 +47,7 @@ impl Plugin for ExamplePlugin {

// The first operation called by zenohd on the plugin
fn start(name: &str, runtime: &Self::StartArgs) -> ZResult<Self::RunningPlugin> {
let config = runtime.config.lock();
let config = runtime.config().lock();
let self_cfg = config.plugin(name).unwrap().as_object().unwrap();
// get the plugin's config details from self_cfg Map (here the "storage-selector" property)
let selector: KeyExpr = match self_cfg.get("storage-selector") {
Expand Down Expand Up @@ -86,46 +86,42 @@ struct RunningPluginInner {
#[derive(Clone)]
struct RunningPlugin(Arc<Mutex<RunningPluginInner>>);
impl RunningPluginTrait for RunningPlugin {
// Operation returning a ValidationFunction(path, old, new)-> ZResult<Option<serde_json::Map<String, serde_json::Value>>>
// this function will be called each time the plugin's config is changed via the zenohd admin space
fn config_checker(&self) -> ValidationFunction {
let guard = zlock!(&self.0);
let name = guard.name.clone();
std::mem::drop(guard);
let plugin = self.clone();
Arc::new(move |path, old, new| {
const STORAGE_SELECTOR: &str = "storage-selector";
if path == STORAGE_SELECTOR || path.is_empty() {
match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) {
(Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns)))
if os == ns => {}
(_, Some(serde_json::Value::String(selector))) => {
let mut guard = zlock!(&plugin.0);
guard.flag.store(false, Relaxed);
guard.flag = Arc::new(AtomicBool::new(true));
match KeyExpr::try_from(selector.clone()) {
Err(e) => log::error!("{}", e),
Ok(selector) => {
async_std::task::spawn(run(
guard.runtime.clone(),
selector,
guard.flag.clone(),
));
}
fn config_checker(
&self,
path: &str,
old: &serde_json::Map<String, serde_json::Value>,
new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
let mut guard = zlock!(&self.0);
const STORAGE_SELECTOR: &str = "storage-selector";
if path == STORAGE_SELECTOR || path.is_empty() {
match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) {
(Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns)))
if os == ns => {}
(_, Some(serde_json::Value::String(selector))) => {
guard.flag.store(false, Relaxed);
guard.flag = Arc::new(AtomicBool::new(true));
match KeyExpr::try_from(selector.clone()) {
Err(e) => log::error!("{}", e),
Ok(selector) => {
async_std::task::spawn(run(
guard.runtime.clone(),
selector,
guard.flag.clone(),
));
}
return Ok(None);
}
(_, None) => {
let guard = zlock!(&plugin.0);
guard.flag.store(false, Relaxed);
}
_ => {
bail!("storage-selector for {} must be a string", &name)
}
return Ok(None);
}
(_, None) => {
guard.flag.store(false, Relaxed);
}
_ => {
bail!("storage-selector for {} must be a string", &guard.name)
}
}
bail!("unknown option {} for {}", path, &name)
})
}
bail!("unknown option {} for {}", path, guard.name)
}

// Function called on any query on admin space that matches this plugin's sub-part of the admin space.
Expand Down
15 changes: 9 additions & 6 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Plugin for RestPlugin {
let _ = env_logger::try_init();
log::debug!("REST plugin {}", LONG_VERSION.as_str());

let runtime_conf = runtime.config.lock();
let runtime_conf = runtime.config().lock();
let plugin_conf = runtime_conf
.plugin(name)
.ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?;
Expand All @@ -243,10 +243,13 @@ impl Plugin for RestPlugin {

struct RunningPlugin(Config);
impl RunningPluginTrait for RunningPlugin {
fn config_checker(&self) -> zenoh::plugins::ValidationFunction {
Arc::new(|_, _, _| {
bail!("zenoh-plugin-rest doesn't accept any runtime configuration changes")
})
fn config_checker(
&self,
_: &str,
_: &serde_json::Map<String, serde_json::Value>,
_: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
bail!("zenoh-plugin-rest doesn't accept any runtime configuration changes")
}

fn adminspace_getter<'a>(
Expand Down Expand Up @@ -476,7 +479,7 @@ pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
// But cannot be done twice in case of static link.
let _ = env_logger::try_init();

let zid = runtime.zid.to_string();
let zid = runtime.zid().to_string();
let session = zenoh::init(runtime).res().await.unwrap();

let mut app = Server::with_state((Arc::new(session), zid));
Expand Down
33 changes: 18 additions & 15 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use storages_mgt::StorageMessage;
use zenoh::plugins::{Plugin, RunningPluginTrait, ValidationFunction, ZenohPlugin};
use zenoh::plugins::{Plugin, RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::sync::*;
use zenoh::runtime::Runtime;
use zenoh::Session;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl Plugin for StoragesPlugin {
std::mem::drop(env_logger::try_init());
log::debug!("StorageManager plugin {}", LONG_VERSION.as_str());
let config =
{ PluginConfig::try_from((name, runtime.config.lock().plugin(name).unwrap())) }?;
{ PluginConfig::try_from((name, runtime.config().lock().plugin(name).unwrap())) }?;
Ok(Box::new(StorageRuntime::from(StorageRuntimeInner::new(
runtime.clone(),
config,
Expand All @@ -85,7 +85,8 @@ impl StorageRuntimeInner {
fn status_key(&self) -> String {
format!(
"@/router/{}/status/plugins/{}",
&self.runtime.zid, &self.name
&self.runtime.zid(),
&self.name
)
}
fn new(runtime: Runtime, config: PluginConfig) -> ZResult<Self> {
Expand Down Expand Up @@ -308,19 +309,21 @@ impl From<StorageRuntimeInner> for StorageRuntime {
}

impl RunningPluginTrait for StorageRuntime {
fn config_checker(&self) -> ValidationFunction {
fn config_checker(
&self,
_: &str,
old: &serde_json::Map<String, serde_json::Value>,
new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
let name = { zlock!(self.0).name.clone() };
let runtime = self.0.clone();
Arc::new(move |_path, old, new| {
let old = PluginConfig::try_from((&name, old))?;
let new = PluginConfig::try_from((&name, new))?;
log::info!("old: {:?}", &old);
log::info!("new: {:?}", &new);
let diffs = ConfigDiff::diffs(old, new);
log::info!("diff: {:?}", &diffs);
{ zlock!(runtime).update(diffs) }?;
Ok(None)
})
let old = PluginConfig::try_from((&name, old))?;
let new = PluginConfig::try_from((&name, new))?;
log::info!("old: {:?}", &old);
log::info!("new: {:?}", &new);
let diffs = ConfigDiff::diffs(old, new);
log::info!("diff: {:?}", &diffs);
{ zlock!(self.0).update(diffs) }?;
Ok(None)
}

fn adminspace_getter<'a>(
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<'a> Resolvable for ZidBuilder<'a> {

impl<'a> SyncResolve for ZidBuilder<'a> {
fn res_sync(self) -> Self::To {
self.session.runtime.zid
self.session.runtime.zid()
}
}

Expand Down
7 changes: 5 additions & 2 deletions zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,14 @@ impl<'a> Liveliness<'a> {
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
let key_expr = key_expr.try_into().map_err(Into::into);
let conf = self.session.runtime.config.lock();
let timeout = {
let conf = self.session.runtime.config().lock();
Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout()))
};
LivelinessGetBuilder {
session: &self.session,
key_expr,
timeout: Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())),
timeout,
handler: DefaultHandler,
}
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Network {
log::debug!("{} Add node (self) {}", name, zid);
let idx = graph.add_node(Node {
zid,
whatami: Some(runtime.whatami),
whatami: Some(runtime.whatami()),
locators: None,
sn: 1,
links: vec![],
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ impl Resource {
}
}

#[allow(dead_code)]
pub fn print_tree(from: &Arc<Resource>) -> String {
let mut result = from.expr();
result.push('\n');
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl Tables {
&self.root_res
}

#[allow(dead_code)]
pub fn print(&self) -> String {
Resource::print_tree(&self.root_res)
}
Expand Down
Loading
Loading