Skip to content

Commit

Permalink
docs(runtime): improve overall documentation (#219)
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet authored Mar 14, 2024
1 parent c74c3de commit 42d67d0
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 47 deletions.
75 changes: 73 additions & 2 deletions zenoh-flow-runtime/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,64 @@
//

use crate::runners::Runner;
use serde::{Deserialize, Serialize};

use std::{collections::HashMap, fmt::Display, ops::Deref};

use serde::{Deserialize, Serialize};
use uhlc::{Timestamp, HLC};
use zenoh_flow_commons::{NodeId, Result, RuntimeId};
use zenoh_flow_records::DataFlowRecord;

/// A `DataFlowInstance` keeps track of the parts of a data flow managed by the Zenoh-Flow runtime.
///
/// A `DataFlowInstance` structure is thus *local* to a Zenoh-Flow runtime. For a data flow that spawns on multiple
/// runtimes, there will be one such structure at each runtime.
///
/// All instances will share the same [record](DataFlowRecord) but their internal state will differ.
pub struct DataFlowInstance {
pub(crate) state: InstanceState,
pub(crate) record: DataFlowRecord,
pub(crate) runners: HashMap<NodeId, Runner>,
}

/// The different states of a [DataFlowInstance].
///
/// Note that *a state is tied to a Zenoh-Flow [runtime]*: if a data flow is distributed across multiple Zenoh-Flow
/// runtimes, their respective state for the same instance could be different (but should eventually converge).
///
/// [runtime]: crate::Runtime
#[derive(Clone, Deserialize, Serialize, Debug)]
pub enum InstanceState {
/// A [runtime] listing a [DataFlowInstance] in the `Creating` state is in the process of loading all the nodes it
/// manages.
///
/// [runtime]: crate::Runtime
Creating(Timestamp),
/// A [runtime] listing a [DataFlowInstance] in the `Loaded` state successfully instantiated all the nodes it manages
/// and is ready to start them.
///
/// A `Loaded` data flow can be started or deleted.
///
/// [runtime]: crate::Runtime
Loaded(Timestamp),
/// A [runtime] listing a [DataFlowInstance] in the `Running` state has (re)started all the nodes it manages.
///
/// A `Running` data flow can be aborted or deleted.
///
/// [runtime]: crate::Runtime
Running(Timestamp),
/// A [runtime] listing a [DataFlowInstance] in the `Aborted` state has abruptly stopped all the nodes it manages.
///
/// An `Aborted` data flow can be restarted or deleted.
///
/// [runtime]: crate::Runtime
Aborted(Timestamp),
/// A [runtime] listing a [DataFlowInstance] in the `Failed` state failed to load at least one of the nodes of this
/// instance it manages.
///
/// A data flow in the `Failed` state can only be deleted.
///
/// [runtime]: crate::Runtime
Failed((Timestamp, String)),
}

Expand All @@ -48,10 +88,22 @@ impl Display for InstanceState {
}
}

/// The `InstanceStatus` provides information about the data flow instance.
///
/// It details:
/// - from which runtime this information comes from (through its [identifier](RuntimeId)),
/// - the [state](InstanceState) of the data flow instance,
/// - the list of nodes (through their [identifier](NodeId)) the runtime manages --- and thus for which the state
/// applies.
///
/// This information is what is displayed by the `zfctl` tool when requesting the status of a data flow instance.
#[derive(Deserialize, Serialize, Debug)]
pub struct InstanceStatus {
/// The identifier of the [runtime](crate::Runtime) this information comes from.
pub runtime_id: RuntimeId,
/// The state of the data flow instance --- on this runtime.
pub state: InstanceState,
/// The nodes managed by this runtime, for which the state applies.
pub nodes: Vec<NodeId>,
}

Expand All @@ -64,14 +116,25 @@ impl Deref for DataFlowInstance {
}

impl DataFlowInstance {
pub fn new(record: DataFlowRecord, hlc: &HLC) -> Self {
/// Creates a new `DataFlowInstance`, setting its state to [Creating](InstanceState::Creating).
pub(crate) fn new(record: DataFlowRecord, hlc: &HLC) -> Self {
Self {
state: InstanceState::Creating(hlc.new_timestamp()),
record,
runners: HashMap::default(),
}
}

/// (re-)Starts the `DataFlowInstance`.
///
/// The [hlc](HLC) is required to keep track of when this call was made.
///
/// # Errors
///
/// This method can fail when attempting to re-start: when re-starting a data flow, the method
/// [on_resume] is called for each node and is faillible.
///
/// [on_resume]: zenoh_flow_nodes::prelude::Node::on_resume()
pub async fn start(&mut self, hlc: &HLC) -> Result<()> {
for (node_id, runner) in self.runners.iter_mut() {
runner.start().await?;
Expand All @@ -82,6 +145,9 @@ impl DataFlowInstance {
Ok(())
}

/// Aborts the `DataFlowInstance`.
///
/// The [hlc](HLC) is required to keep track of when this call was made.
pub async fn abort(&mut self, hlc: &HLC) {
for (node_id, runner) in self.runners.iter_mut() {
runner.abort().await;
Expand All @@ -91,10 +157,15 @@ impl DataFlowInstance {
self.state = InstanceState::Aborted(hlc.new_timestamp());
}

/// Returns the [state](InstanceState) of this `DataFlowInstance`.
pub fn state(&self) -> &InstanceState {
&self.state
}

/// Returns the [status](InstanceStatus) of this `DataFlowInstance`.
///
/// This structure was intended as a way to retrieve and display information about the instance. This is what the
/// `zfctl` tool leverages for its `instance status` command.
pub fn status(&self, runtime_id: &RuntimeId) -> InstanceStatus {
InstanceStatus {
runtime_id: runtime_id.clone(),
Expand Down
11 changes: 11 additions & 0 deletions zenoh-flow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
// ZettaScale Zenoh Team, <[email protected]>
//

//! This crate exposes the structures driving the execution of a data flow: the [Runtime] and the [DataFlowInstance].
//!
//! If the feature `zenoh` is enabled (it is by default), this crate additionally re-exports the structures from
//! [Zenoh](zenoh) that allow opening a [Session](zenoh::Session) *asynchronously*.
//!
//! Users interested in exposing a Zenoh-Flow runtime should find everything in the [Runtime] and [RuntimeBuilder].
//!
//! Users interested in fetching the state of a data flow instance should look into the [DataFlowInstance],
//! [InstanceState] and [InstanceStatus] structures. These structures are leveraged by the `zfctl` command line tool.
mod instance;
pub use instance::{DataFlowInstance, InstanceState, InstanceStatus};

Expand All @@ -26,6 +36,7 @@ mod runners;
mod runtime;
pub use runtime::{DataFlowErr, Runtime, RuntimeBuilder};

/// A re-export of the Zenoh structures needed to open a [Session](zenoh::Session) asynchronously.
#[cfg(feature = "zenoh")]
pub mod zenoh {
pub use zenoh::config::{client, empty, peer};
Expand Down
117 changes: 115 additions & 2 deletions zenoh-flow-runtime/src/loader/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,28 @@ use serde::{Deserialize, Deserializer};
use zenoh_flow_commons::Result;
use zenoh_flow_nodes::{OperatorFn, SinkFn, SourceFn};

// Convenient shortcut.
/// A convenient wrapper for a set of [Extension].
///
/// The main purpose of this structure is to facilitate parsing.
///
/// # Example configuration
///
/// ```
/// # use zenoh_flow_runtime::Extensions;
/// # let yaml = r#"
/// - file_extension: py
/// libraries:
/// source: /home/zenoh-flow/extension/libpy_source.so
/// operator: /home/zenoh-flow/extension/libpy_operator.so
/// sink: /home/zenoh-flow/extension/libpy_sink.so
///
/// - file_extension: js
/// libraries:
/// source: /home/zenoh-flow/extension/libwasm_source.so
/// operator: /home/zenoh-flow/extension/libwasm_operator.so
/// sink: /home/zenoh-flow/extension/libwasm_sink.so
/// # "#;
/// # serde_yaml::from_str::<Extensions>(yaml).unwrap();
#[derive(Default, Debug, Clone, Deserialize, PartialEq, Eq)]
#[repr(transparent)]
pub struct Extensions(
Expand Down Expand Up @@ -79,7 +100,7 @@ impl Extensions {
/// This method will return an error if any of the library:
/// - does not expose the correct symbol (see these macros: [1], [2], [3]),
/// - was not compiled with the same Rust version,
/// - was not using the same Zenoh-Flow version as this Zenoh-Flow [runtime](crate::Runtime).
/// - was not using the same version of Zenoh-Flow as this [runtime](crate::Runtime).
///
/// [1]: zenoh_flow_nodes::prelude::export_source
/// [2]: zenoh_flow_nodes::prelude::export_operator
Expand All @@ -104,6 +125,38 @@ impl Extensions {
}
}

/// An `Extension` associates a file extension (e.g. `.py`) to a set of shared libraries.
///
/// This details how a Zenoh-Flow runtime should load nodes that have the [url](url::Url) of their implementation with
/// this extension.
///
/// Zenoh-Flow only supports node implementation in the form of [shared libraries]. To support additional implementation
/// --- for instance [Python scripts] --- a Zenoh-Flow runtime needs to be informed on (i) which shared libraries it
/// should load and (ii) how it should make these shared libraries "load" the node implementation.
///
/// To support an extension on a Zenoh-Flow runtime, one can either detail them in the configuration file of the runtime
/// or through the dedicated [method](crate::RuntimeBuilder::add_extension()).
///
/// # Example configuration
///
/// (Yaml)
///
/// ```
/// # use zenoh_flow_runtime::Extension;
/// # let yaml = r#"
/// file_extension: py
/// libraries:
/// source: /home/zenoh-flow/libpy_source.so
/// operator: /home/zenoh-flow/libpy_operator.so
/// sink: /home/zenoh-flow/libpy_sink.so
/// # "#;
/// # serde_yaml::from_str::<Extension>(yaml).unwrap();
/// ```
///
/// [shared libraries]: std::env::consts::DLL_EXTENSION
/// [Python scripts]: https://github.com/eclipse-zenoh/zenoh-flow-python
// NOTE: We separate the libraries in its own dedicated structure to have that same textual representation (YAML/JSON).
// There is no real need to do so.
#[derive(Debug, Clone, Deserialize, Hash, PartialEq, Eq)]
pub struct Extension {
pub(crate) file_extension: Arc<str>,
Expand All @@ -112,22 +165,82 @@ pub struct Extension {

impl Extension {
/// Returns the file extension associated with this extension.
///
/// # Example
///
/// ```
/// # use zenoh_flow_runtime::Extension;
/// # let yaml = r#"
/// # file_extension: py
/// # libraries:
/// # source: /home/zenoh-flow/libpy_source.so
/// # operator: /home/zenoh-flow/libpy_operator.so
/// # sink: /home/zenoh-flow/libpy_sink.so
/// # "#;
/// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
/// assert_eq!(extension.file_extension(), "py");
/// ```
pub fn file_extension(&self) -> &str {
&self.file_extension
}

/// Returns the [path](PathBuf) of the shared library responsible for loading Source nodes for this file extension.
///
/// # Example
///
/// ```
/// # use zenoh_flow_runtime::Extension;
/// # let yaml = r#"
/// # file_extension: py
/// # libraries:
/// # source: /home/zenoh-flow/libpy_source.so
/// # operator: /home/zenoh-flow/libpy_operator.so
/// # sink: /home/zenoh-flow/libpy_sink.so
/// # "#;
/// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
/// assert_eq!(extension.source().to_str(), Some("/home/zenoh-flow/libpy_source.so"));
/// ```
pub fn source(&self) -> &PathBuf {
&self.libraries.source
}

/// Returns the [path](PathBuf) of the shared library responsible for loading Operator nodes for this file
/// extension.
///
/// # Example
///
/// ```
/// # use zenoh_flow_runtime::Extension;
/// # let yaml = r#"
/// # file_extension: py
/// # libraries:
/// # source: /home/zenoh-flow/libpy_source.so
/// # operator: /home/zenoh-flow/libpy_operator.so
/// # sink: /home/zenoh-flow/libpy_sink.so
/// # "#;
/// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
/// assert_eq!(extension.operator().to_str(), Some("/home/zenoh-flow/libpy_operator.so"));
/// ```
pub fn operator(&self) -> &PathBuf {
&self.libraries.operator
}

/// Returns the [path](PathBuf) of the shared library responsible for loading Sink nodes for this file extension.
///
/// # Example
///
/// ```
/// # use zenoh_flow_runtime::Extension;
/// # let yaml = r#"
/// # file_extension: py
/// # libraries:
/// # source: /home/zenoh-flow/libpy_source.so
/// # operator: /home/zenoh-flow/libpy_operator.so
/// # sink: /home/zenoh-flow/libpy_sink.so
/// # "#;
/// # let extension = serde_yaml::from_str::<Extension>(yaml).unwrap();
/// assert_eq!(extension.sink().to_str(), Some("/home/zenoh-flow/libpy_sink.so"));
/// ```
pub fn sink(&self) -> &PathBuf {
&self.libraries.sink
}
Expand Down
Loading

0 comments on commit 42d67d0

Please sign in to comment.