Skip to content

Commit

Permalink
compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Nov 8, 2023
1 parent 4e63949 commit 2ccb9dd
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 144 deletions.
23 changes: 14 additions & 9 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,23 @@ impl StorageRuntimeInner {
.map(|s| async move { s.send(StorageMessage::Stop) }),
));
}
self.plugins_manager.running_plugin_mut(name)?.stop();
self.plugins_manager.started_plugin_mut(name).ok_or(format!(
"Cannot find volume {} to stop it",
name
))?.stop();
Ok(())
}
fn spawn_volume(&mut self, config: VolumeConfig) -> ZResult<()> {
let volume_id = config.name();
let backend_name = config.backend();
if let Some(paths) = config.paths() {
self.plugins_manager
.load_plugin_by_paths(volume_id, paths)?;
let declared = if let Some(paths) = config.paths() {
self.plugins_manager.add_dynamic_plugin_by_paths(volume_id, paths)?
} else {
self.plugins_manager
.load_plugin_by_backend_name(volume_id, backend_name)?;
}
self.plugins_manager.plugin_mut(volume_id)?.run(&config)?;
.add_dynamic_plugin_by_name(volume_id, backend_name)?
};
let loaded = declared.load()?;
loaded.start(&config)?;
Ok(())
}
fn kill_storage(&mut self, config: StorageConfig) {
Expand All @@ -188,7 +191,9 @@ impl StorageRuntimeInner {
fn spawn_storage(&mut self, storage: StorageConfig) -> ZResult<()> {
let admin_key = self.status_key() + "/storages/" + &storage.name;
let volume_id = storage.volume_id.clone();
let backend = self.plugins_manager.running_plugin(&volume_id)?;
let backend = self.plugins_manager.started_plugin(&volume_id).ok_or(
format!("Cannot find volume {} to spawn storage {}", volume_id, storage.name),
)?;
let storage_name = storage.name.clone();
let in_interceptor = backend.instance().incoming_data_interceptor();
let out_interceptor = backend.instance().outgoing_data_interceptor();
Expand Down Expand Up @@ -251,7 +256,7 @@ impl RunningPluginTrait for StorageRuntime {
});
let guard = self.0.lock().unwrap();
with_extended_string(&mut key, &["/volumes/"], |key| {
for plugin in guard.plugins_manager.running_plugins() {
for plugin in guard.plugins_manager.started_plugins() {
with_extended_string(key, &[plugin.name()], |key| {
with_extended_string(key, &["/__path__"], |key| {
if keyexpr::new(key.as_str())
Expand Down
106 changes: 49 additions & 57 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use zenoh_util::LibLoader;
pub enum PluginState {
Declared,
Loaded,
Running,
Started,
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -92,12 +92,12 @@ pub trait DeclaredPlugin<StartArgs: CompatibilityVersion, Instance: Compatibilit
pub trait LoadedPlugin<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>:
PluginInfo
{
fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin<StartArgs, Instance>>;
fn running(&self) -> Option<&dyn RunningPlugin<StartArgs, Instance>>;
fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin<StartArgs, Instance>>;
fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin<StartArgs, Instance>>;
fn started(&self) -> Option<&dyn StartedPlugin<StartArgs, Instance>>;
fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin<StartArgs, Instance>>;
}

pub trait RunningPlugin<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion> {
pub trait StartedPlugin<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion> : PluginInfo {
fn stop(&mut self);
fn instance(&self) -> &Instance;
fn instance_mut(&mut self) -> &mut Instance;
Expand Down Expand Up @@ -140,7 +140,7 @@ where
state: self
.instance
.as_ref()
.map_or(PluginState::Loaded, |_| PluginState::Running),
.map_or(PluginState::Loaded, |_| PluginState::Started),
condition: PluginCondition::new(), // TODO: request runnnig plugin status
}
}
Expand All @@ -167,20 +167,20 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion, P>
where
P: Plugin<StartArgs = StartArgs, Instance = Instance>,
{
fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin<StartArgs, Instance>> {
fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_none() {
self.instance = Some(P::start(self.name(), args)?);
}
Ok(self)
}
fn running(&self) -> Option<&dyn RunningPlugin<StartArgs, Instance>> {
fn started(&self) -> Option<&dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_some() {
Some(self)
} else {
None
}
}
fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin<StartArgs, Instance>> {
fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_some() {
Some(self)
} else {
Expand All @@ -190,7 +190,7 @@ where
}

impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion, P>
RunningPlugin<StartArgs, Instance> for StaticPlugin<StartArgs, Instance, P>
StartedPlugin<StartArgs, Instance> for StaticPlugin<StartArgs, Instance, P>
where
P: Plugin<StartArgs = StartArgs, Instance = Instance>,
{
Expand Down Expand Up @@ -323,14 +323,14 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion> PluginInfo
PluginStatus {
state: if self.starter.is_some() {
if self.instance.is_some() {
PluginState::Running
PluginState::Started
} else {
PluginState::Loaded
}
} else {
PluginState::Declared
},
condition: self.condition.clone(), // TODO: request condition from running plugin
condition: self.condition.clone(), // TODO: request condition from started plugin
}
}
}
Expand Down Expand Up @@ -365,29 +365,29 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
LoadedPlugin<StartArgs, Instance> for DynamicPlugin<StartArgs, Instance>
{
fn run(&mut self, args: &StartArgs) -> ZResult<&mut dyn RunningPlugin<StartArgs, Instance>> {
fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin<StartArgs, Instance>> {
let starter = self
.starter
.as_ref()
.ok_or_else(|| format!("Plugin `{}` not loaded", self.name))
.add_error(&mut self.condition)?;
let already_running = self.instance.is_some();
if !already_running {
let already_started = self.instance.is_some();
if !already_started {
let instance = starter
.start(self.name(), args)
.add_error(&mut self.condition)?;
self.instance = Some(instance);
}
Ok(self)
}
fn running(&self) -> Option<&dyn RunningPlugin<StartArgs, Instance>> {
fn started(&self) -> Option<&dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_some() {
Some(self)
} else {
None
}
}
fn running_mut(&mut self) -> Option<&mut dyn RunningPlugin<StartArgs, Instance>> {
fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin<StartArgs, Instance>> {
if self.instance.is_some() {
Some(self)
} else {
Expand All @@ -397,7 +397,7 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
}

impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
RunningPlugin<StartArgs, Instance> for DynamicPlugin<StartArgs, Instance>
StartedPlugin<StartArgs, Instance> for DynamicPlugin<StartArgs, Instance>
{
fn stop(&mut self) {
self.instance = None;
Expand All @@ -411,13 +411,13 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
}

struct PluginRecord<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>(
Box<dyn DeclaredPlugin<StartArgs, Instance>>,
Box<dyn DeclaredPlugin<StartArgs, Instance> + Send>,
);

impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>
PluginRecord<StartArgs, Instance>
{
fn new<P: DeclaredPlugin<StartArgs, Instance> + 'static>(plugin: P) -> Self {
fn new<P: DeclaredPlugin<StartArgs, Instance> + Send + 'static>(plugin: P) -> Self {
Self(Box::new(plugin))
}
}
Expand Down Expand Up @@ -452,13 +452,13 @@ impl<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion>

/// A plugins manager that handles starting and stopping plugins.
/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`].
pub struct PluginsManager<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion> {
pub struct PluginsManager<StartArgs: CompatibilityVersion, Instance: CompatibilityVersion + Send> {
default_lib_prefix: String,
loader: Option<LibLoader>,
plugins: Vec<PluginRecord<StartArgs, Instance>>,
}

impl<StartArgs: 'static + CompatibilityVersion, Instance: 'static + CompatibilityVersion>
impl<StartArgs: 'static + CompatibilityVersion, Instance: 'static + CompatibilityVersion + Send>
PluginsManager<StartArgs, Instance>
{
/// Constructs a new plugin manager with dynamic library loading enabled.
Expand Down Expand Up @@ -526,11 +526,6 @@ impl<StartArgs: 'static + CompatibilityVersion, Instance: 'static + Compatibilit
self.plugins.iter().position(|p| p.name() == name)
}

fn get_plugin_index_err(&self, name: &str) -> ZResult<usize> {
self.get_plugin_index(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

/// Lists all plugins
pub fn plugins(&self) -> impl Iterator<Item = &dyn DeclaredPlugin<StartArgs, Instance>> + '_ {
self.plugins
Expand Down Expand Up @@ -562,69 +557,66 @@ impl<StartArgs: 'static + CompatibilityVersion, Instance: 'static + Compatibilit
self.plugins_mut().filter_map(|p| p.loaded_mut())
}

/// Lists the running plugins
pub fn running_plugins(
/// Lists the started plugins
pub fn started_plugins(
&self,
) -> impl Iterator<Item = &dyn RunningPlugin<StartArgs, Instance>> + '_ {
self.loaded_plugins().filter_map(|p| p.running())
) -> impl Iterator<Item = &dyn StartedPlugin<StartArgs, Instance>> + '_ {
self.loaded_plugins().filter_map(|p| p.started())
}

/// Lists the running plugins mutable
pub fn running_plugins_mut(
/// Lists the started plugins mutable
pub fn started_plugins_mut(
&mut self,
) -> impl Iterator<Item = &mut dyn RunningPlugin<StartArgs, Instance>> + '_ {
self.loaded_plugins_mut().filter_map(|p| p.running_mut())
) -> impl Iterator<Item = &mut dyn StartedPlugin<StartArgs, Instance>> + '_ {
self.loaded_plugins_mut().filter_map(|p| p.started_mut())
}

/// Returns single plugin record
pub fn plugin(&self, name: &str) -> ZResult<&dyn DeclaredPlugin<StartArgs, Instance>> {
Ok(&self.plugins[self.get_plugin_index_err(name)?])
pub fn plugin(&self, name: &str) -> Option<&dyn DeclaredPlugin<StartArgs, Instance>> {
let index = self.get_plugin_index(name)?;
Some(&self.plugins[index])
}

/// Returns mutable plugin record
pub fn plugin_mut(
&mut self,
name: &str,
) -> ZResult<&mut dyn DeclaredPlugin<StartArgs, Instance>> {
let index = self.get_plugin_index_err(name)?;
Ok(&mut self.plugins[index])
) -> Option<&mut dyn DeclaredPlugin<StartArgs, Instance>> {
let index = self.get_plugin_index(name)?;
Some(&mut self.plugins[index])
}

/// Returns loaded plugin record
pub fn loaded_plugin(&self, name: &str) -> ZResult<&dyn LoadedPlugin<StartArgs, Instance>> {
Ok(self
pub fn loaded_plugin(&self, name: &str) -> Option<&dyn LoadedPlugin<StartArgs, Instance>> {
self
.plugin(name)?
.loaded()
.ok_or_else(|| format!("Plugin `{}` not loaded", name))?)
}

/// Returns mutable loaded plugin record
pub fn loaded_plugin_mut(
&mut self,
name: &str,
) -> ZResult<&mut dyn LoadedPlugin<StartArgs, Instance>> {
Ok(self
) -> Option<&mut dyn LoadedPlugin<StartArgs, Instance>> {
self
.plugin_mut(name)?
.loaded_mut()
.ok_or_else(|| format!("Plugin `{}` not loaded", name))?)
}

/// Returns running plugin record
pub fn running_plugin(&self, name: &str) -> ZResult<&dyn RunningPlugin<StartArgs, Instance>> {
Ok(self
/// Returns started plugin record
pub fn started_plugin(&self, name: &str) -> Option<&dyn StartedPlugin<StartArgs, Instance>> {
self
.loaded_plugin(name)?
.running()
.ok_or_else(|| format!("Plugin `{}` is not running", name))?)
.started()
}

/// Returns mutable running plugin record
pub fn running_plugin_mut(
/// Returns mutable started plugin record
pub fn started_plugin_mut(
&mut self,
name: &str,
) -> ZResult<&mut dyn RunningPlugin<StartArgs, Instance>> {
Ok(self
) -> Option<&mut dyn StartedPlugin<StartArgs, Instance>> {
self
.loaded_plugin_mut(name)?
.running_mut()
.ok_or_else(|| format!("Plugin `{}` is not running", name))?)
.started_mut()
}
}
Loading

0 comments on commit 2ccb9dd

Please sign in to comment.