From 2ccb9dd785c64841012c219d6e3a9ca4a18ce3b2 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Wed, 8 Nov 2023 18:03:19 +0100 Subject: [PATCH] compiles --- .../zenoh-plugin-storage-manager/src/lib.rs | 23 ++-- plugins/zenoh-plugin-trait/src/loading.rs | 106 ++++++++-------- plugins/zenoh-plugin-trait/src/vtable.rs | 34 ++--- zenoh/src/net/runtime/adminspace.rs | 117 ++++++++++-------- zenohd/src/main.rs | 36 +++++- 5 files changed, 172 insertions(+), 144 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index 27e61be28..9a0c88121 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -155,20 +155,23 @@ impl StorageRuntimeInner { .map(|s| async move { s.send(StorageMessage::Stop) }), )); } - self.plugins_manager.running_plugin_mut(name)?.stop(); + self.plugins_manager.started_plugin_mut(name).ok_or(format!( + "Cannot find volume {} to stop it", + name + ))?.stop(); Ok(()) } fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> { let volume_id = config.name(); let backend_name = config.backend(); - if let Some(paths) = config.paths() { - self.plugins_manager - .load_plugin_by_paths(volume_id, paths)?; + let declared = if let Some(paths) = config.paths() { + self.plugins_manager.add_dynamic_plugin_by_paths(volume_id, paths)? } else { self.plugins_manager - .load_plugin_by_backend_name(volume_id, backend_name)?; - } - self.plugins_manager.plugin_mut(volume_id)?.run(&config)?; + .add_dynamic_plugin_by_name(volume_id, backend_name)? + }; + let loaded = declared.load()?; + loaded.start(&config)?; Ok(()) } fn kill_storage(&mut self, config: StorageConfig) { @@ -188,7 +191,9 @@ impl StorageRuntimeInner { fn spawn_storage(&mut self, storage: StorageConfig) -> ZResult<()> { let admin_key = self.status_key() + "/storages/" + &storage.name; let volume_id = storage.volume_id.clone(); - let backend = self.plugins_manager.running_plugin(&volume_id)?; + let backend = self.plugins_manager.started_plugin(&volume_id).ok_or( + format!("Cannot find volume {} to spawn storage {}", volume_id, storage.name), + )?; let storage_name = storage.name.clone(); let in_interceptor = backend.instance().incoming_data_interceptor(); let out_interceptor = backend.instance().outgoing_data_interceptor(); @@ -251,7 +256,7 @@ impl RunningPluginTrait for StorageRuntime { }); let guard = self.0.lock().unwrap(); with_extended_string(&mut key, &["/volumes/"], |key| { - for plugin in guard.plugins_manager.running_plugins() { + for plugin in guard.plugins_manager.started_plugins() { with_extended_string(key, &[plugin.name()], |key| { with_extended_string(key, &["/__path__"], |key| { if keyexpr::new(key.as_str()) diff --git a/plugins/zenoh-plugin-trait/src/loading.rs b/plugins/zenoh-plugin-trait/src/loading.rs index cfcd60a69..d83a9d782 100644 --- a/plugins/zenoh-plugin-trait/src/loading.rs +++ b/plugins/zenoh-plugin-trait/src/loading.rs @@ -26,7 +26,7 @@ use zenoh_util::LibLoader; pub enum PluginState { Declared, Loaded, - Running, + Started, } #[derive(Clone, Debug, PartialEq, Eq, Default)] @@ -92,12 +92,12 @@ pub trait DeclaredPlugin: PluginInfo { - fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin>; - fn running(&self) -> Option<&dyn RunningPlugin>; - fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin>; + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin>; + fn started(&self) -> Option<&dyn StartedPlugin>; + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin>; } -pub trait RunningPlugin { +pub trait StartedPlugin : PluginInfo { fn stop(&mut self); fn instance(&self) -> &Instance; fn instance_mut(&mut self) -> &mut Instance; @@ -140,7 +140,7 @@ where state: self .instance .as_ref() - .map_or(PluginState::Loaded, |_| PluginState::Running), + .map_or(PluginState::Loaded, |_| PluginState::Started), condition: PluginCondition::new(), // TODO: request runnnig plugin status } } @@ -167,20 +167,20 @@ impl where P: Plugin, { - fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin> { + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { if self.instance.is_none() { self.instance = Some(P::start(self.name(), args)?); } Ok(self) } - fn running(&self) -> Option<&dyn RunningPlugin> { + fn started(&self) -> Option<&dyn StartedPlugin> { if self.instance.is_some() { Some(self) } else { None } } - fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin> { + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin> { if self.instance.is_some() { Some(self) } else { @@ -190,7 +190,7 @@ where } impl - RunningPlugin for StaticPlugin + StartedPlugin for StaticPlugin where P: Plugin, { @@ -323,14 +323,14 @@ impl PluginInfo PluginStatus { state: if self.starter.is_some() { if self.instance.is_some() { - PluginState::Running + PluginState::Started } else { PluginState::Loaded } } else { PluginState::Declared }, - condition: self.condition.clone(), // TODO: request condition from running plugin + condition: self.condition.clone(), // TODO: request condition from started plugin } } } @@ -365,14 +365,14 @@ impl impl LoadedPlugin for DynamicPlugin { - fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin> { + fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { let starter = self .starter .as_ref() .ok_or_else(|| format!("Plugin `{}` not loaded", self.name)) .add_error(&mut self.condition)?; - let already_running = self.instance.is_some(); - if !already_running { + let already_started = self.instance.is_some(); + if !already_started { let instance = starter .start(self.name(), args) .add_error(&mut self.condition)?; @@ -380,14 +380,14 @@ impl } Ok(self) } - fn running(&self) -> Option<&dyn RunningPlugin> { + fn started(&self) -> Option<&dyn StartedPlugin> { if self.instance.is_some() { Some(self) } else { None } } - fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin> { + fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin> { if self.instance.is_some() { Some(self) } else { @@ -397,7 +397,7 @@ impl } impl - RunningPlugin for DynamicPlugin + StartedPlugin for DynamicPlugin { fn stop(&mut self) { self.instance = None; @@ -411,13 +411,13 @@ impl } struct PluginRecord( - Box>, + Box + Send>, ); impl PluginRecord { - fn new + 'static>(plugin: P) -> Self { + fn new + Send + 'static>(plugin: P) -> Self { Self(Box::new(plugin)) } } @@ -452,13 +452,13 @@ impl /// A plugins manager that handles starting and stopping plugins. /// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`]. -pub struct PluginsManager { +pub struct PluginsManager { default_lib_prefix: String, loader: Option, plugins: Vec>, } -impl +impl PluginsManager { /// Constructs a new plugin manager with dynamic library loading enabled. @@ -526,11 +526,6 @@ impl ZResult { - self.get_plugin_index(name) - .ok_or_else(|| format!("Plugin `{}` not found", name).into()) - } - /// Lists all plugins pub fn plugins(&self) -> impl Iterator> + '_ { self.plugins @@ -562,69 +557,66 @@ impl impl Iterator> + '_ { - self.loaded_plugins().filter_map(|p| p.running()) + ) -> impl Iterator> + '_ { + self.loaded_plugins().filter_map(|p| p.started()) } - /// Lists the running plugins mutable - pub fn running_plugins_mut( + /// Lists the started plugins mutable + pub fn started_plugins_mut( &mut self, - ) -> impl Iterator> + '_ { - self.loaded_plugins_mut().filter_map(|p| p.running_mut()) + ) -> impl Iterator> + '_ { + self.loaded_plugins_mut().filter_map(|p| p.started_mut()) } /// Returns single plugin record - pub fn plugin(&self, name: &str) -> ZResult<&dyn DeclaredPlugin> { - Ok(&self.plugins[self.get_plugin_index_err(name)?]) + pub fn plugin(&self, name: &str) -> Option<&dyn DeclaredPlugin> { + let index = self.get_plugin_index(name)?; + Some(&self.plugins[index]) } /// Returns mutable plugin record pub fn plugin_mut( &mut self, name: &str, - ) -> ZResult<&mut dyn DeclaredPlugin> { - let index = self.get_plugin_index_err(name)?; - Ok(&mut self.plugins[index]) + ) -> Option<&mut dyn DeclaredPlugin> { + let index = self.get_plugin_index(name)?; + Some(&mut self.plugins[index]) } /// Returns loaded plugin record - pub fn loaded_plugin(&self, name: &str) -> ZResult<&dyn LoadedPlugin> { - Ok(self + pub fn loaded_plugin(&self, name: &str) -> Option<&dyn LoadedPlugin> { + self .plugin(name)? .loaded() - .ok_or_else(|| format!("Plugin `{}` not loaded", name))?) } /// Returns mutable loaded plugin record pub fn loaded_plugin_mut( &mut self, name: &str, - ) -> ZResult<&mut dyn LoadedPlugin> { - Ok(self + ) -> Option<&mut dyn LoadedPlugin> { + self .plugin_mut(name)? .loaded_mut() - .ok_or_else(|| format!("Plugin `{}` not loaded", name))?) } - /// Returns running plugin record - pub fn running_plugin(&self, name: &str) -> ZResult<&dyn RunningPlugin> { - Ok(self + /// Returns started plugin record + pub fn started_plugin(&self, name: &str) -> Option<&dyn StartedPlugin> { + self .loaded_plugin(name)? - .running() - .ok_or_else(|| format!("Plugin `{}` is not running", name))?) + .started() } - /// Returns mutable running plugin record - pub fn running_plugin_mut( + /// Returns mutable started plugin record + pub fn started_plugin_mut( &mut self, name: &str, - ) -> ZResult<&mut dyn RunningPlugin> { - Ok(self + ) -> Option<&mut dyn StartedPlugin> { + self .loaded_plugin_mut(name)? - .running_mut() - .ok_or_else(|| format!("Plugin `{}` is not running", name))?) + .started_mut() } } diff --git a/plugins/zenoh-plugin-trait/src/vtable.rs b/plugins/zenoh-plugin-trait/src/vtable.rs index d9fa05270..6cf16feeb 100644 --- a/plugins/zenoh-plugin-trait/src/vtable.rs +++ b/plugins/zenoh-plugin-trait/src/vtable.rs @@ -1,4 +1,3 @@ -use std::fmt::Display; // // Copyright (c) 2023 ZettaScale Technology @@ -16,19 +15,20 @@ use std::fmt::Display; use crate::*; pub use no_mangle::*; use zenoh_result::ZResult; +use std::fmt::Display; pub type PluginLoaderVersion = u64; pub const PLUGIN_LOADER_VERSION: PluginLoaderVersion = 1; pub const FEATURES: &str = concat_enabled_features!(prefix = "zenoh-plugin-trait", features = ["default"]); -type StartFn = fn(&str, &StartArgs) -> ZResult; +type StartFn = fn(&str, &StartArgs) -> ZResult; #[repr(C)] -pub struct PluginVTable { - pub start: StartFn, +pub struct PluginVTable { + pub start: StartFn, } -impl CompatibilityVersion for PluginVTable { +impl CompatibilityVersion for PluginVTable { fn version() -> u64 { 1 } @@ -71,27 +71,27 @@ pub struct Compatibility { rust_version: RustVersion, vtable_version: StructVersion, start_args_version: StructVersion, - running_plugin_version: StructVersion, + instance_version: StructVersion, } impl Compatibility { - pub fn new() -> Self { + pub fn new() -> Self { let rust_version = RustVersion::new(); - let vtable_version = StructVersion::new::>(); + let vtable_version = StructVersion::new::>(); let start_args_version = StructVersion::new::(); - let running_plugin_version = StructVersion::new::(); + let instance_version = StructVersion::new::(); Self { rust_version, vtable_version, start_args_version, - running_plugin_version, + instance_version, } } pub fn are_compatible(&self, other: &Self) -> bool { RustVersion::are_compatible(&self.rust_version, &other.rust_version) && self.vtable_version == other.vtable_version && self.start_args_version == other.start_args_version - && self.running_plugin_version == other.running_plugin_version + && self.instance_version == other.instance_version } } @@ -99,11 +99,11 @@ impl Display for Compatibility { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{}\nVTable:{}StartArgs:{}RunningPlugin:{}", + "{}\nVTable:{}StartArgs:{}Instance:{}", self.rust_version, self.vtable_version, self.start_args_version, - self.running_plugin_version + self.instance_version ) } } @@ -165,8 +165,8 @@ impl Default for RustVersion { } } -impl PluginVTable { - pub fn new>() -> Self { +impl PluginVTable { + pub fn new>() -> Self { Self { start: ConcretePlugin::start, } @@ -190,14 +190,14 @@ pub mod no_mangle { // TODO: add vtable version (including type parameters) to the compatibility information $crate::prelude::Compatibility::new::< <$ty as $crate::prelude::Plugin>::StartArgs, - <$ty as $crate::prelude::Plugin>::RunningPlugin, + <$ty as $crate::prelude::Plugin>::Instance, >() } #[no_mangle] fn load_plugin() -> $crate::prelude::PluginVTable< <$ty as $crate::prelude::Plugin>::StartArgs, - <$ty as $crate::prelude::Plugin>::RunningPlugin, + <$ty as $crate::prelude::Plugin>::Instance, > { $crate::prelude::PluginVTable::new::<$ty>() } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 5e0df31a7..fb937e3bb 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -73,12 +73,55 @@ impl ConfigValidator for AdminSpace { new: &serde_json::Map, ) -> ZResult>> { let plugins_mgr = zlock!(self.context.plugins_mgr); - let plugin = plugins_mgr.running_plugin(name)?; + let plugin = plugins_mgr.started_plugin(name).ok_or(format!( + "Plugin `{}` is not running, but its configuration is being changed", + name + ))?; plugin.instance().config_checker(path, current, new) } } impl AdminSpace { + fn start_plugin( + plugin_mgr: &mut plugins::PluginsManager, + config: &crate::config::PluginLoad, + start_args: &Runtime, + ) -> ZResult<()> { + let name = &config.name; + let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { + log::warn!("Plugin `{}` was already declared", declared.name()); + declared + } else if let Some(paths) = &config.paths { + plugin_mgr.add_dynamic_plugin_by_paths(name, paths)? + } else { + plugin_mgr.add_dynamic_plugin_by_name(name, name)? + }; + + let loaded = if let Some(loaded) = declared.loaded_mut() { + log::warn!( + "Plugin `{}` was already loaded from {}", + loaded.name(), + loaded.path() + ); + loaded + } else { + declared.load()? + }; + + if let Some(started) = loaded.started_mut() { + log::warn!("Plugin `{}` was already started", started.name()); + } else { + let started = loaded.start(start_args)?; + log::info!( + "Successfully started plugin `{}` from {}", + started.name(), + started.path() + ); + }; + + Ok(()) + } + pub async fn start(runtime: &Runtime, plugins_mgr: plugins::PluginsManager, version: String) { let zid_str = runtime.state.zid.to_string(); let metadata = runtime.state.metadata.clone(); @@ -122,7 +165,7 @@ impl AdminSpace { ); let mut active_plugins = plugins_mgr - .running_plugins() + .started_plugins() .map(|rec| (rec.name().to_string(), rec.path().to_string())) .collect::>(); @@ -188,67 +231,31 @@ impl AdminSpace { match diff { PluginDiff::Delete(name) => { active_plugins.remove(name.as_str()); - if let Ok(running) = plugins_mgr.running_plugin_mut(&name) { + if let Some(running) = plugins_mgr.started_plugin_mut(&name) { running.stop() } } PluginDiff::Start(plugin) => { - let name = &plugin.name; - let rec = if let Some(paths) = &plugin.paths { - plugins_mgr.load_plugin_by_paths(name, paths) - } else { - plugins_mgr.load_plugin_by_backend_name(name, &plugin.name) - }; - match rec { - Ok((loaded, rec)) => { - if loaded { - log::info!( - "Loaded plugin `{}` from {}", - rec.name(), - rec.path() - ); - } else { - log::warn!( - "Plugin `{}` was already loaded from {}", - rec.name(), - rec.path() - ); - } - match rec.run(&admin.context.runtime) { - Ok((true, rec)) => { - active_plugins - .insert(name.into(), rec.path().to_string()); - log::info!( - "Successfully started plugin `{}` from {}", - rec.name(), - rec.path() - ); - } - Ok((false, _)) => { - log::warn!("Plugin `{}` was already running", name) - } - Err(e) => { - log::error!( - "Failed to start plugin `{}`: {}", - name, - e - ) - } - } - } - Err(e) => { - if plugin.required { - panic!("Failed to load plugin `{}`: {}", name, e) - } else { - log::error!("Failed to load plugin `{}`: {}", name, e) - } + if let Err(e) = Self::start_plugin( + &mut plugins_mgr, + &plugin, + &admin.context.runtime, + ) { + if plugin.required { + panic!("Failed to load plugin `{}`: {}", plugin.name, e) + } else { + log::error!( + "Failed to load plugin `{}`: {}", + plugin.name, + e + ) } } } } } - log::info!("Running plugins: {:?}", &active_plugins) } + log::info!("Running plugins: {:?}", &active_plugins) } }); @@ -443,7 +450,7 @@ fn router_data(context: &AdminContext, query: Query) { let plugins: serde_json::Value = { let plugins_mgr = zlock!(context.plugins_mgr); plugins_mgr - .running_plugins() + .started_plugins() .map(|rec| (rec.name(), json!({ "path": rec.path() }))) .collect() }; @@ -648,7 +655,7 @@ fn plugins_status(context: &AdminContext, query: Query) { let guard = zlock!(context.plugins_mgr); let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str); - for plugin in guard.running_plugins() { + for plugin in guard.started_plugins() { with_extended_string(&mut root_key, &[plugin.name()], |plugin_key| { with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| { if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) { diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index fe9b6a473..3bd126ef6 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -15,6 +15,7 @@ use async_std::task; use clap::{ArgMatches, Command}; use futures::future; use git_version::git_version; +use zenoh::Result; use std::collections::HashSet; use zenoh::config::{Config, ModeDependentValue, PermissionsConf, PluginLoad, ValidatedMap}; use zenoh::plugins::PluginsManager; @@ -29,6 +30,32 @@ lazy_static::lazy_static!( const DEFAULT_LISTENER: &str = "tcp/[::]:7447"; +fn load_plugin( + plugin_mgr: &mut PluginsManager, + name: &str, + paths: &Option>, +) -> Result<()> { + let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { + log::warn!("Plugin `{}` was already declared", declared.name()); + declared + } else if let Some(paths) = paths { + plugin_mgr.add_dynamic_plugin_by_paths(name, paths)? + } else { + plugin_mgr.add_dynamic_plugin_by_name(name, name)? + }; + + if let Some(loaded) = declared.loaded_mut() { + log::warn!( + "Plugin `{}` was already loaded from {}", + loaded.name(), + loaded.path() + ); + } else { + let _ = declared.load()?; + }; + Ok(()) +} + fn main() { task::block_on(async { let mut log_builder = @@ -89,10 +116,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na "Loading {req} plugin \"{name}\"", req = if required { "required" } else { "" } ); - if let Err(e) = match paths { - None => plugin_mgr.load_plugin_by_backend_name(&name, &name), - Some(paths) => plugin_mgr.load_plugin_by_paths(name.clone(), &paths), - } { + if let Err(e) = load_plugin(&mut plugin_mgr, &name, &paths) { if required { panic!("Plugin load failure: {}", e) } else { @@ -112,14 +136,14 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na } }; - for plugin in plugin_mgr.plugins_mut() { + for plugin in plugin_mgr.loaded_plugins_mut() { let required = required_plugins.contains(plugin.name()); log::info!( "Starting {req} plugin \"{name}\"", req = if required { "required" } else { "" }, name = plugin.name() ); - match plugin.run(&runtime) { + match plugin.start(&runtime) { Ok(_) => { log::info!( "Successfully started plugin {} from {:?}",