diff --git a/plugins/example-storage-plugin/src/lib.rs b/plugins/example-storage-plugin/src/lib.rs index a6d5d9760..d0b9addaf 100644 --- a/plugins/example-storage-plugin/src/lib.rs +++ b/plugins/example-storage-plugin/src/lib.rs @@ -73,7 +73,7 @@ impl Volume for ExampleBackend { read_cost: 0, } } - async fn create_storage(&mut self, _props: StorageConfig) -> ZResult> { + async fn create_storage(&self, _props: StorageConfig) -> ZResult> { Ok(Box::::default()) } fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>> { diff --git a/plugins/zenoh-backend-traits/src/lib.rs b/plugins/zenoh-backend-traits/src/lib.rs index 8ab5844de..325e5a698 100644 --- a/plugins/zenoh-backend-traits/src/lib.rs +++ b/plugins/zenoh-backend-traits/src/lib.rs @@ -204,7 +204,7 @@ pub trait Volume: Send + Sync { fn get_capability(&self) -> Capability; /// Creates a storage configured with some properties. - async fn create_storage(&mut self, props: StorageConfig) -> ZResult>; + async fn create_storage(&self, props: StorageConfig) -> ZResult>; /// Returns an interceptor that will be called before pushing any data /// into a storage created by this backend. `None` can be returned for no interception point. diff --git a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs index 981ec521a..5fb37409f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs @@ -30,7 +30,7 @@ pub struct StoreIntercept { pub(crate) async fn create_and_start_storage( admin_key: String, config: StorageConfig, - backend: &mut Box, + backend: &Box, in_interceptor: Option Sample + Send + Sync>>, out_interceptor: Option Sample + Send + Sync>>, zenoh: Arc, diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index 8b6ffbe3e..161c0c70a 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -21,12 +21,8 @@ use async_std::task; use flume::Sender; -use libloading::Library; -use memory_backend::create_memory_backend; use std::collections::HashMap; use std::convert::TryFrom; -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; use storages_mgt::StorageMessage; @@ -34,10 +30,7 @@ use zenoh::plugins::{Plugin, RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::sync::*; use zenoh::runtime::Runtime; use zenoh::Session; -use zenoh_backend_traits::CreateVolume; use zenoh_backend_traits::VolumePlugin; -use zenoh_backend_traits::CREATE_VOLUME_FN_NAME; -use zenoh_backend_traits::{config::*, Volume}; use zenoh_core::zlock; use zenoh_result::{bail, ZResult}; use zenoh_util::LibLoader; @@ -81,7 +74,6 @@ struct StorageRuntimeInner { name: String, runtime: Runtime, session: Arc, - volumes: HashMap, storages: HashMap>>, plugins_manager: PluginsManager, } @@ -114,7 +106,6 @@ impl StorageRuntimeInner { name, runtime, session, - volumes: Default::default(), storages: Default::default(), plugins_manager: PluginsManager::dynamic(lib_loader, BACKEND_LIB_PREFIX), }; @@ -136,7 +127,7 @@ impl StorageRuntimeInner { fn update>(&mut self, diffs: I) -> ZResult<()> { for diff in diffs { match diff { - ConfigDiff::DeleteVolume(volume) => self.kill_volume(volume), + ConfigDiff::DeleteVolume(volume) => self.kill_volume(&volume.name), ConfigDiff::AddVolume(volume) => { self.spawn_volume(volume)?; } @@ -146,29 +137,31 @@ impl StorageRuntimeInner { } Ok(()) } - fn kill_volume(&mut self, volume: VolumeConfig) { - if let Some(storages) = self.storages.remove(&volume.name) { + fn kill_volume>(&mut self, name: T) { + let name = name.as_ref(); + if let Some(storages) = self.storages.remove(name) { async_std::task::block_on(futures::future::join_all( storages .into_values() .map(|s| async move { s.send(StorageMessage::Stop) }), )); } - std::mem::drop(self.volumes.remove(&volume.name)); + self.plugins_manager.stop(name); } fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> { let volume_id = config.name(); let backend_name = config.backend(); if backend_name == MEMORY_BACKEND_NAME { - match create_memory_backend(config) { - Ok(backend) => { - self.volumes.insert( - volume_id.to_string(), - VolumeHandle::new(backend, None, "".into()), - ); - } - Err(e) => bail!("{}", e), - } + // match create_memory_backend(config) { + // Ok(backend) => { + // TODO: implement static memory backend as static plugin + // self.volumes.insert( + // volume_id.to_string(), + // VolumeHandle::new(backend, None, "".into()), + // ); + // } + // Err(e) => bail!("{}", e), + // } } else { if let Some(paths) = config.paths() { self.plugins_manager @@ -177,46 +170,10 @@ impl StorageRuntimeInner { self.plugins_manager .load_plugin_by_backend_name(volume_id, backend_name)?; } - if let Some() }; + self.plugins_manager.start(volume_id, &config)?; Ok(()) } - unsafe fn loaded_backend_from_lib( - &mut self, - volume_id: &str, - config: VolumeConfig, - lib: Library, - lib_path: PathBuf, - ) -> ZResult<()> { - if let Ok(create_backend) = lib.get::(CREATE_VOLUME_FN_NAME) { - match create_backend(config) { - Ok(backend) => { - self.volumes.insert( - volume_id.to_string(), - VolumeHandle::new( - backend, - Some(lib), - lib_path.to_string_lossy().into_owned(), - ), - ); - Ok(()) - } - Err(e) => bail!( - "Failed to load Backend {} from {}: {}", - volume_id, - lib_path.display(), - e - ), - } - } else { - bail!( - "Failed to instantiate volume {} from {}: function {}(VolumeConfig) not found in lib", - volume_id, - lib_path.display(), - String::from_utf8_lossy(CREATE_VOLUME_FN_NAME) - ); - } - } fn kill_storage(&mut self, config: StorageConfig) { let volume = &config.volume_id; if let Some(storages) = self.storages.get_mut(volume) { @@ -234,14 +191,14 @@ impl StorageRuntimeInner { fn spawn_storage(&mut self, storage: StorageConfig) -> ZResult<()> { let admin_key = self.status_key() + "/storages/" + &storage.name; let volume_id = storage.volume_id.clone(); - if let Some(backend) = self.volumes.get_mut(&volume_id) { + if let Some(backend) = self.plugins_manager.plugin(&volume_id) { let storage_name = storage.name.clone(); - let in_interceptor = backend.backend.incoming_data_interceptor(); - let out_interceptor = backend.backend.outgoing_data_interceptor(); + let in_interceptor = backend.incoming_data_interceptor(); + let out_interceptor = backend.outgoing_data_interceptor(); let stopper = async_std::task::block_on(create_and_start_storage( admin_key, storage, - &mut backend.backend, + &backend, in_interceptor, out_interceptor, self.session.clone(), @@ -259,28 +216,6 @@ impl StorageRuntimeInner { } } } -struct VolumeHandle { - backend: Box, - _lib: Option, - lib_path: String, - stopper: Arc, -} -impl VolumeHandle { - fn new(backend: Box, lib: Option, lib_path: String) -> Self { - VolumeHandle { - backend, - _lib: lib, - lib_path, - stopper: Arc::new(std::sync::atomic::AtomicBool::new(true)), - } - } -} -impl Drop for VolumeHandle { - fn drop(&mut self) { - self.stopper - .store(false, std::sync::atomic::Ordering::Relaxed); - } -} impl From for StorageRuntime { fn from(inner: StorageRuntimeInner) -> Self { StorageRuntime(Arc::new(Mutex::new(inner))) @@ -325,7 +260,7 @@ impl RunningPluginTrait for StorageRuntime { }); let guard = self.0.lock().unwrap(); with_extended_string(&mut key, &["/volumes/"], |key| { - for (volume_id, volume) in &guard.volumes { + for (volume_id, (lib_path, volume)) in guard.plugins_manager.running_plugins() { with_extended_string(key, &[volume_id], |key| { with_extended_string(key, &["/__path__"], |key| { if keyexpr::new(key.as_str()) @@ -334,7 +269,7 @@ impl RunningPluginTrait for StorageRuntime { { responses.push(zenoh::plugins::Response::new( key.clone(), - volume.lib_path.clone().into(), + lib_path.into(), )) } }); @@ -344,7 +279,7 @@ impl RunningPluginTrait for StorageRuntime { { responses.push(zenoh::plugins::Response::new( key.clone(), - volume.backend.get_admin_status(), + volume.get_admin_status(), )) } }); diff --git a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs index 96a5ec22d..5ac076ce0 100644 --- a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs @@ -43,7 +43,7 @@ impl Volume for MemoryBackend { } } - async fn create_storage(&mut self, properties: StorageConfig) -> ZResult> { + async fn create_storage(&self, properties: StorageConfig) -> ZResult> { log::debug!("Create Memory Storage with configuration: {:?}", properties); Ok(Box::new(MemoryStorage::new(properties).await?)) } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 329cca0e4..de264502c 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -200,7 +200,7 @@ impl AdminSpace { Some(paths) => plugins_mgr .load_plugin_by_paths(plugin.name.clone(), paths), None => plugins_mgr - .load_plugin_by_backend_name(plugin.name.clone()), + .load_plugin_by_backend_name(&plugin.name, &plugin.name), }; match load { Err(e) => { diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index dc31a0d5b..aadab8555 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -76,7 +76,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na let config = config_from_args(&args); log::info!("Initial conf: {}", &config); - let mut plugins = PluginsManager::dynamic(config.libloader()); + let mut plugins = PluginsManager::dynamic(config.libloader(), "zenoh_plugin_"); // Static plugins are to be added here, with `.add_static::()` let mut required_plugins = HashSet::new(); for plugin_load in config.plugins().load_requests() { @@ -90,7 +90,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na req = if required { "required" } else { "" } ); if let Err(e) = match paths { - None => plugins.load_plugin_by_backend_name(name.clone()), + None => plugins.load_plugin_by_backend_name(&name, &name), Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths), } { if required {