diff --git a/Cargo.lock b/Cargo.lock index 4327e6571..a03e85fd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4607,6 +4607,27 @@ dependencies = [ "zenoh-util", ] +[[package]] +name = "zenoh-backend-example" +version = "0.11.0-dev" +dependencies = [ + "async-std", + "async-trait", + "clap", + "const_format", + "env_logger", + "futures", + "git-version", + "log", + "serde_json", + "zenoh", + "zenoh-core", + "zenoh-plugin-trait", + "zenoh-result", + "zenoh-util", + "zenoh_backend_traits", +] + [[package]] name = "zenoh-buffers" version = "0.11.0-dev" @@ -4942,8 +4963,10 @@ version = "0.11.0-dev" dependencies = [ "async-std", "clap", + "const_format", "env_logger", "futures", + "git-version", "log", "serde_json", "zenoh", @@ -4961,6 +4984,7 @@ dependencies = [ "async-std", "base64 0.21.4", "clap", + "const_format", "env_logger", "flume", "futures", @@ -4988,6 +5012,7 @@ dependencies = [ "async-std", "async-trait", "clap", + "const_format", "crc", "derive-new", "env_logger", @@ -4995,7 +5020,6 @@ dependencies = [ "futures", "git-version", "jsonschema", - "lazy_static", "libloading", "log", "rustc_version 0.4.0", @@ -5017,9 +5041,12 @@ dependencies = [ name = "zenoh-plugin-trait" version = "0.11.0-dev" dependencies = [ + "const_format", "libloading", "log", + "serde", "serde_json", + "zenoh-keyexpr", "zenoh-macros", "zenoh-result", "zenoh-util", @@ -5139,10 +5166,12 @@ version = "0.11.0-dev" dependencies = [ "async-std", "async-trait", + "const_format", "derive_more", "schemars", "serde_json", "zenoh", + "zenoh-plugin-trait", "zenoh-result", "zenoh-util", ] @@ -5162,6 +5191,7 @@ dependencies = [ "rand 0.8.5", "rustc_version 0.4.0", "zenoh", + "zenoh_backend_traits", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index def96aeba..918c43207 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ members = [ "io/zenoh-links/zenoh-link-ws/", "io/zenoh-links/zenoh-link-unixpipe/", "io/zenoh-transport", + "plugins/zenoh-backend-example", "plugins/zenoh-plugin-example", "plugins/zenoh-backend-traits", "plugins/zenoh-plugin-rest", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index c3a633b0e..29a87a43e 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -26,13 +26,13 @@ use serde_json::Value; use std::convert::TryFrom; // This is a false positive from the rust analyser use std::{ any::Any, - collections::{HashMap, HashSet}, + collections::HashSet, fmt, io::Read, marker::PhantomData, net::SocketAddr, path::Path, - sync::{Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard, Weak}, }; use validated_struct::ValidatedMapAssociatedTypes; pub use validated_struct::{GetError, ValidatedMap}; @@ -70,15 +70,21 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; -pub type ValidationFunction = std::sync::Arc< - dyn Fn( - &str, - &serde_json::Map, - &serde_json::Map, - ) -> ZResult>> - + Send - + Sync, ->; +pub trait ConfigValidator: Send + Sync { + fn check_config( + &self, + _plugin_name: &str, + _path: &str, + _current: &serde_json::Map, + _new: &serde_json::Map, + ) -> ZResult>> { + Ok(None) + } +} + +// Necessary to allow to set default emplty weak referece value to plugin.validator field +// because empty weak value is not allowed for Arc +impl ConfigValidator for () {} /// Creates an empty zenoh net Session configuration. pub fn empty() -> Config { @@ -520,8 +526,8 @@ fn config_deser() { } impl Config { - pub fn add_plugin_validator(&mut self, name: impl Into, validator: ValidationFunction) { - self.plugins.validators.insert(name.into(), validator); + pub fn set_plugin_validator(&mut self, validator: Weak) { + self.plugins.validator = validator; } pub fn plugin(&self, name: &str) -> Option<&Value> { @@ -882,7 +888,7 @@ fn user_conf_validator(u: &UsrPwdConf) -> bool { #[derive(Clone)] pub struct PluginsConfig { values: Value, - validators: HashMap, + validator: std::sync::Weak, } fn sift_privates(value: &mut serde_json::Value) { match value { @@ -948,11 +954,9 @@ impl PluginsConfig { Some(first_in_plugin) => first_in_plugin, None => { self.values.as_object_mut().unwrap().remove(plugin); - self.validators.remove(plugin); return Ok(()); } }; - let validator = self.validators.get(plugin); let (old_conf, mut new_conf) = match self.values.get_mut(plugin) { Some(plugin) => { let clone = plugin.clone(); @@ -993,8 +997,9 @@ impl PluginsConfig { } other => bail!("{} cannot be indexed", other), } - let new_conf = if let Some(validator) = validator { - match validator( + let new_conf = if let Some(validator) = self.validator.upgrade() { + match validator.check_config( + plugin, &key[("plugins/".len() + plugin.len())..], old_conf.as_object().unwrap(), new_conf.as_object().unwrap(), @@ -1023,7 +1028,7 @@ impl Default for PluginsConfig { fn default() -> Self { Self { values: Value::Object(Default::default()), - validators: Default::default(), + validator: std::sync::Weak::<()>::new(), } } } @@ -1033,8 +1038,8 @@ impl<'a> serde::Deserialize<'a> for PluginsConfig { D: serde::Deserializer<'a>, { Ok(PluginsConfig { - validators: Default::default(), values: serde::Deserialize::deserialize(deserializer)?, + validator: std::sync::Weak::<()>::new(), }) } } @@ -1122,7 +1127,6 @@ impl validated_struct::ValidatedMap for PluginsConfig { validated_struct::InsertionError: From, { let (plugin, key) = validated_struct::split_once(key, '/'); - let validator = self.validators.get(plugin); let new_value: Value = serde::Deserialize::deserialize(deserializer)?; let value = self .values @@ -1131,8 +1135,9 @@ impl validated_struct::ValidatedMap for PluginsConfig { .entry(plugin) .or_insert(Value::Null); let mut new_value = value.clone().merge(key, new_value)?; - if let Some(validator) = validator { - match validator( + if let Some(validator) = self.validator.upgrade() { + match validator.check_config( + plugin, key, value.as_object().unwrap(), new_value.as_object().unwrap(), diff --git a/plugins/zenoh-backend-example/Cargo.toml b/plugins/zenoh-backend-example/Cargo.toml new file mode 100644 index 000000000..eac0e0d80 --- /dev/null +++ b/plugins/zenoh-backend-example/Cargo.toml @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-backend-example" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } + +[features] +default = ["no_mangle"] +no_mangle = [] + +[lib] +name = "zenoh_backend_example" +# This crate type will make `cargo` output a dynamic library instead of a rust static library +crate-type = ["cdylib"] + +[dependencies] +async-std = { workspace = true, features = ["default"] } +clap = { workspace = true } +const_format = { workspace = true } +env_logger = { workspace = true } +futures = { workspace = true } +git-version = { workspace = true } +log = { workspace = true } +serde_json = { workspace = true } +zenoh = { workspace = true } +zenoh-core = { workspace = true } +zenoh-plugin-trait = { workspace = true } +zenoh-result = { workspace = true } +zenoh-util = { workspace = true } +async-trait = { workspace = true } +zenoh_backend_traits = { workspace = true } diff --git a/plugins/zenoh-backend-example/src/lib.rs b/plugins/zenoh-backend-example/src/lib.rs new file mode 100644 index 000000000..602d29f37 --- /dev/null +++ b/plugins/zenoh-backend-example/src/lib.rs @@ -0,0 +1,134 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::sync::RwLock; +use async_trait::async_trait; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, +}; +use zenoh::{prelude::OwnedKeyExpr, sample::Sample, time::Timestamp, value::Value}; +use zenoh_backend_traits::{ + config::{StorageConfig, VolumeConfig}, + Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume, + VolumeInstance, +}; +use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; +use zenoh_result::ZResult; + +#[cfg(feature = "no_mangle")] +zenoh_plugin_trait::declare_plugin!(ExampleBackend); + +impl Plugin for ExampleBackend { + type StartArgs = VolumeConfig; + type Instance = VolumeInstance; + fn start(_name: &str, _args: &Self::StartArgs) -> ZResult { + let volume = ExampleBackend {}; + Ok(Box::new(volume)) + } + + const DEFAULT_NAME: &'static str = "example_backend"; + const PLUGIN_VERSION: &'static str = plugin_version!(); + const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!(); +} + +pub struct ExampleBackend {} + +pub struct ExampleStorage { + map: RwLock, StoredData>>, +} + +impl Default for ExampleStorage { + fn default() -> Self { + Self { + map: RwLock::new(HashMap::new()), + } + } +} + +#[async_trait] +impl Volume for ExampleBackend { + fn get_admin_status(&self) -> serde_json::Value { + serde_json::Value::Null + } + fn get_capability(&self) -> Capability { + Capability { + persistence: Persistence::Volatile, + history: History::Latest, + read_cost: 0, + } + } + async fn create_storage(&self, _props: StorageConfig) -> ZResult> { + Ok(Box::::default()) + } + fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>> { + None + } + fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>> { + None + } +} + +#[async_trait] +impl Storage for ExampleStorage { + fn get_admin_status(&self) -> serde_json::Value { + serde_json::Value::Null + } + async fn put( + &mut self, + key: Option, + value: Value, + timestamp: Timestamp, + ) -> ZResult { + let mut map = self.map.write().await; + match map.entry(key) { + Entry::Occupied(mut e) => { + e.insert(StoredData { value, timestamp }); + return Ok(StorageInsertionResult::Replaced); + } + Entry::Vacant(e) => { + e.insert(StoredData { value, timestamp }); + return Ok(StorageInsertionResult::Inserted); + } + } + } + + async fn delete( + &mut self, + key: Option, + _timestamp: Timestamp, + ) -> ZResult { + self.map.write().await.remove_entry(&key); + Ok(StorageInsertionResult::Deleted) + } + + async fn get( + &mut self, + key: Option, + _parameters: &str, + ) -> ZResult> { + match self.map.read().await.get(&key) { + Some(v) => Ok(vec![v.clone()]), + None => Err(format!("Key {:?} is not present", key).into()), + } + } + + async fn get_all_entries(&self) -> ZResult, Timestamp)>> { + let map = self.map.read().await; + let mut result = Vec::with_capacity(map.len()); + for (k, v) in map.iter() { + result.push((k.clone(), v.timestamp)); + } + Ok(result) + } +} diff --git a/plugins/zenoh-backend-traits/Cargo.toml b/plugins/zenoh-backend-traits/Cargo.toml index b9a6cef18..f2b8a4a1e 100644 --- a/plugins/zenoh-backend-traits/Cargo.toml +++ b/plugins/zenoh-backend-traits/Cargo.toml @@ -34,4 +34,9 @@ serde_json = { workspace = true } zenoh = { workspace = true } zenoh-result = { workspace = true } zenoh-util = { workspace = true } -schemars = { workspace = true } \ No newline at end of file +schemars = { workspace = true } +zenoh-plugin-trait = { workspace = true } +const_format = { workspace = true } + +[features] +default = [] diff --git a/plugins/zenoh-backend-traits/src/config.rs b/plugins/zenoh-backend-traits/src/config.rs index ba9ac3b2b..dbcfa420b 100644 --- a/plugins/zenoh-backend-traits/src/config.rs +++ b/plugins/zenoh-backend-traits/src/config.rs @@ -11,12 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // +use const_format::concatcp; use derive_more::{AsMut, AsRef}; use schemars::JsonSchema; use serde_json::{Map, Value}; use std::convert::TryFrom; use std::time::Duration; use zenoh::{key_expr::keyexpr, prelude::OwnedKeyExpr, Result as ZResult}; +use zenoh_plugin_trait::{PluginStartArgs, StructVersion}; use zenoh_result::{bail, zerror, Error}; #[derive(JsonSchema, Debug, Clone, AsMut, AsRef)] @@ -67,6 +69,17 @@ pub struct ReplicaConfig { pub delta: Duration, } +impl StructVersion for VolumeConfig { + fn struct_version() -> u64 { + 1 + } + fn struct_features() -> &'static str { + concatcp!(zenoh::FEATURES, crate::FEATURES) + } +} + +impl PluginStartArgs for VolumeConfig {} + impl Default for ReplicaConfig { fn default() -> Self { Self { @@ -222,10 +235,6 @@ impl ConfigDiff { diffs } } -pub enum BackendSearchMethod<'a> { - ByPaths(&'a [String]), - ByName(&'a str), -} impl VolumeConfig { pub fn to_json_value(&self) -> Value { let mut result = self.rest.clone(); @@ -240,11 +249,14 @@ impl VolumeConfig { } Value::Object(result) } - pub fn backend_search_method(&self) -> BackendSearchMethod { - match &self.paths { - None => BackendSearchMethod::ByName(self.backend.as_deref().unwrap_or(&self.name)), - Some(paths) => BackendSearchMethod::ByPaths(paths), - } + pub fn name(&self) -> &str { + &self.name + } + pub fn backend(&self) -> &str { + self.backend.as_deref().unwrap_or(&self.name) + } + pub fn paths(&self) -> Option<&[String]> { + self.paths.as_deref() } fn try_from(plugin_name: &str, configs: &V) -> ZResult> { let configs = configs.as_object().ok_or_else(|| { @@ -312,6 +324,9 @@ impl VolumeConfig { } } impl StorageConfig { + pub fn name(&self) -> &str { + &self.name + } pub fn to_json_value(&self) -> Value { let mut result = serde_json::Map::new(); result.insert("key_expr".into(), Value::String(self.key_expr.to_string())); diff --git a/plugins/zenoh-backend-traits/src/lib.rs b/plugins/zenoh-backend-traits/src/lib.rs index ac1e1c973..8b9fa359e 100644 --- a/plugins/zenoh-backend-traits/src/lib.rs +++ b/plugins/zenoh-backend-traits/src/lib.rs @@ -14,6 +14,8 @@ //! ⚠️ WARNING ⚠️ //! +//! TODO: The example is outdated, rewrite it +//! //! This crate should be considered unstable, as in we might change the APIs anytime. //! //! This crate provides the traits to be implemented by a zenoh backend library: @@ -62,7 +64,7 @@ //! } //! } //! -//! async fn create_storage(&mut self, properties: StorageConfig) -> ZResult> { +//! async fn create_storage(&self, properties: StorageConfig) -> ZResult> { //! // The properties are the ones passed via a PUT in the admin space for Storage creation. //! Ok(Box::new(MyStorage::new(properties).await?)) //! } @@ -132,15 +134,23 @@ //! ``` use async_trait::async_trait; +use const_format::concatcp; use std::sync::Arc; use zenoh::prelude::{KeyExpr, OwnedKeyExpr, Sample, Selector}; use zenoh::queryable::ReplyBuilder; use zenoh::time::Timestamp; use zenoh::value::Value; pub use zenoh::Result as ZResult; +use zenoh_plugin_trait::{PluginControl, PluginInstance, PluginStatusRec, StructVersion}; +use zenoh_util::concat_enabled_features; pub mod config; -use config::{StorageConfig, VolumeConfig}; +use config::StorageConfig; + +// No features are actually used in this crate, but this dummy list allows to demonstrate how to combine feature lists +// from multiple crates. See impl `PluginStructVersion` for `VolumeConfig` below. +const FEATURES: &str = + concat_enabled_features!(prefix = "zenoh-backend-traits", features = ["default"]); /// Capability of a storage indicates the guarantees of the storage /// It is used by the storage manager to take decisions on the trade-offs to ensure correct performance @@ -173,10 +183,6 @@ pub enum History { All, } -/// Signature of the `create_volume` operation to be implemented in the library as an entrypoint. -pub const CREATE_VOLUME_FN_NAME: &[u8] = b"create_volume"; -pub type CreateVolume = fn(VolumeConfig) -> ZResult>; - /// pub enum StorageInsertionResult { Outdated, @@ -203,7 +209,7 @@ pub trait Volume: Send + Sync { fn get_capability(&self) -> Capability; /// Creates a storage configured with some properties. - async fn create_storage(&mut self, props: StorageConfig) -> ZResult>; + async fn create_storage(&self, props: StorageConfig) -> ZResult>; /// Returns an interceptor that will be called before pushing any data /// into a storage created by this backend. `None` can be returned for no interception point. @@ -214,6 +220,25 @@ pub trait Volume: Send + Sync { fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>>; } +pub type VolumeInstance = Box; + +impl StructVersion for VolumeInstance { + fn struct_version() -> u64 { + 1 + } + fn struct_features() -> &'static str { + concatcp!(zenoh::FEATURES, crate::FEATURES) + } +} + +impl PluginControl for VolumeInstance { + fn plugins_status(&self, _names: &zenoh::prelude::keyexpr) -> Vec { + Vec::new() + } +} + +impl PluginInstance for VolumeInstance {} + /// Trait to be implemented by a Storage. #[async_trait] pub trait Storage: Send + Sync { diff --git a/plugins/zenoh-plugin-example/Cargo.toml b/plugins/zenoh-plugin-example/Cargo.toml index fdf9786a4..6d4982623 100644 --- a/plugins/zenoh-plugin-example/Cargo.toml +++ b/plugins/zenoh-plugin-example/Cargo.toml @@ -19,6 +19,10 @@ authors = { workspace = true } edition = { workspace = true } publish = false +[features] +default = ["no_mangle"] +no_mangle = [] + [lib] # When auto-detecting the "example" plugin, `zenohd` will look for a dynamic library named "zenoh_plugin_example" # `zenohd` will expect the file to be named according to OS conventions: @@ -31,9 +35,11 @@ crate-type = ["cdylib"] [dependencies] async-std = { workspace = true, features = ["default"] } +const_format = { workspace = true } clap = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +git-version = { workspace = true } log = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true, features = ["unstable"] } diff --git a/plugins/zenoh-plugin-example/src/lib.rs b/plugins/zenoh-plugin-example/src/lib.rs index 5e8217037..c2f083827 100644 --- a/plugins/zenoh-plugin-example/src/lib.rs +++ b/plugins/zenoh-plugin-example/src/lib.rs @@ -21,16 +21,18 @@ use std::sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, Mutex, }; -use zenoh::plugins::{Plugin, RunningPluginTrait, ValidationFunction, ZenohPlugin}; +use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::r#async::*; use zenoh::runtime::Runtime; use zenoh_core::zlock; +use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; use zenoh_result::{bail, ZResult}; // The struct implementing the ZenohPlugin and ZenohPlugin traits pub struct ExamplePlugin {} // declaration of the plugin's VTable for zenohd to find the plugin's functions to be called +#[cfg(feature = "no_mangle")] zenoh_plugin_trait::declare_plugin!(ExamplePlugin); // A default selector for this example of storage plugin (in case the config doesn't set it) @@ -40,14 +42,16 @@ const DEFAULT_SELECTOR: &str = "demo/example/**"; impl ZenohPlugin for ExamplePlugin {} impl Plugin for ExamplePlugin { type StartArgs = Runtime; - type RunningPlugin = zenoh::plugins::RunningPlugin; + type Instance = zenoh::plugins::RunningPlugin; // A mandatory const to define, in case of the plugin is built as a standalone executable - const STATIC_NAME: &'static str = "example"; + const DEFAULT_NAME: &'static str = "example"; + const PLUGIN_VERSION: &'static str = plugin_version!(); + const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!(); // The first operation called by zenohd on the plugin - fn start(name: &str, runtime: &Self::StartArgs) -> ZResult { - let config = runtime.config.lock(); + fn start(name: &str, runtime: &Self::StartArgs) -> ZResult { + let config = runtime.config().lock(); let self_cfg = config.plugin(name).unwrap().as_object().unwrap(); // get the plugin's config details from self_cfg Map (here the "storage-selector" property) let selector: KeyExpr = match self_cfg.get("storage-selector") { @@ -85,57 +89,46 @@ struct RunningPluginInner { // The RunningPlugin struct implementing the RunningPluginTrait trait #[derive(Clone)] struct RunningPlugin(Arc>); + +impl PluginControl for RunningPlugin {} + impl RunningPluginTrait for RunningPlugin { - // Operation returning a ValidationFunction(path, old, new)-> ZResult>> - // this function will be called each time the plugin's config is changed via the zenohd admin space - fn config_checker(&self) -> ValidationFunction { - let guard = zlock!(&self.0); - let name = guard.name.clone(); - std::mem::drop(guard); - let plugin = self.clone(); - Arc::new(move |path, old, new| { - const STORAGE_SELECTOR: &str = "storage-selector"; - if path == STORAGE_SELECTOR || path.is_empty() { - match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) { - (Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns))) - if os == ns => {} - (_, Some(serde_json::Value::String(selector))) => { - let mut guard = zlock!(&plugin.0); - guard.flag.store(false, Relaxed); - guard.flag = Arc::new(AtomicBool::new(true)); - match KeyExpr::try_from(selector.clone()) { - Err(e) => log::error!("{}", e), - Ok(selector) => { - async_std::task::spawn(run( - guard.runtime.clone(), - selector, - guard.flag.clone(), - )); - } + fn config_checker( + &self, + path: &str, + old: &serde_json::Map, + new: &serde_json::Map, + ) -> ZResult>> { + let mut guard = zlock!(&self.0); + const STORAGE_SELECTOR: &str = "storage-selector"; + if path == STORAGE_SELECTOR || path.is_empty() { + match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) { + (Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns))) + if os == ns => {} + (_, Some(serde_json::Value::String(selector))) => { + guard.flag.store(false, Relaxed); + guard.flag = Arc::new(AtomicBool::new(true)); + match KeyExpr::try_from(selector.clone()) { + Err(e) => log::error!("{}", e), + Ok(selector) => { + async_std::task::spawn(run( + guard.runtime.clone(), + selector, + guard.flag.clone(), + )); } - return Ok(None); - } - (_, None) => { - let guard = zlock!(&plugin.0); - guard.flag.store(false, Relaxed); - } - _ => { - bail!("storage-selector for {} must be a string", &name) } + return Ok(None); + } + (_, None) => { + guard.flag.store(false, Relaxed); + } + _ => { + bail!("storage-selector for {} must be a string", &guard.name) } } - bail!("unknown option {} for {}", path, &name) - }) - } - - // Function called on any query on admin space that matches this plugin's sub-part of the admin space. - // Thus the plugin can reply its contribution to the global admin space of this zenohd. - fn adminspace_getter<'a>( - &'a self, - _selector: &'a Selector<'a>, - _plugin_status_key: &str, - ) -> ZResult> { - Ok(Vec::new()) + } + bail!("unknown option {} for {}", path, guard.name) } } diff --git a/plugins/zenoh-plugin-rest/Cargo.toml b/plugins/zenoh-plugin-rest/Cargo.toml index 616b5a5b6..837c157c2 100644 --- a/plugins/zenoh-plugin-rest/Cargo.toml +++ b/plugins/zenoh-plugin-rest/Cargo.toml @@ -25,7 +25,7 @@ description = "The zenoh REST plugin" [features] default = ["no_mangle"] -no_mangle = ["zenoh-plugin-trait/no_mangle"] +no_mangle = [] [lib] name = "zenoh_plugin_rest" @@ -36,6 +36,7 @@ anyhow = { workspace = true, features = ["default"] } async-std = { workspace = true, features = ["default"] } base64 = { workspace = true } clap = { workspace = true } +const_format = { workspace = true } env_logger = { workspace = true } flume = { workspace = true } futures = { workspace = true } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 6cea5f8e0..6f4e80f4e 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -27,13 +27,14 @@ use std::sync::Arc; use tide::http::Mime; use tide::sse::Sender; use tide::{Request, Response, Server, StatusCode}; -use zenoh::plugins::{Plugin, RunningPluginTrait, ZenohPlugin}; +use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::r#async::*; use zenoh::properties::Properties; use zenoh::query::{QueryConsolidation, Reply}; use zenoh::runtime::Runtime; use zenoh::selector::TIME_RANGE_KEY; use zenoh::Session; +use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; use zenoh_result::{bail, zerror, ZResult}; mod config; @@ -188,7 +189,9 @@ fn response(status: StatusCode, content_type: impl TryInto, body: &str) -> builder.build() } +#[cfg(feature = "no_mangle")] zenoh_plugin_trait::declare_plugin!(RestPlugin); + pub struct RestPlugin {} #[derive(Clone, Copy, Debug)] struct StrError { @@ -215,8 +218,10 @@ impl ZenohPlugin for RestPlugin {} impl Plugin for RestPlugin { type StartArgs = Runtime; - type RunningPlugin = zenoh::plugins::RunningPlugin; - const STATIC_NAME: &'static str = "rest"; + type Instance = zenoh::plugins::RunningPlugin; + const DEFAULT_NAME: &'static str = "rest"; + const PLUGIN_VERSION: &'static str = plugin_version!(); + const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!(); fn start(name: &str, runtime: &Self::StartArgs) -> ZResult { // Try to initiate login. @@ -225,7 +230,7 @@ impl Plugin for RestPlugin { let _ = env_logger::try_init(); log::debug!("REST plugin {}", LONG_VERSION.as_str()); - let runtime_conf = runtime.config.lock(); + let runtime_conf = runtime.config().lock(); let plugin_conf = runtime_conf .plugin(name) .ok_or_else(|| zerror!("Plugin `{}`: missing config", name))?; @@ -242,13 +247,10 @@ impl Plugin for RestPlugin { } struct RunningPlugin(Config); -impl RunningPluginTrait for RunningPlugin { - fn config_checker(&self) -> zenoh::plugins::ValidationFunction { - Arc::new(|_, _, _| { - bail!("zenoh-plugin-rest doesn't accept any runtime configuration changes") - }) - } +impl PluginControl for RunningPlugin {} + +impl RunningPluginTrait for RunningPlugin { fn adminspace_getter<'a>( &'a self, selector: &'a Selector<'a>, @@ -476,7 +478,7 @@ pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> { // But cannot be done twice in case of static link. let _ = env_logger::try_init(); - let zid = runtime.zid.to_string(); + let zid = runtime.zid().to_string(); let session = zenoh::init(runtime).res().await.unwrap(); let mut app = Server::with_state((Arc::new(session), zid)); diff --git a/plugins/zenoh-plugin-storage-manager/Cargo.toml b/plugins/zenoh-plugin-storage-manager/Cargo.toml index ce56e18d0..43b021cbe 100644 --- a/plugins/zenoh-plugin-storage-manager/Cargo.toml +++ b/plugins/zenoh-plugin-storage-manager/Cargo.toml @@ -25,7 +25,7 @@ description = "The zenoh storages plugin." [features] default = ["no_mangle"] -no_mangle = ["zenoh-plugin-trait/no_mangle"] +no_mangle = [] [lib] name = "zenoh_plugin_storage_manager" @@ -36,12 +36,12 @@ async-std = { workspace = true, features = ["default"] } async-trait = { workspace = true } clap = { workspace = true } crc = { workspace = true } +const_format = { workspace = true } derive-new = { workspace = true } env_logger = { workspace = true } flume = { workspace = true } futures = { workspace = true } git-version = { workspace = true } -lazy_static = { workspace = true } libloading = { workspace = true } log = { workspace = true } serde = { workspace = true, features = ["default"] } diff --git a/plugins/zenoh-plugin-storage-manager/config.json5 b/plugins/zenoh-plugin-storage-manager/config.json5 index 5f2f1c319..3b3cd51ff 100644 --- a/plugins/zenoh-plugin-storage-manager/config.json5 +++ b/plugins/zenoh-plugin-storage-manager/config.json5 @@ -1,13 +1,18 @@ { "volumes": { - "influxdb_local": { - "__path__": "../zenoh-backend-influxdb/target/debug/libzenoh_backend_influxdb.dylib", - "__config__": "config_influxdb_local.json5" - }, - "influxdb_remote": { - "__path__": "../zenoh-backend-influxdb/target/debug/libzenoh_backend_influxdb.dylib", - "__config__": "config_influxdb_remote.json5" + "example": { + "__path__": ["target/debug/libzenoh_backend_example.so","target/debug/libzenoh_backend_example.dylib"], + "__config__": "../../plugins/example-storage-plugin/config.json5" } }, - "storages": {} + "storages": { + "memory": { + "volume": "memory", + "key_expr": "demo/memory/**" + }, + "example": { + "volume": "example", + "key_expr": "demo/example/**" + } + } } \ No newline at end of file diff --git a/plugins/zenoh-plugin-storage-manager/config_influxdb_local.json5 b/plugins/zenoh-plugin-storage-manager/config_influxdb_local.json5 deleted file mode 100644 index b627f9eb8..000000000 --- a/plugins/zenoh-plugin-storage-manager/config_influxdb_local.json5 +++ /dev/null @@ -1,7 +0,0 @@ -{ - url: "https://localhost:8086", - private: { - username: "user1", - password: "pw1", - }, -} \ No newline at end of file diff --git a/plugins/zenoh-plugin-storage-manager/config_influxdb_remote.json5 b/plugins/zenoh-plugin-storage-manager/config_influxdb_remote.json5 deleted file mode 100644 index b034b4c77..000000000 --- a/plugins/zenoh-plugin-storage-manager/config_influxdb_remote.json5 +++ /dev/null @@ -1,7 +0,0 @@ -{ - url: "https://myinfluxdb.example", - private: { - username: "user1", - password: "pw1", - }, -} \ No newline at end of file diff --git a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs index 981ec521a..aa7260e86 100644 --- a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use zenoh::prelude::r#async::*; use zenoh::Session; use zenoh_backend_traits::config::StorageConfig; -use zenoh_backend_traits::Capability; +use zenoh_backend_traits::{Capability, VolumeInstance}; use zenoh_result::ZResult; pub struct StoreIntercept { @@ -30,12 +30,12 @@ pub struct StoreIntercept { pub(crate) async fn create_and_start_storage( admin_key: String, config: StorageConfig, - backend: &mut Box, + backend: &VolumeInstance, in_interceptor: Option Sample + Send + Sync>>, out_interceptor: Option Sample + Send + Sync>>, zenoh: Arc, ) -> ZResult> { - log::trace!("Create storage {}", &admin_key); + log::trace!("Create storage '{}'", &admin_key); let capability = backend.get_capability(); let storage = backend.create_storage(config.clone()).await?; let store_intercept = StoreIntercept { diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index e9d9f6594..0db30bbd6 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -21,24 +21,29 @@ use async_std::task; use flume::Sender; -use libloading::Library; -use memory_backend::create_memory_backend; +use memory_backend::MemoryBackend; use std::collections::HashMap; use std::convert::TryFrom; -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; use storages_mgt::StorageMessage; -use zenoh::plugins::{Plugin, RunningPluginTrait, ValidationFunction, ZenohPlugin}; +use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::sync::*; use zenoh::runtime::Runtime; use zenoh::Session; -use zenoh_backend_traits::CreateVolume; -use zenoh_backend_traits::CREATE_VOLUME_FN_NAME; -use zenoh_backend_traits::{config::*, Volume}; +use zenoh_backend_traits::config::ConfigDiff; +use zenoh_backend_traits::config::PluginConfig; +use zenoh_backend_traits::config::StorageConfig; +use zenoh_backend_traits::config::VolumeConfig; +use zenoh_backend_traits::VolumeInstance; use zenoh_core::zlock; -use zenoh_result::{bail, ZResult}; +use zenoh_plugin_trait::plugin_long_version; +use zenoh_plugin_trait::plugin_version; +use zenoh_plugin_trait::Plugin; +use zenoh_plugin_trait::PluginControl; +use zenoh_plugin_trait::PluginReport; +use zenoh_plugin_trait::PluginStatusRec; +use zenoh_result::ZResult; use zenoh_util::LibLoader; mod backends_mgt; @@ -47,45 +52,47 @@ mod memory_backend; mod replica; mod storages_mgt; -const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v"); -lazy_static::lazy_static! { - static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION")); -} - +#[cfg(feature = "no_mangle")] zenoh_plugin_trait::declare_plugin!(StoragesPlugin); + pub struct StoragesPlugin {} impl ZenohPlugin for StoragesPlugin {} impl Plugin for StoragesPlugin { - const STATIC_NAME: &'static str = "storage_manager"; + const DEFAULT_NAME: &'static str = "storage_manager"; + const PLUGIN_VERSION: &'static str = plugin_version!(); + const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!(); type StartArgs = Runtime; - type RunningPlugin = zenoh::plugins::RunningPlugin; + type Instance = zenoh::plugins::RunningPlugin; - fn start(name: &str, runtime: &Self::StartArgs) -> ZResult { + fn start(name: &str, runtime: &Self::StartArgs) -> ZResult { std::mem::drop(env_logger::try_init()); - log::debug!("StorageManager plugin {}", LONG_VERSION.as_str()); + log::debug!("StorageManager plugin {}", Self::PLUGIN_VERSION); let config = - { PluginConfig::try_from((name, runtime.config.lock().plugin(name).unwrap())) }?; + { PluginConfig::try_from((name, runtime.config().lock().plugin(name).unwrap())) }?; Ok(Box::new(StorageRuntime::from(StorageRuntimeInner::new( runtime.clone(), config, )?))) } } + +type PluginsManager = zenoh_plugin_trait::PluginsManager; + struct StorageRuntime(Arc>); struct StorageRuntimeInner { name: String, runtime: Runtime, session: Arc, - lib_loader: LibLoader, - volumes: HashMap, storages: HashMap>>, + plugins_manager: PluginsManager, } impl StorageRuntimeInner { fn status_key(&self) -> String { format!( "@/router/{}/status/plugins/{}", - &self.runtime.zid, &self.name + &self.runtime.zid(), + &self.name ) } fn new(runtime: Runtime, config: PluginConfig) -> ZResult { @@ -104,34 +111,56 @@ impl StorageRuntimeInner { .map(|search_dirs| LibLoader::new(&search_dirs, false)) .unwrap_or_default(); - let session = Arc::new(zenoh::init(runtime.clone()).res_sync().unwrap()); + let plugins_manager = PluginsManager::dynamic(lib_loader.clone(), BACKEND_LIB_PREFIX) + .declare_static_plugin::(); + + let session = Arc::new(zenoh::init(runtime.clone()).res_sync()?); + + // After this moment result should be only Ok. Failure of loading of one voulme or storage should not affect others. + let mut new_self = StorageRuntimeInner { name, runtime, session, - lib_loader, - volumes: Default::default(), storages: Default::default(), + plugins_manager, }; - new_self.spawn_volume(VolumeConfig { - name: MEMORY_BACKEND_NAME.into(), - backend: None, - paths: None, - required: false, - rest: Default::default(), - })?; - new_self.update( - volumes - .into_iter() - .map(ConfigDiff::AddVolume) - .chain(storages.into_iter().map(ConfigDiff::AddStorage)), - )?; + new_self + .spawn_volume(&VolumeConfig { + name: MEMORY_BACKEND_NAME.into(), + backend: None, + paths: None, + required: false, + rest: Default::default(), + }) + .map_or_else( + |e| { + log::error!( + "Cannot spawn static volume '{}': {}", + MEMORY_BACKEND_NAME, + e + ) + }, + |_| (), + ); + for volume in &volumes { + new_self.spawn_volume(volume).map_or_else( + |e| log::error!("Cannot spawn volume '{}': {}", volume.name(), e), + |_| (), + ); + } + for storage in &storages { + new_self.spawn_storage(storage).map_or_else( + |e| log::error!("Cannot spawn storage '{}': {}", storage.name(), e), + |_| (), + ); + } Ok(new_self) } fn update>(&mut self, diffs: I) -> ZResult<()> { - for diff in diffs { + for ref diff in diffs { match diff { - ConfigDiff::DeleteVolume(volume) => self.kill_volume(volume), + ConfigDiff::DeleteVolume(volume) => self.kill_volume(&volume.name)?, ConfigDiff::AddVolume(volume) => { self.spawn_volume(volume)?; } @@ -141,108 +170,50 @@ impl StorageRuntimeInner { } Ok(()) } - fn kill_volume(&mut self, volume: VolumeConfig) { - if let Some(storages) = self.storages.remove(&volume.name) { + fn kill_volume>(&mut self, name: T) -> ZResult<()> { + let name = name.as_ref(); + log::info!("Killing volume '{}'", name); + if let Some(storages) = self.storages.remove(name) { async_std::task::block_on(futures::future::join_all( storages .into_values() .map(|s| async move { s.send(StorageMessage::Stop) }), )); } - std::mem::drop(self.volumes.remove(&volume.name)); + self.plugins_manager + .started_plugin_mut(name) + .ok_or(format!("Cannot find volume '{}' to stop it", name))? + .stop(); + Ok(()) } - fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> { - let volume_id = config.name.clone(); - if volume_id == MEMORY_BACKEND_NAME { - match create_memory_backend(config) { - Ok(backend) => { - self.volumes.insert( - volume_id, - VolumeHandle::new(backend, None, "".into()), - ); - } - Err(e) => bail!("{}", e), - } + fn spawn_volume(&mut self, config: &VolumeConfig) -> ZResult<()> { + let volume_id = config.name(); + let backend_name = config.backend(); + log::info!( + "Spawning volume '{}' with backend '{}'", + volume_id, + backend_name + ); + let declared = if let Some(declared) = self.plugins_manager.plugin_mut(volume_id) { + declared + } else if let Some(paths) = config.paths() { + self.plugins_manager + .declare_dynamic_plugin_by_paths(volume_id, paths)? } else { - match config.backend_search_method() { - BackendSearchMethod::ByPaths(paths) => { - for path in paths { - unsafe { - if let Ok((lib, path)) = LibLoader::load_file(path) { - self.loaded_backend_from_lib( - &volume_id, - config.clone(), - lib, - path, - )?; - break; - } - } - } - bail!( - "Failed to find a suitable library for volume {} from paths: {:?}", - volume_id, - paths - ); - } - BackendSearchMethod::ByName(backend_name) => unsafe { - let backend_filename = format!("{}{}", BACKEND_LIB_PREFIX, &backend_name); - if let Ok((lib, path)) = self.lib_loader.search_and_load(&backend_filename) { - self.loaded_backend_from_lib(&volume_id, config.clone(), lib, path)?; - } else { - bail!( - "Failed to find a suitable library for volume {} (was looking for {}<.so/.dll/.dylib>)", - volume_id, - &backend_filename - ); - } - }, - }; + self.plugins_manager + .declare_dynamic_plugin_by_name(volume_id, backend_name)? }; + let loaded = declared.load()?; + loaded.start(config)?; Ok(()) } - unsafe fn loaded_backend_from_lib( - &mut self, - volume_id: &str, - config: VolumeConfig, - lib: Library, - lib_path: PathBuf, - ) -> ZResult<()> { - if let Ok(create_backend) = lib.get::(CREATE_VOLUME_FN_NAME) { - match create_backend(config) { - Ok(backend) => { - self.volumes.insert( - volume_id.to_string(), - VolumeHandle::new( - backend, - Some(lib), - lib_path.to_string_lossy().into_owned(), - ), - ); - Ok(()) - } - Err(e) => bail!( - "Failed to load Backend {} from {}: {}", - volume_id, - lib_path.display(), - e - ), - } - } else { - bail!( - "Failed to instantiate volume {} from {}: function {}(VolumeConfig) not found in lib", - volume_id, - lib_path.display(), - String::from_utf8_lossy(CREATE_VOLUME_FN_NAME) - ); - } - } - fn kill_storage(&mut self, config: StorageConfig) { + fn kill_storage(&mut self, config: &StorageConfig) { let volume = &config.volume_id; + log::info!("Killing storage '{}' from volume '{}'", config.name, volume); if let Some(storages) = self.storages.get_mut(volume) { if let Some(storage) = storages.get_mut(&config.name) { log::debug!( - "Closing storage {} from volume {}", + "Closing storage '{}' from volume '{}'", config.name, config.volume_id ); @@ -251,54 +222,38 @@ impl StorageRuntimeInner { } } } - fn spawn_storage(&mut self, storage: StorageConfig) -> ZResult<()> { + fn spawn_storage(&mut self, storage: &StorageConfig) -> ZResult<()> { let admin_key = self.status_key() + "/storages/" + &storage.name; let volume_id = storage.volume_id.clone(); - if let Some(backend) = self.volumes.get_mut(&volume_id) { - let storage_name = storage.name.clone(); - let in_interceptor = backend.backend.incoming_data_interceptor(); - let out_interceptor = backend.backend.outgoing_data_interceptor(); - let stopper = async_std::task::block_on(create_and_start_storage( - admin_key, - storage, - &mut backend.backend, - in_interceptor, - out_interceptor, - self.session.clone(), + let backend = self + .plugins_manager + .started_plugin(&volume_id) + .ok_or(format!( + "Cannot find volume '{}' to spawn storage '{}'", + volume_id, storage.name ))?; - self.storages - .entry(volume_id) - .or_default() - .insert(storage_name, stopper); - Ok(()) - } else { - bail!( - "`{}` volume doesn't support the required storage configuration", - volume_id - ) - } - } -} -struct VolumeHandle { - backend: Box, - _lib: Option, - lib_path: String, - stopper: Arc, -} -impl VolumeHandle { - fn new(backend: Box, lib: Option, lib_path: String) -> Self { - VolumeHandle { - backend, - _lib: lib, - lib_path, - stopper: Arc::new(std::sync::atomic::AtomicBool::new(true)), - } - } -} -impl Drop for VolumeHandle { - fn drop(&mut self) { - self.stopper - .store(false, std::sync::atomic::Ordering::Relaxed); + let storage_name = storage.name.clone(); + log::info!( + "Spawning storage '{}' from volume '{}' with backend '{}'", + storage_name, + volume_id, + backend.name() + ); + let in_interceptor = backend.instance().incoming_data_interceptor(); + let out_interceptor = backend.instance().outgoing_data_interceptor(); + let stopper = async_std::task::block_on(create_and_start_storage( + admin_key, + storage.clone(), + backend.instance(), + in_interceptor, + out_interceptor, + self.session.clone(), + ))?; + self.storages + .entry(volume_id) + .or_default() + .insert(storage_name, stopper); + Ok(()) } } impl From for StorageRuntime { @@ -307,20 +262,39 @@ impl From for StorageRuntime { } } +impl PluginControl for StorageRuntime { + fn report(&self) -> PluginReport { + PluginReport::default() + } + fn plugins_status(&self, names: &keyexpr) -> Vec { + let guard = self.0.lock().unwrap(); + guard + .plugins_manager + .plugins_status(names) + .into_iter() + .map(PluginStatusRec::into_owned) + .collect() + } +} + impl RunningPluginTrait for StorageRuntime { - fn config_checker(&self) -> ValidationFunction { + fn config_checker( + &self, + _: &str, + old: &serde_json::Map, + new: &serde_json::Map, + ) -> ZResult>> { let name = { zlock!(self.0).name.clone() }; - let runtime = self.0.clone(); - Arc::new(move |_path, old, new| { - let old = PluginConfig::try_from((&name, old))?; - let new = PluginConfig::try_from((&name, new))?; - log::info!("old: {:?}", &old); - log::info!("new: {:?}", &new); - let diffs = ConfigDiff::diffs(old, new); - log::info!("diff: {:?}", &diffs); - { zlock!(runtime).update(diffs) }?; - Ok(None) - }) + let old = PluginConfig::try_from((&name, old))?; + let new = PluginConfig::try_from((&name, new))?; + log::debug!("config change requested for plugin '{}'", name); + log::debug!("old config: {:?}", &old); + log::debug!("new config: {:?}", &new); + let diffs = ConfigDiff::diffs(old, new); + log::debug!("applying diff: {:?}", &diffs); + { zlock!(self.0).update(diffs) }?; + log::debug!("applying diff done"); + Ok(None) } fn adminspace_getter<'a>( @@ -330,6 +304,7 @@ impl RunningPluginTrait for StorageRuntime { ) -> ZResult> { let mut responses = Vec::new(); let mut key = String::from(plugin_status_key); + // TODO: to be removed when "__version__" is implemented in admoin space with_extended_string(&mut key, &["/version"], |key| { if keyexpr::new(key.as_str()) .unwrap() @@ -337,14 +312,14 @@ impl RunningPluginTrait for StorageRuntime { { responses.push(zenoh::plugins::Response::new( key.clone(), - GIT_VERSION.into(), + StoragesPlugin::PLUGIN_VERSION.into(), )) } }); let guard = self.0.lock().unwrap(); with_extended_string(&mut key, &["/volumes/"], |key| { - for (volume_id, volume) in &guard.volumes { - with_extended_string(key, &[volume_id], |key| { + for plugin in guard.plugins_manager.started_plugins_iter() { + with_extended_string(key, &[plugin.name()], |key| { with_extended_string(key, &["/__path__"], |key| { if keyexpr::new(key.as_str()) .unwrap() @@ -352,7 +327,7 @@ impl RunningPluginTrait for StorageRuntime { { responses.push(zenoh::plugins::Response::new( key.clone(), - volume.lib_path.clone().into(), + plugin.path().into(), )) } }); @@ -362,7 +337,7 @@ impl RunningPluginTrait for StorageRuntime { { responses.push(zenoh::plugins::Response::new( key.clone(), - volume.backend.get_admin_status(), + plugin.instance().get_admin_status(), )) } }); diff --git a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs index 3def07880..ebb4922c9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs @@ -19,16 +19,30 @@ use zenoh::prelude::r#async::*; use zenoh::time::Timestamp; use zenoh_backend_traits::config::{StorageConfig, VolumeConfig}; use zenoh_backend_traits::*; +use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin}; use zenoh_result::ZResult; -pub fn create_memory_backend(config: VolumeConfig) -> ZResult> { - Ok(Box::new(MemoryBackend { config })) -} +use crate::MEMORY_BACKEND_NAME; pub struct MemoryBackend { config: VolumeConfig, } +impl Plugin for MemoryBackend { + type StartArgs = VolumeConfig; + type Instance = VolumeInstance; + + const DEFAULT_NAME: &'static str = MEMORY_BACKEND_NAME; + const PLUGIN_VERSION: &'static str = plugin_version!(); + const PLUGIN_LONG_VERSION: &'static str = plugin_long_version!(); + + fn start(_: &str, args: &VolumeConfig) -> ZResult { + Ok(Box::new(MemoryBackend { + config: args.clone(), + })) + } +} + #[async_trait] impl Volume for MemoryBackend { fn get_admin_status(&self) -> serde_json::Value { @@ -43,7 +57,7 @@ impl Volume for MemoryBackend { } } - async fn create_storage(&mut self, properties: StorageConfig) -> ZResult> { + async fn create_storage(&self, properties: StorageConfig) -> ZResult> { log::debug!("Create Memory Storage with configuration: {:?}", properties); Ok(Box::new(MemoryStorage::new(properties).await?)) } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 16f5fd4a3..84f592d89 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -146,7 +146,7 @@ impl StorageService { let storage_sub = match self.session.declare_subscriber(&self.key_expr).res().await { Ok(storage_sub) => storage_sub, Err(e) => { - log::error!("Error starting storage {}: {}", self.name, e); + log::error!("Error starting storage '{}': {}", self.name, e); return; } }; @@ -161,7 +161,7 @@ impl StorageService { { Ok(storage_queryable) => storage_queryable, Err(e) => { - log::error!("Error starting storage {}: {}", self.name, e); + log::error!("Error starting storage '{}': {}", self.name, e); return; } }; @@ -205,7 +205,7 @@ impl StorageService { message = rx.recv_async() => { match message { Ok(StorageMessage::Stop) => { - log::trace!("Dropping storage {}", self.name); + log::trace!("Dropping storage '{}'", self.name); return }, Ok(StorageMessage::GetStatus(tx)) => { @@ -243,7 +243,7 @@ impl StorageService { message = rx.recv_async() => { match message { Ok(StorageMessage::Stop) => { - log::trace!("Dropping storage {}", self.name); + log::trace!("Dropping storage '{}'", self.name); return }, Ok(StorageMessage::GetStatus(tx)) => { @@ -458,7 +458,7 @@ impl StorageService { } Err(e) => { log::warn!( - "Storage {} raised an error fetching a query on key {} : {}", + "Storage '{}' raised an error fetching a query on key {} : {}", self.name, key_expr, e @@ -527,14 +527,14 @@ impl StorageService { }; if let Err(e) = q.reply(Ok(sample)).res().await { log::warn!( - "Storage {} raised an error replying a query: {}", + "Storage '{}' raised an error replying a query: {}", self.name, e ) } } } - Err(e) => log::warn!("Storage {} raised an error on query: {}", self.name, e), + Err(e) => log::warn!("Storage'{}' raised an error on query: {}", self.name, e), }; } drop(storage); @@ -561,7 +561,7 @@ impl StorageService { }; if let Err(e) = q.reply(Ok(sample)).res().await { log::warn!( - "Storage {} raised an error replying a query: {}", + "Storage '{}' raised an error replying a query: {}", self.name, e ) @@ -570,11 +570,11 @@ impl StorageService { } Err(e) => { let err_message = - format!("Storage {} raised an error on query: {}", self.name, e); + format!("Storage '{}' raised an error on query: {}", self.name, e); log::warn!("{}", err_message); if let Err(e) = q.reply(Err(err_message.into())).res().await { log::warn!( - "Storage {} raised an error replying a query: {}", + "Storage '{}' raised an error replying a query: {}", self.name, e ) @@ -602,7 +602,7 @@ impl StorageService { } } Err(e) => log::warn!( - "Storage {} raised an error while retrieving keys: {}", + "Storage '{}' raised an error while retrieving keys: {}", self.name, e ), @@ -659,7 +659,7 @@ impl StorageService { { Ok(replies) => replies, Err(e) => { - log::error!("Error aligning storage {}: {}", self.name, e); + log::error!("Error aligning storage '{}': {}", self.name, e); return; } }; @@ -669,7 +669,7 @@ impl StorageService { self.process_sample(sample).await; } Err(e) => log::warn!( - "Storage {} received an error to align query: {}", + "Storage '{}' received an error to align query: {}", self.name, e ), diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs index 7bf171420..6de5e2f2c 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs @@ -35,7 +35,7 @@ pub(crate) async fn start_storage( let storage_name = parts[7]; let name = format!("{uuid}/{storage_name}"); - log::trace!("Start storage {} on {}", name, config.key_expr); + log::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr); let (tx, rx) = flume::bounded(1); diff --git a/plugins/zenoh-plugin-trait/Cargo.toml b/plugins/zenoh-plugin-trait/Cargo.toml index 1df89f538..d93043723 100644 --- a/plugins/zenoh-plugin-trait/Cargo.toml +++ b/plugins/zenoh-plugin-trait/Cargo.toml @@ -26,14 +26,13 @@ description = { workspace = true } [lib] name = "zenoh_plugin_trait" -[features] -default = ["no_mangle"] -no_mangle = [] - [dependencies] libloading = { workspace = true } log = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } zenoh-macros = { workspace = true } zenoh-result = { workspace = true } zenoh-util = { workspace = true } +zenoh-keyexpr = { workspace = true } +const_format = { workspace = true } \ No newline at end of file diff --git a/plugins/zenoh-plugin-trait/src/compatibility.rs b/plugins/zenoh-plugin-trait/src/compatibility.rs new file mode 100644 index 000000000..7b52bc5fb --- /dev/null +++ b/plugins/zenoh-plugin-trait/src/compatibility.rs @@ -0,0 +1,258 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; + +use crate::{Plugin, PluginInstance, PluginStartArgs, PluginVTable}; + +pub trait StructVersion { + /// The version of the structure which implements this trait. After any change in the structure or its dependencies + /// which may affect the ABI, this version should be incremented. + fn struct_version() -> u64; + /// The features enabled during compilation of the structure implementing this trait. + /// Different features between the plugin and the host may cause ABI incompatibility even if the structure version is the same. + /// Use `concat_enabled_features!` to generate this string + fn struct_features() -> &'static str; +} + +#[repr(C)] +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct PluginStructVersion { + pub version: u64, + pub name: &'static str, + pub features: &'static str, +} + +impl PluginStructVersion { + pub fn new() -> Self { + Self { + version: T::struct_version(), + name: std::any::type_name::(), + features: T::struct_features(), + } + } +} + +impl Display for PluginStructVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + " version: {}\n type: {}\n features: {}\n", + self.version, self.name, self.features + ) + } +} + +#[repr(C)] +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Compatibility { + rust_version: Option, + vtable_version: Option, + start_args_version: Option, + instance_version: Option, + plugin_version: Option<&'static str>, + plugin_long_version: Option<&'static str>, +} + +impl Compatibility { + pub fn with_plugin_version< + StartArgsType: PluginStartArgs, + InstanceType: PluginInstance, + PluginType: Plugin, + >() -> Self { + let rust_version = Some(RustVersion::new()); + let vtable_version = Some(PluginStructVersion::new::< + PluginVTable, + >()); + let start_args_version = Some(PluginStructVersion::new::()); + let instance_version = Some(PluginStructVersion::new::()); + let plugin_version = Some(PluginType::PLUGIN_VERSION); + let plugin_long_version = Some(PluginType::PLUGIN_LONG_VERSION); + Self { + rust_version, + vtable_version, + start_args_version, + instance_version, + plugin_version, + plugin_long_version, + } + } + pub fn with_empty_plugin_version< + StartArgsType: PluginStartArgs, + InstanceType: PluginInstance, + >() -> Self { + let rust_version = Some(RustVersion::new()); + let vtable_version = Some(PluginStructVersion::new::< + PluginVTable, + >()); + let start_args_version = Some(PluginStructVersion::new::()); + let instance_version = Some(PluginStructVersion::new::()); + Self { + rust_version, + vtable_version, + start_args_version, + instance_version, + plugin_version: None, + plugin_long_version: None, + } + } + pub fn plugin_version(&self) -> Option<&'static str> { + self.plugin_version + } + pub fn plugin_long_version(&self) -> Option<&'static str> { + self.plugin_long_version + } + /// Compares fields if both are Some, otherwise skips the comparison. + /// Returns true if all the comparisons returned true, otherwise false. + /// If comparison passed or skipped, the corresponding field in both structs is set to None. + /// If comparison failed, the corresponding field in both structs is kept as is. + /// This allows not only to check compatibility, but also point to exact reasons of incompatibility. + pub fn compare(&mut self, other: &mut Self) -> bool { + let mut result = true; + Self::compare_field_fn( + &mut result, + &mut self.rust_version, + &mut other.rust_version, + RustVersion::are_compatible, + ); + Self::compare_field( + &mut result, + &mut self.vtable_version, + &mut other.vtable_version, + ); + Self::compare_field( + &mut result, + &mut self.start_args_version, + &mut other.start_args_version, + ); + Self::compare_field( + &mut result, + &mut self.instance_version, + &mut other.instance_version, + ); + // TODO: here we can later implement check for plugin version range compatibility + Self::compare_field( + &mut result, + &mut self.plugin_version, + &mut other.plugin_version, + ); + Self::compare_field( + &mut result, + &mut self.plugin_long_version, + &mut other.plugin_long_version, + ); + result + } + + // Utility function for compare single field + fn compare_field_fn bool>( + result: &mut bool, + a: &mut Option, + b: &mut Option, + compare: F, + ) { + let compatible = if let (Some(a), Some(b)) = (&a, &b) { + compare(a, b) + } else { + true + }; + if compatible { + *a = None; + *b = None; + } else { + *result = false; + } + } + fn compare_field(result: &mut bool, a: &mut Option, b: &mut Option) { + Self::compare_field_fn(result, a, b, |a, b| a == b); + } +} + +impl Display for Compatibility { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(rust_version) = &self.rust_version { + writeln!(f, "Rust version:\n{}", rust_version)?; + } + if let Some(vtable_version) = &self.vtable_version { + writeln!(f, "VTable version:\n{}", vtable_version)?; + } + if let Some(start_args_version) = &self.start_args_version { + writeln!(f, "StartArgs version:\n{}", start_args_version)?; + } + if let Some(instance_version) = &self.instance_version { + writeln!(f, "Instance version:\n{}", instance_version)?; + } + if let Some(plugin_version) = &self.plugin_version { + writeln!(f, "Plugin version: {}", plugin_version)?; + } + Ok(()) + } +} + +#[repr(C)] +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct RustVersion { + major: u32, + minor: u32, + patch: u32, + stable: bool, + commit: &'static str, +} + +impl Display for RustVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Rust {}.{}.{}{} commit {}", + self.major, + self.minor, + self.patch, + if self.stable { "" } else { "-nightly" }, + self.commit + ) + } +} + +const RELEASE_AND_COMMIT: (&str, &str) = zenoh_macros::rustc_version_release!(); +impl RustVersion { + pub fn new() -> Self { + let (release, commit) = RELEASE_AND_COMMIT; + let (release, stable) = if let Some(p) = release.chars().position(|c| c == '-') { + (&release[..p], false) + } else { + (release, true) + }; + let mut split = release.split('.').map(|s| s.trim()); + RustVersion { + major: split.next().unwrap().parse().unwrap(), + minor: split.next().unwrap().parse().unwrap(), + patch: split.next().unwrap().parse().unwrap(), + stable, + commit, + } + } + pub fn are_compatible(a: &Self, b: &Self) -> bool { + if a.stable && b.stable { + a.major == b.major && a.minor == b.minor && a.patch == b.patch + } else { + a == b + } + } +} + +impl Default for RustVersion { + fn default() -> Self { + Self::new() + } +} diff --git a/plugins/zenoh-plugin-trait/src/lib.rs b/plugins/zenoh-plugin-trait/src/lib.rs index 486aa8fbe..6d9ac35fe 100644 --- a/plugins/zenoh-plugin-trait/src/lib.rs +++ b/plugins/zenoh-plugin-trait/src/lib.rs @@ -18,62 +18,34 @@ //! //! If building a plugin for [`zenohd`](https://crates.io/crates/zenoh), you should use the types exported in [`zenoh::plugins`](https://docs.rs/zenoh/latest/zenoh/plugins) to fill [`Plugin`]'s associated types. //! To check your plugin typing for `zenohd`, have your plugin implement [`zenoh::plugins::ZenohPlugin`](https://docs.rs/zenoh/latest/zenoh/plugins/struct.ZenohPlugin) -pub mod loading; -pub mod vtable; - -use zenoh_result::ZResult; - -#[repr(C)] -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct Compatibility { - major: u64, - minor: u64, - patch: u64, - stable: bool, - commit: &'static str, -} -const RELEASE_AND_COMMIT: (&str, &str) = zenoh_macros::rustc_version_release!(); -impl Compatibility { - pub fn new() -> ZResult { - let (release, commit) = RELEASE_AND_COMMIT; - let (release, stable) = if let Some(p) = release.chars().position(|c| c == '-') { - (&release[..p], false) - } else { - (release, true) - }; - let mut split = release.split('.').map(|s| s.trim()); - Ok(Compatibility { - major: split.next().unwrap().parse().unwrap(), - minor: split.next().unwrap().parse().unwrap(), - patch: split.next().unwrap().parse().unwrap(), - stable, - commit, - }) - } - pub fn are_compatible(a: &Self, b: &Self) -> bool { - if a.stable && b.stable { - a.major == b.major && a.minor == b.minor && a.patch == b.patch - } else { - a == b - } - } -} +//! +//! Plugin is a struct which implements the [`Plugin`] trait. This trait has two associated types: +//! - `StartArgs`: the type of the arguments passed to the plugin's [`start`](Plugin::start) function. +//! - `Instance`: the type of the plugin's instance. +//! +//! The actual work of the plugin is performed by the instance, which is created by the [`start`](Plugin::start) function. +//! +//! Plugins are loaded, started and stopped by [`PluginsManager`](crate::manager::PluginsManager). Stopping pluign is just dropping it's instance. +//! +//! Plugins can be static and dynamic. +//! +//! Static plugin is just a type which implements [`Plugin`] trait. It can be added to [`PluginsManager`](crate::manager::PluginsManager) by [`PluginsManager::add_static_plugin`](crate::manager::PluginsManager::add_static_plugin) method. +//! +//! Dynamic pluign is a shared library which exports set of C-repr (unmangled) functions which allows to check plugin compatibility and create plugin instance. These functiuons are defined automatically by [`declare_plugin`](crate::declare_plugin) macro. +//! +mod compatibility; +mod manager; +mod plugin; +mod vtable; -pub mod prelude { - pub use crate::{loading::*, vtable::*, Plugin}; -} +pub use compatibility::{Compatibility, PluginStructVersion, StructVersion}; +pub use manager::{DeclaredPlugin, LoadedPlugin, PluginsManager, StartedPlugin}; +pub use plugin::{ + Plugin, PluginConditionSetter, PluginControl, PluginInstance, PluginReport, PluginStartArgs, + PluginState, PluginStatus, PluginStatusRec, +}; +pub use vtable::{PluginLoaderVersion, PluginVTable, PLUGIN_LOADER_VERSION}; +use zenoh_util::concat_enabled_features; -pub trait Plugin: Sized + 'static { - type StartArgs; - type RunningPlugin; - /// Your plugins' default name when statically linked. - const STATIC_NAME: &'static str; - /// You probabky don't need to override this function. - /// - /// Returns some build information on your plugin, allowing the host to detect potential ABI changes that would break it. - fn compatibility() -> ZResult { - Compatibility::new() - } - /// Starts your plugin. Use `Ok` to return your plugin's control structure - fn start(name: &str, args: &Self::StartArgs) -> ZResult; -} +pub const FEATURES: &str = + concat_enabled_features!(prefix = "zenoh-plugin-trait", features = ["default"]); diff --git a/plugins/zenoh-plugin-trait/src/loading.rs b/plugins/zenoh-plugin-trait/src/loading.rs deleted file mode 100644 index 0f7475a65..000000000 --- a/plugins/zenoh-plugin-trait/src/loading.rs +++ /dev/null @@ -1,303 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::vtable::*; -use crate::*; -use libloading::Library; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::path::PathBuf; -use zenoh_result::{bail, zerror, ZResult}; -use zenoh_util::LibLoader; - -/// A plugins manager that handles starting and stopping plugins. -/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`]. -pub struct PluginsManager { - loader: Option, - plugin_starters: Vec + Send + Sync>>, - running_plugins: HashMap, -} - -impl PluginsManager { - /// Constructs a new plugin manager with dynamic library loading enabled. - pub fn dynamic(loader: LibLoader) -> Self { - PluginsManager { - loader: Some(loader), - plugin_starters: Vec::new(), - running_plugins: HashMap::new(), - } - } - /// Constructs a new plugin manager with dynamic library loading enabled. - pub fn static_plugins_only() -> Self { - PluginsManager { - loader: None, - plugin_starters: Vec::new(), - running_plugins: HashMap::new(), - } - } - - /// Adds a statically linked plugin to the manager. - pub fn add_static< - P: Plugin + Send + Sync, - >( - mut self, - ) -> Self { - let plugin_starter: StaticPlugin

= StaticPlugin::new(); - self.plugin_starters.push(Box::new(plugin_starter)); - self - } - - /// Starts `plugin`. - /// - /// `Ok(true)` => plugin was successfully started - /// `Ok(false)` => plugin was running already, nothing happened - /// `Err(e)` => starting the plugin failed due to `e` - pub fn start( - &mut self, - plugin: &str, - args: &StartArgs, - ) -> ZResult> { - match self.running_plugins.entry(plugin.into()) { - Entry::Occupied(_) => Ok(None), - Entry::Vacant(e) => { - match self.plugin_starters.iter().find(|p| p.name() == plugin) { - Some(s) => { - let path = s.path(); - let (_, plugin) = e.insert((path.into(), s.start(args).map_err(|e| zerror!(e => "Failed to load plugin {} (from {})", plugin, path))?)); - Ok(Some((path, &*plugin))) - } - None => bail!("Plugin starter for `{}` not found", plugin), - } - } - } - } - - /// Lazily starts all plugins. - /// - /// `Ok(Ok(name))` => plugin `name` was successfully started - /// `Ok(Err(name))` => plugin `name` wasn't started because it was already running - /// `Err(e)` => Error `e` occured when trying to start plugin `name` - pub fn start_all<'l>( - &'l mut self, - args: &'l StartArgs, - ) -> impl Iterator>)> + 'l { - let PluginsManager { - plugin_starters, - running_plugins, - .. - } = self; - let compat = crate::Compatibility::new().unwrap(); - plugin_starters.iter().map(move |p| { - let name = p.name(); - let path = p.path(); - ( - name, - path, - match running_plugins.entry(name.into()) { - std::collections::hash_map::Entry::Occupied(_) => Ok(None), - std::collections::hash_map::Entry::Vacant(e) => { - let compatible = match p.compatibility() { - Some(Ok(c)) => { - if Compatibility::are_compatible(&compat, &c) { - Ok(()) - } else { - Err(zerror!("Plugin compatibility mismatch: host: {:?} - plugin: {:?}. This could lead to segfaults, so wer'e not starting it.", &compat, &c)) - } - } - Some(Err(e)) => Err(zerror!(e => "Plugin {} (from {}) compatibility couldn't be recovered. This likely means it's very broken.", name, path)), - None => Ok(()), - }; - if let Err(e) = compatible { - Err(e.into()) - } else { - match p.start(args) { - Ok(p) => Ok(Some(unsafe { - std::mem::transmute(&e.insert((path.into(), p)).1) - })), - Err(e) => Err(e), - } - } - } - }, - ) - }) - } - - /// Stops `plugin`, returning `true` if it was indeed running. - pub fn stop(&mut self, plugin: &str) -> bool { - let result = self.running_plugins.remove(plugin).is_some(); - self.plugin_starters - .retain(|p| p.name() != plugin || !p.deletable()); - result - } - - /// Lists the loaded plugins by name. - pub fn loaded_plugins(&self) -> impl Iterator { - self.plugin_starters.iter().map(|p| p.name()) - } - /// Retuns a map containing each running plugin's load-path, associated to its name. - pub fn running_plugins_info(&self) -> HashMap<&str, &str> { - let mut result = HashMap::with_capacity(self.running_plugins.len()); - for p in self.plugin_starters.iter() { - let name = p.name(); - if self.running_plugins.contains_key(name) && !result.contains_key(name) { - result.insert(name, p.path()); - } - } - result - } - /// Returns an iterator over each running plugin, where the keys are their name, and the values are a tuple of their path and handle. - pub fn running_plugins(&self) -> impl Iterator { - self.running_plugins - .iter() - .map(|(s, (path, p))| (s.as_str(), (path.as_str(), p))) - } - /// Returns the handle of the requested running plugin if available. - pub fn plugin(&self, name: &str) -> Option<&RunningPlugin> { - self.running_plugins.get(name).map(|p| &p.1) - } - - fn load_plugin( - name: &str, - lib: Library, - path: PathBuf, - ) -> ZResult> { - DynamicPlugin::new(name.into(), lib, path).map_err(|e| - zerror!("Wrong PluginVTable version, your {} doesn't appear to be compatible with this version of Zenoh (vtable versions: plugin v{}, zenoh v{})", - name, - e.map_or_else(|| "UNKNWON".to_string(), |e| e.to_string()), - PLUGIN_VTABLE_VERSION).into() - ) - } - - pub fn load_plugin_by_name(&mut self, name: String) -> ZResult { - let (lib, p) = match &mut self.loader { - Some(l) => unsafe { l.search_and_load(&format!("zenoh_plugin_{}", &name))? }, - None => bail!("Can't load dynamic plugin ` {}`, as dynamic loading is not enabled for this plugin manager.", name), - }; - let plugin = match Self::load_plugin(&name, lib, p.clone()) { - Ok(p) => p, - Err(e) => bail!("After loading `{:?}`: {}", &p, e), - }; - let path = plugin.path().into(); - self.plugin_starters.push(Box::new(plugin)); - Ok(path) - } - pub fn load_plugin_by_paths + std::fmt::Debug>( - &mut self, - name: String, - paths: &[P], - ) -> ZResult { - for path in paths { - let path = path.as_ref(); - match unsafe { LibLoader::load_file(path) } { - Ok((lib, p)) => { - let plugin = Self::load_plugin(&name, lib, p)?; - let path = plugin.path().into(); - self.plugin_starters.push(Box::new(plugin)); - return Ok(path); - } - Err(e) => log::warn!("Plugin '{}' load fail at {}: {}", &name, path, e), - } - } - bail!("Plugin '{}' not found in {:?}", name, &paths) - } -} - -trait PluginStarter { - fn name(&self) -> &str; - fn path(&self) -> &str; - fn start(&self, args: &StartArgs) -> ZResult; - fn compatibility(&self) -> Option>; - fn deletable(&self) -> bool; -} - -struct StaticPlugin

{ - inner: std::marker::PhantomData

, -} - -impl

StaticPlugin

{ - fn new() -> Self { - StaticPlugin { - inner: std::marker::PhantomData, - } - } -} - -impl PluginStarter for StaticPlugin

-where - P: Plugin, -{ - fn name(&self) -> &str { - P::STATIC_NAME - } - fn path(&self) -> &str { - "" - } - fn compatibility(&self) -> Option> { - None - } - fn start(&self, args: &StartArgs) -> ZResult { - P::start(P::STATIC_NAME, args) - } - fn deletable(&self) -> bool { - false - } -} - -impl PluginStarter - for DynamicPlugin -{ - fn name(&self) -> &str { - &self.name - } - fn path(&self) -> &str { - self.path.to_str().unwrap() - } - fn start(&self, args: &StartArgs) -> ZResult { - self.vtable.start(self.name(), args) - } - fn compatibility(&self) -> Option> { - Some(self.vtable.compatibility()) - } - fn deletable(&self) -> bool { - true - } -} - -pub struct DynamicPlugin { - _lib: Library, - vtable: PluginVTable, - pub name: String, - pub path: PathBuf, -} - -impl DynamicPlugin { - fn new(name: String, lib: Library, path: PathBuf) -> Result> { - let load_plugin = unsafe { - lib.get:: LoadPluginResult>( - b"load_plugin", - ) - .map_err(|_| None)? - }; - match load_plugin(PLUGIN_VTABLE_VERSION) { - Ok(vtable) => Ok(DynamicPlugin { - _lib: lib, - vtable, - name, - path, - }), - Err(plugin_version) => Err(Some(plugin_version)), - } - } -} diff --git a/plugins/zenoh-plugin-trait/src/manager.rs b/plugins/zenoh-plugin-trait/src/manager.rs new file mode 100644 index 000000000..d975fa0e2 --- /dev/null +++ b/plugins/zenoh-plugin-trait/src/manager.rs @@ -0,0 +1,302 @@ +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +mod dynamic_plugin; +mod static_plugin; + +use crate::*; +use zenoh_keyexpr::keyexpr; +use zenoh_result::ZResult; +use zenoh_util::LibLoader; + +use self::{ + dynamic_plugin::{DynamicPlugin, DynamicPluginSource}, + static_plugin::StaticPlugin, +}; + +pub trait DeclaredPlugin: PluginStatus { + fn as_status(&self) -> &dyn PluginStatus; + fn load(&mut self) -> ZResult<&mut dyn LoadedPlugin>; + fn loaded(&self) -> Option<&dyn LoadedPlugin>; + fn loaded_mut(&mut self) -> Option<&mut dyn LoadedPlugin>; +} +pub trait LoadedPlugin: PluginStatus { + fn as_status(&self) -> &dyn PluginStatus; + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin>; + fn started(&self) -> Option<&dyn StartedPlugin>; + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin>; +} + +pub trait StartedPlugin: PluginStatus { + fn as_status(&self) -> &dyn PluginStatus; + fn stop(&mut self); + fn instance(&self) -> &Instance; + fn instance_mut(&mut self) -> &mut Instance; +} + +struct PluginRecord( + Box + Send>, +); + +impl PluginRecord { + fn new + Send + 'static>(plugin: P) -> Self { + Self(Box::new(plugin)) + } +} + +impl PluginStatus + for PluginRecord +{ + fn name(&self) -> &str { + self.0.name() + } + fn version(&self) -> Option<&str> { + self.0.version() + } + fn long_version(&self) -> Option<&str> { + self.0.long_version() + } + fn path(&self) -> &str { + self.0.path() + } + fn state(&self) -> PluginState { + self.0.state() + } + fn report(&self) -> PluginReport { + self.0.report() + } +} + +impl DeclaredPlugin + for PluginRecord +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn load(&mut self) -> ZResult<&mut dyn LoadedPlugin> { + self.0.load() + } + fn loaded(&self) -> Option<&dyn LoadedPlugin> { + self.0.loaded() + } + fn loaded_mut(&mut self) -> Option<&mut dyn LoadedPlugin> { + self.0.loaded_mut() + } +} + +/// A plugins manager that handles starting and stopping plugins. +/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`]. +pub struct PluginsManager { + default_lib_prefix: String, + loader: Option, + plugins: Vec>, +} + +impl + PluginsManager +{ + /// Constructs a new plugin manager with dynamic library loading enabled. + pub fn dynamic>(loader: LibLoader, default_lib_prefix: S) -> Self { + PluginsManager { + default_lib_prefix: default_lib_prefix.into(), + loader: Some(loader), + plugins: Vec::new(), + } + } + /// Constructs a new plugin manager with dynamic library loading disabled. + pub fn static_plugins_only() -> Self { + PluginsManager { + default_lib_prefix: String::new(), + loader: None, + plugins: Vec::new(), + } + } + + /// Adds a statically linked plugin to the manager. + pub fn declare_static_plugin< + P: Plugin + Send + Sync, + >( + mut self, + ) -> Self { + let plugin_loader: StaticPlugin = StaticPlugin::new(); + self.plugins.push(PluginRecord::new(plugin_loader)); + log::debug!( + "Declared static plugin {}", + self.plugins.last().unwrap().name() + ); + self + } + + /// Add dynamic plugin to the manager by name, automatically prepending the default library prefix + pub fn declare_dynamic_plugin_by_name>( + &mut self, + name: S, + plugin_name: &str, + ) -> ZResult<&mut dyn DeclaredPlugin> { + let name = name.into(); + let plugin_name = format!("{}{}", self.default_lib_prefix, plugin_name); + let libloader = self + .loader + .as_ref() + .ok_or("Dynamic plugin loading is disabled")? + .clone(); + log::debug!("Declared dynamic plugin {} by name {}", &name, &plugin_name); + let loader = + DynamicPlugin::new(name, DynamicPluginSource::ByName((libloader, plugin_name))); + self.plugins.push(PluginRecord::new(loader)); + Ok(self.plugins.last_mut().unwrap()) + } + + /// Add first available dynamic plugin from the list of paths to the plugin files + pub fn declare_dynamic_plugin_by_paths, P: AsRef + std::fmt::Debug>( + &mut self, + name: S, + paths: &[P], + ) -> ZResult<&mut dyn DeclaredPlugin> { + let name = name.into(); + let paths = paths.iter().map(|p| p.as_ref().into()).collect(); + log::debug!("Declared dynamic plugin {} by paths {:?}", &name, &paths); + let loader = DynamicPlugin::new(name, DynamicPluginSource::ByPaths(paths)); + self.plugins.push(PluginRecord::new(loader)); + Ok(self.plugins.last_mut().unwrap()) + } + + fn get_plugin_index(&self, name: &str) -> Option { + self.plugins.iter().position(|p| p.name() == name) + } + + /// Lists all plugins + pub fn declared_plugins_iter( + &self, + ) -> impl Iterator> + '_ { + self.plugins + .iter() + .map(|p| p as &dyn DeclaredPlugin) + } + + /// Lists all plugins mutable + pub fn declared_plugins_iter_mut( + &mut self, + ) -> impl Iterator> + '_ { + self.plugins + .iter_mut() + .map(|p| p as &mut dyn DeclaredPlugin) + } + + /// Lists the loaded plugins + pub fn loaded_plugins_iter( + &self, + ) -> impl Iterator> + '_ { + self.declared_plugins_iter().filter_map(|p| p.loaded()) + } + + /// Lists the loaded plugins mutable + pub fn loaded_plugins_iter_mut( + &mut self, + ) -> impl Iterator> + '_ { + // self.plugins_mut().filter_map(|p| p.loaded_mut()) + self.declared_plugins_iter_mut() + .filter_map(|p| p.loaded_mut()) + } + + /// Lists the started plugins + pub fn started_plugins_iter( + &self, + ) -> impl Iterator> + '_ { + self.loaded_plugins_iter().filter_map(|p| p.started()) + } + + /// Lists the started plugins mutable + pub fn started_plugins_iter_mut( + &mut self, + ) -> impl Iterator> + '_ { + self.loaded_plugins_iter_mut() + .filter_map(|p| p.started_mut()) + } + + /// Returns single plugin record by name + pub fn plugin(&self, name: &str) -> Option<&dyn DeclaredPlugin> { + let index = self.get_plugin_index(name)?; + Some(&self.plugins[index]) + } + + /// Returns mutable plugin record by name + pub fn plugin_mut( + &mut self, + name: &str, + ) -> Option<&mut dyn DeclaredPlugin> { + let index = self.get_plugin_index(name)?; + Some(&mut self.plugins[index]) + } + + /// Returns loaded plugin record by name + pub fn loaded_plugin(&self, name: &str) -> Option<&dyn LoadedPlugin> { + self.plugin(name)?.loaded() + } + + /// Returns mutable loaded plugin record by name + pub fn loaded_plugin_mut( + &mut self, + name: &str, + ) -> Option<&mut dyn LoadedPlugin> { + self.plugin_mut(name)?.loaded_mut() + } + + /// Returns started plugin record by name + pub fn started_plugin(&self, name: &str) -> Option<&dyn StartedPlugin> { + self.loaded_plugin(name)?.started() + } + + /// Returns mutable started plugin record by name + pub fn started_plugin_mut( + &mut self, + name: &str, + ) -> Option<&mut dyn StartedPlugin> { + self.loaded_plugin_mut(name)?.started_mut() + } +} + +impl PluginControl + for PluginsManager +{ + fn plugins_status(&self, names: &keyexpr) -> Vec { + log::debug!( + "Plugin manager with prefix `{}` : requested plugins_status {:?}", + self.default_lib_prefix, + names + ); + let mut plugins = Vec::new(); + for plugin in self.declared_plugins_iter() { + let name = unsafe { keyexpr::from_str_unchecked(plugin.name()) }; + if names.includes(name) { + let status = PluginStatusRec::new(plugin.as_status()); + plugins.push(status); + } + // for running plugins append their subplugins prepended with the running plugin name + if let Some(plugin) = plugin.loaded() { + if let Some(plugin) = plugin.started() { + if let [names, ..] = names.strip_prefix(name)[..] { + plugins.append( + &mut plugin + .instance() + .plugins_status(names) + .into_iter() + .map(|s| s.prepend_name(name)) + .collect(), + ); + } + } + } + } + plugins + } +} diff --git a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs new file mode 100644 index 000000000..1153f8a6e --- /dev/null +++ b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs @@ -0,0 +1,256 @@ +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::*; +use std::path::{Path, PathBuf}; + +use libloading::Library; +use zenoh_result::{bail, ZResult}; +use zenoh_util::LibLoader; + +/// This enum contains information where to load the plugin from. +pub enum DynamicPluginSource { + /// Load plugin with the name in String + `.so | .dll | .dylib` + /// in LibLoader's search paths. + ByName((LibLoader, String)), + /// Load first avalilable plugin from the list of path to plugin files (absolute or relative to the current working directory) + ByPaths(Vec), +} + +impl DynamicPluginSource { + fn load(&self) -> ZResult<(Library, PathBuf)> { + match self { + DynamicPluginSource::ByName((libloader, name)) => unsafe { + libloader.search_and_load(name) + }, + DynamicPluginSource::ByPaths(paths) => { + for path in paths { + match unsafe { LibLoader::load_file(path) } { + Ok((l, p)) => return Ok((l, p)), + Err(e) => log::debug!("Attempt to load {} failed: {}", path, e), + } + } + bail!("Plugin not found in {:?}", &paths) + } + } + } +} + +struct DynamicPluginStarter { + _lib: Library, + path: PathBuf, + vtable: PluginVTable, +} + +impl + DynamicPluginStarter +{ + fn get_vtable(lib: &Library, path: &Path) -> ZResult> { + log::debug!("Loading plugin {}", path.to_str().unwrap(),); + let get_plugin_loader_version = + unsafe { lib.get:: PluginLoaderVersion>(b"get_plugin_loader_version")? }; + let plugin_loader_version = get_plugin_loader_version(); + log::debug!("Plugin loader version: {}", &plugin_loader_version); + if plugin_loader_version != PLUGIN_LOADER_VERSION { + bail!( + "Plugin loader version mismatch: host = {}, plugin = {}", + PLUGIN_LOADER_VERSION, + plugin_loader_version + ); + } + let get_compatibility = unsafe { lib.get:: Compatibility>(b"get_compatibility")? }; + let mut plugin_compatibility_record = get_compatibility(); + let mut host_compatibility_record = + Compatibility::with_empty_plugin_version::(); + log::debug!( + "Plugin compativilty record: {:?}", + &plugin_compatibility_record + ); + if !plugin_compatibility_record.compare(&mut host_compatibility_record) { + bail!( + "Plugin compatibility mismatch:\nHost:\n{}Plugin:\n{}", + host_compatibility_record, + plugin_compatibility_record + ); + } + let load_plugin = + unsafe { lib.get:: PluginVTable>(b"load_plugin")? }; + + Ok(load_plugin()) + } + fn new(lib: Library, path: PathBuf) -> ZResult { + let vtable = Self::get_vtable(&lib, &path) + .map_err(|e| format!("Error loading {}: {}", path.to_str().unwrap(), e))?; + Ok(Self { + _lib: lib, + path, + vtable, + }) + } + fn start(&self, name: &str, args: &StartArgs) -> ZResult { + (self.vtable.start)(name, args) + } + fn path(&self) -> &str { + self.path.to_str().unwrap() + } +} + +pub struct DynamicPlugin { + name: String, + report: PluginReport, + source: DynamicPluginSource, + starter: Option>, + instance: Option, +} + +impl DynamicPlugin { + pub fn new(name: String, source: DynamicPluginSource) -> Self { + Self { + name, + report: PluginReport::new(), + source, + starter: None, + instance: None, + } + } +} + +impl PluginStatus + for DynamicPlugin +{ + fn name(&self) -> &str { + self.name.as_str() + } + fn version(&self) -> Option<&str> { + self.starter.as_ref().map(|v| v.vtable.plugin_version) + } + fn long_version(&self) -> Option<&str> { + self.starter.as_ref().map(|v| v.vtable.plugin_long_version) + } + fn path(&self) -> &str { + if let Some(starter) = &self.starter { + starter.path() + } else { + "" + } + } + fn state(&self) -> PluginState { + if self.starter.is_some() { + if self.instance.is_some() { + PluginState::Started + } else { + PluginState::Loaded + } + } else { + PluginState::Declared + } + } + fn report(&self) -> PluginReport { + if let Some(instance) = &self.instance { + instance.report() + } else { + self.report.clone() + } + } +} + +impl DeclaredPlugin + for DynamicPlugin +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn load(&mut self) -> ZResult<&mut dyn LoadedPlugin> { + if self.starter.is_none() { + let (lib, path) = self.source.load().add_error(&mut self.report)?; + let starter = DynamicPluginStarter::new(lib, path).add_error(&mut self.report)?; + log::debug!("Plugin {} loaded from {}", self.name, starter.path()); + self.starter = Some(starter); + } else { + log::warn!("Plugin `{}` already loaded", self.name); + } + Ok(self) + } + fn loaded(&self) -> Option<&dyn LoadedPlugin> { + if self.starter.is_some() { + Some(self) + } else { + None + } + } + fn loaded_mut(&mut self) -> Option<&mut dyn LoadedPlugin> { + if self.starter.is_some() { + Some(self) + } else { + None + } + } +} + +impl LoadedPlugin + for DynamicPlugin +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { + let starter = self + .starter + .as_ref() + .ok_or_else(|| format!("Plugin `{}` not loaded", self.name)) + .add_error(&mut self.report)?; + let already_started = self.instance.is_some(); + if !already_started { + let instance = starter + .start(self.name(), args) + .add_error(&mut self.report)?; + log::debug!("Plugin `{}` started", self.name); + self.instance = Some(instance); + } else { + log::warn!("Plugin `{}` already started", self.name); + } + Ok(self) + } + fn started(&self) -> Option<&dyn StartedPlugin> { + if self.instance.is_some() { + Some(self) + } else { + None + } + } + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin> { + if self.instance.is_some() { + Some(self) + } else { + None + } + } +} + +impl StartedPlugin + for DynamicPlugin +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn stop(&mut self) { + log::debug!("Plugin `{}` stopped", self.name); + self.report.clear(); + self.instance = None; + } + fn instance(&self) -> &Instance { + self.instance.as_ref().unwrap() + } + fn instance_mut(&mut self) -> &mut Instance { + self.instance.as_mut().unwrap() + } +} diff --git a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs new file mode 100644 index 000000000..a94069553 --- /dev/null +++ b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs @@ -0,0 +1,138 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::*; +use std::marker::PhantomData; +use zenoh_result::ZResult; + +pub struct StaticPlugin +where + P: Plugin, +{ + instance: Option, + phantom: PhantomData

, +} + +impl StaticPlugin +where + P: Plugin, +{ + pub fn new() -> Self { + Self { + instance: None, + phantom: PhantomData, + } + } +} + +impl PluginStatus for StaticPlugin +where + P: Plugin, +{ + fn name(&self) -> &str { + P::DEFAULT_NAME + } + fn version(&self) -> Option<&str> { + Some(P::PLUGIN_VERSION) + } + fn long_version(&self) -> Option<&str> { + Some(P::PLUGIN_LONG_VERSION) + } + fn path(&self) -> &str { + "" + } + fn state(&self) -> PluginState { + self.instance + .as_ref() + .map_or(PluginState::Loaded, |_| PluginState::Started) + } + fn report(&self) -> PluginReport { + if let Some(instance) = &self.instance { + instance.report() + } else { + PluginReport::default() + } + } +} + +impl DeclaredPlugin + for StaticPlugin +where + P: Plugin, +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn load(&mut self) -> ZResult<&mut dyn LoadedPlugin> { + Ok(self) + } + fn loaded(&self) -> Option<&dyn LoadedPlugin> { + Some(self) + } + fn loaded_mut(&mut self) -> Option<&mut dyn LoadedPlugin> { + Some(self) + } +} + +impl LoadedPlugin + for StaticPlugin +where + P: Plugin, +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { + if self.instance.is_none() { + log::debug!("Plugin `{}` started", self.name()); + self.instance = Some(P::start(self.name(), args)?); + } else { + log::warn!("Plugin `{}` already started", self.name()); + } + Ok(self) + } + fn started(&self) -> Option<&dyn StartedPlugin> { + if self.instance.is_some() { + Some(self) + } else { + None + } + } + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin> { + if self.instance.is_some() { + Some(self) + } else { + None + } + } +} + +impl StartedPlugin + for StaticPlugin +where + P: Plugin, +{ + fn as_status(&self) -> &dyn PluginStatus { + self + } + fn stop(&mut self) { + log::debug!("Plugin `{}` stopped", self.name()); + self.instance = None; + } + fn instance(&self) -> &Instance { + self.instance.as_ref().unwrap() + } + fn instance_mut(&mut self) -> &mut Instance { + self.instance.as_mut().unwrap() + } +} diff --git a/plugins/zenoh-plugin-trait/src/plugin.rs b/plugins/zenoh-plugin-trait/src/plugin.rs new file mode 100644 index 000000000..f1c2d0938 --- /dev/null +++ b/plugins/zenoh-plugin-trait/src/plugin.rs @@ -0,0 +1,240 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::StructVersion; +use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, ops::BitOrAssign}; +use zenoh_keyexpr::keyexpr; +use zenoh_result::ZResult; + +/// The plugin can be in one of these states: +/// - Declared: the plugin is declared in the configuration file, but not loaded yet or failed to load +/// - Loaded: the plugin is loaded, but not started yet or failed to start +/// - Started: the plugin is started and running +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)] +pub enum PluginState { + Declared, + Loaded, + Started, +} + +/// The severity level of a plugin report messages +/// - Normal: the message(s) are just notifications +/// - Warning: at least one warning is reported +/// - Error: at least one error is reported +#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize, PartialOrd, Ord)] +pub enum PluginReportLevel { + #[default] + Info, + Warning, + Error, +} + +/// Allow using the `|=` operator to update the severity level of a report +impl BitOrAssign for PluginReportLevel { + fn bitor_assign(&mut self, rhs: Self) { + if *self < rhs { + *self = rhs; + } + } +} + +/// A plugin report contains a severity level and a list of messages +/// describing the plugin's situation (for the Declared state - dynamic library loading errors, for the Loaded state - plugin start errors, etc) +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Default, Deserialize)] +pub struct PluginReport { + level: PluginReportLevel, + #[serde(skip_serializing_if = "Vec::is_empty")] + messages: Vec>, +} + +/// Trait allowing getting all information about the plugin +pub trait PluginStatus { + /// Returns the name of the plugin + fn name(&self) -> &str; + /// Returns the version of the loaded plugin (usually the version of the plugin's crate) + fn version(&self) -> Option<&str>; + /// Returns the long version of the loaded plugin (usually the version of the plugin's crate + git commit hash) + fn long_version(&self) -> Option<&str>; + /// Returns the path of the loaded plugin + fn path(&self) -> &str; + /// Returns the plugin's state (Declared, Loaded, Started) + fn state(&self) -> PluginState; + /// Returns the plugin's current report: a list of messages and the severity level + /// When the status is changed, the report is cleared + fn report(&self) -> PluginReport; +} + +/// The structure which contains all information about the plugin status in a single cloneable structure +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct PluginStatusRec<'a> { + pub name: Cow<'a, str>, + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option>, + pub long_version: Option>, + pub path: Cow<'a, str>, + pub state: PluginState, + pub report: PluginReport, +} + +impl PluginStatus for PluginStatusRec<'_> { + fn name(&self) -> &str { + &self.name + } + fn version(&self) -> Option<&str> { + self.version.as_deref() + } + fn long_version(&self) -> Option<&str> { + self.long_version.as_deref() + } + fn path(&self) -> &str { + &self.path + } + fn state(&self) -> PluginState { + self.state + } + fn report(&self) -> PluginReport { + self.report.clone() + } +} + +impl<'a> PluginStatusRec<'a> { + /// Construct the status structure from the getter interface + pub fn new(plugin: &'a T) -> Self { + Self { + name: Cow::Borrowed(plugin.name()), + version: plugin.version().map(Cow::Borrowed), + long_version: plugin.long_version().map(Cow::Borrowed), + path: Cow::Borrowed(plugin.path()), + state: plugin.state(), + report: plugin.report(), + } + } + /// Convert the status structure to the owned version + pub fn into_owned(self) -> PluginStatusRec<'static> { + PluginStatusRec { + name: Cow::Owned(self.name.into_owned()), + version: self.version.map(|v| Cow::Owned(v.into_owned())), + long_version: self.long_version.map(|v| Cow::Owned(v.into_owned())), + path: Cow::Owned(self.path.into_owned()), + state: self.state, + report: self.report, + } + } + pub(crate) fn prepend_name(self, prefix: &str) -> Self { + Self { + name: Cow::Owned(format!("{}/{}", prefix, self.name)), + ..self + } + } +} + +/// This trait allows getting information about the plugin status and the status of its subplugins, if any. +pub trait PluginControl { + /// Returns the current state of the running plugin. By default, the state is `PluginReportLevel::Normal` and the list of messages is empty. + /// This can be overridden by the plugin implementation if the plugin is able to report its status: no connection to the database, etc. + fn report(&self) -> PluginReport { + PluginReport::default() + } + /// Collects information of sub-plugins matching the `_names` key expression. The information is richer than the one returned by `report()`: it contains external information about the running plugin, such as its name, path on disk, load status, etc. + /// Returns an empty list by default. + fn plugins_status(&self, _names: &keyexpr) -> Vec { + Vec::new() + } +} + +pub trait PluginStartArgs: StructVersion {} + +pub trait PluginInstance: StructVersion + PluginControl + Send {} + +/// Base plugin trait. The loaded plugin +pub trait Plugin: Sized + 'static { + type StartArgs: PluginStartArgs; + type Instance: PluginInstance; + /// Plugins' default name when statically linked. + const DEFAULT_NAME: &'static str; + /// Plugin's version. Used only for information purposes. It's recommended to use [plugin_version!] macro to generate this string. + const PLUGIN_VERSION: &'static str; + /// Plugin's long version (with git commit hash). Used only for information purposes. It's recommended to use [plugin_long_version!] macro to generate this string. + const PLUGIN_LONG_VERSION: &'static str; + /// Starts your plugin. Use `Ok` to return your plugin's control structure + fn start(name: &str, args: &Self::StartArgs) -> ZResult; +} + +#[macro_export] +macro_rules! plugin_version { + () => { + env!("CARGO_PKG_VERSION") + }; +} + +#[macro_export] +macro_rules! plugin_long_version { + () => { + git_version::git_version!(prefix = "v", cargo_prefix = "v") + }; +} + +impl PluginReport { + pub fn new() -> Self { + Self::default() + } + pub fn clear(&mut self) { + *self = Self::default(); + } + pub fn get_level(&self) -> PluginReportLevel { + self.level + } + pub fn add_error>>(&mut self, error: S) { + self.level |= PluginReportLevel::Error; + self.messages.push(error.into()); + } + pub fn add_warning>>(&mut self, warning: S) { + self.level |= PluginReportLevel::Warning; + self.messages.push(warning.into()); + } + pub fn add_info>>(&mut self, message: S) { + self.level |= PluginReportLevel::Info; + self.messages.push(message.into()); + } + pub fn messages(&self) -> &[Cow<'static, str>] { + &self.messages + } +} + +pub trait PluginConditionSetter { + fn add_error(self, report: &mut PluginReport) -> Self; + fn add_warning(self, report: &mut PluginReport) -> Self; + fn add_info(self, report: &mut PluginReport) -> Self; +} + +impl PluginConditionSetter for core::result::Result { + fn add_error(self, report: &mut PluginReport) -> Self { + if let Err(e) = &self { + report.add_error(e.to_string()); + } + self + } + fn add_warning(self, report: &mut PluginReport) -> Self { + if let Err(e) = &self { + report.add_warning(e.to_string()); + } + self + } + fn add_info(self, report: &mut PluginReport) -> Self { + if let Err(e) = &self { + report.add_info(e.to_string()); + } + self + } +} diff --git a/plugins/zenoh-plugin-trait/src/vtable.rs b/plugins/zenoh-plugin-trait/src/vtable.rs index 6bb3eb7dd..e1108f87f 100644 --- a/plugins/zenoh-plugin-trait/src/vtable.rs +++ b/plugins/zenoh-plugin-trait/src/vtable.rs @@ -11,106 +11,76 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::*; -pub use no_mangle::*; use zenoh_result::ZResult; -pub type PluginVTableVersion = u16; -type LoadPluginResultInner = Result, PluginVTableVersion>; -pub type LoadPluginResult = Result, PluginVTableVersion>; +use crate::{Plugin, StructVersion, FEATURES}; -/// This number should change any time the internal structure of [`PluginVTable`] changes -pub const PLUGIN_VTABLE_VERSION: PluginVTableVersion = 1; +pub type PluginLoaderVersion = u64; +pub const PLUGIN_LOADER_VERSION: PluginLoaderVersion = 1; -type StartFn = fn(&str, &StartArgs) -> ZResult; +type StartFn = fn(&str, &StartArgs) -> ZResult; #[repr(C)] -struct PluginVTableInner { - start: StartFn, - compatibility: fn() -> ZResult, +pub struct PluginVTable { + pub plugin_version: &'static str, + pub plugin_long_version: &'static str, + pub start: StartFn, } - -/// Automagical padding such that [PluginVTable::init]'s result is the size of a cache line -#[repr(C)] -struct PluginVTablePadding { - __padding: [u8; PluginVTablePadding::padding_length()], -} -impl PluginVTablePadding { - const fn padding_length() -> usize { - 64 - std::mem::size_of::() +impl StructVersion for PluginVTable { + fn struct_version() -> u64 { + 1 } - fn new() -> Self { - PluginVTablePadding { - __padding: [0; Self::padding_length()], - } + fn struct_features() -> &'static str { + FEATURES } } -/// For use with dynamically loaded plugins. Its size will not change accross versions, but its internal structure might. -/// -/// To ensure compatibility, its size and alignment must allow `size_of::>() == 64` (one cache line). -#[repr(C)] -pub struct PluginVTable { - inner: PluginVTableInner, - padding: PluginVTablePadding, -} - -impl PluginVTable { - pub fn new>( - ) -> Self { - PluginVTable { - inner: PluginVTableInner { - start: ConcretePlugin::start, - compatibility: ConcretePlugin::compatibility, - }, - padding: PluginVTablePadding::new(), +impl PluginVTable { + pub fn new>() -> Self { + Self { + plugin_version: ConcretePlugin::PLUGIN_VERSION, + plugin_long_version: ConcretePlugin::PLUGIN_LONG_VERSION, + start: ConcretePlugin::start, } } +} - /// Ensures [PluginVTable]'s size stays the same between versions - fn __size_check() { - unsafe { - std::mem::transmute::<_, [u8; 64]>(std::mem::MaybeUninit::< - Result, - >::uninit()) - }; - } +/// This macro adds non-mangled functions which provides plugin version and loads it into the host. +/// If plugin library should work also as static, consider calling this macro under feature condition +/// +/// The funcitons declared by this macro are: +/// +/// - `get_plugin_loader_version` - returns `PLUGIN_LOADER_VERSION` const of the crate. The [`PluginsManager`](crate::manager::PluginsManager) +/// will check if this version is compatible with the host. +/// - `get_compatibility` - returns [`Compatibility`](crate::Compatibility) struct which contains all version information (Rust compiler version, features used, version of plugin's structures). +/// The layout of this structure is guaranteed to be stable until the [`PLUGIN_LOADER_VERSION`](crate::PLUGIN_LOADER_VERSION) is changed, +/// so it's safe to use it in the host after call to `get_plugin_loader_version` returns compatible version. +/// Then the [`PluginsManager`](crate::manager::PluginsManager) compares the returned [`Compatibility`](crate::Compatibility) with it's own and decides if it can continue loading the plugin. +/// - `load_plugin` - returns [`PluginVTable`](crate::PluginVTable) which is able to create plugin's instance. +/// +#[macro_export] +macro_rules! declare_plugin { + ($ty: path) => { + #[no_mangle] + fn get_plugin_loader_version() -> $crate::PluginLoaderVersion { + $crate::PLUGIN_LOADER_VERSION + } - pub fn start(&self, name: &str, start_args: &StartArgs) -> ZResult { - (self.inner.start)(name, start_args) - } - pub fn compatibility(&self) -> ZResult { - (self.inner.compatibility)() - } -} + #[no_mangle] + fn get_compatibility() -> $crate::Compatibility { + $crate::Compatibility::with_plugin_version::< + <$ty as $crate::Plugin>::StartArgs, + <$ty as $crate::Plugin>::Instance, + $ty, + >() + } -pub use no_mangle::*; -#[cfg(feature = "no_mangle")] -pub mod no_mangle { - /// This macro will add a non-mangled `load_plugin` function to the library if feature `no_mangle` is enabled (which it is by default). - #[macro_export] - macro_rules! declare_plugin { - ($ty: path) => { - #[no_mangle] - fn load_plugin( - version: $crate::prelude::PluginVTableVersion, - ) -> $crate::prelude::LoadPluginResult< - <$ty as $crate::prelude::Plugin>::StartArgs, - <$ty as $crate::prelude::Plugin>::RunningPlugin, - > { - if version == $crate::prelude::PLUGIN_VTABLE_VERSION { - Ok($crate::prelude::PluginVTable::new::<$ty>()) - } else { - Err($crate::prelude::PLUGIN_VTABLE_VERSION) - } - } - }; - } -} -#[cfg(not(feature = "no_mangle"))] -pub mod no_mangle { - #[macro_export] - macro_rules! declare_plugin { - ($ty: path) => {}; - } + #[no_mangle] + fn load_plugin() -> $crate::PluginVTable< + <$ty as $crate::Plugin>::StartArgs, + <$ty as $crate::Plugin>::Instance, + > { + $crate::PluginVTable::new::<$ty>() + } + }; } diff --git a/zenoh/src/info.rs b/zenoh/src/info.rs index 28579b3d6..e299d4aee 100644 --- a/zenoh/src/info.rs +++ b/zenoh/src/info.rs @@ -42,7 +42,7 @@ impl<'a> Resolvable for ZidBuilder<'a> { impl<'a> SyncResolve for ZidBuilder<'a> { fn res_sync(self) -> Self::To { - self.session.runtime.zid + self.session.runtime.zid() } } diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index a29d4b5d4..0883041bb 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -185,11 +185,14 @@ impl<'a> Liveliness<'a> { >>::Error: Into, { let key_expr = key_expr.try_into().map_err(Into::into); - let conf = self.session.runtime.config.lock(); + let timeout = { + let conf = self.session.runtime.config().lock(); + Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) + }; LivelinessGetBuilder { session: &self.session, key_expr, - timeout: Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())), + timeout, handler: DefaultHandler, } } diff --git a/zenoh/src/net/routing/face.rs b/zenoh/src/net/routing/face.rs index 0d2ee926d..90946527e 100644 --- a/zenoh/src/net/routing/face.rs +++ b/zenoh/src/net/routing/face.rs @@ -31,6 +31,7 @@ pub struct FaceState { pub(super) id: usize, pub(super) zid: ZenohId, pub(super) whatami: WhatAmI, + #[allow(dead_code)] pub(super) local: bool, #[cfg(feature = "stats")] pub(super) stats: Option>, @@ -80,6 +81,7 @@ impl FaceState { }) } + #[allow(dead_code)] #[inline] pub fn is_local(&self) -> bool { self.local diff --git a/zenoh/src/net/routing/network.rs b/zenoh/src/net/routing/network.rs index 0fb9f3612..627a47252 100644 --- a/zenoh/src/net/routing/network.rs +++ b/zenoh/src/net/routing/network.rs @@ -134,7 +134,7 @@ impl Network { log::debug!("{} Add node (self) {}", name, zid); let idx = graph.add_node(Node { zid, - whatami: Some(runtime.whatami), + whatami: Some(runtime.whatami()), locators: None, sn: 1, links: vec![], diff --git a/zenoh/src/net/routing/resource.rs b/zenoh/src/net/routing/resource.rs index e26a9217f..00189b85c 100644 --- a/zenoh/src/net/routing/resource.rs +++ b/zenoh/src/net/routing/resource.rs @@ -363,6 +363,7 @@ impl Resource { } } + #[cfg(test)] pub fn print_tree(from: &Arc) -> String { let mut result = from.expr(); result.push('\n'); diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 1ad5d9360..933bd7339 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -142,6 +142,7 @@ impl Tables { &self.root_res } + #[cfg(test)] pub fn print(&self) -> String { Resource::print_tree(&self.root_res) } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 96ea85f6b..b10977405 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -13,7 +13,7 @@ use super::routing::face::Face; use super::Runtime; use crate::key_expr::KeyExpr; -use crate::plugins::sealed as plugins; +use crate::plugins::sealed::{self as plugins}; use crate::prelude::sync::{Sample, SyncResolve}; use crate::queryable::Query; use crate::queryable::QueryInner; @@ -27,7 +27,9 @@ use std::convert::TryInto; use std::sync::Arc; use std::sync::Mutex; use zenoh_buffers::buffer::SplitBuffer; -use zenoh_config::ValidatedMap; +use zenoh_config::{ConfigValidator, ValidatedMap}; +use zenoh_plugin_trait::{PluginControl, PluginStatus}; +use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, ExprId, KnownEncoding, WireExpr, ZenohId, EMPTY_EXPR_ID}, network::{ @@ -64,10 +66,67 @@ enum PluginDiff { Start(crate::config::PluginLoad), } +impl ConfigValidator for AdminSpace { + fn check_config( + &self, + name: &str, + path: &str, + current: &serde_json::Map, + new: &serde_json::Map, + ) -> ZResult>> { + let plugins_mgr = zlock!(self.context.plugins_mgr); + let plugin = plugins_mgr.started_plugin(name).ok_or(format!( + "Plugin `{}` is not running, but its configuration is being changed", + name + ))?; + plugin.instance().config_checker(path, current, new) + } +} + impl AdminSpace { + fn start_plugin( + plugin_mgr: &mut plugins::PluginsManager, + config: &crate::config::PluginLoad, + start_args: &Runtime, + ) -> ZResult<()> { + let name = &config.name; + let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { + log::warn!("Plugin `{}` was already declared", declared.name()); + declared + } else if let Some(paths) = &config.paths { + plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)? + } else { + plugin_mgr.declare_dynamic_plugin_by_name(name, name)? + }; + + let loaded = if let Some(loaded) = declared.loaded_mut() { + log::warn!( + "Plugin `{}` was already loaded from {}", + loaded.name(), + loaded.path() + ); + loaded + } else { + declared.load()? + }; + + if let Some(started) = loaded.started_mut() { + log::warn!("Plugin `{}` was already started", started.name()); + } else { + let started = loaded.start(start_args)?; + log::info!( + "Successfully started plugin `{}` from {}", + started.name(), + started.path() + ); + }; + + Ok(()) + } + pub async fn start(runtime: &Runtime, plugins_mgr: plugins::PluginsManager, version: String) { - let zid_str = runtime.zid.to_string(); - let metadata = runtime.metadata.clone(); + let zid_str = runtime.state.zid.to_string(); + let metadata = runtime.state.metadata.clone(); let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); let mut handlers: HashMap<_, Handler> = HashMap::new(); @@ -100,6 +159,10 @@ impl AdminSpace { .unwrap(), Arc::new(queryables_data), ); + handlers.insert( + format!("@/router/{zid_str}/plugins/**").try_into().unwrap(), + Arc::new(plugins_data), + ); handlers.insert( format!("@/router/{zid_str}/status/plugins/**") .try_into() @@ -108,9 +171,8 @@ impl AdminSpace { ); let mut active_plugins = plugins_mgr - .running_plugins_info() - .into_iter() - .map(|(a, b)| (a.to_string(), b.to_string())) + .started_plugins_iter() + .map(|rec| (rec.name().to_string(), rec.path().to_string())) .collect::>(); let context = Arc::new(AdminContext { @@ -121,14 +183,22 @@ impl AdminSpace { metadata, }); let admin = Arc::new(AdminSpace { - zid: runtime.zid, + zid: runtime.zid(), primitives: Mutex::new(None), mappings: Mutex::new(HashMap::new()), handlers, context, }); - let cfg_rx = admin.context.runtime.config.subscribe(); + admin + .context + .runtime + .state + .config + .lock() + .set_plugin_validator(Arc::downgrade(&admin)); + + let cfg_rx = admin.context.runtime.state.config.subscribe(); task::spawn({ let admin = admin.clone(); async move { @@ -139,7 +209,7 @@ impl AdminSpace { } let requested_plugins = { - let cfg_guard = admin.context.runtime.config.lock(); + let cfg_guard = admin.context.runtime.state.config.lock(); cfg_guard.plugins().load_requests().collect::>() }; let mut diffs = Vec::new(); @@ -165,63 +235,37 @@ impl AdminSpace { let mut plugins_mgr = zlock!(admin.context.plugins_mgr); for diff in diffs { match diff { - PluginDiff::Delete(plugin) => { - active_plugins.remove(plugin.as_str()); - plugins_mgr.stop(&plugin); + PluginDiff::Delete(name) => { + active_plugins.remove(name.as_str()); + if let Some(running) = plugins_mgr.started_plugin_mut(&name) { + running.stop() + } } PluginDiff::Start(plugin) => { - let load = match &plugin.paths { - Some(paths) => { - plugins_mgr.load_plugin_by_paths(plugin.name.clone(), paths) - } - None => plugins_mgr.load_plugin_by_name(plugin.name.clone()), - }; - match load { - Err(e) => { - if plugin.required { - panic!("Failed to load plugin `{}`: {}", plugin.name, e) - } else { - log::error!( - "Failed to load plugin `{}`: {}", - plugin.name, - e - ) - } - } - Ok(path) => { - let name = &plugin.name; - log::info!("Loaded plugin `{}` from {}", name, &path); - match plugins_mgr.start(name, &admin.context.runtime) { - Ok(Some((path, plugin))) => { - active_plugins.insert(name.into(), path.into()); - let mut cfg_guard = - admin.context.runtime.config.lock(); - cfg_guard.add_plugin_validator( - name, - plugin.config_checker(), - ); - log::info!( - "Successfully started plugin `{}` from {}", - name, - path - ); - } - Ok(None) => { - log::warn!("Plugin `{}` was already running", name) - } - Err(e) => log::error!("{}", e), - } + if let Err(e) = Self::start_plugin( + &mut plugins_mgr, + &plugin, + &admin.context.runtime, + ) { + if plugin.required { + panic!("Failed to load plugin `{}`: {}", plugin.name, e) + } else { + log::error!( + "Failed to load plugin `{}`: {}", + plugin.name, + e + ) } } } } } - log::info!("Running plugins: {:?}", &active_plugins) } + log::info!("Running plugins: {:?}", &active_plugins) } }); - let primitives = runtime.router.new_primitives(admin.clone()); + let primitives = runtime.state.router.new_primitives(admin.clone()); zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { @@ -283,7 +327,7 @@ impl Primitives for AdminSpace { fn send_push(&self, msg: Push) { trace!("recv Push {:?}", msg); { - let conf = self.context.runtime.config.lock(); + let conf = self.context.runtime.state.config.lock(); if !conf.adminspace.permissions().write { log::error!( "Received PUT on '{}' but adminspace.permissions.write=false in configuration", @@ -307,7 +351,8 @@ impl Primitives for AdminSpace { key, json ); - if let Err(e) = (&self.context.runtime.config).insert_json5(key, json) { + if let Err(e) = (&self.context.runtime.state.config).insert_json5(key, json) + { error!( "Error inserting conf value /@/router/{}/config/{} : {} - {}", &self.context.zid_str, key, json, e @@ -325,7 +370,7 @@ impl Primitives for AdminSpace { &self.context.zid_str, key ); - if let Err(e) = self.context.runtime.config.remove(key) { + if let Err(e) = self.context.runtime.state.config.remove(key) { log::error!("Error deleting conf value {} : {}", msg.wire_expr, e) } } @@ -338,7 +383,7 @@ impl Primitives for AdminSpace { if let RequestBody::Query(query) = msg.payload { let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); { - let conf = self.context.runtime.config.lock(); + let conf = self.context.runtime.state.config.lock(); if !conf.adminspace.permissions().read { log::error!( "Received GET on '{}' but adminspace.permissions.read=false in configuration", @@ -411,10 +456,10 @@ fn router_data(context: &AdminContext, query: Query) { // plugins info let plugins: serde_json::Value = { - zlock!(context.plugins_mgr) - .running_plugins_info() - .into_iter() - .map(|(k, v)| (k, json!({ "path": v }))) + let plugins_mgr = zlock!(context.plugins_mgr); + plugins_mgr + .started_plugins_iter() + .map(|rec| (rec.name(), json!({ "path": rec.path() }))) .collect() }; @@ -530,7 +575,7 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) { .try_into() .unwrap(); - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); if let Err(e) = query .reply(Ok(Sample::new( @@ -557,7 +602,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) { .try_into() .unwrap(); - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); if let Err(e) = query .reply(Ok(Sample::new( @@ -580,7 +625,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) { } fn subscribers_data(context: &AdminContext, query: Query) { - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); for sub in tables.router_subs.iter() { let key = KeyExpr::try_from(format!( "@/router/{}/subscriber/{}", @@ -597,7 +642,7 @@ fn subscribers_data(context: &AdminContext, query: Query) { } fn queryables_data(context: &AdminContext, query: Query) { - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); for qabl in tables.router_qabls.iter() { let key = KeyExpr::try_from(format!( "@/router/{}/queryable/{}", @@ -613,20 +658,39 @@ fn queryables_data(context: &AdminContext, query: Query) { } } +fn plugins_data(context: &AdminContext, query: Query) { + let guard = zlock!(context.plugins_mgr); + let root_key = format!("@/router/{}/plugins", &context.zid_str); + let root_key = unsafe { keyexpr::from_str_unchecked(&root_key) }; + log::debug!("requested plugins status {:?}", query.key_expr()); + if let [names, ..] = query.key_expr().strip_prefix(root_key)[..] { + let statuses = guard.plugins_status(names); + for status in statuses { + log::debug!("plugin status: {:?}", status); + let key = root_key.join(status.name()).unwrap(); + let status = serde_json::to_value(status).unwrap(); + if let Err(e) = query.reply(Ok(Sample::new(key, Value::from(status)))).res() { + log::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } +} + fn plugins_status(context: &AdminContext, query: Query) { let selector = query.selector(); let guard = zlock!(context.plugins_mgr); let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str); - for (name, (path, plugin)) in guard.running_plugins() { - with_extended_string(&mut root_key, &[name], |plugin_key| { + for plugin in guard.started_plugins_iter() { + with_extended_string(&mut root_key, &[plugin.name()], |plugin_key| { + // TODO: response to "__version__", this need not to be implemented by each plugin with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| { if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) { if query.key_expr().intersects(&key_expr) { if let Err(e) = query .reply(Ok(Sample::new( key_expr, - Value::from(path).encoding(KnownEncoding::AppJson.into()), + Value::from(plugin.path()).encoding(KnownEncoding::AppJson.into()), ))) .res() { @@ -646,7 +710,7 @@ fn plugins_status(context: &AdminContext, query: Query) { return; } match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - plugin.adminspace_getter(&selector, plugin_key) + plugin.instance().adminspace_getter(&selector, plugin_key) })) { Ok(Ok(responses)) => { for response in responses { @@ -665,15 +729,15 @@ fn plugins_status(context: &AdminContext, query: Query) { } } Ok(Err(e)) => { - log::error!("Plugin {} bailed from responding to {}: {}", name, query.key_expr(), e) + log::error!("Plugin {} bailed from responding to {}: {}", plugin.name(), query.key_expr(), e) } Err(e) => match e .downcast_ref::() .map(|s| s.as_str()) .or_else(|| e.downcast_ref::<&str>().copied()) { - Some(e) => log::error!("Plugin {} panicked while responding to {}: {}", name, query.key_expr(), e), - None => log::error!("Plugin {} panicked while responding to {}. The panic message couldn't be recovered.", name, query.key_expr()), + Some(e) => log::error!("Plugin {} panicked while responding to {}: {}", plugin.name(), query.key_expr(), e), + None => log::error!("Plugin {} panicked while responding to {}. The panic message couldn't be recovered.", plugin.name(), query.key_expr()), }, } }); diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 5599d1ed1..8eb57699b 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -37,6 +37,7 @@ use stop_token::future::FutureExt; use stop_token::{StopSource, TimedOutError}; use uhlc::{HLCBuilder, HLC}; use zenoh_link::{EndPoint, Link}; +use zenoh_plugin_trait::{PluginStartArgs, StructVersion}; use zenoh_protocol::core::{whatami::WhatAmIMatcher, Locator, WhatAmI, ZenohId}; use zenoh_protocol::network::{NetworkBody, NetworkMessage}; use zenoh_result::{bail, ZResult}; @@ -47,17 +48,17 @@ use zenoh_transport::{ TransportPeerEventHandler, }; -pub struct RuntimeState { - pub zid: ZenohId, - pub whatami: WhatAmI, - pub metadata: serde_json::Value, - pub router: Arc, - pub config: Notifier, - pub manager: TransportManager, - pub transport_handlers: std::sync::RwLock>>, - pub(crate) locators: std::sync::RwLock>, - pub hlc: Option>, - pub(crate) stop_source: std::sync::RwLock>, +struct RuntimeState { + zid: ZenohId, + whatami: WhatAmI, + metadata: serde_json::Value, + router: Arc, + config: Notifier, + manager: TransportManager, + transport_handlers: std::sync::RwLock>>, + locators: std::sync::RwLock>, + hlc: Option>, + stop_source: std::sync::RwLock>, } #[derive(Clone)] @@ -65,14 +66,17 @@ pub struct Runtime { state: Arc, } -impl std::ops::Deref for Runtime { - type Target = RuntimeState; - - fn deref(&self) -> &RuntimeState { - self.state.deref() +impl StructVersion for Runtime { + fn struct_version() -> u64 { + 1 + } + fn struct_features() -> &'static str { + crate::FEATURES } } +impl PluginStartArgs for Runtime {} + impl Runtime { pub async fn new(config: Config) -> ZResult { let mut runtime = Runtime::init(config).await?; @@ -149,7 +153,7 @@ impl Runtime { }), }; *handler.runtime.write().unwrap() = Some(runtime.clone()); - get_mut_unchecked(&mut runtime.router.clone()).init_link_state( + get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state( runtime.clone(), router_link_state, peer_link_state, @@ -179,7 +183,7 @@ impl Runtime { #[inline(always)] pub fn manager(&self) -> &TransportManager { - &self.manager + &self.state.manager } pub fn new_handler(&self, handler: Arc) { @@ -188,17 +192,17 @@ impl Runtime { pub async fn close(&self) -> ZResult<()> { log::trace!("Runtime::close())"); - drop(self.stop_source.write().unwrap().take()); + drop(self.state.stop_source.write().unwrap().take()); self.manager().close().await; Ok(()) } pub fn new_timestamp(&self) -> Option { - self.hlc.as_ref().map(|hlc| hlc.new_timestamp()) + self.state.hlc.as_ref().map(|hlc| hlc.new_timestamp()) } pub fn get_locators(&self) -> Vec { - self.locators.read().unwrap().clone() + self.state.locators.read().unwrap().clone() } pub(crate) fn spawn(&self, future: F) -> Option>> @@ -206,12 +210,33 @@ impl Runtime { F: Future + Send + 'static, T: Send + 'static, { - self.stop_source + self.state + .stop_source .read() .unwrap() .as_ref() .map(|source| async_std::task::spawn(future.timeout_at(source.token()))) } + + pub(crate) fn router(&self) -> Arc { + self.state.router.clone() + } + + pub fn config(&self) -> &Notifier { + &self.state.config + } + + pub fn hlc(&self) -> Option<&HLC> { + self.state.hlc.as_ref().map(Arc::as_ref) + } + + pub fn zid(&self) -> ZenohId { + self.state.zid + } + + pub fn whatami(&self) -> WhatAmI { + self.state.whatami + } } struct RuntimeTransportEventHandler { @@ -227,7 +252,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { match zread!(self.runtime).as_ref() { Some(runtime) => { let slave_handlers: Vec> = - zread!(runtime.transport_handlers) + zread!(runtime.state.transport_handlers) .iter() .filter_map(|handler| { handler.new_unicast(peer.clone(), transport.clone()).ok() @@ -236,7 +261,11 @@ impl TransportEventHandler for RuntimeTransportEventHandler { Ok(Arc::new(RuntimeSession { runtime: runtime.clone(), endpoint: std::sync::RwLock::new(None), - main_handler: runtime.router.new_transport_unicast(transport).unwrap(), + main_handler: runtime + .state + .router + .new_transport_unicast(transport) + .unwrap(), slave_handlers, })) } @@ -251,11 +280,14 @@ impl TransportEventHandler for RuntimeTransportEventHandler { match zread!(self.runtime).as_ref() { Some(runtime) => { let slave_handlers: Vec> = - zread!(runtime.transport_handlers) + zread!(runtime.state.transport_handlers) .iter() .filter_map(|handler| handler.new_multicast(transport.clone()).ok()) .collect(); - runtime.router.new_transport_multicast(transport.clone())?; + runtime + .state + .router + .new_transport_multicast(transport.clone())?; Ok(Arc::new(RuntimeMuticastGroup { runtime: runtime.clone(), transport, @@ -344,6 +376,7 @@ impl TransportMulticastEventHandler for RuntimeMuticastGroup { Ok(Arc::new(RuntimeMuticastSession { main_handler: self .runtime + .state .router .new_peer_multicast(self.transport.clone(), peer)?, slave_handlers, diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index ccd2e68f6..a1a2c8db4 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -47,7 +47,7 @@ pub enum Loop { impl Runtime { pub(crate) async fn start(&mut self) -> ZResult<()> { - match self.whatami { + match self.whatami() { WhatAmI::Client => self.start_client().await, WhatAmI::Peer => self.start_peer().await, WhatAmI::Router => self.start_router().await, @@ -56,7 +56,7 @@ impl Runtime { async fn start_client(&self) -> ZResult<()> { let (peers, scouting, addr, ifaces, timeout) = { - let guard = self.config.lock(); + let guard = self.state.config.lock(); ( guard.connect().endpoints().clone(), unwrap_or_default!(guard.scouting().multicast().enabled()), @@ -110,12 +110,13 @@ impl Runtime { async fn start_peer(&self) -> ZResult<()> { let (listeners, peers, scouting, listen, autoconnect, addr, ifaces, delay) = { - let guard = &self.config.lock(); + let guard = &self.state.config.lock(); let listeners = if guard.listen().endpoints().is_empty() { let endpoint: EndPoint = PEER_DEFAULT_LISTENER.parse().unwrap(); let protocol = endpoint.protocol(); let mut listeners = vec![]; if self + .state .manager .config .protocols @@ -155,12 +156,13 @@ impl Runtime { async fn start_router(&self) -> ZResult<()> { let (listeners, peers, scouting, listen, autoconnect, addr, ifaces) = { - let guard = self.config.lock(); + let guard = self.state.config.lock(); let listeners = if guard.listen().endpoints().is_empty() { let endpoint: EndPoint = ROUTER_DEFAULT_LISTENER.parse().unwrap(); let protocol = endpoint.protocol(); let mut listeners = vec![]; if self + .state .manager .config .protocols @@ -241,10 +243,10 @@ impl Runtime { } pub(crate) async fn update_peers(&self) -> ZResult<()> { - let peers = { self.config.lock().connect().endpoints().clone() }; + let peers = { self.state.config.lock().connect().endpoints().clone() }; let tranports = self.manager().get_transports_unicast().await; - if self.whatami == WhatAmI::Client { + if self.state.whatami == WhatAmI::Client { for transport in tranports { let should_close = if let Ok(Some(orch_transport)) = transport.get_callback() { if let Some(orch_transport) = orch_transport @@ -301,7 +303,7 @@ impl Runtime { } } - let mut locators = self.locators.write().unwrap(); + let mut locators = self.state.locators.write().unwrap(); *locators = self.manager().get_locators(); for locator in &*locators { log::info!("Zenoh can be reached at: {}", locator); @@ -771,7 +773,7 @@ impl Runtime { if let Ok(msg) = res { log::trace!("Received {:?} from {}", msg.body, peer); if let ScoutingBody::Scout(Scout { what, .. }) = &msg.body { - if what.matches(self.whatami) { + if what.matches(self.whatami()) { let mut wbuf = vec![]; let mut writer = wbuf.writer(); let codec = Zenoh080::new(); @@ -779,7 +781,7 @@ impl Runtime { let zid = self.manager().zid(); let hello: ScoutingMessage = Hello { version: zenoh_protocol::VERSION, - whatami: self.whatami, + whatami: self.whatami(), zid, locators: self.get_locators(), } @@ -811,7 +813,7 @@ impl Runtime { } pub(super) fn closing_session(session: &RuntimeSession) { - match session.runtime.whatami { + match session.runtime.whatami() { WhatAmI::Client => { let runtime = session.runtime.clone(); session.runtime.spawn(async move { @@ -827,7 +829,16 @@ impl Runtime { } _ => { if let Some(endpoint) = &*zread!(session.endpoint) { - let peers = { session.runtime.config.lock().connect().endpoints().clone() }; + let peers = { + session + .runtime + .state + .config + .lock() + .connect() + .endpoints() + .clone() + }; if peers.contains(endpoint) { let endpoint = endpoint.clone(); let runtime = session.runtime.clone(); diff --git a/zenoh/src/plugins/sealed.rs b/zenoh/src/plugins/sealed.rs index 6caabca99..28c66d83b 100644 --- a/zenoh/src/plugins/sealed.rs +++ b/zenoh/src/plugins/sealed.rs @@ -14,22 +14,44 @@ //! `zenohd`'s plugin system. For more details, consult the [detailed documentation](https://github.com/eclipse-zenoh/roadmap/blob/main/rfcs/ALL/Plugins/Zenoh%20Plugins.md). -use crate::prelude::Selector; -pub use crate::runtime::Runtime; -pub use crate::Result as ZResult; +use crate::{prelude::Selector, runtime::Runtime}; use zenoh_core::zconfigurable; +use zenoh_plugin_trait::{ + Plugin, PluginControl, PluginInstance, PluginReport, PluginStatusRec, StructVersion, +}; +use zenoh_protocol::core::key_expr::keyexpr; +use zenoh_result::ZResult; + zconfigurable! { pub static ref PLUGIN_PREFIX: String = "zenoh_plugin_".to_string(); } +/// A zenoh plugin, when started, must return this type. +pub type RunningPlugin = Box; /// Zenoh plugins should implement this trait to ensure type-safety, even if the starting arguments and expected plugin types change in a future release. -pub trait ZenohPlugin: Plugin {} +pub trait ZenohPlugin: Plugin {} -/// A zenoh plugin receives a reference to a value of this type when started. -pub type StartArgs = Runtime; -/// A zenoh plugin, when started, must return this type. -pub type RunningPlugin = Box; +impl StructVersion for RunningPlugin { + fn struct_version() -> u64 { + 1 + } + fn struct_features() -> &'static str { + crate::FEATURES + } +} + +impl PluginControl for RunningPlugin { + fn report(&self) -> PluginReport { + self.as_ref().report() + } + + fn plugins_status(&self, names: &keyexpr) -> Vec { + self.as_ref().plugins_status(names) + } +} + +impl PluginInstance for RunningPlugin {} #[non_exhaustive] #[derive(serde::Serialize, Debug, Clone)] @@ -45,8 +67,8 @@ impl Response { } } -pub trait RunningPluginTrait: Send + Sync + std::any::Any { - /// Returns a function that will be called when configuration relevant to the plugin is about to change. +pub trait RunningPluginTrait: Send + Sync + PluginControl { + /// Function that will be called when configuration relevant to the plugin is about to change. /// /// This function is called with 3 arguments: /// * `path`, the relative path from the plugin's configuration root to the changed value. @@ -58,25 +80,44 @@ pub trait RunningPluginTrait: Send + Sync + std::any::Any { /// Useful when the changes affect settings that aren't hot-configurable for your plugin. /// * `Ok(None)` indicates that the plugin has accepted the configuration change. /// * `Ok(Some(value))` indicates that the plugin would rather the new configuration be `value`. - fn config_checker(&self) -> ValidationFunction; - /// Used to request your plugin's status for the administration space. + fn config_checker( + &self, + _path: &str, + _current: &serde_json::Map, + _new: &serde_json::Map, + ) -> ZResult>> { + bail!("Runtime configuration change not supported"); + } + /// Used to request plugin's status for the administration space. + /// Function called on any query on admin space that matches this plugin's sub-part of the admin space. + /// Thus the plugin can reply its contribution to the global admin space of this zenohd. + /// Parameters: + /// * `selector`: the full selector of the query (usually only key_expr part is used). This selector is + /// exactly the same as it was requested by user, for example "@/router/ROUTER_ID/plugins/PLUGIN_NAME/some/plugin/info" or "@/router/*/plugins/*/foo/bar". + /// But the plugin's [adminspace_getter] is called only if the selector matches the [plugin_status_key] + /// * `plugin_status_key`: the actual path to plugin's status in the admin space. For example "@/router/ROUTER_ID/plugins/PLUGIN_NAME" + /// Returns value: + /// * `Ok(Vec)`: the list of responses to the query. For example if plugins can return information on subleys "foo", "bar", "foo/buzz" and "bar/buzz" + /// and it's requested with the query "@/router/ROUTER_ID/plugins/PLUGIN_NAME/*", it should return only information on "foo" and "bar" subkeys, but not on "foo/buzz" and "bar/buzz" + /// as they doesn't match the query. + /// * `Err(ZError)`: Problem occured when processing the query. + /// + /// If plugin implements subplugins (as the storage plugin), then it should also reply with information about its subplugins with the same rules. + /// + /// TODO: + /// * add example + /// * rework the admin space: rework "with_extented_string" function, provide it as utility for plugins + /// * reorder paramaters: plugin_status_key should be first as it describes the root of pluginb's admin space + /// * Instead of ZResult return just Vec. Check, do we really need ZResult? If yes, make it separate for each status record. + /// fn adminspace_getter<'a>( &'a self, - selector: &'a Selector<'a>, - plugin_status_key: &str, - ) -> ZResult>; + _selector: &'a Selector<'a>, + _plugin_status_key: &str, + ) -> ZResult> { + Ok(Vec::new()) + } } /// The zenoh plugins manager. It handles the full lifetime of plugins, from loading to destruction. -pub type PluginsManager = zenoh_plugin_trait::loading::PluginsManager; - -pub use zenoh_plugin_trait::Plugin; -pub type ValidationFunction = std::sync::Arc< - dyn Fn( - &str, - &serde_json::Map, - &serde_json::Map, - ) -> ZResult>> - + Send - + Sync, ->; +pub type PluginsManager = zenoh_plugin_trait::PluginsManager; diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 6609d1361..7897b293f 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -339,7 +339,7 @@ impl Session { aggregated_publishers: Vec, ) -> impl Resolve { ResolveClosure::new(move || { - let router = runtime.router.clone(); + let router = runtime.router(); let state = Arc::new(RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, @@ -432,7 +432,7 @@ impl Session { } pub fn hlc(&self) -> Option<&HLC> { - self.runtime.hlc.as_ref().map(Arc::as_ref) + self.runtime.hlc() } /// Close the zenoh [`Session`](Session). @@ -497,7 +497,7 @@ impl Session { /// # }) /// ``` pub fn config(&self) -> &Notifier { - &self.runtime.config + self.runtime.config() } /// Get informations about the zenoh [`Session`](Session). @@ -790,7 +790,10 @@ impl Session { >>::Error: Into, { let selector = selector.try_into().map_err(Into::into); - let conf = self.runtime.config.lock(); + let timeout = { + let conf = self.runtime.config().lock(); + Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) + }; GetBuilder { session: self, selector, @@ -798,7 +801,7 @@ impl Session { target: QueryTarget::default(), consolidation: QueryConsolidation::default(), destination: Locality::default(), - timeout: Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())), + timeout, value: None, #[cfg(feature = "unstable")] attachment: None, @@ -1496,7 +1499,8 @@ impl Session { ) -> ZResult { use crate::net::routing::router::RoutingExpr; use zenoh_protocol::core::WhatAmI; - let tables = zread!(self.runtime.router.tables.tables); + let router = self.runtime.router(); + let tables = zread!(router.tables.tables); let res = crate::net::routing::resource::Resource::get_resource( &tables.root_res, key_expr.as_str(), @@ -1789,7 +1793,7 @@ impl Session { }; task::spawn({ let state = self.state.clone(); - let zid = self.runtime.zid; + let zid = self.runtime.zid(); async move { task::sleep(timeout).await; let mut state = zwrite!(state); @@ -1942,7 +1946,7 @@ impl Session { let parameters = parameters.to_owned(); - let zid = self.runtime.zid; // @TODO build/use prebuilt specific zid + let zid = self.runtime.zid(); // @TODO build/use prebuilt specific zid let query = Query { inner: Arc::new(QueryInner { diff --git a/zenohd/Cargo.toml b/zenohd/Cargo.toml index e3177a565..12e381128 100644 --- a/zenohd/Cargo.toml +++ b/zenohd/Cargo.toml @@ -39,6 +39,7 @@ json5 = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } zenoh = { workspace = true, features = ["unstable"] } +zenoh_backend_traits = { workspace = true } [dev-dependencies] rand = { workspace = true, features = ["default"] } diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index c7e3f7b3d..b0d29ea89 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -20,6 +20,7 @@ use zenoh::config::{Config, ModeDependentValue, PermissionsConf, PluginLoad, Val use zenoh::plugins::PluginsManager; use zenoh::prelude::{EndPoint, WhatAmI}; use zenoh::runtime::{AdminSpace, Runtime}; +use zenoh::Result; const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v"); @@ -79,6 +80,32 @@ struct Args { adminspace_permissions: Option, } +fn load_plugin( + plugin_mgr: &mut PluginsManager, + name: &str, + paths: &Option>, +) -> Result<()> { + let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { + log::warn!("Plugin `{}` was already declared", declared.name()); + declared + } else if let Some(paths) = paths { + plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)? + } else { + plugin_mgr.declare_dynamic_plugin_by_name(name, name)? + }; + + if let Some(loaded) = declared.loaded_mut() { + log::warn!( + "Plugin `{}` was already loaded from {}", + loaded.name(), + loaded.path() + ); + } else { + let _ = declared.load()?; + }; + Ok(()) +} + fn main() { task::block_on(async { let mut log_builder = @@ -94,7 +121,7 @@ fn main() { let config = config_from_args(&args); log::info!("Initial conf: {}", &config); - let mut plugins = PluginsManager::dynamic(config.libloader()); + let mut plugin_mgr = PluginsManager::dynamic(config.libloader(), "zenoh_plugin_"); // Static plugins are to be added here, with `.add_static::()` let mut required_plugins = HashSet::new(); for plugin_load in config.plugins().load_requests() { @@ -107,10 +134,7 @@ fn main() { "Loading {req} plugin \"{name}\"", req = if required { "required" } else { "" } ); - if let Err(e) = match paths { - None => plugins.load_plugin_by_name(name.clone()), - Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths), - } { + if let Err(e) = load_plugin(&mut plugin_mgr, &name, &paths) { if required { panic!("Plugin load failure: {}", e) } else { @@ -130,39 +154,53 @@ fn main() { } }; - for (name, path, start_result) in plugins.start_all(&runtime) { - let required = required_plugins.contains(name); + for plugin in plugin_mgr.loaded_plugins_iter_mut() { + let required = required_plugins.contains(plugin.name()); log::info!( "Starting {req} plugin \"{name}\"", - req = if required { "required" } else { "" } + req = if required { "required" } else { "" }, + name = plugin.name() ); - match start_result { - Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path), - Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path), + match plugin.start(&runtime) { + Ok(_) => { + log::info!( + "Successfully started plugin {} from {:?}", + plugin.name(), + plugin.path() + ); + } Err(e) => { let report = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| e.to_string())) { Ok(s) => s, - Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path), + Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", plugin.name(), plugin.path()), }; if required { - panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); - }else { - log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + panic!( + "Plugin \"{}\" failed to start: {}", + plugin.name(), + if report.is_empty() { + "no details provided" + } else { + report.as_str() + } + ); + } else { + log::error!( + "Required plugin \"{}\" failed to start: {}", + plugin.name(), + if report.is_empty() { + "no details provided" + } else { + report.as_str() + } + ); } } } } log::info!("Finished loading plugins"); - { - let mut config_guard = runtime.config.lock(); - for (name, (_, plugin)) in plugins.running_plugins() { - let hook = plugin.config_checker(); - config_guard.add_plugin_validator(name, hook) - } - } - - AdminSpace::start(&runtime, plugins, LONG_VERSION.clone()).await; + AdminSpace::start(&runtime, plugin_mgr, LONG_VERSION.clone()).await; future::pending::<()>().await; });