Skip to content

Commit

Permalink
plugin manager api update
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 25, 2023
1 parent 41f5216 commit 84498a4
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 104 deletions.
116 changes: 59 additions & 57 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
false
}
}
fn start(&mut self, args: &StartArgs) -> ZResult<(bool, &RunningPlugin)> {
fn start(&mut self, args: &StartArgs) -> ZResult<bool> {
if self.running_plugin.is_some() {
return Ok((false, self.running_plugin.as_ref().unwrap()));
return Ok(false);
}
let plugin = self.starter.start(args)?;
self.running_plugin = Some(plugin);
Ok((true, self.running_plugin.as_ref().unwrap()))
Ok(true)
}
fn as_plugin_info(&self) -> &dyn PluginInfo {
self
Expand All @@ -78,6 +78,13 @@ pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: Compat
plugins: Vec<PluginInstance<StartArgs, RunningPlugin>>,
}

/// Unique identifier of plugin in plugin manager. Actually is just an index in the plugins list.
/// It's guaranteed that if intex is obtained from plugin manager, it's valid for this plugin manager.
/// (at least at this moment, when there is no pluing unload support).
/// Using it instead of plugin name allows to avoid checking for ``Option`` every time.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PluginIndex(usize);

impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion>
PluginsManager<StartArgs, RunningPlugin>
{
Expand Down Expand Up @@ -109,68 +116,63 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
self
}

fn get_plugin_instance_mut(
&mut self,
name: &str,
) -> Option<&mut PluginInstance<StartArgs, RunningPlugin>> {
self.plugins.iter_mut().find(|p| p.name() == name)
}

fn get_plugin_instance(
&mut self,
name: &str,
) -> Option<&PluginInstance<StartArgs, RunningPlugin>> {
self.plugins.iter().find(|p| p.name() == name)
/// Returns `true` if the plugin with the given index exists in the manager.
pub fn check_plugin_index(&self, index: PluginIndex) -> bool {
index.0 < self.plugins.len()
}

fn get_plugin_instance_mut_err(
&mut self,
name: &str,
) -> ZResult<&mut PluginInstance<StartArgs, RunningPlugin>> {
self.get_plugin_instance_mut(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
/// Returns plugin index by name
pub fn get_plugin_index(&self, name: &str) -> Option<PluginIndex> {
self.plugins
.iter()
.position(|p| p.name() == name)
.map(PluginIndex)
}

fn get_plugin_instance_err(
&mut self,
name: &str,
) -> ZResult<&PluginInstance<StartArgs, RunningPlugin>> {
self.get_plugin_instance(name)
/// Returns plugin index by name or error if plugin not found
pub fn get_plugin_index_err(&self, name: &str) -> ZResult<PluginIndex> {
self.get_plugin_index(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

/// Starts plugin named `name`
/// Starts plugin.
/// Returns
/// Ok((true, &RunningPluguin)) => plugin was successfully started
/// Ok((false, &RunningPlugin)) => plugin was already running
/// Ok(true) => plugin was successfully started
/// Ok(false) => plugin was already running
/// Err(e) => starting the plugin failed due to `e`
pub fn start(&mut self, name: &str, args: &StartArgs) -> ZResult<(bool, &RunningPlugin)> {
self.get_plugin_instance_mut_err(name)?.start(args)
pub fn start(&mut self, index: PluginIndex, args: &StartArgs) -> ZResult<bool> {
let instance = &mut self.plugins[index.0];
let already_started = instance.start(args)?;
Ok(already_started)
}

/// Stops `plugin`, returning `true` if it was indeed running.
pub fn stop(&mut self, name: &str) -> ZResult<bool> {
Ok(self.get_plugin_instance_mut_err(name)?.stop())
}

/// Lists the loaded plugins by name.
pub fn plugins(&self) -> impl Iterator<Item = (&dyn PluginInfo, Option<&RunningPlugin>)> {
self.plugins
.iter()
.map(|p| (p.as_plugin_info(), p.get_running_plugin()))
pub fn stop(&mut self, index: PluginIndex) -> ZResult<bool> {
let instance = &mut self.plugins[index.0];
let was_running = instance.stop();
Ok(was_running)
}
/// Lists the loaded plugins
pub fn plugins(&self) -> impl Iterator<Item = PluginIndex> + '_ {
self.plugins.iter().enumerate().map(|(i, _)| PluginIndex(i))
}
/// Lists the loaded plugins
pub fn running_plugins(&self) -> impl Iterator<Item = PluginIndex> + '_ {
self.plugins.iter().enumerate().filter_map(|(i, p)| {
if p.get_running_plugin().is_some() {
Some(PluginIndex(i))
} else {
None
}
})
}
/// Returns an iterator over each running plugin, where the keys are their name, and the values are a tuple of their path and handle.
pub fn running_plugins(&self) -> impl Iterator<Item = (&dyn PluginInfo, &RunningPlugin)> {
self.plugins
.iter()
.filter_map(|p| p.get_running_plugin().map(|rp| (p.as_plugin_info(), rp)))
/// Returns running plugin interface of plugin or none if plugin is stopped
pub fn running_plugin(&self, index: PluginIndex) -> Option<&RunningPlugin> {
self.plugins[index.0].get_running_plugin()
}
/// Returns the handle of the requested running plugin if available.
pub fn plugin(&self, name: &str) -> Option<&RunningPlugin> {
self.plugins
.iter()
.find(|p| p.name() == name)
.and_then(|p| p.get_running_plugin())
/// Returns plugin information
pub fn plugin(&self, index: PluginIndex) -> &dyn PluginInfo {
self.plugins[index.0].as_plugin_info()
}

fn load_plugin(
Expand All @@ -187,9 +189,9 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
&mut self,
name: T,
backend_name: T1,
) -> ZResult<&dyn PluginInfo> {
) -> ZResult<PluginIndex> {
let name = name.as_ref();
if self.get_plugin_instance(name).is_some() {
if self.get_plugin_index(name).is_some() {
bail!("Plugin `{}` already loaded", name);
}
let backend_name = backend_name.as_ref();
Expand All @@ -202,16 +204,16 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
Err(e) => bail!("After loading `{:?}`: {}", &p, e),
};
self.plugins.push(PluginInstance::new(plugin));
Ok(self.plugins.last().unwrap().as_plugin_info())
Ok(PluginIndex(self.plugins.len() - 1))
}
/// Tries to load a plugin from the list of path to plugin (absolute or relative to the current working directory)
pub fn load_plugin_by_paths<T: AsRef<str>, P: AsRef<str> + std::fmt::Debug>(
&mut self,
name: T,
paths: &[P],
) -> ZResult<&dyn PluginInfo> {
) -> ZResult<PluginIndex> {
let name = name.as_ref();
if self.get_plugin_instance(name).is_some() {
if self.get_plugin_index(name).is_some() {
bail!("Plugin `{}` already loaded", name);
}
for path in paths {
Expand All @@ -220,7 +222,7 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
Ok((lib, p)) => {
let plugin = Self::load_plugin(name, lib, p)?;
self.plugins.push(PluginInstance::new(plugin));
return Ok(self.plugins.last().unwrap().as_plugin_info());
return Ok(PluginIndex(self.plugins.len() - 1));
}
Err(e) => log::warn!("Plugin '{}' load fail at {}: {}", &name, path, e),
}
Expand Down
102 changes: 55 additions & 47 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ impl ConfigValidator for AdminSpace {
new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
let plugins_mgr = zlock!(self.context.plugins_mgr);
if let Some(plugin) = plugins_mgr.plugin(name) {
plugin.config_checker(path, current, new)
if let Some(plugin) = plugins_mgr.get_plugin_index(name) {
if let Some(plugin) = plugins_mgr.running_plugin(plugin) {
plugin.config_checker(path, current, new)
} else {
Err(format!("Plugin {name} not running").into())
}
} else {
Err(format!("Plugin {name} not found").into())
}
Expand Down Expand Up @@ -125,9 +129,11 @@ impl AdminSpace {
);

let mut active_plugins = plugins_mgr
.running_plugins_info()
.into_iter()
.map(|(a, b)| (a.to_string(), b.to_string()))
.running_plugins()
.map(|index| {
let info = plugins_mgr.plugin(index);
(info.name().to_string(), info.path().to_string())
})
.collect::<HashMap<_, _>>();

let context = Arc::new(AdminContext {
Expand Down Expand Up @@ -190,46 +196,43 @@ impl AdminSpace {
let mut plugins_mgr = zlock!(admin.context.plugins_mgr);
for diff in diffs {
match diff {
PluginDiff::Delete(plugin) => {
active_plugins.remove(plugin.as_str());
plugins_mgr.stop(&plugin);
PluginDiff::Delete(name) => {
active_plugins.remove(name.as_str());
if let Some(index) = plugins_mgr.get_plugin_index(&name) {
let _ = plugins_mgr.stop(index);
}
}
PluginDiff::Start(plugin) => {
let load =
match &plugin.paths {
Some(paths) => plugins_mgr
.load_plugin_by_paths(plugin.name.clone(), paths),
None => plugins_mgr
.load_plugin_by_backend_name(&plugin.name, &plugin.name),
};
match load {
Err(e) => {
if plugin.required {
panic!("Failed to load plugin `{}`: {}", plugin.name, e)
} else {
log::error!(
"Failed to load plugin `{}`: {}",
plugin.name,
e
)
}
let name = &plugin.name;
if let Ok(index) = match &plugin.paths {
Some(paths) => plugins_mgr.load_plugin_by_paths(name, paths),
None => {
plugins_mgr.load_plugin_by_backend_name(name, &plugin.name)
}
Ok(path) => {
let name = &plugin.name;
log::info!("Loaded plugin `{}` from {}", name, &path);
match plugins_mgr.start(name, &admin.context.runtime) {
Ok(Some((path, _))) => {
active_plugins.insert(name.into(), path.into());
log::info!(
"Successfully started plugin `{}` from {}",
name,
path
);
}
Ok(None) => {
log::warn!("Plugin `{}` was already running", name)
}
Err(e) => log::error!("{}", e),
}
.map_err(|e| {
if plugin.required {
panic!("Failed to load plugin `{}`: {}", name, e)
} else {
log::error!("Failed to load plugin `{}`: {}", name, e)
}
}) {
let path = plugins_mgr.plugin(index).path().to_string();
log::info!("Loaded plugin `{}` from {}", name, path);
match plugins_mgr.start(index, &admin.context.runtime) {
Ok(true) => {
active_plugins.insert(name.into(), path.clone());
log::info!(
"Successfully started plugin `{}` from {}",
name,
path
);
}
Ok(false) => {
log::warn!("Plugin `{}` was already running", name)
}
Err(e) => {
log::error!("Failed to start plugin `{}`: {}", name, e)
}
}
}
Expand Down Expand Up @@ -430,10 +433,11 @@ fn router_data(context: &AdminContext, query: Query) {

// plugins info
let plugins: serde_json::Value = {
zlock!(context.plugins_mgr)
.running_plugins_info()
.into_iter()
.map(|(k, v)| (k, json!({ "path": v })))
let plugins_mgr = zlock!(context.plugins_mgr);
plugins_mgr
.running_plugins()
.map(|index| plugins_mgr.plugin(index))
.map(|info| (info.name(), json!({ "path": info.path() })))
.collect()
};

Expand Down Expand Up @@ -637,7 +641,11 @@ fn plugins_status(context: &AdminContext, query: Query) {
let guard = zlock!(context.plugins_mgr);
let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str);

for (name, (path, plugin)) in guard.running_plugins() {
for index in guard.running_plugins() {
let info = guard.plugin(index);
let name = info.name();
let path = info.path();
let plugin = guard.running_plugin(index).unwrap();
with_extended_string(&mut root_key, &[name], |plugin_key| {
with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| {
if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) {
Expand Down

0 comments on commit 84498a4

Please sign in to comment.