Skip to content

Commit

Permalink
unifying plugn loading unfinished
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 22, 2023
1 parent 89e67dd commit 84442f5
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 78 deletions.
23 changes: 10 additions & 13 deletions plugins/zenoh-backend-traits/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use schemars::JsonSchema;
use serde_json::{Map, Value};
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::{
key_expr::keyexpr, prelude::OwnedKeyExpr, Result as ZResult,
};
use zenoh_plugin_trait::{CompatibilityVersion, concat_enabled_features};
use zenoh::{key_expr::keyexpr, prelude::OwnedKeyExpr, Result as ZResult};
use zenoh_plugin_trait::{concat_enabled_features, CompatibilityVersion};
use zenoh_result::{bail, zerror, Error};

#[derive(JsonSchema, Debug, Clone, AsMut, AsRef)]
Expand Down Expand Up @@ -251,10 +249,6 @@ impl ConfigDiff {
diffs
}
}
pub enum BackendSearchMethod<'a> {
ByPaths(&'a [String]),
ByName(&'a str),
}
impl VolumeConfig {
pub fn to_json_value(&self) -> Value {
let mut result = self.rest.clone();
Expand All @@ -269,11 +263,14 @@ impl VolumeConfig {
}
Value::Object(result)
}
pub fn backend_search_method(&self) -> BackendSearchMethod {
match &self.paths {
None => BackendSearchMethod::ByName(self.backend.as_deref().unwrap_or(&self.name)),
Some(paths) => BackendSearchMethod::ByPaths(paths),
}
pub fn name(&self) -> &str {
&self.name
}
pub fn backend(&self) -> &str {
self.backend.as_deref().unwrap_or(&self.name)
}
pub fn paths(&self) -> Option<&[String]> {
self.paths.as_deref()
}
fn try_from<V: AsObject>(plugin_name: &str, configs: &V) -> ZResult<Vec<Self>> {
let configs = configs.as_object().ok_or_else(|| {
Expand Down
56 changes: 18 additions & 38 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use zenoh::prelude::sync::*;
use zenoh::runtime::Runtime;
use zenoh::Session;
use zenoh_backend_traits::CreateVolume;
use zenoh_backend_traits::VolumePlugin;
use zenoh_backend_traits::CREATE_VOLUME_FN_NAME;
use zenoh_backend_traits::{config::*, Volume};
use zenoh_core::zlock;
Expand Down Expand Up @@ -72,14 +73,17 @@ impl Plugin for StoragesPlugin {
)?)))
}
}

type PluginsManager = zenoh_plugin_trait::loading::PluginsManager<VolumeConfig, VolumePlugin>;

struct StorageRuntime(Arc<Mutex<StorageRuntimeInner>>);
struct StorageRuntimeInner {
name: String,
runtime: Runtime,
session: Arc<Session>,
lib_loader: LibLoader,
volumes: HashMap<String, VolumeHandle>,
storages: HashMap<String, HashMap<String, Sender<StorageMessage>>>,
plugins_manager: PluginsManager,
}
impl StorageRuntimeInner {
fn status_key(&self) -> String {
Expand Down Expand Up @@ -110,9 +114,9 @@ impl StorageRuntimeInner {
name,
runtime,
session,
lib_loader,
volumes: Default::default(),
storages: Default::default(),
plugins_manager: PluginsManager::dynamic(lib_loader, BACKEND_LIB_PREFIX),
};
new_self.spawn_volume(VolumeConfig {
name: MEMORY_BACKEND_NAME.into(),
Expand Down Expand Up @@ -153,51 +157,27 @@ impl StorageRuntimeInner {
std::mem::drop(self.volumes.remove(&volume.name));
}
fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> {
let volume_id = config.name.clone();
if volume_id == MEMORY_BACKEND_NAME {
let volume_id = config.name();
let backend_name = config.backend();
if backend_name == MEMORY_BACKEND_NAME {
match create_memory_backend(config) {
Ok(backend) => {
self.volumes.insert(
volume_id,
volume_id.to_string(),
VolumeHandle::new(backend, None, "<static-memory>".into()),
);
}
Err(e) => bail!("{}", e),
}
} else {
match config.backend_search_method() {
BackendSearchMethod::ByPaths(paths) => {
for path in paths {
unsafe {
if let Ok((lib, path)) = LibLoader::load_file(path) {
return self.loaded_backend_from_lib(
&volume_id,
config.clone(),
lib,
path,
);
}
}
}
bail!(
"Failed to find a suitable library for volume {} from paths: {:?}",
volume_id,
paths
);
}
BackendSearchMethod::ByName(backend_name) => unsafe {
let backend_filename = format!("{}{}", BACKEND_LIB_PREFIX, &backend_name);
if let Ok((lib, path)) = self.lib_loader.search_and_load(&backend_filename) {
self.loaded_backend_from_lib(&volume_id, config.clone(), lib, path)?;
} else {
bail!(
"Failed to find a suitable library for volume {} (was looking for <lib>{}<.so/.dll/.dylib>)",
volume_id,
&backend_filename
);
}
},
};
if let Some(paths) = config.paths() {
self.plugins_manager
.load_plugin_by_paths(volume_id, paths)?;
} else {
self.plugins_manager
.load_plugin_by_backend_name(volume_id, backend_name)?;
}
if let Some()
};

Check failure on line 181 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

expected one of `=` or `|`, found `}`

Check failure on line 181 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

expected one of `=` or `|`, found `}`

Check failure on line 181 in plugins/zenoh-plugin-storage-manager/src/lib.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

expected one of `=` or `|`, found `}`
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use zenoh_backend_traits::config::{StorageConfig, VolumeConfig};
use zenoh_backend_traits::*;
use zenoh_result::ZResult;

pub fn create_memory_backend(config: VolumeConfig) -> ZResult<Box<dyn Volume>> {
pub fn create_memory_backend(config: VolumeConfig) -> ZResult<VolumePlugin> {
Ok(Box::new(MemoryBackend { config }))
}

Expand Down
62 changes: 43 additions & 19 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,35 @@ use libloading::Library;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::PathBuf;
use vtable::{PluginVTable, PluginLoaderVersion, PLUGIN_LOADER_VERSION, Compatibility};
use vtable::{Compatibility, PluginLoaderVersion, PluginVTable, PLUGIN_LOADER_VERSION};
use zenoh_result::{bail, zerror, ZResult};
use zenoh_util::LibLoader;

/// A plugins manager that handles starting and stopping plugins.
/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`].
pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
default_lib_prefix: String,
loader: Option<LibLoader>,
plugin_starters: Vec<Box<dyn PluginStarter<StartArgs, RunningPlugin> + Send + Sync>>,
running_plugins: HashMap<String, (String, RunningPlugin)>,
}

impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion> PluginsManager<StartArgs, RunningPlugin> {
impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion>
PluginsManager<StartArgs, RunningPlugin>
{
/// Constructs a new plugin manager with dynamic library loading enabled.
pub fn dynamic(loader: LibLoader) -> Self {
pub fn dynamic<S: Into<String>>(loader: LibLoader, default_lib_prefix: S) -> Self {
PluginsManager {
default_lib_prefix: default_lib_prefix.into(),
loader: Some(loader),
plugin_starters: Vec::new(),
running_plugins: HashMap::new(),
}
}
/// Constructs a new plugin manager with dynamic library loading enabled.
/// Constructs a new plugin manager with dynamic library loading disabled.
pub fn static_plugins_only() -> Self {
PluginsManager {
default_lib_prefix: String::new(),
loader: None,
plugin_starters: Vec::new(),
running_plugins: HashMap::new(),
Expand Down Expand Up @@ -157,29 +162,39 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
DynamicPlugin::new(name.into(), lib, path)
}

pub fn load_plugin_by_name(&mut self, name: String) -> ZResult<String> {
/// Tries to load a plugin with the name `defaukt_lib_prefix` + `backend_name` + `.so | .dll | .dylib`
/// in lib_loader's search paths.
pub fn load_plugin_by_backend_name<T: AsRef<str>, T1: AsRef<str>>(
&mut self,
name: T,
backend_name: T1,
) -> ZResult<String> {
let name = name.as_ref();
let backend_name = backend_name.as_ref();
let (lib, p) = match &mut self.loader {
Some(l) => unsafe { l.search_and_load(&format!("zenoh_plugin_{}", &name))? },
None => bail!("Can't load dynamic plugin ` {}`, as dynamic loading is not enabled for this plugin manager.", name),
Some(l) => unsafe { l.search_and_load(&format!("{}{}", &self.default_lib_prefix, &backend_name))? },
None => bail!("Can't load dynamic plugin `{}`, as dynamic loading is not enabled for this plugin manager.", &name),
};
let plugin = match Self::load_plugin(&name, lib, p.clone()) {
let plugin = match Self::load_plugin(name, lib, p.clone()) {
Ok(p) => p,
Err(e) => bail!("After loading `{:?}`: {}", &p, e),
};
let path = plugin.path().into();
self.plugin_starters.push(Box::new(plugin));
Ok(path)
}
pub fn load_plugin_by_paths<P: AsRef<str> + std::fmt::Debug>(
/// Tries to load a plugin from the list of path to plugin (absolute or relative to the current working directory)
pub fn load_plugin_by_paths<T: AsRef<str>, P: AsRef<str> + std::fmt::Debug>(
&mut self,
name: String,
name: T,
paths: &[P],
) -> ZResult<String> {
let name = name.as_ref();
for path in paths {
let path = path.as_ref();
match unsafe { LibLoader::load_file(path) } {
Ok((lib, p)) => {
let plugin = Self::load_plugin(&name, lib, p)?;
let plugin = Self::load_plugin(name, lib, p)?;
let path = plugin.path().into();
self.plugin_starters.push(Box::new(plugin));
return Ok(path);
Expand Down Expand Up @@ -228,8 +243,8 @@ where
}
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> PluginStarter<StartArgs, RunningPlugin>
for DynamicPlugin<StartArgs, RunningPlugin>
impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
PluginStarter<StartArgs, RunningPlugin> for DynamicPlugin<StartArgs, RunningPlugin>
{
fn name(&self) -> &str {
&self.name
Expand All @@ -252,20 +267,29 @@ pub struct DynamicPlugin<StartArgs: CompatibilityVersion, RunningPlugin: Compati
pub path: PathBuf,
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> DynamicPlugin<StartArgs, RunningPlugin> {
impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
DynamicPlugin<StartArgs, RunningPlugin>
{
fn new(name: String, lib: Library, path: PathBuf) -> ZResult<Self> {
let get_plugin_loader_version = unsafe {
lib.get::<fn() -> PluginLoaderVersion>(b"get_plugin_loader_version")?
};
let get_plugin_loader_version =
unsafe { lib.get::<fn() -> PluginLoaderVersion>(b"get_plugin_loader_version")? };
let plugin_loader_version = get_plugin_loader_version();
if plugin_loader_version != PLUGIN_LOADER_VERSION {
bail!("Plugin loader version mismatch: host = {}, plugin = {}", PLUGIN_LOADER_VERSION, plugin_loader_version);
bail!(
"Plugin loader version mismatch: host = {}, plugin = {}",
PLUGIN_LOADER_VERSION,
plugin_loader_version
);
}
let get_compatibility = unsafe { lib.get::<fn() -> Compatibility>(b"get_compatibility")? };
let plugin_compatibility_record = get_compatibility();
let host_compatibility_record = Compatibility::new::<StartArgs, RunningPlugin>();
if !plugin_compatibility_record.are_compatible(&host_compatibility_record) {
bail!("Plugin compatibility mismatch:\nhost = {:?}\nplugin = {:?}\n", host_compatibility_record, plugin_compatibility_record);
bail!(
"Plugin compatibility mismatch:\nhost = {:?}\nplugin = {:?}\n",
host_compatibility_record,
plugin_compatibility_record
);
}

// TODO: check loader version and compatibility
Expand Down
13 changes: 7 additions & 6 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ impl AdminSpace {
plugins_mgr.stop(&plugin);
}
PluginDiff::Start(plugin) => {
let load = match &plugin.paths {
Some(paths) => {
plugins_mgr.load_plugin_by_paths(plugin.name.clone(), paths)
}
None => plugins_mgr.load_plugin_by_name(plugin.name.clone()),
};
let load =
match &plugin.paths {
Some(paths) => plugins_mgr
.load_plugin_by_paths(plugin.name.clone(), paths),
None => plugins_mgr
.load_plugin_by_backend_name(plugin.name.clone()),

Check failure on line 203 in zenoh/src/net/runtime/adminspace.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

this method takes 2 arguments but 1 argument was supplied

Check failure on line 203 in zenoh/src/net/runtime/adminspace.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

this method takes 2 arguments but 1 argument was supplied

Check failure on line 203 in zenoh/src/net/runtime/adminspace.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

this method takes 2 arguments but 1 argument was supplied
};
match load {
Err(e) => {
if plugin.required {
Expand Down
2 changes: 1 addition & 1 deletion zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_name(name.clone()),
None => plugins.load_plugin_by_backend_name(name.clone()),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
Expand Down

0 comments on commit 84442f5

Please sign in to comment.