Skip to content

Commit

Permalink
log prints, recurse info
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Nov 14, 2023
1 parent 1317544 commit d850216
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 14 deletions.
15 changes: 12 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StorageRuntimeInner {
let session = Arc::new(zenoh::init(runtime.clone()).res_sync().unwrap());

let plugins_manager = PluginsManager::dynamic(lib_loader.clone(), BACKEND_LIB_PREFIX)
.add_static_plugin::<MemoryBackend>();
.declare_static_plugin::<MemoryBackend>();

let mut new_self = StorageRuntimeInner {
name,
Expand Down Expand Up @@ -151,6 +151,7 @@ impl StorageRuntimeInner {
}
fn kill_volume<T: AsRef<str>>(&mut self, name: T) -> ZResult<()> {
let name = name.as_ref();
log::info!("Killing volume {}", name);
if let Some(storages) = self.storages.remove(name) {
async_std::task::block_on(futures::future::join_all(
storages
Expand All @@ -167,20 +168,22 @@ impl StorageRuntimeInner {
fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> {
let volume_id = config.name();
let backend_name = config.backend();
log::info!("Spawning volume {} with backend {}", volume_id, backend_name);
let declared = if let Some(declared) = self.plugins_manager.plugin_mut(volume_id) {
declared
} else if let Some(paths) = config.paths() {
self.plugins_manager.add_dynamic_plugin_by_paths(volume_id, paths)?
self.plugins_manager.declare_dynamic_plugin_by_paths(volume_id, paths)?
} else {
self.plugins_manager
.add_dynamic_plugin_by_name(volume_id, backend_name)?
.declare_dynamic_plugin_by_name(volume_id, backend_name)?
};
let loaded = declared.load()?;
loaded.start(&config)?;
Ok(())
}
fn kill_storage(&mut self, config: StorageConfig) {
let volume = &config.volume_id;
log::info!("Killing storage {} from volume {}", config.name, volume);
if let Some(storages) = self.storages.get_mut(volume) {
if let Some(storage) = storages.get_mut(&config.name) {
log::debug!(
Expand All @@ -200,6 +203,12 @@ impl StorageRuntimeInner {
format!("Cannot find volume {} to spawn storage {}", volume_id, storage.name),
)?;
let storage_name = storage.name.clone();
log::info!(
"Spawning storage {} from volume {} with backend {}",
storage_name,
volume_id,
backend.name()
);
let in_interceptor = backend.instance().incoming_data_interceptor();
let out_interceptor = backend.instance().outgoing_data_interceptor();
let stopper = async_std::task::block_on(create_and_start_storage(
Expand Down
15 changes: 10 additions & 5 deletions plugins/zenoh-plugin-trait/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,44 +107,48 @@ impl<StartArgs: PluginStartArgs + 'static, Instance: PluginInstance + 'static>
}

/// Adds a statically linked plugin to the manager.
pub fn add_static_plugin<
pub fn declare_static_plugin<
P: Plugin<StartArgs = StartArgs, Instance = Instance> + Send + Sync,
>(
mut self,
) -> Self {
let plugin_loader: StaticPlugin<StartArgs, Instance, P> = StaticPlugin::new();
self.plugins.push(PluginRecord::new(plugin_loader));
log::debug!("Declared static plugin {}", self.plugins.last().unwrap().name());
self
}

/// Add dynamic plugin to the manager by name, automatically prepending the default library prefix
pub fn add_dynamic_plugin_by_name<S: Into<String>>(
pub fn declare_dynamic_plugin_by_name<S: Into<String>>(
&mut self,
name: S,
plugin_name: &str,
) -> ZResult<&mut dyn DeclaredPlugin<StartArgs, Instance>> {
let name = name.into();
let plugin_name = format!("{}{}", self.default_lib_prefix, plugin_name);
let libloader = self
.loader
.as_ref()
.ok_or("Dynamic plugin loading is disabled")?
.clone();
log::debug!("Declared dynamic plugin {} by name {}", &name, &plugin_name);
let loader = DynamicPlugin::new(
name.into(),
name,
DynamicPluginSource::ByName((libloader, plugin_name)),
);
self.plugins.push(PluginRecord::new(loader));
Ok(self.plugins.last_mut().unwrap())
}

/// Add first available dynamic plugin from the list of paths to the plugin files
pub fn add_dynamic_plugin_by_paths<S: Into<String>, P: AsRef<str> + std::fmt::Debug>(
pub fn declare_dynamic_plugin_by_paths<S: Into<String>, P: AsRef<str> + std::fmt::Debug>(
&mut self,
name: S,
paths: &[P],
) -> ZResult<&mut dyn DeclaredPlugin<StartArgs, Instance>> {
let name = name.into();
let paths = paths.iter().map(|p| p.as_ref().into()).collect();
log::debug!("Declared dynamic plugin {} by paths {:?}", &name, &paths);
let loader = DynamicPlugin::new(name, DynamicPluginSource::ByPaths(paths));
self.plugins.push(PluginRecord::new(loader));
Ok(self.plugins.last_mut().unwrap())
Expand Down Expand Up @@ -247,6 +251,7 @@ impl<StartArgs: PluginStartArgs + 'static, Instance: PluginInstance + 'static> P
for PluginsManager<StartArgs, Instance>
{
fn plugins_status(&self, names: &keyexpr) -> Vec<(String, PluginStatus)> {
log::debug!("Plugin manager with prefix `{}` : requested plugins_status {:?}", self.default_lib_prefix , names);
let mut plugins = Vec::new();
for plugin in self.declared_plugins() {
let name = unsafe { keyexpr::from_str_unchecked(plugin.name()) };
Expand All @@ -257,7 +262,7 @@ impl<StartArgs: PluginStartArgs + 'static, Instance: PluginInstance + 'static> P
if let Some(plugin) = plugin.loaded() {
if let Some(plugin) = plugin.started() {
if let [names, ..] = names.strip_prefix(name)[..] {
plugins.append(&mut plugin.instance().plugins_status(names));
plugins.append(&mut plugin.instance().plugins_status(names).iter().map(|(n, s)| (format!("{}/{}", name, n), s.clone())).collect());
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ impl<StartArgs: PluginStartArgs, Instance: PluginInstance> DeclaredPlugin<StartA
if self.starter.is_none() {
let (lib, path) = self.source.load().add_error(&mut self.condition)?;
let starter = DynamicPluginStarter::new(lib, path).add_error(&mut self.condition)?;
log::debug!("Plugin {} loaded from {}", self.name, starter.path());
self.starter = Some(starter);
} else {
log::warn!("Plugin `{}` already loaded", self.name);
}
Ok(self)
}
Expand Down Expand Up @@ -189,7 +192,10 @@ impl<StartArgs: PluginStartArgs, Instance: PluginInstance> LoadedPlugin<StartArg
let instance = starter
.start(self.name(), args)
.add_error(&mut self.condition)?;
log::debug!("Plugin `{}` started", self.name);
self.instance = Some(instance);
} else {
log::warn!("Plugin `{}` already started", self.name);
}
Ok(self)
}
Expand All @@ -213,6 +219,7 @@ impl<StartArgs: PluginStartArgs, Instance: PluginInstance> StartedPlugin<StartAr
for DynamicPlugin<StartArgs, Instance>
{
fn stop(&mut self) {
log::debug!("Plugin `{}` stopped", self.name);
self.instance = None;
}
fn instance(&self) -> &Instance {
Expand Down
8 changes: 7 additions & 1 deletion plugins/zenoh-plugin-trait/src/manager/static_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ where
{
fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_none() {
log::debug!("Plugin `{}` started", self.name());
self.instance = Some(P::start(self.name(), args)?);
} else {
log::warn!("Plugin `{}` already started", self.name());
}
Ok(self)
}
Expand All @@ -104,7 +107,10 @@ impl<StartArgs, Instance, P> StartedPlugin<StartArgs, Instance>
where
P: Plugin<StartArgs = StartArgs, Instance = Instance>,
{
fn stop(&mut self) {}
fn stop(&mut self) {
log::debug!("Plugin `{}` stopped", self.name());
self.instance = None;
}
fn instance(&self) -> &Instance {
self.instance.as_ref().unwrap()
}
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ impl AdminSpace {
log::warn!("Plugin `{}` was already declared", declared.name());
declared
} else if let Some(paths) = &config.paths {
plugin_mgr.add_dynamic_plugin_by_paths(name, paths)?
plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)?
} else {
plugin_mgr.add_dynamic_plugin_by_name(name, name)?
plugin_mgr.declare_dynamic_plugin_by_name(name, name)?
};

let loaded = if let Some(loaded) = declared.loaded_mut() {
Expand Down
13 changes: 12 additions & 1 deletion zenoh/src/plugins/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ impl PluginStructVersion for RunningPlugin {
}
}

impl PluginControl for RunningPlugin {}
impl PluginControl for RunningPlugin {
fn condition(&self) -> PluginCondition {
self.as_ref().condition()
}

fn plugins_status(&self, names: &keyexpr) -> Vec<(String, PluginStatus)> {
self.as_ref().plugins_status(names)
}
}

impl PluginInstance for RunningPlugin {}

Expand Down Expand Up @@ -88,7 +96,10 @@ pub trait RunningPluginTrait: Send + Sync + PluginControl {
/// The zenoh plugins manager. It handles the full lifetime of plugins, from loading to destruction.
pub type PluginsManager = zenoh_plugin_trait::PluginsManager<StartArgs, RunningPlugin>;

use zenoh_plugin_trait::PluginCondition;
use zenoh_plugin_trait::PluginStatus;
pub use zenoh_plugin_trait::PluginStructVersion;
pub use zenoh_plugin_trait::Plugin;
use zenoh_plugin_trait::PluginControl;
use zenoh_plugin_trait::PluginInstance;
use zenoh_protocol::core::key_expr::keyexpr;
4 changes: 2 additions & 2 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ fn load_plugin(
log::warn!("Plugin `{}` was already declared", declared.name());
declared
} else if let Some(paths) = paths {
plugin_mgr.add_dynamic_plugin_by_paths(name, paths)?
plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)?
} else {
plugin_mgr.add_dynamic_plugin_by_name(name, name)?
plugin_mgr.declare_dynamic_plugin_by_name(name, name)?
};

if let Some(loaded) = declared.loaded_mut() {
Expand Down

0 comments on commit d850216

Please sign in to comment.