Skip to content

Operator (v0.4.0)

J-Loudet edited this page Dec 30, 2022 · 2 revisions

In Zenoh-Flow, an Operator processes data, received from Sources or other Operators, and forwards the results of this processing to Sinks or other Operators. An Operator is typically where one would write the core of their business logic. For instance, in an object detection pipeline, a Source would fetch the images while the actual detection would be done in an Operator.

There are — for now — two ways to create a Source in Zenoh-Flow, depending on the programming language you favor:

  • a shared library (Rust)
  • a script (Python)

Shared library

Assuming you want to create a Source called my-operator, enter the following in a terminal:

cargo new --lib my-operator

Modify the Cargo.toml to add these dependencies and tell rustc that you want a library that can be dynamically loaded:

[dependencies]
async-trait = "0.1.50"  # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow = { git = "https://github.com/ZettaScaleLabs/zenoh-flow.git", branch = "dev/v0.4.0" }

[lib]
crate-type=["cdylib"]

⚠️ Once we release 0.4.0 on crates.io, the Zenoh-Flow dependency will simply be zenoh-flow = "0.4".

Now modify lib.rs to (i) implement the Zenoh-Flow traits and (ii) include your logic.

Below you can find commented boilerplate code to do (i).

use async_trait::async_trait;
use zenoh_flow::prelude::*;

// MyOperator is where you implement your business' logic.
//
// `Input` and `Output` are structures provided by Zenoh-Flow through which you, respectively, receive data from
// upstream nodes and send data to donwstream nodes.
//
// The way to pass an `Output` and an `Input` is through its Constructor --- see below.
//
// That structure is also where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_operator` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_operator]
struct MyOperator {
    input: Input<String>,
    output: Output<String>,
}

#[async_trait]
impl Node for MyOperator {
    async fn iteration(&self) -> Result<()> {
        // Add your business logic here.
        let data_to_process = self.input.recv().await?;
        let processed_data = format!("{} From Zenoh-Flow!", data_to_process);
        self.output.send(processed_data, None).await
    }
}

#[async_trait]
impl Operator for MySource {
    async fn new(
        // The `context` provides information about the Zenoh-Flow daemon on which the generated
        // node MySource will be executed.
        context: Context,
        // The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
        // dictionary and allows accessing configuration variables defined in the descriptor.
        configuration: Option<Configuration>,
        // The `Inputs` are encapsulated `flume::Receivers` created by Zenoh-Flow. It is a HashMap
        // whose keys match what was defined in the descriptor file.
        mut inputs: Inputs,
        // The `Outputs` are encapsulated `flume::Senders` that were created by Zenoh-Flow. It is
        // a HashMap whose keys match what was defined in the descriptor file.
        mut outputs: Outputs,
    ) -> Result<Self> {
        let input = inputs.take("input").expect("No input named 'input' found");
        let output = outputs.take("output").expect("No output named 'output' found");
        Ok(MyOperator { input, output })
    }
}

(1): Configuration (2): serde_json::Value

Python script

TODO: Add a reference to the auto-generated Python docs.

Clone this wiki locally