Skip to content

Commit

Permalink
docs(nodes): improve documentation (#208)
Browse files Browse the repository at this point in the history
- re-enable the doctests in inputs / outputs
- make the `cargo doc` compile without any warning

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet authored Mar 7, 2024
1 parent e50f9ab commit 3c18616
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 278 deletions.
7 changes: 4 additions & 3 deletions zenoh-flow-nodes/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use zenoh_flow_commons::{InstanceId, RuntimeId};
/// The `Context` structure provides information about the data flow and the Zenoh-Flow runtime.
///
/// In particular, it allows accessing:
/// - [name](Context::name()) of the data flow,
/// - [instance id](Context::instance_id()) of this instance of the data flow,
/// - [runtime id](Context::runtime_id()) of the Zenoh-Flow runtime managing the **node**.
/// - the [name](Context::name()) of the data flow,
/// - the [instance id](Context::instance_id()) of this instance of the data flow,
/// - the [runtime id](Context::runtime_id()) of the Zenoh-Flow runtime managing the **node**.
#[derive(Clone, Debug)]
pub struct Context {
pub(crate) flow_name: Arc<str>,
Expand All @@ -30,6 +30,7 @@ pub struct Context {
}

impl Context {
/// Creates a new node `Context`.
pub fn new(flow_name: Arc<str>, instance_id: InstanceId, runtime_id: RuntimeId) -> Self {
Self {
flow_name,
Expand Down
38 changes: 29 additions & 9 deletions zenoh-flow-nodes/src/declaration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,58 @@ use std::{pin::Pin, sync::Arc};
use futures::Future;
use zenoh_flow_commons::{Configuration, Result};

/// Constant used to check if a node is compatible with the currently running Zenoh Flow daemon.
/// As nodes are dynamically loaded, this is to prevent (possibly cryptic) runtime error due to
/// incompatible API.
/// (⚙️️ *internal)* Constant used to check if a node is compatible with the Zenoh-Flow runtime managing it.
///
/// As nodes are dynamically loaded, this is to prevent (possibly cryptic) runtime error due to incompatible API.
///
/// This constant is used by the procedural macros [export_operator](crate::prelude::export_operator),
/// [export_source](crate::prelude::export_source) and [export_sink](crate::prelude::export_sink). A Zenoh-Flow runtime
/// will compare its value of this constant to the value that all node it will dynamically load expose.
pub const CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Constant used to check if a node was compiled with the same version of the Rust compiler than
/// the currently running Zenoh Flow daemon.

/// (⚙️ *internal)* Constant used to check if a node was compiled with the same version of the Rust compiler than the
/// Zenoh-Flow runtime managing it.
///
/// As Rust is not ABI stable, this is to prevent (possibly cryptic) runtime errors.
///
/// This constant is used by the procedural macros [export_operator](crate::prelude::export_operator),
/// [export_source](crate::prelude::export_source) and [export_sink](crate::prelude::export_sink). A Zenoh-Flow runtime
/// will compare its value of this constant to the value that all node it will dynamically load expose.
pub const RUSTC_VERSION: &str = env!("RUSTC_VERSION");

/// Declaration expected in the library that will be loaded.
/// (⚙️ *internal)* Declaration expected in the library that will be loaded.
///
/// This structure is automatically created by the procedural macros
/// [export_operator](crate::prelude::export_operator), [export_source](crate::prelude::export_source) and
/// [export_sink](crate::prelude::export_sink).
pub struct NodeDeclaration<C> {
pub rustc_version: &'static str,
pub core_version: &'static str,
pub constructor: C,
}

/// `SourceFn` is the only signature we accept to construct a [`Source`](`crate::prelude::Source`).
/// (⚙️ *internal)* `SourceFn` is the only signature we accept to construct a [Source](crate::prelude::Source).
///
/// This function is automatically created by the procedural macro [export_source](crate::prelude::export_source).
pub type SourceFn = fn(
Context,
Configuration,
Outputs,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn Node>>> + Send>>;

/// `OperatorFn` is the only signature we accept to construct an [`Operator`](`crate::prelude::Operator`).
/// (⚙️ *internal)* `OperatorFn` is the only signature we accept to construct an [Operator](crate::prelude::Operator).
///
/// This function is automatically created by the procedural macro [export_operator](crate::prelude::export_operator).
pub type OperatorFn = fn(
Context,
Configuration,
Inputs,
Outputs,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn Node>>> + Send>>;

/// `SinkFn` is the only signature we accept to construct a [`Sink`](`crate::prelude::Sink`).
/// (⚙️ *internal)* `SinkFn` is the only signature we accept to construct a [Sink](crate::prelude::Sink).
///
/// This function is automatically created by the procedural macro [export_sink](crate::prelude::export_sink).
pub type SinkFn = fn(
Context,
Configuration,
Expand Down
164 changes: 90 additions & 74 deletions zenoh-flow-nodes/src/io/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ use zenoh_flow_commons::{PortId, Result};
/// of the node. These names are _case sensitive_ and should be an exact match to what was written
/// in the descriptor.
///
/// Zenoh-Flow provides two flavors of input: [InputRaw] and [`Input<T>`]. An [`Input<T>`]
/// Zenoh-Flow provides two flavours of input: [InputRaw] and [`Input<T>`]. An [`Input<T>`]
/// conveniently exposes instances of `T` while an [InputRaw] exposes messages, allowing to
/// disregard the contained data.
///
/// The main way to interact with `Inputs` is through the `take` method.
///
/// # Example
///
/// ```ignore
/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found");
/// let input_raw = input_builder.raw();
/// ```no_run
/// # use zenoh_flow_nodes::prelude::*;
/// # let mut inputs = Inputs::default();
/// let input_raw = inputs.take("test raw")
/// .expect("No input name 'test raw' found")
/// .raw();
///
/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found");
/// let input: Input<u64> = input_build.typed(
/// |bytes| serde_json::from_slice(bytes)
/// .map_err(|e| anyhow::anyhow!(e))
/// )?;
/// let input: Input<u64> = inputs.take("test typed")
/// .expect("No input name 'test typed' found")
/// .typed(
/// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
/// );
/// ```
#[derive(Default)]
pub struct Inputs {
Expand All @@ -68,40 +71,39 @@ impl Inputs {
self.hmap.entry(port_id).or_insert(rx);
}

/// Returns an [InputBuilder] for the provided `port_id`, if an input was declared with this
/// exact name in the descriptor of the node, otherwise returns `None`.
/// Returns an Input builder for the provided `port_id`, if an input was declared with this exact name in the
/// descriptor of the node, otherwise returns `None`.
///
/// # Usage
///
/// This builder can either produce a, typed, [`Input<T>`] or an [InputRaw]. The main difference
/// between both is the type of data they expose: an [`Input<T>`] automatically tries to downcast
/// or deserialize the data contained in the message to expose `&T`, while an [InputRaw] simply
/// exposes a [LinkMessage].
///
/// As long as data need to be manipulated, a typed [`Input<T>`] should be favored.
/// This builder can either produce a, typed, [`Input<T>`](Input) or an [InputRaw]. The main difference between both
/// is the type of data they expose: an [`Input<T>`](Input) automatically tries to downcast or deserialize the data
/// contained in the message to expose `&T`, while an [InputRaw] simply exposes a [LinkMessage].
///
/// ## Typed
///
/// To obtain an [`Input<T>`] one must call `typed` and provide a deserializer function. In
/// the example below we rely on the `serde_json` crate to do the deserialization.
/// To obtain an [`Input<T>`](Input) one must call `typed` and provide a deserialiser function. In the example below
/// we rely on the `serde_json` crate to do the deserialisation.
///
/// ```ignore
/// let input_typed: Input<u64> = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// ```no_run
/// # use zenoh_flow_nodes::prelude::*;
/// # let mut inputs = Inputs::default();
/// let input: Input<u64> = inputs.take("test typed")
/// .expect("No input name 'test typed' found")
/// .typed(
/// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e))
/// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
/// );
/// ```
///
/// ## Raw
///
/// To obtain an [InputRaw] one must call `raw`.
///
/// ```ignore
/// let input_raw: InputRaw = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// ```no_run
/// # use zenoh_flow_nodes::prelude::*;
/// # let mut inputs = Inputs::default();
/// let input_raw = inputs.take("test raw")
/// .expect("No input name 'test raw' found")
/// .raw();
/// ```
pub fn take(&mut self, port_id: impl AsRef<str>) -> Option<InputBuilder> {
Expand All @@ -114,8 +116,7 @@ impl Inputs {
}
}

/// An `InputBuilder` is the intermediate structure to obtain either an [`Input<T>`] or an
/// [InputRaw].
/// An `InputBuilder` is the intermediate structure to obtain either an [`Input<T>`](Input) or an [InputRaw].
///
/// The main difference between both is the type of data they expose: an [`Input<T>`] automatically
/// tries to downcast or deserialize the data contained in the message to expose `&T`, while an
Expand Down Expand Up @@ -143,14 +144,15 @@ impl InputBuilder {
/// # `InputRaw` vs `Input<T>`
///
/// If the node needs access to the data to perform computations, an [`Input<T>`] should be
/// favored as it performs the conversion automatically.
/// favoured as it performs the conversion automatically.
///
/// # Example
///
/// ```ignore
/// let input_raw: InputRaw = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// ```no_run
/// # use zenoh_flow_nodes::prelude::*;
/// # let mut inputs = Inputs::default();
/// let input_raw = inputs.take("test raw")
/// .expect("No input name 'test raw' found")
/// .raw();
/// ```
pub fn raw(self) -> InputRaw {
Expand All @@ -163,22 +165,23 @@ impl InputBuilder {
/// Consume the `InputBuilder` to produce an [`Input<T>`].
///
/// An [`Input<T>`] tries to automatically convert the data contained in the [LinkMessage] in
/// order to expose `&T`. Depending on if the data is received serialized or not, to perform
/// this conversion either the `deserializer` is called or a downcast is attempted.
/// order to expose `&T`. Depending on if the data is received serialised or not, to perform
/// this conversion either the `deserialiser` is called or a downcast is attempted.
///
/// # `Input<T>` vs `InputRaw`
///
/// If the node does need to access the data contained in the [LinkMessage], an [InputRaw]
/// should be favored as it does not try to perform the extra conversion steps.
/// should be favoured as it does not try to perform the extra conversion steps.
///
/// # Example
///
/// ```ignore
/// let input_typed: Input<u64> = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// ```no_run
/// # use zenoh_flow_nodes::prelude::*;
/// # let mut inputs = Inputs::default();
/// let input: Input<u64> = inputs.take("test typed")
/// .expect("No input name 'test typed' found")
/// .typed(
/// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e))
/// |bytes| serde_json::from_slice(bytes).map_err(|e| anyhow!(e))
/// );
/// ```
pub fn typed<T>(
Expand All @@ -192,10 +195,12 @@ impl InputBuilder {
}
}

/// An [`InputRaw`](`InputRaw`) exposes the [`LinkMessage`](`LinkMessage`) it receives.
/// An `InputRaw` receives "raw" [LinkMessage].
///
/// It's primary purpose is to ensure "optimal" performance. This can be useful to implement
/// behaviour where actual access to the underlying data is irrelevant.
/// As opposed to a typed [`Input<T>`](Input), an `InputRaw` will not perform any operation on the data it receives.
/// This behaviour is useful when access to the underlying data is either irrelevant (e.g. for rate-limiting purposes)
/// or when Zenoh-Flow should not attempt to interpret the contained [Payload](crate::prelude::Payload) (e.g. for
/// bindings).
#[derive(Clone, Debug)]
pub struct InputRaw {
pub(crate) port_id: PortId,
Expand All @@ -216,11 +221,10 @@ impl InputRaw {
///
/// # Asynchronous alternative: `recv`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`.
/// Although synchronous, but given it is "fail-fast", this method will not block the thread on
/// which it is executed.
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. Although
/// synchronous, this method will not block the thread on which it is executed.
///
/// # Error
/// # Errors
///
/// An error is returned if the associated channel is disconnected.
pub fn try_recv(&self) -> Result<Option<LinkMessage>> {
Expand All @@ -236,15 +240,14 @@ impl InputRaw {
}
}

/// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels
/// associated with this Input.
/// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels associated with this
/// Input.
///
/// If several [LinkMessage] are received at the same time, one is *randomly* selected.
///
/// # Error
/// # Errors
///
/// An error is returned if *all* channels are disconnected. For each disconnected channel, an
/// error is separately logged.
/// An error is returned if a channel was disconnected.
pub async fn recv(&self) -> Result<LinkMessage> {
self.receiver.recv_async().await.map_err(|_| {
tracing::error!("Link disconnected: {}", self.port_id);
Expand All @@ -253,13 +256,17 @@ impl InputRaw {
}
}

/// A typed `Input` that tries to automatically downcast or deserialize the data received in order
/// to expose `&T`.
/// A typed `Input` receiving [`Data<T>`](Data).
///
/// An `Input` will automatically try to downcast or deserialise the [Payload](crate::prelude::Payload) it receives,
/// exposing a [`Data<T>`](Data).
///
/// The type of conversion performed depends on whether the upstream node resides on the same Zenoh-Flow runtime
/// (downcast) or on another runtime (deserialisation).
///
/// # Performance
///
/// If the data is received serialized from the upstream node, an allocation is performed to host
/// the deserialized `T`.
/// If the data is received serialised from the upstream node, an allocation is performed to host the deserialised `T`.
pub struct Input<T> {
pub(crate) input_raw: InputRaw,
pub(crate) deserializer: Arc<DeserializerFn<T>>,
Expand All @@ -275,24 +282,27 @@ impl<T: Send + Sync + 'static> Deref for Input<T> {
}

impl<T: Send + Sync + 'static> Input<T> {
/// Returns the first [`Message<T>`] that was received, *asynchronously*, on any of the channels
/// Returns the first [`Data<T>`](Data) that was received, *asynchronously*, on any of the channels
/// associated with this Input.
///
/// If several [`Message<T>`] are received at the same time, one is *randomly* selected.
/// If several [`Data<T>`](Data) are received at the same time, one is *randomly* selected.
///
/// This method interprets the data to the type associated with this [`Input<T>`].
/// This method interprets the data to the type associated with this [`Input<T>`](Input).
///
/// # Performance
///
/// As this method interprets the data received additional operations are performed:
/// - data received serialized is deserialized (an allocation is performed to store an instance
/// of `T`),
/// - data received "typed" are checked against the type associated to this [`Input<T>`].
/// As this method interprets the data received, additional operations are performed:
/// - data received serialised is deserialised (an allocation is performed to store an instance of `T`),
/// - data received "typed" are checked against the type associated to this [`Input<T>`](Input).
///
/// # Synchronous alternative: `try_recv`
///
/// # Error
/// This method is an asynchronous alternative to it's synchronous fail-fast counterpart: `try_recv`.
///
/// # Errors
///
/// Several errors can occur:
/// - all the channels are disconnected,
/// - a channel was disconnected,
/// - Zenoh-Flow failed at interpreting the received data as an instance of `T`.
pub async fn recv(&self) -> Result<(Data<T>, Timestamp)> {
let LinkMessage { payload, timestamp } = self.input_raw.recv().await?;
Expand All @@ -302,18 +312,24 @@ impl<T: Send + Sync + 'static> Input<T> {
))
}

/// Returns the first [`Message<T>`] that was received on any of the channels associated with this
/// Input, or `None` if all the channels are empty.
/// Returns the first [`Data<T>`](Data) that was received on any of the channels associated with this Input,
/// or [None] if all the channels are empty.
///
/// # Performance
///
/// As this method interprets the data received, additional operations are performed:
/// - data received serialised is deserialised (an allocation is performed to store an instance of `T`),
/// - data received "typed" are checked against the type associated to this [`Input<T>`](Input).
///
/// # Asynchronous alternative: `recv`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`.
/// Although synchronous, this method will not block the thread on which it is executed.
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. Although
/// synchronous, this method will not block the thread on which it is executed.
///
/// # Error
/// # Errors
///
/// Several errors can occur:
/// - no message was received (i.e. Empty error),
/// - a channel was disconnected,
/// - Zenoh-Flow failed at interpreting the received data as an instance of `T`.
///
/// Note that if some channels are disconnected, for each of such channel an error is logged.
Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow-nodes/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

mod inputs;
pub use inputs::{Input, InputRaw, Inputs};
pub use inputs::{Input, InputBuilder, InputRaw, Inputs};

mod outputs;
pub use outputs::{Output, OutputRaw, Outputs};
pub use outputs::{Output, OutputBuilder, OutputRaw, Outputs};
Loading

0 comments on commit 3c18616

Please sign in to comment.