diff --git a/zenoh-flow-nodes/src/context.rs b/zenoh-flow-nodes/src/context.rs index 8bed9591..1c37b2fa 100644 --- a/zenoh-flow-nodes/src/context.rs +++ b/zenoh-flow-nodes/src/context.rs @@ -19,9 +19,9 @@ use zenoh_flow_commons::{InstanceId, RuntimeId}; /// The `Context` structure provides information about the data flow and the Zenoh-Flow runtime. /// /// In particular, it allows accessing: -/// - [name](Context::name()) of the data flow, -/// - [instance id](Context::instance_id()) of this instance of the data flow, -/// - [runtime id](Context::runtime_id()) of the Zenoh-Flow runtime managing the **node**. +/// - the [name](Context::name()) of the data flow, +/// - the [instance id](Context::instance_id()) of this instance of the data flow, +/// - the [runtime id](Context::runtime_id()) of the Zenoh-Flow runtime managing the **node**. #[derive(Clone, Debug)] pub struct Context { pub(crate) flow_name: Arc, @@ -30,6 +30,7 @@ pub struct Context { } impl Context { + /// Creates a new node `Context`. pub fn new(flow_name: Arc, instance_id: InstanceId, runtime_id: RuntimeId) -> Self { Self { flow_name, diff --git a/zenoh-flow-nodes/src/declaration.rs b/zenoh-flow-nodes/src/declaration.rs index 3f2308ce..674b0317 100644 --- a/zenoh-flow-nodes/src/declaration.rs +++ b/zenoh-flow-nodes/src/declaration.rs @@ -19,30 +19,48 @@ use std::{pin::Pin, sync::Arc}; use futures::Future; use zenoh_flow_commons::{Configuration, Result}; -/// Constant used to check if a node is compatible with the currently running Zenoh Flow daemon. -/// As nodes are dynamically loaded, this is to prevent (possibly cryptic) runtime error due to -/// incompatible API. +/// (⚙️️ *internal)* Constant used to check if a node is compatible with the Zenoh-Flow runtime managing it. +/// +/// As nodes are dynamically loaded, this is to prevent (possibly cryptic) runtime error due to incompatible API. +/// +/// This constant is used by the procedural macros [export_operator](crate::prelude::export_operator), +/// [export_source](crate::prelude::export_source) and [export_sink](crate::prelude::export_sink). A Zenoh-Flow runtime +/// will compare its value of this constant to the value that all node it will dynamically load expose. pub const CORE_VERSION: &str = env!("CARGO_PKG_VERSION"); -/// Constant used to check if a node was compiled with the same version of the Rust compiler than -/// the currently running Zenoh Flow daemon. + +/// (⚙️ *internal)* Constant used to check if a node was compiled with the same version of the Rust compiler than the +/// Zenoh-Flow runtime managing it. +/// /// As Rust is not ABI stable, this is to prevent (possibly cryptic) runtime errors. +/// +/// This constant is used by the procedural macros [export_operator](crate::prelude::export_operator), +/// [export_source](crate::prelude::export_source) and [export_sink](crate::prelude::export_sink). A Zenoh-Flow runtime +/// will compare its value of this constant to the value that all node it will dynamically load expose. pub const RUSTC_VERSION: &str = env!("RUSTC_VERSION"); -/// Declaration expected in the library that will be loaded. +/// (⚙️ *internal)* Declaration expected in the library that will be loaded. +/// +/// This structure is automatically created by the procedural macros +/// [export_operator](crate::prelude::export_operator), [export_source](crate::prelude::export_source) and +/// [export_sink](crate::prelude::export_sink). pub struct NodeDeclaration { pub rustc_version: &'static str, pub core_version: &'static str, pub constructor: C, } -/// `SourceFn` is the only signature we accept to construct a [`Source`](`crate::prelude::Source`). +/// (⚙️ *internal)* `SourceFn` is the only signature we accept to construct a [Source](crate::prelude::Source). +/// +/// This function is automatically created by the procedural macro [export_source](crate::prelude::export_source). pub type SourceFn = fn( Context, Configuration, Outputs, ) -> Pin>> + Send>>; -/// `OperatorFn` is the only signature we accept to construct an [`Operator`](`crate::prelude::Operator`). +/// (⚙️ *internal)* `OperatorFn` is the only signature we accept to construct an [Operator](crate::prelude::Operator). +/// +/// This function is automatically created by the procedural macro [export_operator](crate::prelude::export_operator). pub type OperatorFn = fn( Context, Configuration, @@ -50,7 +68,9 @@ pub type OperatorFn = fn( Outputs, ) -> Pin>> + Send>>; -/// `SinkFn` is the only signature we accept to construct a [`Sink`](`crate::prelude::Sink`). +/// (⚙️ *internal)* `SinkFn` is the only signature we accept to construct a [Sink](crate::prelude::Sink). +/// +/// This function is automatically created by the procedural macro [export_sink](crate::prelude::export_sink). pub type SinkFn = fn( Context, Configuration, diff --git a/zenoh-flow-nodes/src/io/inputs.rs b/zenoh-flow-nodes/src/io/inputs.rs index 103f5be5..4fab2cb4 100644 --- a/zenoh-flow-nodes/src/io/inputs.rs +++ b/zenoh-flow-nodes/src/io/inputs.rs @@ -30,7 +30,7 @@ use zenoh_flow_commons::{PortId, Result}; /// of the node. These names are _case sensitive_ and should be an exact match to what was written /// in the descriptor. /// -/// Zenoh-Flow provides two flavors of input: [InputRaw] and [`Input`]. An [`Input`] +/// Zenoh-Flow provides two flavours of input: [InputRaw] and [`Input`]. An [`Input`] /// conveniently exposes instances of `T` while an [InputRaw] exposes messages, allowing to /// disregard the contained data. /// @@ -38,15 +38,18 @@ use zenoh_flow_commons::{PortId, Result}; /// /// # Example /// -/// ```ignore -/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found"); -/// let input_raw = input_builder.raw(); +/// ```no_run +/// # use zenoh_flow_nodes::prelude::*; +/// # let mut inputs = Inputs::default(); +/// let input_raw = inputs.take("test raw") +/// .expect("No input name 'test raw' found") +/// .raw(); /// -/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found"); -/// let input: Input = input_build.typed( -/// |bytes| serde_json::from_slice(bytes) -/// .map_err(|e| anyhow::anyhow!(e)) -/// )?; +/// let input: Input = inputs.take("test typed") +/// .expect("No input name 'test typed' found") +/// .typed( +/// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) +/// ); /// ``` #[derive(Default)] pub struct Inputs { @@ -68,29 +71,27 @@ impl Inputs { self.hmap.entry(port_id).or_insert(rx); } - /// Returns an [InputBuilder] for the provided `port_id`, if an input was declared with this - /// exact name in the descriptor of the node, otherwise returns `None`. + /// Returns an Input builder for the provided `port_id`, if an input was declared with this exact name in the + /// descriptor of the node, otherwise returns `None`. /// /// # Usage /// - /// This builder can either produce a, typed, [`Input`] or an [InputRaw]. The main difference - /// between both is the type of data they expose: an [`Input`] automatically tries to downcast - /// or deserialize the data contained in the message to expose `&T`, while an [InputRaw] simply - /// exposes a [LinkMessage]. - /// - /// As long as data need to be manipulated, a typed [`Input`] should be favored. + /// This builder can either produce a, typed, [`Input`](Input) or an [InputRaw]. The main difference between both + /// is the type of data they expose: an [`Input`](Input) automatically tries to downcast or deserialize the data + /// contained in the message to expose `&T`, while an [InputRaw] simply exposes a [LinkMessage]. /// /// ## Typed /// - /// To obtain an [`Input`] one must call `typed` and provide a deserializer function. In - /// the example below we rely on the `serde_json` crate to do the deserialization. + /// To obtain an [`Input`](Input) one must call `typed` and provide a deserialiser function. In the example below + /// we rely on the `serde_json` crate to do the deserialisation. /// - /// ```ignore - /// let input_typed: Input = inputs - /// .take("test") - /// .expect("No input named 'test' found") + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut inputs = Inputs::default(); + /// let input: Input = inputs.take("test typed") + /// .expect("No input name 'test typed' found") /// .typed( - /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) /// ); /// ``` /// @@ -98,10 +99,11 @@ impl Inputs { /// /// To obtain an [InputRaw] one must call `raw`. /// - /// ```ignore - /// let input_raw: InputRaw = inputs - /// .take("test") - /// .expect("No input named 'test' found") + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut inputs = Inputs::default(); + /// let input_raw = inputs.take("test raw") + /// .expect("No input name 'test raw' found") /// .raw(); /// ``` pub fn take(&mut self, port_id: impl AsRef) -> Option { @@ -114,8 +116,7 @@ impl Inputs { } } -/// An `InputBuilder` is the intermediate structure to obtain either an [`Input`] or an -/// [InputRaw]. +/// An `InputBuilder` is the intermediate structure to obtain either an [`Input`](Input) or an [InputRaw]. /// /// The main difference between both is the type of data they expose: an [`Input`] automatically /// tries to downcast or deserialize the data contained in the message to expose `&T`, while an @@ -143,14 +144,15 @@ impl InputBuilder { /// # `InputRaw` vs `Input` /// /// If the node needs access to the data to perform computations, an [`Input`] should be - /// favored as it performs the conversion automatically. + /// favoured as it performs the conversion automatically. /// /// # Example /// - /// ```ignore - /// let input_raw: InputRaw = inputs - /// .take("test") - /// .expect("No input named 'test' found") + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut inputs = Inputs::default(); + /// let input_raw = inputs.take("test raw") + /// .expect("No input name 'test raw' found") /// .raw(); /// ``` pub fn raw(self) -> InputRaw { @@ -163,22 +165,23 @@ impl InputBuilder { /// Consume the `InputBuilder` to produce an [`Input`]. /// /// An [`Input`] tries to automatically convert the data contained in the [LinkMessage] in - /// order to expose `&T`. Depending on if the data is received serialized or not, to perform - /// this conversion either the `deserializer` is called or a downcast is attempted. + /// order to expose `&T`. Depending on if the data is received serialised or not, to perform + /// this conversion either the `deserialiser` is called or a downcast is attempted. /// /// # `Input` vs `InputRaw` /// /// If the node does need to access the data contained in the [LinkMessage], an [InputRaw] - /// should be favored as it does not try to perform the extra conversion steps. + /// should be favoured as it does not try to perform the extra conversion steps. /// /// # Example /// - /// ```ignore - /// let input_typed: Input = inputs - /// .take("test") - /// .expect("No input named 'test' found") + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut inputs = Inputs::default(); + /// let input: Input = inputs.take("test typed") + /// .expect("No input name 'test typed' found") /// .typed( - /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) /// ); /// ``` pub fn typed( @@ -192,10 +195,12 @@ impl InputBuilder { } } -/// An [`InputRaw`](`InputRaw`) exposes the [`LinkMessage`](`LinkMessage`) it receives. +/// An `InputRaw` receives "raw" [LinkMessage]. /// -/// It's primary purpose is to ensure "optimal" performance. This can be useful to implement -/// behaviour where actual access to the underlying data is irrelevant. +/// As opposed to a typed [`Input`](Input), an `InputRaw` will not perform any operation on the data it receives. +/// This behaviour is useful when access to the underlying data is either irrelevant (e.g. for rate-limiting purposes) +/// or when Zenoh-Flow should not attempt to interpret the contained [Payload](crate::prelude::Payload) (e.g. for +/// bindings). #[derive(Clone, Debug)] pub struct InputRaw { pub(crate) port_id: PortId, @@ -216,11 +221,10 @@ impl InputRaw { /// /// # Asynchronous alternative: `recv` /// - /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. - /// Although synchronous, but given it is "fail-fast", this method will not block the thread on - /// which it is executed. + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. Although + /// synchronous, this method will not block the thread on which it is executed. /// - /// # Error + /// # Errors /// /// An error is returned if the associated channel is disconnected. pub fn try_recv(&self) -> Result> { @@ -236,15 +240,14 @@ impl InputRaw { } } - /// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels - /// associated with this Input. + /// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels associated with this + /// Input. /// /// If several [LinkMessage] are received at the same time, one is *randomly* selected. /// - /// # Error + /// # Errors /// - /// An error is returned if *all* channels are disconnected. For each disconnected channel, an - /// error is separately logged. + /// An error is returned if a channel was disconnected. pub async fn recv(&self) -> Result { self.receiver.recv_async().await.map_err(|_| { tracing::error!("Link disconnected: {}", self.port_id); @@ -253,13 +256,17 @@ impl InputRaw { } } -/// A typed `Input` that tries to automatically downcast or deserialize the data received in order -/// to expose `&T`. +/// A typed `Input` receiving [`Data`](Data). +/// +/// An `Input` will automatically try to downcast or deserialise the [Payload](crate::prelude::Payload) it receives, +/// exposing a [`Data`](Data). +/// +/// The type of conversion performed depends on whether the upstream node resides on the same Zenoh-Flow runtime +/// (downcast) or on another runtime (deserialisation). /// /// # Performance /// -/// If the data is received serialized from the upstream node, an allocation is performed to host -/// the deserialized `T`. +/// If the data is received serialised from the upstream node, an allocation is performed to host the deserialised `T`. pub struct Input { pub(crate) input_raw: InputRaw, pub(crate) deserializer: Arc>, @@ -275,24 +282,27 @@ impl Deref for Input { } impl Input { - /// Returns the first [`Message`] that was received, *asynchronously*, on any of the channels + /// Returns the first [`Data`](Data) that was received, *asynchronously*, on any of the channels /// associated with this Input. /// - /// If several [`Message`] are received at the same time, one is *randomly* selected. + /// If several [`Data`](Data) are received at the same time, one is *randomly* selected. /// - /// This method interprets the data to the type associated with this [`Input`]. + /// This method interprets the data to the type associated with this [`Input`](Input). /// /// # Performance /// - /// As this method interprets the data received additional operations are performed: - /// - data received serialized is deserialized (an allocation is performed to store an instance - /// of `T`), - /// - data received "typed" are checked against the type associated to this [`Input`]. + /// As this method interprets the data received, additional operations are performed: + /// - data received serialised is deserialised (an allocation is performed to store an instance of `T`), + /// - data received "typed" are checked against the type associated to this [`Input`](Input). + /// + /// # Synchronous alternative: `try_recv` /// - /// # Error + /// This method is an asynchronous alternative to it's synchronous fail-fast counterpart: `try_recv`. + /// + /// # Errors /// /// Several errors can occur: - /// - all the channels are disconnected, + /// - a channel was disconnected, /// - Zenoh-Flow failed at interpreting the received data as an instance of `T`. pub async fn recv(&self) -> Result<(Data, Timestamp)> { let LinkMessage { payload, timestamp } = self.input_raw.recv().await?; @@ -302,18 +312,24 @@ impl Input { )) } - /// Returns the first [`Message`] that was received on any of the channels associated with this - /// Input, or `None` if all the channels are empty. + /// Returns the first [`Data`](Data) that was received on any of the channels associated with this Input, + /// or [None] if all the channels are empty. + /// + /// # Performance + /// + /// As this method interprets the data received, additional operations are performed: + /// - data received serialised is deserialised (an allocation is performed to store an instance of `T`), + /// - data received "typed" are checked against the type associated to this [`Input`](Input). /// /// # Asynchronous alternative: `recv` /// - /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. - /// Although synchronous, this method will not block the thread on which it is executed. + /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. Although + /// synchronous, this method will not block the thread on which it is executed. /// - /// # Error + /// # Errors /// /// Several errors can occur: - /// - no message was received (i.e. Empty error), + /// - a channel was disconnected, /// - Zenoh-Flow failed at interpreting the received data as an instance of `T`. /// /// Note that if some channels are disconnected, for each of such channel an error is logged. diff --git a/zenoh-flow-nodes/src/io/mod.rs b/zenoh-flow-nodes/src/io/mod.rs index faa1f3ed..5bc798a5 100644 --- a/zenoh-flow-nodes/src/io/mod.rs +++ b/zenoh-flow-nodes/src/io/mod.rs @@ -13,7 +13,7 @@ // mod inputs; -pub use inputs::{Input, InputRaw, Inputs}; +pub use inputs::{Input, InputBuilder, InputRaw, Inputs}; mod outputs; -pub use outputs::{Output, OutputRaw, Outputs}; +pub use outputs::{Output, OutputBuilder, OutputRaw, Outputs}; diff --git a/zenoh-flow-nodes/src/io/outputs.rs b/zenoh-flow-nodes/src/io/outputs.rs index 753dff1c..93a57880 100644 --- a/zenoh-flow-nodes/src/io/outputs.rs +++ b/zenoh-flow-nodes/src/io/outputs.rs @@ -22,23 +22,22 @@ use std::sync::Arc; use uhlc::{Timestamp, HLC}; use zenoh_flow_commons::{PortId, Result}; -/// The [Outputs] structure contains all the outputs created for a [Source](crate::prelude::Source) -/// or an [Operator](crate::prelude::Operator). +/// The [Outputs] structure contains all the outputs created for a [Source](crate::prelude::Source) or an +/// [Operator](crate::prelude::Operator). /// -/// Each output is indexed by its **port identifier**: the name that was indicated in the descriptor -/// of the node. These names are _case sensitive_ and should be an exact match to what was written -/// in the descriptor. +/// Each output is indexed by its **port identifier**: the name that was indicated in the descriptor of the node. These +/// names are _case sensitive_ and should be an exact match to what was written in the descriptor. /// -/// Zenoh-Flow provides two flavors of output: [OutputRaw] and [`Output`]. An [`Output`] -/// conveniently accepts instances of `T` while an [OutputRaw] operates at the message level, -/// potentially disregarding the data it contains. +/// Zenoh-Flow provides two flavours of output: [OutputRaw] and [`Output`](Output). An [`Output`](Output) conveniently +/// accepts instances of `T` while an [OutputRaw] operates at the message level, potentially disregarding the data it +/// contains. +#[derive(Default)] pub struct Outputs { pub(crate) hmap: HashMap>>, pub(crate) hlc: Arc, } -// Dereferencing on the internal [`HashMap`](`Hashmap`) allows users to call all the methods -// implemented on it: `keys()` for one. +// Dereferencing on the internal [HashMap] allows users to call all the methods implemented on it: `keys()` for one. impl Deref for Outputs { type Target = HashMap>>; @@ -61,35 +60,40 @@ impl Outputs { self.hmap.entry(port_id).or_insert_with(Vec::new).push(tx) } - /// Returns an [OutputBuilder] for the provided `port_id`, if an output was declared with this - /// exact name in the descriptor of the node, otherwise returns `None`. + /// Returns an Output builder for the provided `port_id`, if an output was declared with this exact name in the + /// descriptor of the node, otherwise returns `None`. /// /// # Usage /// - /// This builder can either produce a, typed, [`Output`] or an [OutputRaw]. The main difference - /// between both is the type of data they accept: an [`Output`] accepts anything that is - /// `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is - /// `Into<`[Payload]`>`. + /// This builder can either produce a, typed, [Output] or an [OutputRaw]. The main difference between both is the + /// type of data they accept: an [Output] accepts anything that is `Into` while an [OutputRaw] accepts a + /// [LinkMessage] or an array / slice of bytes (i.e. a [Payload]). /// - /// As long as data are produced or manipulated, a typed [`Output`] should be favored. + /// As long as data are produced or manipulated, a typed [Output] should be favoured. /// /// ## Typed /// - /// To obtain an [`Output`] one must call `typed` and provide a serializer function. In - /// the example below we rely on the `serde_json` crate to do the serialization. + /// To obtain an [Output] one must call `typed` and provide a serialiser function. In the example below we rely + /// on the `serde_json` crate to do the serialisation. /// - /// ```ignore - /// let output_typed: Output = outputs + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut outputs = Outputs::default(); + /// let output: Output = outputs /// .take("test") /// .expect("No key named 'test' found") - /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// .typed(|buffer: &mut Vec, data: &u64| { + /// serde_json::to_writer(buffer, data).map_err(|e| anyhow!(e)) + /// }); /// ``` /// /// ## Raw /// /// To obtain an [OutputRaw] one must call `raw`. /// - /// ```ignore + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut outputs = Outputs::default(); /// let output_raw = outputs /// .take("test") /// .expect("No key named 'test' found") @@ -106,17 +110,15 @@ impl Outputs { } } -/// An [OutputBuilder] is the intermediate structure to obtain either an [`Output`] or an -/// [OutputRaw]. +/// An Output builder is the intermediate structure to obtain either a typed [`Output`](Output) or an [OutputRaw]. /// -/// The main difference between both is the type of data they accept: an [`Output`] accepts -/// anything that is `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is -/// `Into<`[Payload]`>`. +/// The main difference between both is the type of data they accept: an [Output] accepts anything that is `Into` +/// while an [OutputRaw] accepts a [LinkMessage] or anything that is `Into`. /// /// # Planned evolution /// -/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the `senders` -/// channels are _unbounded_ and do not implement a dropping policy, which could lead to issues. +/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the `senders` channels are +/// _unbounded_ and do not implement a dropping policy, which could lead to issues. pub struct OutputBuilder { pub(crate) port_id: PortId, pub(crate) senders: Vec>, @@ -126,22 +128,23 @@ pub struct OutputBuilder { impl OutputBuilder { /// Consume this `OutputBuilder` to produce an [OutputRaw]. /// - /// An [OutputRaw] sends [LinkMessage]s (through `forward`) or anything that is - /// `Into<`[Payload]`>` (through `send` and `try_send`) to downstream nodes. + /// An [OutputRaw] sends [LinkMessage]s (through `forward`) or anything that is `Into` (through `send` and + /// `try_send`) to downstream nodes. /// - /// The [OutputRaw] was designed for use cases such as load-balancing or rate-limiting. In this - /// scenarios, the node does not need to access the underlying data and the message can simply - /// be forwarded downstream. + /// The [OutputRaw] was designed for use cases such as load-balancing or rate-limiting. In this scenarios, the node + /// does not need to access the underlying data and the message can simply be forwarded downstream. /// /// # `OutputRaw` vs `Output` /// - /// If the node produces instances of `T` as a result of computations, an [`Output`] should be - /// favored as it sends anything that is `Into`. Thus, contrary to an [OutputRaw], there is - /// no need to encapsulate `T` inside a [Payload]. + /// If the node produces instances of `T` as a result of computations, an [Output] should be favoured as it sends + /// anything that is `Into`. Thus, contrary to an [OutputRaw], there is no need to encapsulate `T` inside a + /// Payload. /// /// # Example /// - /// ```ignore + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut outputs = Outputs::default(); /// let output_raw = outputs /// .take("test") /// .expect("No key named 'test' found") @@ -155,30 +158,32 @@ impl OutputBuilder { } } - /// Consume this `OutputBuilder` to produce an [`Output`]. + /// Consume this `OutputBuilder` to produce an [`Output`](Output). /// - /// An [`Output`] sends anything that is `Into` (through `send` and `try_send`) to - /// downstream nodes. + /// An [`Output`](Output) sends anything that is `Into` (through `send` and `try_send`) to downstream nodes. /// - /// An [`Output`] requires knowing how to serialize `T`. Data is only serialized when it is (a) - /// transmitted to a node located on another process or (b) transmitted to a node written in a - /// programming language other than Rust. + /// An [`Output`](Output) requires knowing how to serialise `T`. Data is only serialised when it is (a) transmitted + /// to a node located on another process or (b) transmitted to a node written in a programming language other than + /// Rust. /// - /// The serialization will automatically be performed by Zenoh-Flow and only when needed. + /// The serialisation will automatically be performed by Zenoh-Flow and only when needed. /// /// # `Output` vs `OutputRaw` /// - /// If the node does not process any data and only has access to a [LinkMessage], an [OutputRaw] - /// would be better suited as it does not require to downcast it into an object that - /// implements `Into`. + /// If the node does not process any data and only has access to a [LinkMessage], an [OutputRaw] would be better + /// suited as it does not require to downcast it into an object that implements `Into`. /// /// # Example /// - /// ```ignore - /// let output_typed: Output = outputs + /// ```no_run + /// # use zenoh_flow_nodes::prelude::*; + /// # let mut outputs = Outputs::default(); + /// let output: Output = outputs /// .take("test") /// .expect("No key named 'test' found") - /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// .typed(|buffer: &mut Vec, data: &u64| { + /// serde_json::to_writer(buffer, data).map_err(|e| anyhow!(e)) + /// }); /// ``` pub fn typed( self, @@ -201,7 +206,7 @@ impl OutputBuilder { } } -/// An [OutputRaw] sends [LinkMessage] or `Into<`[Payload]`>` to downstream Nodes. +/// An [OutputRaw] sends [LinkMessage] or [`Into`](crate::prelude::Payload) to downstream nodes. /// /// Its primary purpose is to ensure optimal performance: any message received on an input can /// transparently be sent downstream, without requiring (a potentially expensive) access to the data @@ -296,8 +301,8 @@ impl OutputRaw { /// /// # Errors /// - /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send - /// it on the remaining channels. For each failing channel, an error is logged and counted for. + /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it on the remaining + /// channels. For each failing channel, an error is logged and counted for. pub async fn forward(&self, message: LinkMessage) -> Result<()> { // FIXME Feels like a cheap hack counting the number of errors. To improve. let mut err = 0; @@ -305,13 +310,13 @@ impl OutputRaw { .senders .iter() .map(|sender| sender.send_async(message.clone())); - // [`join_all`](`futures::future::join_all`) executes all futures in parallel. + // `join_all` executes all futures concurrently. let res = futures::future::join_all(fut_senders).await; res.iter().for_each(|res| { if let Err(e) = res { tracing::error!( - "[Output: {}] Error occured while sending to downstream node(s): {:?}", + "[Output: {}] Error occurred while sending to downstream node(s): {:?}", self.port_id(), e ); @@ -349,10 +354,10 @@ impl OutputRaw { } } -/// An [`Output`] sends instances of `T` to downstream Nodes. +/// An `Output` (only) sends instances of `T` to downstream nodes. /// -/// It's primary purpose is to ensure type guarantees: only types that implement `Into` can be -/// sent to downstream Nodes. +/// It's primary purpose is to enforce type guarantees: only types that implement `Into` can be sent to downstream +/// nodes. #[derive(Clone)] pub struct Output { _phantom: PhantomData, @@ -360,8 +365,7 @@ pub struct Output { pub(crate) serializer: Arc, } -// Dereferencing to the [`OutputRaw`](`OutputRaw`) allows to directly call methods on it with a -// typed [`Output`](`Output`). +// Dereferencing to the [OutputRaw] allows to directly call methods on it with a typed [`Output`](Output). impl Deref for Output { type Target = OutputRaw; @@ -384,42 +388,36 @@ impl Output { }) } - /// Send, *asynchronously*, the provided `data` to all downstream Nodes. + /// Send, *asynchronously*, the provided `data` to downstream node(s). /// - /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by - /// the Zenoh-Flow daemon running this Node) is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by the Zenoh-Flow runtime + /// managing this node) is taken. /// - /// # Constraint `Into>` + /// # Synchronous alternative: `try_send` /// - /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that - /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). + /// This method is an asynchronous alternative to its fail-fast synchronous counterpart `try_send`. /// /// # Errors /// - /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it - /// on the remaining channels. For each failing channel, an error is logged and counted for. The - /// total number of encountered errors is returned. + /// An error is returned if the send operation failed. pub async fn send(&self, data: impl Into>, timestamp: Option) -> Result<()> { self.output_raw .forward(self.construct_message(data, timestamp)?) .await } - /// Tries to send the provided `data` to all downstream Nodes. + /// Send, *synchronously*, the provided `data` to downstream node(s). /// - /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by - /// the Zenoh-Flow daemon running this Node) is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by the Zenoh-Flow runtime + /// running this node) is taken. /// - /// # Constraint `Into>` + /// # Asynchronous alternative: `send` /// - /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that - /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). + /// This method is a fail-fast synchronous alternative to its asynchronous counterpart `send`. /// /// # Errors /// - /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it - /// on the remaining channels. For each failing channel, an error is logged and counted for. The - /// total number of encountered errors is returned. + /// An error is returned if sending on a channel failed. pub fn try_send(&self, data: impl Into>, timestamp: Option) -> Result<()> { self.output_raw .try_forward(self.construct_message(data, timestamp)?) diff --git a/zenoh-flow-nodes/src/io/tests/input-tests.rs b/zenoh-flow-nodes/src/io/tests/input-tests.rs index 74a02f31..f0a2fdee 100644 --- a/zenoh-flow-nodes/src/io/tests/input-tests.rs +++ b/zenoh-flow-nodes/src/io/tests/input-tests.rs @@ -22,17 +22,17 @@ use crate::{ traits::SendSyncAny, }; -/// Test that the Input behaves as expected for the provided data and deserializer: -/// 1. when a Payload::Bytes is received the deserializer is called and produces the correct output, -/// 2. when a Payload::Typed is received the data can correctly be downcasted. +/// Test that the Input behaves as expected for the provided data and deserialiser: +/// 1. when a Payload::Bytes is received the deserialiser is called and produces the correct output, +/// 2. when a Payload::Typed is received the data can correctly be downcast. /// /// ## Scenario tested /// /// A typed input is created. /// /// We send on the associated channel: -/// 1. a Payload::Bytes (the `expected_serialized`), -/// 2. a Payload::Typed (the `expected_data` upcasted to `dyn SendSyncAny`). +/// 1. a Payload::Bytes (the `expected_serialised`), +/// 2. a Payload::Typed (the `expected_data` upcast to `dyn SendSyncAny`). /// /// ## Traits bound on T /// @@ -64,7 +64,7 @@ fn test_typed_input, - // The serializer should never be called, hence the panic. - Arc::new(|_buffer, _data| panic!("Unexpected call to serialize the data")), + // The serialiser should never be called, hence the panic. + Arc::new(|_buffer, _data| panic!("Unexpected call to serialise the data")), )), hlc.new_timestamp(), ); @@ -105,7 +105,7 @@ fn test_serde_json() { }; let expected_serialized = - serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialise"); test_typed_input(expected_data, expected_serialized, |bytes| { serde_json::de::from_slice::(bytes).map_err(|e| anyhow::anyhow!(e)) @@ -137,7 +137,7 @@ fn test_protobuf_prost() { field3: 0.2f64, }; - // First test, send data serialized. + // First test, send data serialised. let expected_serialized = expected_data.encode_to_vec(); test_typed_input(expected_data, expected_serialized, |bytes| { diff --git a/zenoh-flow-nodes/src/io/tests/output-tests.rs b/zenoh-flow-nodes/src/io/tests/output-tests.rs index e6ad9c08..dfefdd9a 100644 --- a/zenoh-flow-nodes/src/io/tests/output-tests.rs +++ b/zenoh-flow-nodes/src/io/tests/output-tests.rs @@ -20,21 +20,21 @@ use zenoh_flow_commons::PortId; use super::Outputs; use crate::messages::{LinkMessage, Payload}; -/// Test that the Output behaves as expected for the provided data and serializer: -/// 1. the `serializer` is correctly type-erased yet still produces the correct output, -/// 2. the `expected_data` is not eagerly serialized and can correctly be downcasted. +/// Test that the Output behaves as expected for the provided data and serialiser: +/// 1. the `serialiser` is correctly type-erased yet still produces the correct output, +/// 2. the `expected_data` is not eagerly serialised and can correctly be downcast. /// /// ## Scenario tested /// /// A bogus output is generated — see the call to `outputs.take`. We go through the `Outputs` -/// structure such that the transformation on the serializer is performed (i.e. the type is erased). +/// structure such that the transformation on the serialiser is performed (i.e. the type is erased). /// /// The provided `expected_data` is sent on the output. /// /// A receiver channel ensures that: /// 1. it is a `Payload::Typed`, /// 2. we can still downcast it to `T`, -/// 3. the result of the serialization is correct. +/// 3. the result of the serialisation is correct. /// /// ## Traits on T /// @@ -72,7 +72,7 @@ fn test_typed_output panic!("Unexpected bytes payload"), Payload::Typed((dyn_data, serializer)) => { let mut dyn_serialized = Vec::new(); - (serializer)(&mut dyn_serialized, dyn_data.clone()).expect("Failed to serialize"); + (serializer)(&mut dyn_serialized, dyn_data.clone()).expect("Failed to serialise"); assert_eq!(expected_serialized, dyn_serialized); let data = (*dyn_data) @@ -103,7 +103,7 @@ fn test_serde_json() { }; let expected_serialized = - serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialise"); let serializer = |buffer: &mut Vec, data: &TestData| { serde_json::ser::to_writer(buffer, data).map_err(|e| anyhow::anyhow!(e)) diff --git a/zenoh-flow-nodes/src/lib.rs b/zenoh-flow-nodes/src/lib.rs index 4ac4dc36..9b9a8fe3 100644 --- a/zenoh-flow-nodes/src/lib.rs +++ b/zenoh-flow-nodes/src/lib.rs @@ -12,18 +12,41 @@ // ZettaScale Zenoh Team, // +//! This crate exposes the traits and structures necessary to create Zenoh-Flow nodes. +//! +//! Items not exposed in the `prelude` are meant for internal usage within the Zenoh-Flow project. +//! +//! # [prelude] +//! +//! Application developers wishing to create a data flow should include the [prelude] in their code-base as it regroups +//! all the required structures and traits: +//! +//! ``` +//! use zenoh_flow_nodes::prelude::*; +//! ``` +//! +//! Next would be to implement, as different shared libraries, at least a [Source](crate::prelude::Source), a +//! [Sink](crate::prelude::Sink) and possibly some [Operators](crate::prelude::Operator). See their respective +//! documentation for examples. + pub(crate) mod declaration; pub use declaration::{NodeDeclaration, OperatorFn, SinkFn, SourceFn, CORE_VERSION, RUSTC_VERSION}; pub(crate) mod context; + pub(crate) mod io; +pub use io::{InputBuilder, OutputBuilder}; + pub(crate) mod messages; pub(crate) mod traits; +/// This module expose all the structures required to implement a Zenoh-Flow node. +/// +/// It also re-exposes items from the [anyhow], [zenoh_flow_commons] and [zenoh_flow_derive] crates. pub mod prelude { pub use crate::context::Context; pub use crate::io::{Input, InputRaw, Inputs, Output, OutputRaw, Outputs}; - pub use crate::messages::{Data, LinkMessage}; + pub use crate::messages::{Data, LinkMessage, Payload}; pub use crate::traits::{Node, Operator, SendSyncAny, Sink, Source}; pub use anyhow::{anyhow, bail}; pub use zenoh_flow_commons::{Configuration, Result}; diff --git a/zenoh-flow-nodes/src/messages.rs b/zenoh-flow-nodes/src/messages.rs index 08ba9c8d..2f539513 100644 --- a/zenoh-flow-nodes/src/messages.rs +++ b/zenoh-flow-nodes/src/messages.rs @@ -22,33 +22,33 @@ use std::sync::Arc; use uhlc::Timestamp; use zenoh_flow_commons::Result; -/// `SerializerFn` is a type-erased version of the serializer function provided by node developer. +/// `SerializerFn` is a type-erased version of the serialiser function provided by node developer. /// -/// It is passed to downstream nodes (residing on the same process) in case they need to serialize +/// It is passed to downstream nodes (residing on the same process) in case they need to serialise /// the data they receive typed. -/// Passing around the function allows us to serialize only when needed and without requiring prior +/// Passing around the function allows us to serialise only when needed and without requiring prior /// knowledge. pub(crate) type SerializerFn = dyn Fn(&mut Vec, Arc) -> Result<()> + Send + Sync; -/// This function is what Zenoh-Flow will use to deserialize the data received on the `Input`. +/// This function is what Zenoh-Flow will use to deserialise the data received on the `Input`. /// -/// It will be called for instance when data is received serialized (i.e. from an upstream node that +/// It will be called for instance when data is received serialised (i.e. from an upstream node that /// is either not implemented in Rust or on a different process) before it is given to the user's /// code. pub(crate) type DeserializerFn = dyn Fn(&[u8]) -> Result + Send + Sync; /// A `Payload` is Zenoh-Flow's lowest message container. /// -/// It either contains serialized data, i.e. `Bytes` (if received from the network, or from nodes +/// It either contains serialised data, i.e. `Bytes` (if received from the network, or from nodes /// not written in Rust), or `Typed` data as a tuple `(`[Any](`std::any::Any`)`, SerializerFn)`. #[derive(Clone, Serialize, Deserialize)] pub enum Payload { - /// Serialized data, coming either from Zenoh of from non-Rust node. + /// Serialised data, coming either from Zenoh of from non-Rust node. Bytes(Arc>), #[serde(skip_serializing, skip_deserializing)] - /// Data coming from another Rust node located on the same process that can either be downcasted - /// (provided that its actual type is known) or serialized. + /// Data coming from another Rust node located on the same Zenoh-Flow runtime that can either be downcast or + /// serialised. Typed((Arc, Arc)), } @@ -78,10 +78,10 @@ impl Payload { /// /// # Performance /// - /// This method will serialize the [Payload] if it is `Typed`. Otherwise, the bytes + /// This method will serialise the [Payload] if it is `Typed`. Otherwise, the bytes /// representation is simply cloned. /// - /// The provided `buffer` is reused and cleared between calls, so once its capacity stabilizes + /// The provided `buffer` is reused and cleared between calls, so once its capacity stabilises /// no more allocation is performed. pub fn try_as_bytes_into(&self, buffer: &mut Vec) -> Result<()> { buffer.clear(); // remove previous data but keep the allocated capacity @@ -101,7 +101,7 @@ impl Payload { /// /// # Performance /// - /// This method will only serialize (and thus allocate) the [Payload] if it is typed. Otherwise + /// This method will only serialise (and thus allocate) the [Payload] if it is typed. Otherwise /// the [Arc] is cloned. // // NOTE: This method is used by, at least, our Python API. @@ -196,20 +196,20 @@ impl LinkMessage { &self.timestamp } - /// Serializes the [LinkMessage] using [bincode] into the given `buffer`. + /// Serialises the [LinkMessage] using [bincode] into the given `buffer`. /// - /// The `inner_buffer` is used to serialize (if need be) the [Payload] contained inside the + /// The `inner_buffer` is used to serialise (if need be) the [Payload] contained inside the /// [LinkMessage]. /// /// # Performance /// /// The provided `buffer` and `inner_buffer` are reused and cleared between calls, so once their - /// capacity stabilizes no (re)allocation is performed. + /// capacity stabilises no (re)allocation is performed. /// /// # Errors /// /// An error variant is returned in case of: - /// - fails to serialize + /// - fails to serialise pub fn serialize_bincode_into( &self, message_buffer: &mut Vec, @@ -220,7 +220,7 @@ impl LinkMessage { match &self.payload { Payload::Bytes(_) => bincode::serialize_into(message_buffer, &self) - .context("Failed to serialize `Payload::Bytes``"), + .context("Failed to serialise `Payload::Bytes``"), Payload::Typed((data, serializer)) => { (serializer)(payload_buffer, Arc::clone(data))?; let serialized_message = Self { @@ -229,20 +229,23 @@ impl LinkMessage { }; bincode::serialize_into(message_buffer, &serialized_message) - .context("Failed to serialize `Payload::Typed`") + .context("Failed to serialise `Payload::Typed`") } } } } -/// A `Data` is a convenience wrapper around `T`. +/// A `Data` is a wrapper around `T` given by a typed [`Input`](crate::prelude::Input). /// -/// Upon reception, it transparently deserializes to `T` when the message is received serialized. It -/// downcasts it to a `&T` when the data is passed "typed" through a channel. +/// A `Data` automatically dereferences to a `&T`. /// -/// ## Performance +/// # Performance /// -/// When deserializing, an allocation is performed. +/// If the upstream node does not reside on the same Zenoh-Flow runtime, dereferencing to `&T` incurs some additional +/// operations: the received [Payload] will then necessarily be serialised and must first be deserialised. +/// +/// To perform the deserialisation, the [deserialiser](crate::io::InputBuilder::typed()) function passed to the +/// [`Input`](crate::prelude::Input) will be called. #[derive(Debug)] pub struct Data { inner: DataInner, @@ -283,20 +286,18 @@ impl From for Data { // // ## SAFETY // -// Despite the presence of `expect` and `panic!`, we should never end up in these situations in -// normal circumstances. +// Despite the presence of `expect` and `panic!`, we should never end up in these situations in normal circumstances. // // Let us reason here as to why this is "safe". // -// The call to `expect` happens when the inner data is a [`Typed`](`Payload::Typed`) payload and the -// downcasts to `T` fails. This should not happen because of the way a [`Data`](`Data`) is created: -// upon creation we first perform a check that the provided typed payload can actually be downcasted -// to `T` — see the method `Data::try_from_payload`. +// The call to `expect` happens when the inner data is a [Typed](Payload::Typed) payload and the downcast to `T` +// fails. This should not happen because of the way a [Data] is created: upon creation we first perform a check that the +// provided typed payload can actually be downcast to `T` — see the method `Data::try_from_payload`. // -// The call to `panic!` happens when the inner data is a [`Bytes`](`Payload::Bytes`) payload and the -// `data` field is `None`. Again, this should not happen because of the way a [`Data`](`Data`) is -// created: upon creation, if the data is received as bytes, we first deserialize it and set the -// `data` field to `Some(T)` — see the method `Data::try_from_payload`. +// The call to `panic!` happens when the inner data is a [Bytes](Payload::Bytes) payload and the `data` field is +// `None`. Again, this should not happen because of the way a [`Data`](`Data`) is created: upon creation, if the data is +// received as bytes, we first deserialise it and set the `data` field to `Some(T)` — see the method +// `Data::try_from_payload`. impl Deref for Data { type Target = T; @@ -308,7 +309,7 @@ impl Deref for Data { } else if let Payload::Typed((typed, _)) = payload { (**typed).as_any().downcast_ref::().expect( r#"You probably managed to find a very nasty flaw in Zenoh-Flow’s code as we -believed this situation would never happen (unless explicitely triggered — "explicitely" being an +believed this situation would never happen (unless explicitly triggered — "explicitly" being an understatement here, we feel it’s more like you really, really, wanted to see that message — in which case, congratulations!). @@ -326,7 +327,7 @@ Feel free to contact us at < zenoh@zettascale.tech >. } else { panic!( r#"You probably managed to find a very nasty flaw in Zenoh-Flow's code as we -believed this situation would never happen (unless explicitely triggered — "explicitely" being an +believed this situation would never happen (unless explicitly triggered — "explicitly" being an understatement here, we feel it's more like you really, really, wanted to see that message — in which case, congratulations!). @@ -351,14 +352,14 @@ impl Data { /// Try to create a new [`Data`](`Data`) based on a [`Payload`](`Payload`). /// /// Depending on the variant of [`Payload`](`Payload`) different steps are performed: - /// - if `Payload::Bytes` then Zenoh-Flow tries to deserialize to an instance of `T` (performing + /// - if `Payload::Bytes` then Zenoh-Flow tries to deserialise to an instance of `T` (performing /// an allocation), /// - if `Payload::Typed` then Zenoh-Flow checks that the underlying type matches `T` (relying /// on [`Any`](`Any`)). /// /// ## Errors /// - /// An error will be returned if the Payload does not match `T`, i.e. if the deserialization or + /// An error will be returned if the Payload does not match `T`, i.e. if the deserialisation or /// the downcast failed. pub(crate) fn try_from_payload( payload: Payload, diff --git a/zenoh-flow-nodes/src/traits.rs b/zenoh-flow-nodes/src/traits.rs index 6b4abe10..09e27ea3 100644 --- a/zenoh-flow-nodes/src/traits.rs +++ b/zenoh-flow-nodes/src/traits.rs @@ -18,12 +18,12 @@ use async_trait::async_trait; use std::any::Any; use zenoh_flow_commons::{Configuration, Result}; -/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process -/// without serializing. +/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process without +/// serialising. /// -/// This trait is implemented for any type that has the `static` lifetime and implements `Send` and -/// `Sync`. These constraints are the same than for the typed `Input` and `Output` which means that -/// there is absolutely no need to manually implement it. +/// This trait is implemented for any type that has the `static` lifetime and implements [Send] and [Sync]. These +/// constraints are the same than for the typed [Input](crate::prelude::Input) and typed +/// [Output](crate::prelude::Output) which means that there should be no need to manually implement it. pub trait SendSyncAny: Send + Sync { fn as_any(&self) -> &dyn Any; @@ -40,19 +40,48 @@ impl SendSyncAny for T { } } -/// A `Node` is defined by its `iteration` that is repeatedly called by Zenoh-Flow. +/// The `Node` trait, shared among all node types, dictates how a node *runs*. /// -/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a state and to mutate it, -/// the interior mutability pattern is necessary. +/// # `Iteration` /// -/// A structure implementing the Node trait typically needs to keep a reference to the `Input`(s) and `Output`(s) it -/// needs. +/// The central method, for which there is no default implementation, is [iteration](Node::iteration()). This method +/// is called, in a loop, by the Zenoh-Flow runtime managing the node. /// -/// For usage examples see: [`Operator`](`Operator`), [`Source`](`Source`) or [`Sink`](`Sink`) traits. +/// Method in this trait takes an immutable reference to `self` so as to not impact performance. To keep a state and to +/// mutate it, the [interior mutability](https://doc.rust-lang.org/reference/interior-mutability.html) pattern is +/// necessary. +/// +/// For usage examples see the [Operator](crate::prelude::Operator), [Source](crate::prelude::Source) or +/// [Sink](crate::prelude::Sink) traits. +/// +/// # Additional hooks: `on_resume`, `on_abort` +/// +/// It is possible to define specific code that the Zenoh-Flow runtime should run *before* the node is aborted and +/// *before* it is resumed. +/// +/// Note that the `on_resume` hook is only run once the node has been aborted. It is not run when it is created. +/// +/// A default blank implementation is provided. #[async_trait] pub trait Node: Send + Sync { + /// The code a Zenoh-Flow runtime will execute in a loop. + /// + /// A typical workflow would be to wait for all or a subset of the [Input(s)](crate::prelude::Input) to be ready, + /// perform some computation and finally forward the result(s) downstream on the + /// [Output(s)](crate::prelude::Output). async fn iteration(&self) -> Result<()>; + /// Custom code that Zenoh-Flow will run *before* re-starting a node that was previously aborted. + /// + /// The code to correctly manage the state of a node should go there. This hook is for instance leveraged within + /// the implementation of the Zenoh Source built-in node. + /// + /// The blanket implementation defaults to returning `Ok(())`. + /// + /// # Performance + /// + /// This method is only called when the node is restarted. Hence, its impact is limited and does not affect the + /// normal execution of a node. async fn on_resume(&self) -> Result<()> { Ok(()) } @@ -60,14 +89,12 @@ pub trait Node: Send + Sync { async fn on_abort(&self) {} } -/// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and -/// their purpose is to fetch data from the external world. +/// A `Source` feeds data into a data flow. /// -/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a -/// state and to mutate it, the interior mutability pattern is necessary. +/// A `Source` only possesses `Output` (either [typed](crate::prelude::Output) or [raw](crate::prelude::OutputRaw)) as +/// it does not receive any data from upstream nodes but from "outside" the data flow. /// -/// A struct implementing the Source trait typically needs to keep a reference to the `Output` it -/// needs. +/// A structure implementing the `Source` trait typically needs to keep a reference to the `Output`. /// /// ## Example /// @@ -94,7 +121,7 @@ pub trait Node: Send + Sync { /// let output = outputs /// .take("out") /// .expect("No output called 'out' found") -/// .typed(|buffer, data| todo!("Provide your serializer here")); +/// .typed(|buffer, data| todo!("Provide your serialiser here")); /// /// Ok(Self { output }) /// } @@ -107,8 +134,8 @@ pub trait Node: Send + Sync { /// // /// // let state = self.state.lock().await; /// // -/// // The state is a way for the Source to read information from the external world, i.e., -/// // interacting with I/O devices. We mimick an asynchronous iteraction with a sleep. +/// // The state is a way for the Source to read information from the external world, i.e., interacting with +/// // I/O devices. /// /// self.output.send(10usize, None).await /// } @@ -116,28 +143,21 @@ pub trait Node: Send + Sync { /// ``` #[async_trait] pub trait Source: Node + Send + Sync { - /// For a `Context`, a `Configuration` and a set of `Outputs`, produce a new *Source*. + /// For a [Context], a [Configuration] and a set of [Outputs], produce a new [Source]. /// - /// Sources only possess `Outputs` and their purpose is to fetch data from the external world. - /// - /// Sources are **started last** when initiating a data flow. This is to prevent data loss: if a - /// Source is started before its downstream nodes then the data it would send before said - /// downstream nodes are up would be lost. + /// Sources only possess `Outputs` as their purpose is to fetch data from the external world. async fn new(context: Context, configuration: Configuration, outputs: Outputs) -> Result where Self: Sized; } -/// The `Operator` trait represents an Operator inside Zenoh-Flow. -/// -/// Operators are at the heart of a data flow, they carry out computations on the data they receive -/// before sending them out to the next downstream node. +/// An `Operator` is a node performing transformation over the data it receives, outputting the end result to downstream +/// node(s). /// -/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a -/// state and to mutate it, the interior mutability pattern is necessary. +/// An `Operator` possesses both `Input` (either [typed](crate::prelude::Input) or [raw](crate::prelude::InputRaw)) and +/// `Output` (either [typed](crate::prelude::Output) or [raw](crate::prelude::OutputRaw)). /// -/// A struct implementing the Operator trait typically needs to keep a reference to the `Input` and -/// `Output` it needs. +/// A structure implementing the `Operator` trait typically needs to keep a reference to its `Input`(s) and `Output`(s). /// /// ## Example /// @@ -165,11 +185,11 @@ pub trait Source: Node + Send + Sync { /// input: inputs /// .take("in") /// .expect("No input called 'in' found") -/// .typed(|bytes| todo!("Provide your deserializer here")), +/// .typed(|bytes| todo!("Provide your deserialiser here")), /// output: outputs /// .take("out") /// .expect("No output called 'out' found") -/// .typed(|buffer, data| todo!("Provide your serializer here")), +/// .typed(|buffer, data| todo!("Provide your serialiser here")), /// }) /// } /// } @@ -184,14 +204,10 @@ pub trait Source: Node + Send + Sync { /// ``` #[async_trait] pub trait Operator: Node + Send + Sync { - /// For a `Context`, a `Configuration`, a set of `Inputs` and `Outputs`, produce a new - /// **Operator**. + /// For a [Context], a [Configuration], a set of [Inputs] and [Outputs], produce a new [Operator]. /// /// Operators are at the heart of a data flow, they carry out computations on the data they /// receive before sending them out to the next downstream node. - /// - /// The Operators are started *before the Sources* such that they are active before the first - /// data are produced. async fn new( context: Context, configuration: Configuration, @@ -202,16 +218,12 @@ pub trait Operator: Node + Send + Sync { Self: Sized; } -/// The `Sink` trait represents a Sink of data in Zenoh Flow. -/// -/// Sinks only possess `Inputs`, their objective is to send the result of the computations to the -/// external world. +/// A `Sink` exposes the outcome of the data flow processing. /// -/// This trait takes an immutable reference to `self` so as to not impact performance. To keep a -/// state and to mutate it, the interior mutability pattern is necessary. +/// A `Sink` only possesses `Input` (either [typed](crate::prelude::Input) or [raw](crate::prelude::InputRaw)) as its +/// purpose is to communicate with entities outside of the data flow. /// -/// A struct implementing the Sink trait typically needs to keep a reference to the `Input` it -/// needs. +/// A structure implementing the `Sink` trait typically needs to keep a reference to its `Input`(s). /// /// ## Example /// @@ -254,13 +266,9 @@ pub trait Operator: Node + Send + Sync { /// ``` #[async_trait] pub trait Sink: Node + Send + Sync { - /// For a `Context`, a `Configuration` and a set of `Inputs`, produce a new **Sink**. - /// - /// Sinks only possess `Inputs`, their objective is to send the result of the computations to the - /// external world. + /// For a [Context], a [Configuration] and [Inputs], produce a new [Sink]. /// - /// Sinks are **started first** when initiating a data flow. As they are at the end of the chain of - /// computations. By starting them first we ensure that no data is lost. + /// Sinks only possess `Inputs`, their objective is to send the result of the computations to the external world. async fn new(context: Context, configuration: Configuration, inputs: Inputs) -> Result where Self: Sized;