Skip to content

Commit

Permalink
plugin manager refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 24, 2023
1 parent 3f23cbc commit 41f5216
Showing 1 changed file with 146 additions and 105 deletions.
251 changes: 146 additions & 105 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,69 @@
//
use crate::*;
use libloading::Library;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::PathBuf;
use vtable::{Compatibility, PluginLoaderVersion, PluginVTable, PLUGIN_LOADER_VERSION};
use zenoh_result::{bail, zerror, ZResult};
use zenoh_result::{bail, ZResult};
use zenoh_util::LibLoader;

struct PluginInstance<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
starter: Box<dyn PluginStarter<StartArgs, RunningPlugin> + Send + Sync>,
running_plugin: Option<RunningPlugin>,
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> PluginInfo
for PluginInstance<StartArgs, RunningPlugin>
{
fn name(&self) -> &str {
self.starter.name()
}
fn path(&self) -> &str {
self.starter.path()
}
fn deletable(&self) -> bool {
self.starter.deletable()
}
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
PluginInstance<StartArgs, RunningPlugin>
{
fn new<T: PluginStarter<StartArgs, RunningPlugin> + Send + Sync + 'static>(starter: T) -> Self {
Self {
starter: Box::new(starter),
running_plugin: None,
}
}
fn get_running_plugin(&self) -> Option<&RunningPlugin> {
self.running_plugin.as_ref()
}
fn stop(&mut self) -> bool {
if self.running_plugin.is_some() {
self.running_plugin = None;
true
} else {
false
}
}
fn start(&mut self, args: &StartArgs) -> ZResult<(bool, &RunningPlugin)> {
if self.running_plugin.is_some() {
return Ok((false, self.running_plugin.as_ref().unwrap()));
}
let plugin = self.starter.start(args)?;
self.running_plugin = Some(plugin);
Ok((true, self.running_plugin.as_ref().unwrap()))
}
fn as_plugin_info(&self) -> &dyn PluginInfo {
self
}
}

/// A plugins manager that handles starting and stopping plugins.
/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`].
pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
default_lib_prefix: String,
loader: Option<LibLoader>,
plugin_starters: Vec<Box<dyn PluginStarter<StartArgs, RunningPlugin> + Send + Sync>>,
running_plugins: HashMap<String, (String, RunningPlugin)>,
plugins: Vec<PluginInstance<StartArgs, RunningPlugin>>,
}

impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion>
Expand All @@ -37,17 +86,15 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
PluginsManager {
default_lib_prefix: default_lib_prefix.into(),
loader: Some(loader),
plugin_starters: Vec::new(),
running_plugins: HashMap::new(),
plugins: Vec::new(),
}
}
/// Constructs a new plugin manager with dynamic library loading disabled.
pub fn static_plugins_only() -> Self {
PluginsManager {
default_lib_prefix: String::new(),
loader: None,
plugin_starters: Vec::new(),
running_plugins: HashMap::new(),
plugins: Vec::new(),
}
}

Expand All @@ -58,100 +105,72 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
mut self,
) -> Self {
let plugin_starter: StaticPlugin<P> = StaticPlugin::new();
self.plugin_starters.push(Box::new(plugin_starter));
self.plugins.push(PluginInstance::new(plugin_starter));
self
}

/// Starts `plugin`.
///
/// `Ok(true)` => plugin was successfully started
/// `Ok(false)` => plugin was running already, nothing happened
/// `Err(e)` => starting the plugin failed due to `e`
pub fn start(
fn get_plugin_instance_mut(
&mut self,
plugin: &str,
args: &StartArgs,
) -> ZResult<Option<(&str, &RunningPlugin)>> {
match self.running_plugins.entry(plugin.into()) {
Entry::Occupied(_) => Ok(None),
Entry::Vacant(e) => {
match self.plugin_starters.iter().find(|p| p.name() == plugin) {
Some(s) => {
let path = s.path();
let (_, plugin) = e.insert((path.into(), s.start(args).map_err(|e| zerror!(e => "Failed to load plugin {} (from {})", plugin, path))?));
Ok(Some((path, &*plugin)))
}
None => bail!("Plugin starter for `{}` not found", plugin),
}
}
}
name: &str,
) -> Option<&mut PluginInstance<StartArgs, RunningPlugin>> {
self.plugins.iter_mut().find(|p| p.name() == name)
}

/// Lazily starts all plugins.
///
/// `Ok(Ok(name))` => plugin `name` was successfully started
/// `Ok(Err(name))` => plugin `name` wasn't started because it was already running
/// `Err(e)` => Error `e` occured when trying to start plugin `name`
pub fn start_all<'l>(
&'l mut self,
args: &'l StartArgs,
) -> impl Iterator<Item = (&str, &str, ZResult<Option<&RunningPlugin>>)> + 'l {
let PluginsManager {
plugin_starters,
running_plugins,
..
} = self;
plugin_starters.iter().map(move |p| {
let name = p.name();
let path = p.path();
(
name,
path,
match running_plugins.entry(name.into()) {
std::collections::hash_map::Entry::Occupied(_) => Ok(None),
std::collections::hash_map::Entry::Vacant(e) => match p.start(args) {
Ok(p) => Ok(Some(unsafe {
std::mem::transmute(&e.insert((path.into(), p)).1)
})),
Err(e) => Err(e),
},
},
)
})
fn get_plugin_instance(
&mut self,
name: &str,
) -> Option<&PluginInstance<StartArgs, RunningPlugin>> {
self.plugins.iter().find(|p| p.name() == name)
}

fn get_plugin_instance_mut_err(
&mut self,
name: &str,
) -> ZResult<&mut PluginInstance<StartArgs, RunningPlugin>> {
self.get_plugin_instance_mut(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

fn get_plugin_instance_err(

Check warning on line 134 in plugins/zenoh-plugin-trait/src/loading.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

method `get_plugin_instance_err` is never used

Check warning on line 134 in plugins/zenoh-plugin-trait/src/loading.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

method `get_plugin_instance_err` is never used

Check warning on line 134 in plugins/zenoh-plugin-trait/src/loading.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

method `get_plugin_instance_err` is never used
&mut self,
name: &str,
) -> ZResult<&PluginInstance<StartArgs, RunningPlugin>> {
self.get_plugin_instance(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

/// Starts plugin named `name`
/// Returns
/// Ok((true, &RunningPluguin)) => plugin was successfully started
/// Ok((false, &RunningPlugin)) => plugin was already running
/// Err(e) => starting the plugin failed due to `e`
pub fn start(&mut self, name: &str, args: &StartArgs) -> ZResult<(bool, &RunningPlugin)> {
self.get_plugin_instance_mut_err(name)?.start(args)
}

/// Stops `plugin`, returning `true` if it was indeed running.
pub fn stop(&mut self, plugin: &str) -> bool {
let result = self.running_plugins.remove(plugin).is_some();
self.plugin_starters
.retain(|p| p.name() != plugin || !p.deletable());
result
pub fn stop(&mut self, name: &str) -> ZResult<bool> {
Ok(self.get_plugin_instance_mut_err(name)?.stop())
}

/// Lists the loaded plugins by name.
pub fn loaded_plugins(&self) -> impl Iterator<Item = &str> {
self.plugin_starters.iter().map(|p| p.name())
}
/// Retuns a map containing each running plugin's load-path, associated to its name.
pub fn running_plugins_info(&self) -> HashMap<&str, &str> {
let mut result = HashMap::with_capacity(self.running_plugins.len());
for p in self.plugin_starters.iter() {
let name = p.name();
if self.running_plugins.contains_key(name) && !result.contains_key(name) {
result.insert(name, p.path());
}
}
result
pub fn plugins(&self) -> impl Iterator<Item = (&dyn PluginInfo, Option<&RunningPlugin>)> {
self.plugins
.iter()
.map(|p| (p.as_plugin_info(), p.get_running_plugin()))
}
/// Returns an iterator over each running plugin, where the keys are their name, and the values are a tuple of their path and handle.
pub fn running_plugins(&self) -> impl Iterator<Item = (&str, (&str, &RunningPlugin))> {
self.running_plugins
pub fn running_plugins(&self) -> impl Iterator<Item = (&dyn PluginInfo, &RunningPlugin)> {
self.plugins
.iter()
.map(|(s, (path, p))| (s.as_str(), (path.as_str(), p)))
.filter_map(|p| p.get_running_plugin().map(|rp| (p.as_plugin_info(), rp)))
}
/// Returns the handle of the requested running plugin if available.
pub fn plugin(&self, name: &str) -> Option<&RunningPlugin> {
self.running_plugins.get(name).map(|p| &p.1)
self.plugins
.iter()
.find(|p| p.name() == name)
.and_then(|p| p.get_running_plugin())
}

fn load_plugin(
Expand All @@ -168,11 +187,11 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
&mut self,
name: T,
backend_name: T1,
) -> ZResult<String> {
) -> ZResult<&dyn PluginInfo> {
let name = name.as_ref();

// TODO: check if plugin is already loaded

if self.get_plugin_instance(name).is_some() {
bail!("Plugin `{}` already loaded", name);
}
let backend_name = backend_name.as_ref();
let (lib, p) = match &mut self.loader {
Some(l) => unsafe { l.search_and_load(&format!("{}{}", &self.default_lib_prefix, &backend_name))? },
Expand All @@ -182,25 +201,26 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
Ok(p) => p,
Err(e) => bail!("After loading `{:?}`: {}", &p, e),
};
let path = plugin.path().into();
self.plugin_starters.push(Box::new(plugin));
Ok(path)
self.plugins.push(PluginInstance::new(plugin));
Ok(self.plugins.last().unwrap().as_plugin_info())
}
/// Tries to load a plugin from the list of path to plugin (absolute or relative to the current working directory)
pub fn load_plugin_by_paths<T: AsRef<str>, P: AsRef<str> + std::fmt::Debug>(
&mut self,
name: T,
paths: &[P],
) -> ZResult<String> {
) -> ZResult<&dyn PluginInfo> {
let name = name.as_ref();
if self.get_plugin_instance(name).is_some() {
bail!("Plugin `{}` already loaded", name);
}
for path in paths {
let path = path.as_ref();
match unsafe { LibLoader::load_file(path) } {
Ok((lib, p)) => {
let plugin = Self::load_plugin(name, lib, p)?;
let path = plugin.path().into();
self.plugin_starters.push(Box::new(plugin));
return Ok(path);
self.plugins.push(PluginInstance::new(plugin));
return Ok(self.plugins.last().unwrap().as_plugin_info());
}
Err(e) => log::warn!("Plugin '{}' load fail at {}: {}", &name, path, e),
}
Expand All @@ -209,13 +229,17 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
}
}

trait PluginStarter<StartArgs, RunningPlugin> {
pub trait PluginInfo {
fn name(&self) -> &str;
fn path(&self) -> &str;
fn start(&self, args: &StartArgs) -> ZResult<RunningPlugin>;
fn deletable(&self) -> bool;
}

trait PluginStarter<StartArgs, RunningPlugin>: PluginInfo {
fn start(&self, args: &StartArgs) -> ZResult<RunningPlugin>;
fn as_plugin_info(&self) -> &dyn PluginInfo;
}

struct StaticPlugin<P> {
inner: std::marker::PhantomData<P>,
}
Expand All @@ -228,7 +252,7 @@ impl<P> StaticPlugin<P> {
}
}

impl<StartArgs, RunningPlugin, P> PluginStarter<StartArgs, RunningPlugin> for StaticPlugin<P>
impl<StartArgs, RunningPlugin, P> PluginInfo for StaticPlugin<P>
where
P: Plugin<StartArgs = StartArgs, RunningPlugin = RunningPlugin>,
{
Expand All @@ -238,28 +262,45 @@ where
fn path(&self) -> &str {
"<statically_linked>"
}
fn deletable(&self) -> bool {
false
}
}

impl<StartArgs, RunningPlugin, P> PluginStarter<StartArgs, RunningPlugin> for StaticPlugin<P>
where
P: Plugin<StartArgs = StartArgs, RunningPlugin = RunningPlugin>,
{
fn start(&self, args: &StartArgs) -> ZResult<RunningPlugin> {
P::start(P::STATIC_NAME, args)
}
fn deletable(&self) -> bool {
false
fn as_plugin_info(&self) -> &dyn PluginInfo {
self
}
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
PluginStarter<StartArgs, RunningPlugin> for DynamicPlugin<StartArgs, RunningPlugin>
impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> PluginInfo
for DynamicPlugin<StartArgs, RunningPlugin>
{
fn name(&self) -> &str {
&self.name
}
fn path(&self) -> &str {
self.path.to_str().unwrap()
}
fn deletable(&self) -> bool {
true
}
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
PluginStarter<StartArgs, RunningPlugin> for DynamicPlugin<StartArgs, RunningPlugin>
{
fn start(&self, args: &StartArgs) -> ZResult<RunningPlugin> {
(self.vtable.start)(self.name(), args)
}
fn deletable(&self) -> bool {
true
fn as_plugin_info(&self) -> &dyn PluginInfo {
self
}
}

Expand Down

0 comments on commit 41f5216

Please sign in to comment.