Skip to content

Commit

Permalink
replaced volumes map to plugin_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 23, 2023
1 parent 84442f5 commit 684c279
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 95 deletions.
2 changes: 1 addition & 1 deletion plugins/example-storage-plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Volume for ExampleBackend {
read_cost: 0,
}
}
async fn create_storage(&mut self, _props: StorageConfig) -> ZResult<Box<dyn Storage>> {
async fn create_storage(&self, _props: StorageConfig) -> ZResult<Box<dyn Storage>> {
Ok(Box::<ExampleStorage>::default())
}
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub trait Volume: Send + Sync {
fn get_capability(&self) -> Capability;

/// Creates a storage configured with some properties.
async fn create_storage(&mut self, props: StorageConfig) -> ZResult<Box<dyn Storage>>;
async fn create_storage(&self, props: StorageConfig) -> ZResult<Box<dyn Storage>>;

/// Returns an interceptor that will be called before pushing any data
/// into a storage created by this backend. `None` can be returned for no interception point.
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct StoreIntercept {
pub(crate) async fn create_and_start_storage(
admin_key: String,
config: StorageConfig,
backend: &mut Box<dyn zenoh_backend_traits::Volume>,
backend: &Box<dyn zenoh_backend_traits::Volume>,
in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
zenoh: Arc<Session>,
Expand Down
111 changes: 23 additions & 88 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,16 @@

use async_std::task;
use flume::Sender;
use libloading::Library;
use memory_backend::create_memory_backend;
use std::collections::HashMap;
use std::convert::TryFrom;

Check warning on line 25 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

unused import: `std::convert::TryFrom`

Check warning on line 25 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

unused import: `std::convert::TryFrom`

Check warning on line 25 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

unused import: `std::convert::TryFrom`
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use storages_mgt::StorageMessage;
use zenoh::plugins::{Plugin, RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::sync::*;
use zenoh::runtime::Runtime;
use zenoh::Session;
use zenoh_backend_traits::CreateVolume;
use zenoh_backend_traits::VolumePlugin;
use zenoh_backend_traits::CREATE_VOLUME_FN_NAME;
use zenoh_backend_traits::{config::*, Volume};
use zenoh_core::zlock;
use zenoh_result::{bail, ZResult};
use zenoh_util::LibLoader;
Expand Down Expand Up @@ -81,7 +74,6 @@ struct StorageRuntimeInner {
name: String,
runtime: Runtime,
session: Arc<Session>,
volumes: HashMap<String, VolumeHandle>,
storages: HashMap<String, HashMap<String, Sender<StorageMessage>>>,
plugins_manager: PluginsManager,
}
Expand Down Expand Up @@ -114,7 +106,6 @@ impl StorageRuntimeInner {
name,
runtime,
session,
volumes: Default::default(),
storages: Default::default(),
plugins_manager: PluginsManager::dynamic(lib_loader, BACKEND_LIB_PREFIX),
};
Expand All @@ -136,7 +127,7 @@ impl StorageRuntimeInner {
fn update<I: IntoIterator<Item = ConfigDiff>>(&mut self, diffs: I) -> ZResult<()> {

Check failure on line 127 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

cannot find type `ConfigDiff` in this scope

Check failure on line 127 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

cannot find type `ConfigDiff` in this scope

Check failure on line 127 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

cannot find type `ConfigDiff` in this scope
for diff in diffs {
match diff {
ConfigDiff::DeleteVolume(volume) => self.kill_volume(volume),
ConfigDiff::DeleteVolume(volume) => self.kill_volume(&volume.name),

Check failure on line 130 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

failed to resolve: use of undeclared type `ConfigDiff`

Check failure on line 130 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

failed to resolve: use of undeclared type `ConfigDiff`

Check failure on line 130 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

failed to resolve: use of undeclared type `ConfigDiff`
ConfigDiff::AddVolume(volume) => {

Check failure on line 131 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

failed to resolve: use of undeclared type `ConfigDiff`

Check failure on line 131 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

failed to resolve: use of undeclared type `ConfigDiff`

Check failure on line 131 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

failed to resolve: use of undeclared type `ConfigDiff`
self.spawn_volume(volume)?;
}
Expand All @@ -146,29 +137,31 @@ impl StorageRuntimeInner {
}
Ok(())
}
fn kill_volume(&mut self, volume: VolumeConfig) {
if let Some(storages) = self.storages.remove(&volume.name) {
fn kill_volume<T: AsRef<str>>(&mut self, name: T) {
let name = name.as_ref();
if let Some(storages) = self.storages.remove(name) {
async_std::task::block_on(futures::future::join_all(
storages
.into_values()
.map(|s| async move { s.send(StorageMessage::Stop) }),
));
}
std::mem::drop(self.volumes.remove(&volume.name));
self.plugins_manager.stop(name);
}
fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> {
let volume_id = config.name();
let backend_name = config.backend();
if backend_name == MEMORY_BACKEND_NAME {
match create_memory_backend(config) {
Ok(backend) => {
self.volumes.insert(
volume_id.to_string(),
VolumeHandle::new(backend, None, "<static-memory>".into()),
);
}
Err(e) => bail!("{}", e),
}
// match create_memory_backend(config) {
// Ok(backend) => {
// TODO: implement static memory backend as static plugin
// self.volumes.insert(
// volume_id.to_string(),
// VolumeHandle::new(backend, None, "<static-memory>".into()),
// );
// }
// Err(e) => bail!("{}", e),
// }
} else {
if let Some(paths) = config.paths() {
self.plugins_manager
Expand All @@ -177,46 +170,10 @@ impl StorageRuntimeInner {
self.plugins_manager
.load_plugin_by_backend_name(volume_id, backend_name)?;
}
if let Some()
};
self.plugins_manager.start(volume_id, &config)?;
Ok(())
}
unsafe fn loaded_backend_from_lib(
&mut self,
volume_id: &str,
config: VolumeConfig,
lib: Library,
lib_path: PathBuf,
) -> ZResult<()> {
if let Ok(create_backend) = lib.get::<CreateVolume>(CREATE_VOLUME_FN_NAME) {
match create_backend(config) {
Ok(backend) => {
self.volumes.insert(
volume_id.to_string(),
VolumeHandle::new(
backend,
Some(lib),
lib_path.to_string_lossy().into_owned(),
),
);
Ok(())
}
Err(e) => bail!(
"Failed to load Backend {} from {}: {}",
volume_id,
lib_path.display(),
e
),
}
} else {
bail!(
"Failed to instantiate volume {} from {}: function {}(VolumeConfig) not found in lib",
volume_id,
lib_path.display(),
String::from_utf8_lossy(CREATE_VOLUME_FN_NAME)
);
}
}
fn kill_storage(&mut self, config: StorageConfig) {
let volume = &config.volume_id;
if let Some(storages) = self.storages.get_mut(volume) {
Expand All @@ -234,14 +191,14 @@ impl StorageRuntimeInner {
fn spawn_storage(&mut self, storage: StorageConfig) -> ZResult<()> {
let admin_key = self.status_key() + "/storages/" + &storage.name;
let volume_id = storage.volume_id.clone();
if let Some(backend) = self.volumes.get_mut(&volume_id) {
if let Some(backend) = self.plugins_manager.plugin(&volume_id) {
let storage_name = storage.name.clone();
let in_interceptor = backend.backend.incoming_data_interceptor();
let out_interceptor = backend.backend.outgoing_data_interceptor();
let in_interceptor = backend.incoming_data_interceptor();
let out_interceptor = backend.outgoing_data_interceptor();
let stopper = async_std::task::block_on(create_and_start_storage(
admin_key,
storage,
&mut backend.backend,
&backend,
in_interceptor,
out_interceptor,
self.session.clone(),
Expand All @@ -259,28 +216,6 @@ impl StorageRuntimeInner {
}
}
}
struct VolumeHandle {
backend: Box<dyn Volume>,
_lib: Option<Library>,
lib_path: String,
stopper: Arc<AtomicBool>,
}
impl VolumeHandle {
fn new(backend: Box<dyn Volume>, lib: Option<Library>, lib_path: String) -> Self {
VolumeHandle {
backend,
_lib: lib,
lib_path,
stopper: Arc::new(std::sync::atomic::AtomicBool::new(true)),
}
}
}
impl Drop for VolumeHandle {
fn drop(&mut self) {
self.stopper
.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
impl From<StorageRuntimeInner> for StorageRuntime {
fn from(inner: StorageRuntimeInner) -> Self {
StorageRuntime(Arc::new(Mutex::new(inner)))
Expand Down Expand Up @@ -325,7 +260,7 @@ impl RunningPluginTrait for StorageRuntime {
});
let guard = self.0.lock().unwrap();
with_extended_string(&mut key, &["/volumes/"], |key| {
for (volume_id, volume) in &guard.volumes {
for (volume_id, (lib_path, volume)) in guard.plugins_manager.running_plugins() {
with_extended_string(key, &[volume_id], |key| {
with_extended_string(key, &["/__path__"], |key| {
if keyexpr::new(key.as_str())
Expand All @@ -334,7 +269,7 @@ impl RunningPluginTrait for StorageRuntime {
{
responses.push(zenoh::plugins::Response::new(
key.clone(),
volume.lib_path.clone().into(),
lib_path.into(),
))
}
});
Expand All @@ -344,7 +279,7 @@ impl RunningPluginTrait for StorageRuntime {
{
responses.push(zenoh::plugins::Response::new(
key.clone(),
volume.backend.get_admin_status(),
volume.get_admin_status(),
))
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Volume for MemoryBackend {
}
}

async fn create_storage(&mut self, properties: StorageConfig) -> ZResult<Box<dyn Storage>> {
async fn create_storage(&self, properties: StorageConfig) -> ZResult<Box<dyn Storage>> {
log::debug!("Create Memory Storage with configuration: {:?}", properties);
Ok(Box::new(MemoryStorage::new(properties).await?))
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl AdminSpace {
Some(paths) => plugins_mgr
.load_plugin_by_paths(plugin.name.clone(), paths),
None => plugins_mgr
.load_plugin_by_backend_name(plugin.name.clone()),
.load_plugin_by_backend_name(&plugin.name, &plugin.name),
};
match load {
Err(e) => {
Expand Down
4 changes: 2 additions & 2 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
let config = config_from_args(&args);
log::info!("Initial conf: {}", &config);

let mut plugins = PluginsManager::dynamic(config.libloader());
let mut plugins = PluginsManager::dynamic(config.libloader(), "zenoh_plugin_");
// Static plugins are to be added here, with `.add_static::<PluginType>()`
let mut required_plugins = HashSet::new();
for plugin_load in config.plugins().load_requests() {
Expand All @@ -90,7 +90,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_backend_name(name.clone()),
None => plugins.load_plugin_by_backend_name(&name, &name),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
Expand Down

0 comments on commit 684c279

Please sign in to comment.