diff --git a/Cargo.toml b/Cargo.toml index 6e68ac44..cff3ce95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,10 @@ members = [ "cargo-zenoh-flow", "zenoh-flow", + "zenoh-flow-commons", "zenoh-flow-daemon", "zenoh-flow-derive", + "zenoh-flow-descriptors", "zenoh-flow-plugin", "zfctl", ] @@ -33,6 +35,7 @@ repository = "https://github.com/eclipse-zenoh/zenoh-flow" version = "0.6.0-dev" [workspace.dependencies] +anyhow = "1" async-std = "1.12" async-trait = "0.1.50" base64 = "0.21" @@ -42,7 +45,7 @@ flume = "0.11" futures = "0.3.15" git-version = "0.3" log = "0.4" -serde = "1.0" +serde = { version = "1.0", features = ["derive", "rc"] } serde_cbor = "0.11" serde_derive = "1.0" serde_json = "1.0" diff --git a/rust-toolchain b/rust-toolchain index bfe79d0b..832e9afb 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.70 +1.70.0 diff --git a/zenoh-flow-commons/Cargo.toml b/zenoh-flow-commons/Cargo.toml new file mode 100644 index 00000000..f8b36e26 --- /dev/null +++ b/zenoh-flow-commons/Cargo.toml @@ -0,0 +1,30 @@ +# +# Copyright (c) 2021 - 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +[package] +authors = { workspace = true } +categories = { workspace = true } +description = "Internal crate for Zenoh-Flow." +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "zenoh-flow-commons" +repository = { workspace = true } +version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +ramhorns = "0.14" +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/zenoh-flow-commons/src/configuration.rs b/zenoh-flow-commons/src/configuration.rs new file mode 100644 index 00000000..c1b851a9 --- /dev/null +++ b/zenoh-flow-commons/src/configuration.rs @@ -0,0 +1,130 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ops::Deref, sync::Arc}; + +use serde::{Deserialize, Serialize}; + +use crate::merge::IMergeOverwrite; + +/// A `Configuration` is a recursive key-value structure that allows modifying the behavior of a +/// node without altering its implementation. +/// +/// It is effectively a re-export of [serde_json::Value]. +/// +/// # Examples +/// +/// ## YAML +/// +/// ```yaml +/// configuration: +/// name: "John Doe", +/// age: 43, +/// phones: +/// - "+44 1234567" +/// - "+44 2345678" +/// ``` +/// +/// ## JSON +/// +/// ```json +/// "configuration": { +/// "name": "John Doe", +/// "age": 43, +/// "phones": [ +/// "+44 1234567", +/// "+44 2345678" +/// ] +/// } +/// ``` +// +// NOTE: we take the `serde_json` representation because: +// - JSON is the most supported representation when going online, +// - a `serde_json::Value` can be converted to a `serde_yaml::Value` whereas the opposite is not +// true (YAML introduces "tags" which are not supported by JSON). +#[derive(Default, Deserialize, Debug, Serialize, Clone, PartialEq, Eq)] +pub struct Configuration(Arc); + +impl Deref for Configuration { + type Target = serde_json::Value; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl IMergeOverwrite for Configuration { + fn merge_overwrite(self, other: Self) -> Self { + if self == Configuration::default() { + return other; + } + + if other == Configuration::default() { + return self; + } + + match (self.as_object(), other.as_object()) { + (Some(this), Some(other)) => { + let mut other = other.clone(); + let mut this = this.clone(); + + other.append(&mut this); + Configuration(Arc::new(other.into())) + } + (_, _) => unreachable!( + "We are checking, when deserializing, that a Configuration is a JSON object." + ), + } + } +} + +impl From for Configuration { + fn from(value: serde_json::Value) -> Self { + Self(Arc::new(value)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_merge_configurations() { + let global = Configuration(Arc::new( + json!({ "a": { "nested": true }, "b": ["an", "array"] }), + )); + let local = Configuration(Arc::new(json!({ "a": { "not-nested": false }, "c": 1 }))); + + assert_eq!( + global.clone().merge_overwrite(local), + Configuration(Arc::new( + json!({ "a": { "nested": true }, "b": ["an", "array"], "c": 1 }) + )) + ); + + assert_eq!( + global, + global.clone().merge_overwrite(Configuration::default()) + ); + assert_eq!( + global, + Configuration::default().merge_overwrite(global.clone()) + ); + assert_eq!( + Configuration::default(), + Configuration::default().merge_overwrite(Configuration::default()) + ) + } +} diff --git a/zenoh-flow-commons/src/deserialize.rs b/zenoh-flow-commons/src/deserialize.rs new file mode 100644 index 00000000..dd8267f3 --- /dev/null +++ b/zenoh-flow-commons/src/deserialize.rs @@ -0,0 +1,59 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! This module exposes the functions [deserialize_size] and [deserialize_time] that are used +//! throughout Zenoh-Flow to "parse" values used to express time or size. +//! +//! The external crates [bytesize] and [humantime] are leveraged for these purposes. + +use std::str::FromStr; + +use serde::Deserializer; + +pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + match serde::de::Deserialize::deserialize(deserializer) { + Ok(buf) => Ok(Some( + bytesize::ByteSize::from_str(buf) + .map_err(|_| { + serde::de::Error::custom(format!("Unable to parse value as bytes {buf}")) + })? + .as_u64() as usize, + )), + Err(_) => { + // log::warn!("failed to deserialize size: {:?}", e); + Ok(None) + } + } +} + +pub fn deserialize_time<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + match serde::de::Deserialize::deserialize(deserializer) { + Ok::<&str, _>(buf) => { + let ht = (buf) + .parse::() + .map_err(serde::de::Error::custom)?; + Ok(Some(ht.as_nanos() as u64)) + } + Err(_) => { + // log::warn!("failed to deserialize time: {:?}", e); + Ok(None) + } + } +} diff --git a/zenoh-flow-commons/src/identifiers.rs b/zenoh-flow-commons/src/identifiers.rs new file mode 100644 index 00000000..9f1dd1e1 --- /dev/null +++ b/zenoh-flow-commons/src/identifiers.rs @@ -0,0 +1,109 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; +use std::ops::Deref; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +/// A `NodeId` identifies a Node in a data flow. +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Hash)] +pub struct NodeId(Arc); + +impl Deref for NodeId { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Display for NodeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", &self.0) + } +} + +impl From for NodeId { + fn from(value: String) -> Self { + Self(value.into()) + } +} + +impl From<&str> for NodeId { + fn from(value: &str) -> Self { + Self(value.into()) + } +} + +/// A `PortId` identifies an `Input` or an `Output` of a Node. +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] +pub struct PortId(Arc); + +impl Deref for PortId { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Display for PortId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", &self.0) + } +} + +impl From for PortId { + fn from(value: String) -> Self { + Self(value.into()) + } +} + +impl From<&str> for PortId { + fn from(value: &str) -> Self { + Self(value.into()) + } +} + +/// A `PortId` identifies an `Input` or an `Output` of a Node. +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] +pub struct RuntimeId(Arc); + +impl Deref for RuntimeId { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Display for RuntimeId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", &self.0) + } +} + +impl From for RuntimeId { + fn from(value: String) -> Self { + Self(value.into()) + } +} + +impl From<&str> for RuntimeId { + fn from(value: &str) -> Self { + Self(value.into()) + } +} diff --git a/zenoh-flow-commons/src/lib.rs b/zenoh-flow-commons/src/lib.rs new file mode 100644 index 00000000..737556f8 --- /dev/null +++ b/zenoh-flow-commons/src/lib.rs @@ -0,0 +1,28 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod vars; +pub use vars::Vars; + +mod identifiers; +pub use identifiers::{NodeId, PortId, RuntimeId}; + +mod configuration; +pub use configuration::Configuration; + +mod merge; +pub use merge::IMergeOverwrite; + +/// Zenoh-Flow's result type. +pub type Result = std::result::Result; diff --git a/zenoh-flow-commons/src/merge.rs b/zenoh-flow-commons/src/merge.rs new file mode 100644 index 00000000..3ef66aaf --- /dev/null +++ b/zenoh-flow-commons/src/merge.rs @@ -0,0 +1,17 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub trait IMergeOverwrite { + fn merge_overwrite(self, other: Self) -> Self; +} diff --git a/zenoh-flow-commons/src/vars.rs b/zenoh-flow-commons/src/vars.rs new file mode 100644 index 00000000..fe2f906b --- /dev/null +++ b/zenoh-flow-commons/src/vars.rs @@ -0,0 +1,100 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::merge::IMergeOverwrite; +use crate::Result; +use anyhow::Context; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::rc::Rc; + +/// `Vars` is an internal structure that we use to expand the "mustache variables" in a descriptor +/// file. +/// +/// Mustache variables take the form: "{{ var }}..." where the number of spaces after the '{{' and +/// before the '}}' do not matter. +/// +/// We first parse the descriptor file to only extract the `vars` section and build a `HashMap` out of it. +/// +/// We then load the descriptor file as a template and "render" it, substituting every "mustache +/// variable" with its corresponding value in the HashMap. +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] +pub struct Vars { + #[serde(default)] + vars: Rc, Rc>>, +} + +impl IMergeOverwrite for Vars { + fn merge_overwrite(self, other: Self) -> Self { + let mut merged = (*other.vars).clone(); + merged.extend((*self.vars).clone().into_iter()); + + Self { + vars: Rc::new(merged), + } + } +} + +impl From<[(&str, &str); N]> for Vars { + fn from(value: [(&str, &str); N]) -> Self { + Self { + vars: Rc::new( + value + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect::, Rc>>(), + ), + } + } +} + +impl Vars { + /// TODO@J-Loudet Documentation. + pub fn expand_mustache(&self, data: &str) -> Result { + Ok(ramhorns::Template::new(data) + .context(format!( + "Failed to create a ramhorns::Template from:\n{}", + data + ))? + .render(&self.vars)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_merge_overwrite() { + let map = Vars::from([("BASE_DIR", "/home/zenoh-flow/dev"), ("BUILD", "release")]); + + let placeholder = Vars::from([ + ("BASE_DIR", "/path/to/zenoh-flow"), + ("BUILD", "debug"), + ("RUST_LOG", "zenoh_flow_trace"), + ]); + + let expected = Vars::from([ + ("BASE_DIR", "/home/zenoh-flow/dev"), + ("BUILD", "release"), + ("RUST_LOG", "zenoh_flow_trace"), + ]); + + assert_eq!(expected, map.clone().merge_overwrite(placeholder)); + + assert_eq!(map, map.clone().merge_overwrite(Vars::default())); + + assert_eq!(map, Vars::default().merge_overwrite(map.clone())); + } +} diff --git a/zenoh-flow-descriptors/Cargo.toml b/zenoh-flow-descriptors/Cargo.toml new file mode 100644 index 00000000..1446861e --- /dev/null +++ b/zenoh-flow-descriptors/Cargo.toml @@ -0,0 +1,34 @@ +# +# Copyright (c) 2021 - 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +[package] +authors = { workspace = true } +categories = { workspace = true } +description = "Internal crate for Zenoh-Flow." +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "zenoh-flow-descriptors" +repository = { workspace = true } +version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +bytesize = "1.2.0" +humantime = "2.1" +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +url = "2.2" +zenoh-flow-commons = { path = "../zenoh-flow-commons" } diff --git a/zenoh-flow-descriptors/src/composite/io.rs b/zenoh-flow-descriptors/src/composite/io.rs new file mode 100644 index 00000000..de2a1f72 --- /dev/null +++ b/zenoh-flow-descriptors/src/composite/io.rs @@ -0,0 +1,101 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use super::ISubstituable; +use crate::deserialize::deserialize_id; +use crate::{InputDescriptor, OutputDescriptor}; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{NodeId, PortId}; + +/// TODO@J-Loudet example? +/// TODO@J-Loudet documentation? +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CompositeOutputDescriptor { + pub id: PortId, + #[serde(deserialize_with = "deserialize_id")] + pub node: NodeId, + pub output: PortId, +} + +impl CompositeOutputDescriptor { + pub fn new(id: impl AsRef, node: impl AsRef, output: impl AsRef) -> Self { + Self { + id: id.as_ref().into(), + node: node.as_ref().into(), + output: output.as_ref().into(), + } + } +} + +impl ISubstituable for CompositeOutputDescriptor { + fn substitute(&mut self, subs: &super::Substitutions) { + if let Some(new_id) = subs.get(&self.node) { + self.node = new_id.clone(); + } + } +} + +impl From for OutputDescriptor { + fn from(composite: CompositeOutputDescriptor) -> Self { + Self { + node: composite.node, + output: composite.output, + } + } +} + +/// Describes an Input of a Composite Operator. +/// +/// # Example +/// +/// ```yaml +/// id: UniquePortIdentifier +/// node: Node +/// input: Input +/// ``` +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct CompositeInputDescriptor { + pub id: PortId, + #[serde(deserialize_with = "deserialize_id")] + pub node: NodeId, + pub input: PortId, +} + +impl CompositeInputDescriptor { + pub fn new(id: impl AsRef, node: impl AsRef, input: impl AsRef) -> Self { + Self { + id: id.as_ref().into(), + node: node.as_ref().into(), + input: input.as_ref().into(), + } + } +} + +impl ISubstituable for CompositeInputDescriptor { + fn substitute(&mut self, subs: &super::Substitutions) { + if let Some(new_id) = subs.get(&self.node) { + self.node = new_id.clone(); + } + } +} + +impl From for InputDescriptor { + fn from(composite: CompositeInputDescriptor) -> Self { + Self { + node: composite.node, + input: composite.input, + } + } +} diff --git a/zenoh-flow-descriptors/src/composite/mod.rs b/zenoh-flow-descriptors/src/composite/mod.rs new file mode 100644 index 00000000..0d41aeef --- /dev/null +++ b/zenoh-flow-descriptors/src/composite/mod.rs @@ -0,0 +1,82 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod io; +mod operator; + +pub use io::{CompositeInputDescriptor, CompositeOutputDescriptor}; +pub use operator::CompositeOperatorDescriptor; + +use std::{ + collections::HashMap, + fmt::Display, + hash::Hash, + ops::{Deref, DerefMut}, +}; + +/// TODO@J-Loudet documentation? +pub trait ISubstituable { + fn substitute(&mut self, subs: &Substitutions); +} + +/// `Substitutions` is an insert only structure that keeps track of all the substitutions to perform. +/// +/// It is leveraged in Zenoh-Flow during the flattening of data flows. +#[derive(Debug, PartialEq, Eq)] +pub struct Substitutions(HashMap); + +impl Default for Substitutions { + fn default() -> Self { + Self(HashMap::new()) + } +} + +impl Deref for Substitutions { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Substitutions { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From> for Substitutions { + fn from(value: HashMap) -> Self { + Self(value) + } +} + +impl From<[(T, T); N]> for Substitutions { + fn from(value: [(T, T); N]) -> Self { + Self(HashMap::from(value)) + } +} + +impl Substitutions { + // pub fn new_with(key: T, value: T) -> Self { + // Self(HashMap::from([(key, value)])) + // } + + // TODO: instead of a slice, provide an iterator? + pub fn apply(&self, substituables: &mut [impl ISubstituable]) { + substituables + .iter_mut() + .for_each(|substituable| substituable.substitute(self)) + } +} diff --git a/zenoh-flow-descriptors/src/composite/operator.rs b/zenoh-flow-descriptors/src/composite/operator.rs new file mode 100644 index 00000000..7c66d8e6 --- /dev/null +++ b/zenoh-flow-descriptors/src/composite/operator.rs @@ -0,0 +1,243 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use crate::composite::Substitutions; +use crate::flattened::{IFlattenableComposite, Patch}; +use crate::{ + CompositeInputDescriptor, CompositeOutputDescriptor, FlattenedOperatorDescriptor, + InputDescriptor, LinkDescriptor, NodeDescriptor, OperatorDescriptor, OutputDescriptor, +}; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, Vars}; + +/// A `Composite Operator` Zenoh-Flow node. +/// +/// A Composite Operator is a meta-operator: it groups together one or more Operators in a single descriptor. Its main +/// purpose is to simplify the creation of data flow graphs by allowing this form of grouping. +/// +/// # `configuration` section caveats +/// +/// The `configuration` section of a Composite Operator supersedes the same section in the Operator(s) it references. +/// See the documentation for more details regarding this behavior. +/// +/// # Examples +/// +/// ``` +/// use zenoh_flow_descriptors::CompositeOperatorDescriptor; +/// +/// let yaml = " +/// name: CompositeOperator +/// +/// configuration: +/// name: foo +/// +/// operators: +/// - id: InnerOperator1 +/// descriptor: file:///home/zenoh-flow/nodes/operator1.so +/// +/// - id: InnerOperator2 +/// descriptor: file:///home/zenoh-flow/nodes/operator2.so +/// +/// links: +/// - from: +/// node: InnerOperator1 +/// output: out-2 +/// to: +/// node: InnerOperator2 +/// input: in-1 +/// +/// inputs: +/// - id: CompositeOperator-in +/// node: InnerOperator1 +/// input: in-1 +/// +/// outputs: +/// - id: CompositeOperator-out +/// node: InnerOperator2 +/// output: out-1 +/// "; +/// +/// let composite_operator_yaml = serde_yaml::from_str::(&yaml).unwrap(); +/// +/// let json = " +/// { +/// \"name\": \"CompositeOperator\", +/// +/// \"configuration\": { +/// \"name\": \"foo\" +/// }, +/// +/// \"operators\": [ +/// { +/// \"id\": \"InnerOperator1\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/operator1.so\" +/// }, +/// { +/// \"id\": \"InnerOperator2\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/operator2.so\" +/// } +/// ], +/// +/// \"links\": [ +/// { +/// \"from\": { +/// \"node\": \"InnerOperator1\", +/// \"output\": \"out-2\" +/// }, +/// \"to\": { +/// \"node\": \"InnerOperator2\", +/// \"input\": \"in-1\" +/// } +/// } +/// ], +/// +/// \"inputs\": [ +/// { +/// \"id\": \"CompositeOperator-in\", +/// \"node\": \"InnerOperator1\", +/// \"input\": \"in-1\" +/// } +/// ], +/// +/// \"outputs\": [ +/// { +/// \"id\": \"CompositeOperator-out\", +/// \"node\": \"InnerOperator2\", +/// \"output\": \"out-1\" +/// } +/// ] +/// }"; +/// +/// let composite_operator_json = serde_json::from_str::(&json).unwrap(); +/// assert_eq!(composite_operator_yaml, composite_operator_json); +/// assert_eq!(composite_operator_yaml.configuration.get("name").unwrap().as_str(), Some("foo")); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct CompositeOperatorDescriptor { + pub name: NodeId, + pub inputs: Vec, + pub outputs: Vec, + pub operators: Vec, + pub links: Vec, + #[serde(default)] + pub configuration: Configuration, +} + +impl std::fmt::Display for CompositeOperatorDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Composite Operator: {}", self.name) + } +} + +impl IFlattenableComposite for CompositeOperatorDescriptor { + type Flattened = FlattenedOperatorDescriptor; + type Flattenable = OperatorDescriptor; + + fn flatten_composite( + mut self, + composite_id: NodeId, + overwriting_configuration: Configuration, + runtime: Option, + vars: Vars, + ancestors: &mut HashSet>, + ) -> Result<(Vec, Vec, Patch)> { + let mut flattened_operators = Vec::with_capacity(self.operators.len()); + let composite_configuration = self.configuration; + + for operator_desc in self.operators.into_iter() { + // The overwriting_configuration coming from upstream has the highest priority. + // The current node's configuration has higher priority than the composite's configuration. + let node_overwriting_configuration = overwriting_configuration.clone().merge_overwrite( + operator_desc + .configuration + .clone() + .merge_overwrite(composite_configuration.clone()), + ); + + let (mut operators, mut links, patch) = operator_desc + .flatten_maybe_composite::( + node_overwriting_configuration, + runtime.clone(), + vars.clone(), + // If we don't clone the ancestors between successive calls, consecutive composite operators + // referring to the same descriptor would be falsely flagged as "infinite recursions". + &mut ancestors.clone(), + )?; + + flattened_operators.append(&mut operators); + // We patch the links before appending the new ones to avoid some useless work. + patch.apply(&mut self.links); + self.links.append(&mut links); + } + + // We have processed all operators. Time to patch. + // 1. Prepend each operator id with the id of the composite. + let subs_nodes: Substitutions = flattened_operators + .iter_mut() + .map(|flattened_operator| { + ( + flattened_operator.id.clone(), // previous id + flattened_operator.composite_id(&composite_id), // `composite_id` returns a clone of the updated id + ) + }) + .collect::>() + .into(); + + // 2. Apply the `NodeId` substitutions on the links + the composite inputs/outputs. + subs_nodes.apply(&mut self.links); + subs_nodes.apply(&mut self.inputs); + subs_nodes.apply(&mut self.outputs); + + // We need to tell upstream how to update the links that involve this Composite: + let subs_inputs: Substitutions = self + .inputs + .into_iter() + .map(|input| { + let old_input = InputDescriptor { + node: composite_id.clone(), + input: input.id.clone(), + }; + (old_input, input.into()) + }) + .collect::>() + .into(); + + let subs_outputs: Substitutions = self + .outputs + .into_iter() + .map(|output| { + let old_output = OutputDescriptor { + node: composite_id.clone(), + output: output.id.clone(), + }; + (old_output, output.into()) + }) + .collect::>() + .into(); + + Ok(( + flattened_operators, + self.links, + Patch::new(subs_inputs, subs_outputs), + )) + } +} + +#[cfg(test)] +#[path = "./tests.rs"] +mod tests; diff --git a/zenoh-flow-descriptors/src/composite/tests.rs b/zenoh-flow-descriptors/src/composite/tests.rs new file mode 100644 index 00000000..48c913e7 --- /dev/null +++ b/zenoh-flow-descriptors/src/composite/tests.rs @@ -0,0 +1,265 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh_flow_commons::{Configuration, Vars}; + +use crate::flattened::IFlattenableComposite; +use std::collections::HashSet; + +use crate::{ + composite::Substitutions, flattened::Patch, CompositeInputDescriptor, + CompositeOperatorDescriptor, CompositeOutputDescriptor, FlattenedOperatorDescriptor, + InputDescriptor, LinkDescriptor, NodeDescriptor, OutputDescriptor, +}; + +const BASE_DIR: &str = "./tests/descriptors"; + +#[test] +fn test_flatten_composite_descriptor_non_nested() { + let composite_descriptor = CompositeOperatorDescriptor { + name: "composite-test".into(), + inputs: vec![ + CompositeInputDescriptor::new("input-1", "my-operator-1", "operator-1-in-1"), + CompositeInputDescriptor::new("input-2", "my-operator-1", "operator-1-in-2"), + ], + outputs: vec![CompositeOutputDescriptor::new( + "output-1", + "my-operator-2", + "operator-2-out", + )], + operators: vec![ + NodeDescriptor { + id: "my-operator-1".into(), + descriptor: format!("file://./{}/operator-1.yml", BASE_DIR).into(), + configuration: Configuration::default(), + }, + NodeDescriptor { + id: "my-operator-2".into(), + descriptor: format!("file://./{}/operator-2.yml", BASE_DIR).into(), + configuration: Configuration::default(), + }, + ], + links: vec![LinkDescriptor::new( + OutputDescriptor::new("my-operator-1", "operator-1-out"), + InputDescriptor::new("my-operator-2", "operator-2-in"), + )], + configuration: Configuration::default(), + }; + + let runtime = Some("zf-plugin-1".into()); + let (flattened_operators, flattened_links, patch) = composite_descriptor + .flatten_composite( + "composite".into(), + Configuration::default(), + runtime.clone(), + Vars::default(), + &mut HashSet::default(), + ) + .expect("Unexpected error while flattening composite operator"); + + // Here we check that the id of the operator has been correctly updated: + // - it should be prefixed with the name of the composite operator, + // - a "/" should follow, + // - the name of the operator as per how it’s written in the composite should be last. + // + // We should see "composite/my-operator-1" & "composite/my-operator-2". + let expected_operators = vec![ + FlattenedOperatorDescriptor { + id: "composite/my-operator-1".into(), + name: "operator-1".into(), + inputs: vec!["operator-1-in-1".into(), "operator-1-in-2".into()], + outputs: vec!["operator-1-out".into()], + uri: Some("file://operator-1.so".into()), + configuration: Configuration::default(), + runtime: runtime.clone(), + }, + FlattenedOperatorDescriptor { + id: "composite/my-operator-2".into(), + name: "operator-2".into(), + inputs: vec!["operator-2-in".into()], + outputs: vec!["operator-2-out".into()], + uri: Some("file://operator-2.so".into()), + configuration: Configuration::default(), + runtime, + }, + ]; + + // NOTE: This `assert_eq` also checks the order of the elements! + assert_eq!(flattened_operators, expected_operators); + + // Here we check that the links have been correctly updated: + // - the id of the composite operator has been replaced with the "new_id" (composite/operator), + // - the inputs & outputs have been replaced with what was declared in the yaml file of the + // actual operator, + // - the links inside the composite operator have been added, + // - only the links of the concerned composite operator were updated. + let expected_links = vec![LinkDescriptor::new( + OutputDescriptor::new("composite/my-operator-1", "operator-1-out"), + InputDescriptor::new("composite/my-operator-2", "operator-2-in"), + )]; + + // NOTE: This `assert_eq` also checks the order of the elements! + assert_eq!(flattened_links, expected_links); + + let expected_patch = Patch { + subs_inputs: Substitutions::::from([ + ( + InputDescriptor::new("composite", "input-1"), + InputDescriptor::new("composite/my-operator-1", "operator-1-in-1"), + ), + ( + InputDescriptor::new("composite", "input-2"), + InputDescriptor::new("composite/my-operator-1", "operator-1-in-2"), + ), + ]), + subs_outputs: Substitutions::::from([( + OutputDescriptor::new("composite", "output-1"), + OutputDescriptor::new("composite/my-operator-2", "operator-2-out"), + )]), + }; + + assert_eq!(expected_patch, patch); +} + +#[test] +fn test_flatten_composite_descriptor_nested() { + let nested_composite_descriptor = CompositeOperatorDescriptor { + name: "nested-composite-test".into(), + inputs: vec![CompositeInputDescriptor::new( + "composite-input", + "composite-outer-i", + "composite-outer-in", + )], + outputs: vec![CompositeOutputDescriptor::new( + "composite-output", + "composite-outer-o", + "composite-outer-out", + )], + operators: vec![ + NodeDescriptor { + id: "composite-outer-o".into(), + descriptor: format!("file://./{}/composite-outer.yml", BASE_DIR).into(), + configuration: Configuration::default(), + }, + NodeDescriptor { + id: "composite-nested".into(), + descriptor: format!("file://./{}/composite-nested.yml", BASE_DIR).into(), + configuration: Configuration::default(), + }, + NodeDescriptor { + id: "composite-outer-i".into(), + descriptor: format!("file://./{}/composite-outer.yml", BASE_DIR).into(), + configuration: Configuration::default(), + }, + ], + links: vec![ + LinkDescriptor::new( + OutputDescriptor::new("composite-outer-i", "composite-outer-out"), + InputDescriptor::new("composite-nested", "composite-nested-in"), + ), + LinkDescriptor::new( + OutputDescriptor::new("composite-nested", "composite-nested-out"), + InputDescriptor::new("composite-outer-o", "composite-outer-in"), + ), + ], + configuration: Configuration::default(), + }; + + let (flattened_operators, flattened_links, patch) = nested_composite_descriptor + .flatten_composite( + "composite".into(), + Configuration::default(), + None, + Vars::from([("SCHEME", "file://"), ("BASE_DIR", BASE_DIR)]), + &mut HashSet::default(), + ) + .expect("Unexpected error while calling `flatten`"); + + // Important checks: + // - operators in composite are prefixed with "composite/", + // - operators in composite and composite-nested are prefixed with "composite/composite-nested". + let expected_operators = vec![ + FlattenedOperatorDescriptor { + id: "composite/composite-outer-o".into(), + name: "composite-outer".into(), + inputs: vec!["composite-outer-in".into()], + outputs: vec!["composite-outer-out".into()], + uri: Some("file://composite-outer.so".into()), + configuration: Configuration::default(), + runtime: None, + }, + FlattenedOperatorDescriptor { + id: "composite/composite-nested/operator-1".into(), + name: "operator-1".into(), + inputs: vec!["operator-1-in-1".into(), "operator-1-in-2".into()], + outputs: vec!["operator-1-out".into()], + uri: Some("file://operator-1.so".into()), + configuration: Configuration::default(), + runtime: None, + }, + FlattenedOperatorDescriptor { + id: "composite/composite-nested/operator-2".into(), + name: "operator-2".into(), + inputs: vec!["operator-2-in".into()], + outputs: vec!["operator-2-out".into()], + uri: Some("file://operator-2.so".into()), + configuration: Configuration::default(), + runtime: None, + }, + FlattenedOperatorDescriptor { + id: "composite/composite-outer-i".into(), + name: "composite-outer".into(), + inputs: vec!["composite-outer-in".into()], + outputs: vec!["composite-outer-out".into()], + uri: Some("file://composite-outer.so".into()), + configuration: Configuration::default(), + runtime: None, + }, + ]; + + assert_eq!(expected_operators, flattened_operators); + + // Important checks: + // - operators in composite are prefixed with "composite/", + // - operators in composite and composite-nested are prefixed with "composite/composite-nested". + let expected_links = vec![ + LinkDescriptor::new( + OutputDescriptor::new("composite/composite-outer-i", "composite-outer-out"), + InputDescriptor::new("composite/composite-nested/operator-1", "operator-1-in-1"), + ), + LinkDescriptor::new( + OutputDescriptor::new("composite/composite-nested/operator-2", "operator-2-out"), + InputDescriptor::new("composite/composite-outer-o", "composite-outer-in"), + ), + LinkDescriptor::new( + OutputDescriptor::new("composite/composite-nested/operator-1", "operator-1-out"), + InputDescriptor::new("composite/composite-nested/operator-2", "operator-2-in"), + ), + ]; + + assert_eq!(expected_links, flattened_links); + + let expected_patch = Patch { + subs_inputs: Substitutions::::from([( + InputDescriptor::new("composite", "composite-input"), + InputDescriptor::new("composite/composite-outer-i", "composite-outer-in"), + )]), + subs_outputs: Substitutions::::from([( + OutputDescriptor::new("composite", "composite-output"), + OutputDescriptor::new("composite/composite-outer-o", "composite-outer-out"), + )]), + }; + + assert_eq!(expected_patch, patch); +} diff --git a/zenoh-flow-descriptors/src/dataflow.rs b/zenoh-flow-descriptors/src/dataflow.rs new file mode 100644 index 00000000..eff4dfd1 --- /dev/null +++ b/zenoh-flow-descriptors/src/dataflow.rs @@ -0,0 +1,240 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + CompositeOperatorDescriptor, FlattenedDataFlowDescriptor, LinkDescriptor, NodeDescriptor, + SinkDescriptor, SourceDescriptor, +}; + +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, RuntimeId, Vars}; + +/// TODO@J-Loudet Documentation? +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::DataFlowDescriptor; +/// +/// let yaml = " +/// flow: DataFlow +/// +/// configuration: +/// foo: bar +/// +/// sources: +/// - id: Source-0 +/// descriptor: file:///home/zenoh-flow/nodes/source.yaml +/// configuration: +/// answer: 0 +/// +/// operators: +/// - id: Operator-1 +/// descriptor: file:///home/zenoh-flow/nodes/operator.yaml +/// configuration: +/// answer: 1 +/// +/// sinks: +/// - id: Sink-2 +/// descriptor: file:///home/zenoh-flow/nodes/sink.yaml +/// configuration: +/// answer: 2 +/// +/// links: +/// - from: +/// node: Source-0 +/// output : out-operator +/// to: +/// node : Operator-1 +/// input : in-source +/// +/// - from: +/// node : Operator-1 +/// output : out-sink +/// to: +/// node : Sink-2 +/// input : in-operator +/// +/// mapping: +/// Source-0: zenoh-flow-plugin-0 +/// "; +/// +/// let data_flow_yaml = serde_yaml::from_str::(yaml).unwrap(); +/// +/// let json = " +/// { +/// \"flow\": \"DataFlow\", +/// +/// \"configuration\": { +/// \"foo\": \"bar\" +/// }, +/// +/// \"sources\": [ +/// { +/// \"id\": \"Source-0\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/source.yaml\", +/// \"configuration\": { +/// \"answer\": 0 +/// } +/// } +/// ], +/// +/// \"operators\": [ +/// { +/// \"id\": \"Operator-1\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/operator.yaml\", +/// \"configuration\": { +/// \"answer\": 1 +/// } +/// } +/// ], +/// +/// \"sinks\": [ +/// { +/// \"id\": \"Sink-2\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/sink.yaml\", +/// \"configuration\": { +/// \"answer\": 2 +/// } +/// } +/// ], +/// +/// \"links\": [ +/// { +/// \"from\": { +/// \"node\": \"Source-0\", +/// \"output\": \"out-operator\" +/// }, +/// \"to\": { +/// \"node\": \"Operator-1\", +/// \"input\": \"in-source\" +/// } +/// }, +/// { +/// \"from\": { +/// \"node\": \"Operator-1\", +/// \"output\": \"out-sink\" +/// }, +/// \"to\": { +/// \"node\": \"Sink-2\", +/// \"input\": \"in-operator\" +/// } +/// } +/// ], +/// +/// \"mapping\": { +/// \"Source-0\": \"zenoh-flow-plugin-0\" +/// } +/// } +/// "; +/// +/// let data_flow_json = serde_json::from_str::(json).unwrap(); +/// assert_eq!(data_flow_yaml, data_flow_json); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct DataFlowDescriptor { + pub flow: String, + #[serde(default)] + pub configuration: Configuration, + pub operators: Vec, + pub sources: Vec, + pub sinks: Vec, + pub links: Vec, + pub mapping: Option>, +} + +impl DataFlowDescriptor { + pub fn flatten(self, vars: Vars) -> Result { + let DataFlowDescriptor { + flow, + configuration: flow_configuration, + operators, + sources, + sinks, + mut links, + mapping, + } = self; + + let mapping = mapping.unwrap_or_default(); + + let mut flattened_sources = Vec::with_capacity(sources.len()); + for source_desc in sources { + // The configuration of the Node has higher priority than the configuration of the Data Flow. + let overwriting_configuration = source_desc + .configuration + .clone() + .merge_overwrite(flow_configuration.clone()); + let runtime = mapping.get(&source_desc.id).cloned(); + + flattened_sources.push(source_desc.flatten::( + overwriting_configuration, + runtime, + vars.clone(), + )?); + } + + let mut flattened_sinks = Vec::with_capacity(sinks.len()); + for sink_desc in sinks { + // The configuration of the Node has higher priority than the configuration of the Data Flow. + let overwriting_configuration = sink_desc + .configuration + .clone() + .merge_overwrite(flow_configuration.clone()); + + let runtime = mapping.get(&sink_desc.id).cloned(); + + flattened_sinks.push(sink_desc.flatten::( + overwriting_configuration, + runtime, + vars.clone(), + )?); + } + + let mut flattened_operators = Vec::with_capacity(operators.len()); + for operator_desc in operators { + // The configuration of the Node has higher priority than the configuration of the Data Flow. + let overwriting_configuration = operator_desc + .configuration + .clone() + .merge_overwrite(flow_configuration.clone()); + let runtime = mapping.get(&operator_desc.id).cloned(); + + let (mut flat_operators, mut flat_links, patch) = operator_desc + .flatten_maybe_composite::( + overwriting_configuration, + runtime, + vars.clone(), + &mut HashSet::default(), + )?; + + flattened_operators.append(&mut flat_operators); + patch.apply(&mut links); + links.append(&mut flat_links); + } + + Ok(FlattenedDataFlowDescriptor { + flow, + sources: flattened_sources, + operators: flattened_operators, + sinks: flattened_sinks, + links, + }) + } +} + +#[cfg(test)] +#[path = "./tests.rs"] +mod tests; diff --git a/zenoh-flow-descriptors/src/deserialize.rs b/zenoh-flow-descriptors/src/deserialize.rs new file mode 100644 index 00000000..d502f464 --- /dev/null +++ b/zenoh-flow-descriptors/src/deserialize.rs @@ -0,0 +1,73 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! This module exposes the functions [deserialize_size] and [deserialize_time] that are used +//! throughout Zenoh-Flow to "parse" values used to express time or size. +//! +//! The external crates [bytesize] and [humantime] are leveraged for these purposes. + +use serde::Deserializer; +use std::str::FromStr; +use zenoh_flow_commons::NodeId; + +pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + match serde::de::Deserialize::deserialize(deserializer) { + Ok(buf) => Ok(Some( + bytesize::ByteSize::from_str(buf) + .map_err(|_| { + serde::de::Error::custom(format!("Unable to parse value as bytes {buf}")) + })? + .as_u64() as usize, + )), + Err(_) => { + // log::warn!("failed to deserialize size: {:?}", e); + Ok(None) + } + } +} + +pub fn deserialize_time<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + match serde::de::Deserialize::deserialize(deserializer) { + Ok::<&str, _>(buf) => { + let ht = (buf) + .parse::() + .map_err(serde::de::Error::custom)?; + Ok(Some(ht.as_nanos() as u64)) + } + Err(_) => { + // log::warn!("failed to deserialize time: {:?}", e); + Ok(None) + } + } +} + +pub fn deserialize_id<'de, D>(deserializer: D) -> std::result::Result +where + D: Deserializer<'de>, +{ + let id: String = serde::de::Deserialize::deserialize(deserializer)?; + if id.contains('/') { + return Err(serde::de::Error::custom(format!( + "A NodeId cannot contain any '/': {id}" + ))); + } + + Ok(id.into()) +} diff --git a/zenoh-flow-descriptors/src/flattened/dataflow.rs b/zenoh-flow-descriptors/src/flattened/dataflow.rs new file mode 100644 index 00000000..d7353133 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/dataflow.rs @@ -0,0 +1,174 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + FlattenedOperatorDescriptor, FlattenedSinkDescriptor, FlattenedSourceDescriptor, LinkDescriptor, +}; + +use serde::{Deserialize, Serialize}; + +/// TODO@J-Loudet Documentation? +/// +/// # Examples +/// +/// ``` +/// use zenoh_flow_descriptors::FlattenedDataFlowDescriptor; +/// +/// let yaml = " +/// flow: DataFlow +/// +/// sources: +/// - id: Source-0 +/// name: Source +/// configuration: +/// foo: bar +/// answer: 0 +/// uri: file:///home/zenoh-flow/node/libsource.so +/// outputs: +/// - out-operator +/// mapping: zenoh-flow-plugin-0 +/// +/// operators: +/// - id: Operator-1 +/// name: Operator +/// configuration: +/// foo: bar +/// answer: 1 +/// uri: file:///home/zenoh-flow/node/liboperator.so +/// inputs: +/// - in-source +/// outputs: +/// - out-sink +/// +/// sinks: +/// - id: Sink-2 +/// name: Sink +/// configuration: +/// foo: bar +/// answer: 2 +/// uri: file:///home/zenoh-flow/node/libsink.so +/// inputs: +/// - in-operator +/// +/// links: +/// - from: +/// node: Source-0 +/// output : out-operator +/// to: +/// node : Operator-1 +/// input : in-source +/// +/// - from: +/// node : Operator-1 +/// output : out-sink +/// to: +/// node : Sink-2 +/// input : in-operator +/// "; +/// +/// let data_flow_yaml = serde_yaml::from_str::(yaml).unwrap(); +/// +/// let json = " +/// { +/// \"flow\": \"DataFlow\", +/// +/// \"configuration\": { +/// \"foo\": \"bar\" +/// }, +/// +/// \"sources\": [ +/// { +/// \"id\": \"Source-0\", +/// \"name\": \"Source\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 0 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsource.so\", +/// \"outputs\": [ +/// \"out-operator\" +/// ], +/// \"mapping\": \"zenoh-flow-plugin-0\" +/// } +/// ], +/// +/// \"operators\": [ +/// { +/// \"id\": \"Operator-1\", +/// \"name\": \"Operator\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 1 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/liboperator.so\", +/// \"inputs\": [ +/// \"in-source\" +/// ], +/// \"outputs\": [ +/// \"out-sink\" +/// ] +/// } +/// ], +/// +/// \"sinks\": [ +/// { +/// \"id\": \"Sink-2\", +/// \"name\": \"Sink\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 2 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsink.so\", +/// \"inputs\": [ +/// \"in-operator\" +/// ] +/// } +/// ], +/// +/// \"links\": [ +/// { +/// \"from\": { +/// \"node\": \"Source-0\", +/// \"output\": \"out-operator\" +/// }, +/// \"to\": { +/// \"node\": \"Operator-1\", +/// \"input\": \"in-source\" +/// } +/// }, +/// { +/// \"from\": { +/// \"node\": \"Operator-1\", +/// \"output\": \"out-sink\" +/// }, +/// \"to\": { +/// \"node\": \"Sink-2\", +/// \"input\": \"in-operator\" +/// } +/// } +/// ] +/// } +/// "; +/// +/// let data_flow_json = serde_json::from_str::(json).unwrap(); +/// assert_eq!(data_flow_yaml, data_flow_json); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlattenedDataFlowDescriptor { + pub flow: String, + pub sources: Vec, + pub operators: Vec, + pub sinks: Vec, + pub links: Vec, +} diff --git a/zenoh-flow-descriptors/src/flattened/mod.rs b/zenoh-flow-descriptors/src/flattened/mod.rs new file mode 100644 index 00000000..53ffc1ef --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/mod.rs @@ -0,0 +1,77 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod nodes; +use std::{ + collections::HashSet, + fmt::{Debug, Display}, + sync::Arc, +}; + +pub use nodes::{FlattenedOperatorDescriptor, FlattenedSinkDescriptor, FlattenedSourceDescriptor}; + +mod dataflow; +pub use dataflow::FlattenedDataFlowDescriptor; + +use crate::{composite::Substitutions, InputDescriptor, LinkDescriptor, OutputDescriptor}; +use serde::de::DeserializeOwned; +use zenoh_flow_commons::{Configuration, NodeId, Result, RuntimeId, Vars}; + +pub(crate) trait IFlattenableComposite: DeserializeOwned + Display + Debug + Clone { + type Flattened: Debug + Display; + type Flattenable: IFlattenable; + + fn flatten_composite( + self, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + vars: Vars, + ancestors: &mut HashSet>, + ) -> Result<(Vec, Vec, Patch)>; +} + +pub trait IFlattenable: DeserializeOwned + Display + Debug { + type Flattened: Debug + Display; + + fn flatten( + self, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + ) -> Self::Flattened; +} + +#[derive(Default, Debug, PartialEq, Eq)] +pub struct Patch { + pub subs_inputs: Substitutions, + pub subs_outputs: Substitutions, +} + +impl Patch { + pub fn new( + subs_inputs: Substitutions, + subs_outputs: Substitutions, + ) -> Self { + Self { + subs_inputs, + subs_outputs, + } + } + + pub fn apply(self, links: &mut [LinkDescriptor]) { + self.subs_inputs.apply(links); + self.subs_outputs.apply(links); + } +} diff --git a/zenoh-flow-descriptors/src/flattened/nodes/mod.rs b/zenoh-flow-descriptors/src/flattened/nodes/mod.rs new file mode 100644 index 00000000..54a6e414 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/nodes/mod.rs @@ -0,0 +1,22 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod operator; +pub use operator::FlattenedOperatorDescriptor; + +mod source; +pub use source::FlattenedSourceDescriptor; + +mod sink; +pub use sink::FlattenedSinkDescriptor; diff --git a/zenoh-flow-descriptors/src/flattened/nodes/operator.rs b/zenoh-flow-descriptors/src/flattened/nodes/operator.rs new file mode 100644 index 00000000..4f375546 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/nodes/operator.rs @@ -0,0 +1,124 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; + +use crate::OperatorDescriptor; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; + +/// TODO@J-Loudet Documentation? +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::FlattenedOperatorDescriptor; +/// +/// let yaml = " +/// id: Operator-1 +/// name: Operator +/// configuration: +/// foo: bar +/// answer: 1 +/// uri: file:///home/zenoh-flow/node/liboperator.so +/// inputs: +/// - in-source +/// outputs: +/// - out-sink +/// "; +/// let operator_yaml = serde_yaml::from_str::(yaml).unwrap(); +/// +/// let json = " +/// { +/// \"id\": \"Operator-1\", +/// \"name\": \"Operator\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 1 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/liboperator.so\", +/// \"inputs\": [ +/// \"in-source\" +/// ], +/// \"outputs\": [ +/// \"out-sink\" +/// ] +/// } +/// "; +/// +/// let operator_json = serde_json::from_str::(json).unwrap(); +/// +/// assert_eq!(operator_yaml, operator_json); +/// ``` + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct FlattenedOperatorDescriptor { + pub id: NodeId, + pub name: String, + pub inputs: Vec, + pub outputs: Vec, + pub uri: Option, + #[serde(default)] + pub configuration: Configuration, + pub runtime: Option, +} + +impl Display for FlattenedOperatorDescriptor { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl FlattenedOperatorDescriptor { + /// TODO@J-Loudet: documentation + /// + /// In case there are identical keys, *the provided configuration will override the configuration of the Operator*. + /// The rationale is that the configuration of the Operator **always** has the lowest priority. + pub fn new( + operator: OperatorDescriptor, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + ) -> Self { + let OperatorDescriptor { + name, + uri, + inputs, + outputs, + configuration, + } = operator; + + Self { + id, + name, + inputs, + outputs, + uri, + configuration: overwritting_configuration.merge_overwrite(configuration), + runtime, + } + } + + /// Update the identifier of the [FlattenedOperatorDescriptor] prepending the id of the + /// [CompositeOperatorDescriptor] it belongs to. + /// + /// # TODO + /// + /// - Prevent the usage of "/" in the id of nodes. + pub fn composite_id(&mut self, composite_id: &NodeId) -> NodeId { + self.id = format!("{composite_id}/{}", self.id).into(); + self.id.clone() + } +} diff --git a/zenoh-flow-descriptors/src/flattened/nodes/sink.rs b/zenoh-flow-descriptors/src/flattened/nodes/sink.rs new file mode 100644 index 00000000..7f94cf73 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/nodes/sink.rs @@ -0,0 +1,100 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; + +use crate::SinkDescriptor; + +/// Textual representation of a Zenoh-Flow Sink node. +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::FlattenedSinkDescriptor; +/// +/// let sink_yaml = " +/// id: Sink-2 +/// name: Sink +/// configuration: +/// foo: bar +/// answer: 2 +/// uri: file:///home/zenoh-flow/node/libsink.so +/// inputs: +/// - in-operator +/// "; +/// let sink_yaml = serde_yaml::from_str::(&sink_yaml).unwrap(); +/// +/// let sink_json = " +/// { +/// \"id\": \"Sink-2\", +/// \"name\": \"Sink\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 2 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsink.so\", +/// \"inputs\": [ +/// \"in-operator\" +/// ] +/// } +/// "; +/// +/// let sink_json = serde_json::from_str::(&sink_json).unwrap(); +/// +/// assert_eq!(sink_yaml, sink_json); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlattenedSinkDescriptor { + pub id: NodeId, + pub name: String, + pub uri: Option, + pub inputs: Vec, + #[serde(default)] + pub configuration: Configuration, + pub runtime: Option, +} + +impl Display for FlattenedSinkDescriptor { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl FlattenedSinkDescriptor { + pub fn new( + sink: SinkDescriptor, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + ) -> Self { + let SinkDescriptor { + name, + configuration, + uri, + inputs, + } = sink; + + Self { + id, + name, + uri, + inputs, + configuration: overwritting_configuration.merge_overwrite(configuration), + runtime, + } + } +} diff --git a/zenoh-flow-descriptors/src/flattened/nodes/source.rs b/zenoh-flow-descriptors/src/flattened/nodes/source.rs new file mode 100644 index 00000000..de0dbc27 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/nodes/source.rs @@ -0,0 +1,102 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; + +use crate::SourceDescriptor; + +/// Textual representation of a Zenoh-Flow Source node. +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::FlattenedSourceDescriptor; +/// +/// let source_yaml = " +/// id: Source-0 +/// name: Source +/// configuration: +/// foo: bar +/// answer: 0 +/// uri: file:///home/zenoh-flow/node/libsource.so +/// outputs: +/// - out-operator +/// mapping: zenoh-flow-plugin-0 +/// "; +/// let source_yaml = serde_yaml::from_str::(&source_yaml).unwrap(); +/// +/// let source_json = " +/// { +/// \"id\": \"Source-0\", +/// \"name\": \"Source\", +/// \"configuration\": { +/// \"foo\": \"bar\", +/// \"answer\": 0 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsource.so\", +/// \"outputs\": [ +/// \"out-operator\" +/// ], +/// \"mapping\": \"zenoh-flow-plugin-0\" +/// } +/// "; +/// +/// let source_json = serde_json::from_str::(&source_json).unwrap(); +/// +/// assert_eq!(source_yaml, source_json); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FlattenedSourceDescriptor { + pub id: NodeId, + pub name: String, + pub uri: Option, + pub outputs: Vec, + #[serde(default)] + pub configuration: Configuration, + pub runtime: Option, +} + +impl Display for FlattenedSourceDescriptor { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl FlattenedSourceDescriptor { + pub fn new( + source: SourceDescriptor, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + ) -> Self { + let SourceDescriptor { + name, + uri, + outputs, + configuration, + } = source; + + Self { + id, + name, + uri, + outputs, + configuration: overwritting_configuration.merge_overwrite(configuration), + runtime, + } + } +} diff --git a/zenoh-flow-descriptors/src/flattened/tests.rs b/zenoh-flow-descriptors/src/flattened/tests.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/zenoh-flow-descriptors/src/flattened/tests.rs @@ -0,0 +1 @@ + diff --git a/zenoh-flow-descriptors/src/io.rs b/zenoh-flow-descriptors/src/io.rs new file mode 100644 index 00000000..2127d8b1 --- /dev/null +++ b/zenoh-flow-descriptors/src/io.rs @@ -0,0 +1,66 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::fmt; + +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{NodeId, PortId}; + +/// An `InputDescriptor` describes an Input port of a Zenoh-Flow node. +/// +/// FIXME@J-Loudet See [links] for their usage. +#[derive(Debug, Hash, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct InputDescriptor { + pub node: NodeId, + pub input: PortId, +} + +impl fmt::Display for InputDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!("{}.{}", self.node, self.input)) + } +} + +impl InputDescriptor { + pub fn new(node: impl AsRef, input: impl AsRef) -> Self { + Self { + node: node.as_ref().into(), + input: input.as_ref().into(), + } + } +} + +/// An `OutputDescriptor` describes an Output port of a Zenoh-Flow node. +/// +/// See [LinkDescriptor][crate::link::LinkDescriptor] for their usage. +#[derive(Debug, Clone, Hash, Serialize, Deserialize, PartialEq, Eq)] +pub struct OutputDescriptor { + pub node: NodeId, + pub output: PortId, +} + +impl fmt::Display for OutputDescriptor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!("{}.{}", self.node, self.output)) + } +} + +impl OutputDescriptor { + pub fn new(node: impl AsRef, output: impl AsRef) -> Self { + Self { + node: node.as_ref().into(), + output: output.as_ref().into(), + } + } +} diff --git a/zenoh-flow-descriptors/src/lib.rs b/zenoh-flow-descriptors/src/lib.rs new file mode 100644 index 00000000..0d88ca2c --- /dev/null +++ b/zenoh-flow-descriptors/src/lib.rs @@ -0,0 +1,40 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +mod composite; +pub use composite::{ + CompositeInputDescriptor, CompositeOperatorDescriptor, CompositeOutputDescriptor, +}; + +mod dataflow; +pub use dataflow::DataFlowDescriptor; + +mod flattened; +pub use flattened::{ + FlattenedDataFlowDescriptor, FlattenedOperatorDescriptor, FlattenedSinkDescriptor, + FlattenedSourceDescriptor, IFlattenable, +}; + +mod io; +pub use io::{InputDescriptor, OutputDescriptor}; + +mod link; +pub use link::LinkDescriptor; + +mod nodes; +pub use nodes::{NodeDescriptor, OperatorDescriptor, SinkDescriptor, SourceDescriptor}; + +mod uri; + +mod deserialize; diff --git a/zenoh-flow-descriptors/src/link.rs b/zenoh-flow-descriptors/src/link.rs new file mode 100644 index 00000000..6f08570f --- /dev/null +++ b/zenoh-flow-descriptors/src/link.rs @@ -0,0 +1,97 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::composite::{ISubstituable, Substitutions}; +use crate::deserialize::{deserialize_size, deserialize_time}; +use crate::{InputDescriptor, OutputDescriptor}; +use zenoh_flow_commons::NodeId; + +use serde::{Deserialize, Serialize}; + +/// A `LinkDescriptor` describes a link in Zenoh-Flow: a connection from an Output to an Input. +/// +/// A link is composed of: +/// - an [OutputDescriptor], +/// - an [InputDescriptor], +/// - (optional) Zenoh shared-memory parameters. +/// +/// # Example +/// +/// The textual representation, in YAML, of a link is as following: +/// ```yaml +/// from: +/// node : Counter +/// output : Counter +/// to: +/// node : SumOperator +/// input : Number +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct LinkDescriptor { + pub from: OutputDescriptor, + pub to: InputDescriptor, + #[serde(default)] + #[serde(deserialize_with = "deserialize_size")] + pub shared_memory_element_size: Option, + pub shared_memory_elements: Option, + #[serde(default)] + #[serde(deserialize_with = "deserialize_time")] + pub shared_memory_backoff: Option, +} + +impl std::fmt::Display for LinkDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} => {}", self.from, self.to) + } +} + +impl ISubstituable for LinkDescriptor { + fn substitute(&mut self, subs: &Substitutions) { + if let Some(new_id) = subs.get(&self.from.node) { + self.from.node = new_id.clone(); + } + + if let Some(new_id) = subs.get(&self.to.node) { + self.to.node = new_id.clone(); + } + } +} + +impl ISubstituable for LinkDescriptor { + fn substitute(&mut self, subs: &Substitutions) { + if let Some(new_output) = subs.get(&self.from) { + self.from = new_output.clone(); + } + } +} + +impl ISubstituable for LinkDescriptor { + fn substitute(&mut self, subs: &Substitutions) { + if let Some(new_input) = subs.get(&self.to) { + self.to = new_input.clone(); + } + } +} + +impl LinkDescriptor { + pub fn new(from: OutputDescriptor, to: InputDescriptor) -> Self { + Self { + from, + to, + shared_memory_element_size: None, + shared_memory_elements: None, + shared_memory_backoff: None, + } + } +} diff --git a/zenoh-flow-descriptors/src/nodes/mod.rs b/zenoh-flow-descriptors/src/nodes/mod.rs new file mode 100644 index 00000000..8ffd3f97 --- /dev/null +++ b/zenoh-flow-descriptors/src/nodes/mod.rs @@ -0,0 +1,198 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub(crate) mod operator; +pub use operator::OperatorDescriptor; +pub(crate) mod sink; +pub use sink::SinkDescriptor; +pub(crate) mod source; +pub use source::SourceDescriptor; + +use crate::{ + deserialize::deserialize_id, + flattened::{IFlattenable, IFlattenableComposite, Patch}, + uri, LinkDescriptor, +}; +use anyhow::bail; +use serde::{Deserialize, Serialize}; +use std::{collections::HashSet, sync::Arc}; +use zenoh_flow_commons::{Configuration, NodeId, Result, RuntimeId, Vars}; + +/// A generic Zenoh-Flow node. +/// +/// It could represent a [SourceDescriptor], an [OperatorDescriptor], a [CompositeOperatorDescriptor] or a +/// [SinkDescriptor]. +/// +/// How Zenoh-Flow will try to parse the associated `descriptor` depends on the section in which this `NodeDescriptor` +/// is declared. For example, Zenoh-Flow will parse the nodes declared in the `sources` section as [SourceDescriptor]. +/// +/// # `configuration` section caveat +/// +/// The `configuration` section of a `NodeDescriptor` supersedes the same section in the Node it references. See the +/// documentation for more details regarding this behavior. +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::NodeDescriptor; +/// +/// let yaml = " +/// id: NodeDescriptor +/// descriptor: file:///home/zenoh-flow/nodes/libnode.so +/// "; +/// +/// let node_descriptor_yaml = serde_yaml::from_str::(&yaml).unwrap(); +/// +/// let json = " +/// { +/// \"id\": \"NodeDescriptor\", +/// \"descriptor\": \"file:///home/zenoh-flow/nodes/libnode.so\" +/// } +/// "; +/// +/// let node_descriptor_json = serde_json::from_str::(&json).unwrap(); +/// assert_eq!(node_descriptor_json, node_descriptor_yaml); +/// ``` +/// +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct NodeDescriptor { + #[serde(deserialize_with = "deserialize_id")] + pub id: NodeId, + pub descriptor: Arc, + #[serde(default)] + pub configuration: Configuration, +} + +impl std::fmt::Display for NodeDescriptor { + fn fmt(&self, _f: &mut std::fmt::Formatter) -> std::fmt::Result { + todo!() + } +} + +impl NodeDescriptor { + /// TODO@J-Loudet Documentation? + pub(crate) fn flatten( + self, + overwriting_configuration: Configuration, + runtime: Option, + vars: Vars, + ) -> Result { + let (node_desc, _) = uri::try_load_descriptor::(&self.descriptor, vars)?; + Ok(node_desc.flatten(self.id, overwriting_configuration, runtime)) + } + + /// TODO@J-Loudet Documentation? + pub(crate) fn flatten_maybe_composite( + self, + overwriting_configuration: Configuration, + runtime: Option, + vars: Vars, + ancestors: &mut HashSet>, + ) -> Result<(Vec, Vec, Patch)> { + // 1st attempt: try to flatten as a regular node. + let res_node = self.clone().flatten::( + overwriting_configuration.clone(), + runtime.clone(), + vars.clone(), + ); + + if let Ok(node) = res_node { + return Ok((vec![node], Vec::default(), Patch::default())); + } + + // 2nd attempt: try to flatten as a composite node. + if !ancestors.insert(self.descriptor.clone()) { + bail!( + r###" +Possible infinite recursion detected, the following descriptor appears to include itself: + {} +"###, + self.descriptor + ) + } + + let (composite_desc, merged_vars) = uri::try_load_descriptor::(&self.descriptor, vars)?; + let res_composite = composite_desc.clone().flatten_composite( + self.id, + overwriting_configuration, + runtime, + merged_vars, + ancestors, + ); + + if res_composite.is_ok() { + return res_composite; + } + + // Deserializations from Operator and CompositeOperator failed: we return both errors. + bail!( + r###" +Failed to deserialize descriptor either as non-Composite or as Composite: +---------------------------- +Descriptor: +{} + +---------------------------- +- non-Composite deserialization attempt: +{:?} + +- Composite deserialization attempt: +{:?} +"###, + composite_desc, + res_node.unwrap_err(), + res_composite.unwrap_err() + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_success() { + let yaml = r#" +id: my-custom-correct-id +descriptor: file:///dev/null +configuration: + answer: 42 +"#; + + let expected_node = NodeDescriptor { + id: "my-custom-correct-id".into(), + descriptor: "file:///dev/null".into(), + configuration: Configuration::from(serde_json::json!({ "answer": 42 })), + }; + assert_eq!( + expected_node, + serde_yaml::from_str::(yaml).unwrap() + ) + } + + #[test] + fn test_name_with_slash_rejected() { + let yaml = r#" +id: my-custom/wrong-id +descriptor: file:///dev/null +configuration: + answer: 42 +"#; + assert!(format!( + "{:?}", + serde_yaml::from_str::(yaml).err().unwrap() + ) + .contains("A NodeId cannot contain any '/'")) + } +} diff --git a/zenoh-flow-descriptors/src/nodes/operator.rs b/zenoh-flow-descriptors/src/nodes/operator.rs new file mode 100644 index 00000000..251bc1ec --- /dev/null +++ b/zenoh-flow-descriptors/src/nodes/operator.rs @@ -0,0 +1,118 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{flattened::IFlattenable, FlattenedOperatorDescriptor}; +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; + +/// Textual representation of a Zenoh-Flow Operator node. +/// +/// # Example +/// +/// ``` +/// use serde_yaml; +/// use serde_json; +/// use zenoh_flow_descriptors::OperatorDescriptor; +/// +/// let yaml = " +/// name: Operator +/// configuration: +/// answer: 42 +/// uri: file:///home/zenoh-flow/node/liboperator.so +/// inputs: +/// - in-source +/// outputs: +/// - out-sink +/// "; +/// let operator_yaml = serde_yaml::from_str::(yaml).unwrap(); +/// +/// let json = " +/// { +/// \"name\": \"Operator\", +/// \"configuration\": { +/// \"answer\": 42 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/liboperator.so\", +/// \"inputs\": [ +/// \"in-source\" +/// ], +/// \"outputs\": [ +/// \"out-sink\" +/// ] +/// }"; +/// +/// let operator_json = serde_json::from_str::(json).unwrap(); +/// +/// assert_eq!(operator_yaml, operator_json); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct OperatorDescriptor { + pub name: String, + pub uri: Option, + pub inputs: Vec, + pub outputs: Vec, + #[serde(default)] + pub configuration: Configuration, +} + +impl std::fmt::Display for OperatorDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Operator:\n{}", self.name) + } +} + +impl IFlattenable for OperatorDescriptor { + type Flattened = FlattenedOperatorDescriptor; + + fn flatten( + self, + id: NodeId, + overwritting_configuration: Configuration, + runtime: Option, + ) -> Self::Flattened { + FlattenedOperatorDescriptor::new(self, id, overwritting_configuration, runtime) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::uri; + use zenoh_flow_commons::{NodeId, Vars}; + + #[test] + fn test_flatten_operator() { + let id: NodeId = "my-operator-1".into(); + let expected_operator = FlattenedOperatorDescriptor { + id: id.clone(), + name: "operator-1".into(), + inputs: vec!["operator-1-in-1".into(), "operator-1-in-2".into()], + outputs: vec!["operator-1-out".into()], + uri: Some("file://operator-1.so".into()), + configuration: Configuration::default(), + runtime: None, + }; + + let (operator_descriptor, _) = uri::try_load_descriptor::( + "file://./tests/descriptors/operator-1.yml", + Vars::default(), + ) + .expect("Unexpected error while parsing OperatorDescriptor"); + + assert_eq!( + expected_operator, + operator_descriptor.flatten(id, Configuration::default(), None) + ) + } +} diff --git a/zenoh-flow-descriptors/src/nodes/sink.rs b/zenoh-flow-descriptors/src/nodes/sink.rs new file mode 100644 index 00000000..ce0dc8b6 --- /dev/null +++ b/zenoh-flow-descriptors/src/nodes/sink.rs @@ -0,0 +1,79 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{flattened::IFlattenable, FlattenedSinkDescriptor}; +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, NodeId, PortId, RuntimeId}; + +/// Textual representation of a Zenoh-Flow Sink node. +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::SinkDescriptor; +/// +/// let sink_yaml = " +/// name: Sink +/// configuration: +/// answer: 42 +/// uri: file:///home/zenoh-flow/node/libsink.so +/// inputs: +/// - in-operator +/// "; +/// let sink_yaml = serde_yaml::from_str::(sink_yaml).unwrap(); +/// +/// let sink_json = " +/// { +/// \"name\": \"Sink\", +/// \"configuration\": { +/// \"answer\": 42 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsink.so\", +/// \"inputs\": [ +/// \"in-operator\" +/// ] +/// }"; +/// +/// let sink_json = serde_json::from_str::(sink_json).unwrap(); +/// +/// assert_eq!(sink_yaml, sink_json); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct SinkDescriptor { + pub name: String, + #[serde(default)] + pub configuration: Configuration, + pub uri: Option, + pub inputs: Vec, +} + +// TODO@J-Loudet Improve +impl std::fmt::Display for SinkDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Sink:\n{}", self.name) + } +} + +impl IFlattenable for SinkDescriptor { + type Flattened = FlattenedSinkDescriptor; + + fn flatten( + self, + id: NodeId, + updated_configuration: Configuration, + runtime: Option, + ) -> Self::Flattened { + FlattenedSinkDescriptor::new(self, id, updated_configuration, runtime) + } +} diff --git a/zenoh-flow-descriptors/src/nodes/source.rs b/zenoh-flow-descriptors/src/nodes/source.rs new file mode 100644 index 00000000..5218d9a5 --- /dev/null +++ b/zenoh-flow-descriptors/src/nodes/source.rs @@ -0,0 +1,93 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{flattened::IFlattenable, FlattenedSourceDescriptor}; +use serde::{Deserialize, Serialize}; +use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, PortId, RuntimeId}; + +/// Textual representation of a Zenoh-Flow Source node. +/// +/// # Example +/// +/// ``` +/// use zenoh_flow_descriptors::SourceDescriptor; +/// +/// let source_yaml = " +/// name: Source +/// configuration: +/// answer: 42 +/// uri: file:///home/zenoh-flow/node/libsource.so +/// outputs: +/// - out-operator +/// "; +/// let source_yaml = serde_yaml::from_str::(&source_yaml).unwrap(); +/// +/// let source_json = " +/// { +/// \"name\": \"Source\", +/// \"configuration\": { +/// \"answer\": 42 +/// }, +/// \"uri\": \"file:///home/zenoh-flow/node/libsource.so\", +/// \"outputs\": [ +/// \"out-operator\" +/// ] +/// }"; +/// +/// let source_json = serde_json::from_str::(&source_json).unwrap(); +/// +/// assert_eq!(source_yaml, source_json); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct SourceDescriptor { + pub name: String, + pub uri: Option, + pub outputs: Vec, + #[serde(default)] + pub configuration: Configuration, +} + +/// TODO@J-Loudet Improve display. +impl std::fmt::Display for SourceDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Source:\n{}", self.name) + } +} + +impl IFlattenable for SourceDescriptor { + type Flattened = FlattenedSourceDescriptor; + + fn flatten( + self, + id: NodeId, + overwriting_configuration: Configuration, + runtime: Option, + ) -> Self::Flattened { + let SourceDescriptor { + name, + uri, + outputs, + configuration, + } = self; + + FlattenedSourceDescriptor { + id, + name, + uri, + outputs, + configuration: overwriting_configuration.merge_overwrite(configuration), + runtime, + } + } +} diff --git a/zenoh-flow-descriptors/src/tests.rs b/zenoh-flow-descriptors/src/tests.rs new file mode 100644 index 00000000..83b316e8 --- /dev/null +++ b/zenoh-flow-descriptors/src/tests.rs @@ -0,0 +1,342 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + uri::try_load_descriptor, DataFlowDescriptor, FlattenedOperatorDescriptor, + FlattenedSinkDescriptor, FlattenedSourceDescriptor, InputDescriptor, LinkDescriptor, + OutputDescriptor, +}; +use serde_json::json; +use zenoh_flow_commons::Vars; + +const BASE_DIR: &str = "./tests/descriptors"; +const SCHEME: &str = "file://"; + +// This test is trying to be thorough when it comes to a data flow descriptor. In particular, we +// want to make sure of the following: +// +// - the fact that the configuration is propagated --- correctly, +// - the naming of the composite operators, +// - the naming of the ports, +// - the links. +// +// See the comments around the "expected" structures for more information. +#[test] +fn test_flatten_descriptor() { + // env variable CARGO_MANIFEST_DIR puts us in zenoh-flow/zenoh-flow-descriptors + let uri = format!( + "file://{}/data-flow.yml", + BASE_DIR, + ); + + let (descriptor, vars) = try_load_descriptor::( + &uri, + Vars::default(), + ) + .expect("Failed to load DataFlowDescriptor"); + + let flatten = descriptor + .flatten(vars) + .expect("Failed to flatten descriptor"); + + let expected_sources = vec![ + FlattenedSourceDescriptor { + id: "source-1".into(), + name: "source".into(), + outputs: vec!["source-out".into()], + uri: Some("file://source.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: Some("runtime-1".into()), + }, + FlattenedSourceDescriptor { + id: "source-2".into(), + name: "source".into(), + outputs: vec!["source-out".into()], + uri: Some("file://source.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: None, + }, + FlattenedSourceDescriptor { + id: "source-composite".into(), + name: "composite-source".into(), + outputs: vec![ + "source-composite-out-1".into(), + "source-composite-out-2".into(), + ], + uri: Some("file://source-composite.so".into()), + configuration: json!({ "foo": "global-outer", "bar": "re-reverse" }).into(), + runtime: Some("runtime-source".into()), + }, + ]; + + expected_sources.iter().for_each(|expected_source| { + assert!( + flatten.sources.contains(expected_source), + "Source missing or incorrect: \n\n (expected) {:?} \n\n {:?}", + expected_source, + flatten + .sources + .iter() + .find(|source| source.id == expected_source.id) + ) + }); + assert_eq!(expected_sources.len(), flatten.sources.len()); + + let expected_operators = vec![ + FlattenedOperatorDescriptor { + id: "operator-1".into(), + name: "operator".into(), + inputs: vec!["operator-in".into()], + outputs: vec!["operator-out".into()], + uri: Some("file://operator.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: None, + }, + FlattenedOperatorDescriptor { + id: "operator-2".into(), + name: "operator".into(), + inputs: vec!["operator-in".into()], + outputs: vec!["operator-out".into()], + uri: Some("file://operator.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: None, + }, + /* + * `sub-operator-1` is declared in the file "operator-composite.yml". + * + * Here we are checking that its name, after flattening, is the concatenation of the id of + * the (composite) operator in "data-flow.yml" and the id of the actual operator in + * "operator-composite.yml": operator-composite/sub-operator-1 + * + * The names of the ports are left intact. + */ + FlattenedOperatorDescriptor { + id: "operator-composite/sub-operator-1".into(), + name: "leaf-operator-1".into(), + inputs: vec!["sub-operator-1-in-1".into(), "sub-operator-1-in-2".into()], + outputs: vec!["sub-operator-1-out".into()], + uri: Some("file://sub-operator-1.so".into()), + configuration: + json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }) + .into(), + runtime: Some("runtime-composite".into()), + }, + /* + * Same spirit but this time it’s a composite operator within a composite operator. The + * resulting name should be the concatenation of both: + * + * operator-composite/sub-operator-composite/sub-sub-operator-1 + */ + FlattenedOperatorDescriptor { + id: "operator-composite/sub-operator-composite/sub-sub-operator-1".into(), + name: "sub-leaf-operator-1".into(), + inputs: vec!["sub-sub-operator-1-in".into()], + outputs: vec!["sub-sub-operator-1-out".into()], + uri: Some("file://sub-sub-operator-1.so".into()), + configuration: + json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner", "baz": "leaf" }).into(), + runtime: Some("runtime-composite".into()), + }, + /* + * Idem as above: operator-composite/sub-operator-composite/sub-sub-operator-2. + */ + FlattenedOperatorDescriptor { + id: "operator-composite/sub-operator-composite/sub-sub-operator-2".into(), + name: "sub-leaf-operator-2".into(), + inputs: vec!["sub-sub-operator-2-in".into()], + outputs: vec!["sub-sub-operator-2-out".into()], + uri: Some("file://sub-sub-operator-2.so".into()), + configuration: + json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer", "buzz": "composite-inner" }).into(), + runtime: Some("runtime-composite".into()), + }, + /* + * Similarly, we check that the name is the composition: operator-composite/sub-operator-2. + */ + FlattenedOperatorDescriptor { + id: "operator-composite/sub-operator-2".into(), + name: "leaf-operator-2".into(), + inputs: vec!["sub-operator-2-in".into()], + outputs: vec!["sub-operator-2-out-1".into(), "sub-operator-2-out-2".into()], + uri: Some("file://sub-operator-2.so".into()), + configuration: + json!({ "foo": "global-outer", "quux": "global-inner", "bar": "composite-outer" }).into(), + runtime: Some("runtime-composite".into()), + }, + ]; + + expected_operators.iter().for_each(|expected_operator| { + assert!( + flatten.operators.contains(expected_operator), + "Operator missing or incorrect: \n\n (expected) {:?} \n\n (found) {:?} \n\n(operators): {:?}\n\n", + expected_operator, + flatten + .operators + .iter() + .find(|operator| operator.id == expected_operator.id), + flatten.operators + ) + }); + assert_eq!(expected_operators.len(), flatten.operators.len()); + + let expected_sinks = vec![ + FlattenedSinkDescriptor { + id: "sink-1".into(), + name: "sink".into(), + inputs: vec!["sink-in".into()], + uri: Some("file://sink.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: None, + }, + FlattenedSinkDescriptor { + id: "sink-2".into(), + name: "sink".into(), + inputs: vec!["sink-in".into()], + uri: Some("file://sink.so".into()), + configuration: json!({ "foo": "global-outer" }).into(), + runtime: Some("runtime-2".into()), + }, + FlattenedSinkDescriptor { + id: "sink-composite".into(), + name: "composite-sink".into(), + inputs: vec!["sink-composite-in-1".into(), "sink-composite-in-2".into()], + uri: Some("file://sink-composite.so".into()), + configuration: json!({ "foo": "global-outer", "bar": "reverse" }).into(), + runtime: Some("runtime-sink".into()), + }, + ]; + + expected_sinks.iter().for_each(|expected_sink| { + assert!( + flatten.sinks.contains(expected_sink), + "Sink missing or incorrect: \n\n (expected) {:?} \n\n {:?}", + expected_sink, + flatten.sinks + ) + }); + assert_eq!(expected_sinks.len(), flatten.sinks.len()); + + let expected_links = vec![ + // source 1 -> operator 1 -> sink 1 + LinkDescriptor::new( + OutputDescriptor::new("source-1", "source-out"), + InputDescriptor::new("operator-1", "operator-in"), + ), + LinkDescriptor::new( + OutputDescriptor::new("operator-1", "operator-out"), + InputDescriptor::new("sink-1", "sink-in"), + ), + // source 2 -> operator 2 -> sink 2 + LinkDescriptor::new( + OutputDescriptor::new("source-2", "source-out"), + InputDescriptor::new("operator-2", "operator-in"), + ), + LinkDescriptor::new( + OutputDescriptor::new("operator-2", "operator-out"), + InputDescriptor::new("sink-2", "sink-in"), + ), + // source-composite -> operator-composite-sub-1 + /* + * The name of the port at the "junction" between the container & the composite is the one + * declared in the **container**. + * + * Hence, the name of the input ports of "sub-operator-1" (operator-composite.yml) are + * replaced by what is declared in "data-flow.yml". + */ + LinkDescriptor::new( + OutputDescriptor::new("source-composite", "source-composite-out-1"), + InputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-in-1"), + ), + LinkDescriptor::new( + OutputDescriptor::new("source-composite", "source-composite-out-2"), + InputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-in-2"), + ), + // operator-composite-sub-2 -> sink-composite + LinkDescriptor::new( + OutputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-out-1"), + InputDescriptor::new("sink-composite", "sink-composite-in-1"), + ), + LinkDescriptor::new( + OutputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-out-2"), + InputDescriptor::new("sink-composite", "sink-composite-in-2"), + ), + // operator-composite-sub-operator-1 -> + // operator-composite-sub-operator-composite-sub-sub-operator-1 + LinkDescriptor::new( + OutputDescriptor::new("operator-composite/sub-operator-1", "sub-operator-1-out"), + InputDescriptor::new( + "operator-composite/sub-operator-composite/sub-sub-operator-1", + "sub-sub-operator-1-in", + ), + ), + // operator-composite-sub-operator-composite-sub-sub-operator-2 -> + // operator-composite-sub-operator-2 + LinkDescriptor::new( + OutputDescriptor::new( + "operator-composite/sub-operator-composite/sub-sub-operator-2", + "sub-sub-operator-2-out", + ), + InputDescriptor::new("operator-composite/sub-operator-2", "sub-operator-2-in"), + ), + // operator-composite-sub-operator-composite-sub-sub-operator-1 -> + // operator-composite-sub-operator-composite-sub-sub-operator-2 + LinkDescriptor::new( + OutputDescriptor::new( + "operator-composite/sub-operator-composite/sub-sub-operator-1", + "sub-sub-operator-1-out", + ), + InputDescriptor::new( + "operator-composite/sub-operator-composite/sub-sub-operator-2", + "sub-sub-operator-2-in", + ), + ), + ]; + + expected_links.iter().for_each(|expected_link| { + assert!( + flatten.links.contains(expected_link), + "Link missing or incorrect: \n\n (expected) {:?} \n\n {:?}", + expected_link, + flatten.links + ) + }); + assert_eq!(expected_links.len(), flatten.links.len()); +} + +#[test] +fn test_detect_recursion() { + let path = format!( + "{}{}/data-flow-recursion.yml", + SCHEME, + BASE_DIR, + ); + + let (descriptor, vars) = + try_load_descriptor::(&path, Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)])).expect("Failed to parse descriptor"); + let res_flatten = descriptor.flatten(vars); + assert!(res_flatten.is_err()); +} + +#[test] +fn test_duplicate_composite_at_same_level_not_detected_as_recursion() { + let path = format!( + "{}{}/data-flow-recursion-duplicate-composite.yml", + SCHEME, + BASE_DIR, + ); + + let (descriptor, vars) = + try_load_descriptor::(&path, Vars::from([("BASE_DIR", BASE_DIR), ("SCHEME", SCHEME)])).expect("Failed to parse descriptor"); + assert!(descriptor.flatten(vars).is_ok()); +} diff --git a/zenoh-flow-descriptors/src/uri.rs b/zenoh-flow-descriptors/src/uri.rs new file mode 100644 index 00000000..d0e7b9dc --- /dev/null +++ b/zenoh-flow-descriptors/src/uri.rs @@ -0,0 +1,116 @@ +// +// Copyright (c) 2021 - 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::io::Read; +use std::path::PathBuf; + +use anyhow::{bail, Context}; +use serde::Deserialize; +use url::Url; +use zenoh_flow_commons::{IMergeOverwrite, Result, Vars}; + +pub(crate) fn try_load_descriptor(uri: &str, vars: Vars) -> Result<(N, Vars)> +where + N: for<'a> Deserialize<'a>, +{ + let url = Url::parse(uri).context(format!("Failed to parse uri:\n{}", uri))?; + + match url.scheme() { + "file" => try_load_descriptor_from_file::(url.path(), vars).context(format!( + "Failed to load descriptor from file:\n{}", + url.path() + )), + _ => bail!( + "Failed to parse uri, unsupported scheme < {} > found:\n{}", + url.scheme(), + uri + ), + } +} + +pub(crate) fn deserializer(path: &PathBuf) -> Result Result> +where + N: for<'a> Deserialize<'a>, +{ + match path.extension().and_then(|ext| ext.to_str()) { + Some("json") => Ok(|buf| { + serde_json::from_str::(buf) + .context(format!("Failed to deserialize from JSON:\n{}", buf)) + }), + Some("yml") | Some("yaml") => Ok(|buf| { + serde_yaml::from_str::(buf) + .context(format!("Failed to deserialize from YAML:\n{}", buf)) + }), + Some(extension) => bail!( + r###" +Unsupported file extension < {} > in: + {:?} + +Currently supported file extensions are: +- .json +- .yml +- .yaml +"###, + extension, + path + ), + None => bail!("Missing file extension in path:\n{}", path.display()), + } +} + +pub(crate) fn try_load_descriptor_from_file(path: &str, vars: Vars) -> Result<(N, Vars)> +where + N: for<'a> Deserialize<'a>, +{ + let mut path_buf = PathBuf::new(); + + #[cfg(test)] + { + // When running the test on the CI we cannot know the path of the clone of Zenoh-Flow. By + // using relative paths (w.r.t. the manifest dir) in the tests and, only in tests, prepend + // the paths with this environment variable we obtain a correct absolute path. + path_buf.push(env!("CARGO_MANIFEST_DIR")); + path_buf.push( + path.strip_prefix('/') + .expect("Failed to remove leading '/'"), + ); + } + + #[cfg(not(test))] + path_buf.push(path); + + let path = std::fs::canonicalize(&path_buf).context(format!( + "Failed to canonicalize path (did you put an absolute path?):\n{}", + path_buf.display() + ))?; + + let mut buf = String::default(); + std::fs::File::open(path.clone()) + .context(format!("Failed to open file:\n{}", path_buf.display()))? + .read_to_string(&mut buf) + .context(format!( + "Failed to read the content of file:\n{}", + path_buf.display() + ))?; + + let merged_vars = vars + .merge_overwrite(deserializer::(&path)?(&buf).context("Failed to deserialize Vars")?); + let expanded_buf = merged_vars.expand_mustache(&buf)?; + + Ok(( + (deserializer::(&path))?(&expanded_buf) + .context(format!("Failed to deserialize {}", &path.display()))?, + merged_vars, + )) +} diff --git a/zenoh-flow-descriptors/tests/descriptors/composite-nested.yml b/zenoh-flow-descriptors/tests/descriptors/composite-nested.yml new file mode 100644 index 00000000..c27b7ab6 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/composite-nested.yml @@ -0,0 +1,30 @@ +name: composite-nested + + +inputs: + - id: composite-nested-in + node: operator-1 + input: operator-1-in-1 + + +outputs: + - id: composite-nested-out + node: operator-2 + output: operator-2-out + + +operators: + - id: operator-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-1.yml" + + - id: operator-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-2.yml" + + +links: + - from: + node: operator-1 + output: operator-1-out + to: + node: operator-2 + input: operator-2-in diff --git a/zenoh-flow-descriptors/tests/descriptors/composite-outer.yml b/zenoh-flow-descriptors/tests/descriptors/composite-outer.yml new file mode 100644 index 00000000..fa0ac943 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/composite-outer.yml @@ -0,0 +1,7 @@ +name: composite-outer + +uri: file://composite-outer.so + +inputs: [composite-outer-in] + +outputs: [composite-outer-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml new file mode 100644 index 00000000..7a7e6fbe --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion-duplicate-composite.yml @@ -0,0 +1,63 @@ +flow: test-recursion-ok + +vars: + SCHEME: file:// + BASE_DIR: ./src/tests/descriptors + +sources: + - id: source-composite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/source-composite.yml" + +operators: + - id: operator-composite-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-composite.yml" + + - id: operator-composite-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-composite.yml" + +sinks: + - id: sink-composite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sink-composite.yml" + +links: + - from: + node: source-composite + output: source-composite-out-1 + to: + node: operator-composite-1 + input: operator-composite-in-1 + + - from: + node: source-composite + output: source-composite-out-2 + to: + node: operator-composite-1 + input: operator-composite-in-2 + + - from: + node: operator-composite-1 + output: operator-composite-out-1 + to: + node: operator-composite-2 + input: operator-composite-in-1 + + - from: + node: operator-composite-1 + output: operator-composite-out-2 + to: + node: operator-composite-2 + input: operator-composite-in-2 + + - from: + node: operator-composite-2 + output: operator-composite-out-1 + to: + node: sink-composite + input: sink-composite-in-1 + + - from: + node: operator-composite-2 + output: operator-composite-out-2 + to: + node: sink-composite + input: sink-composite-in-2 diff --git a/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion.yml b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion.yml new file mode 100644 index 00000000..2c9c4111 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/data-flow-recursion.yml @@ -0,0 +1,39 @@ +flow: test-recursion + +vars: + BASE_DIR: file://./src/tests + +sources: + - id: source-1 + descriptor: "{{ BASE_DIR }}/source-1.yml" + +operators: + - id: operator-1 + descriptor: "{{ BASE_DIR }}/operator-1.yml" + + - id: operator-infinite + descriptor: "{{ BASE_DIR }}/operator-infinite.yml" + +sinks: + - id: sink-1 + descriptor: "{{ BASE_DIR }}/sink-1.yml" + +links: + - from: + node: source-1 + output: source-1-out + to: + node: operator-1 + input: operator-1-in + - from: + node: operator-1 + output: operator-out-1 + to: + node: operator-infinite + input: infinite-input + - from: + node: operator-infinite + output: infinite-output + to: + node: sink-1 + input: sink-1-in diff --git a/zenoh-flow-descriptors/tests/descriptors/data-flow.yml b/zenoh-flow-descriptors/tests/descriptors/data-flow.yml new file mode 100644 index 00000000..3ff0b713 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/data-flow.yml @@ -0,0 +1,106 @@ +flow: test + +vars: + SCHEME: file:// + BASE_DIR: ./tests/descriptors + +configuration: + foo: "global-outer" + +sources: + - id: source-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/source.yml" + + - id: source-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/source.yml" + + - id: source-composite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/source-composite.yml" + + +operators: + - id: operator-1 + descriptor: "{{ SCHEME }}{{BASE_DIR }}/operator.yml" + + - id: operator-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR}}/operator.yml" + + - id: operator-composite + descriptor: "{{ SCHEME }}{{BASE_DIR}}/operator-composite.yml" + configuration: + quux: "global-inner" + + +sinks: + - id: sink-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sink.yml" + + - id: sink-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sink.yml" + + - id: sink-composite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sink-composite.yml" + + +links: + - from: + node: source-1 + output: source-out + to: + node: operator-1 + input: operator-in + - from: + node: operator-1 + output: operator-out + to: + node: sink-1 + input: sink-in + + - from: + node: source-2 + output: source-out + to: + node: operator-2 + input: operator-in + - from: + node: operator-2 + output: operator-out + to: + node: sink-2 + input: sink-in + + - from: + node: source-composite + output: source-composite-out-1 + to: + node: operator-composite + input: operator-composite-in-1 + + - from: + node: source-composite + output: source-composite-out-2 + to: + node: operator-composite + input: operator-composite-in-2 + + - from: + node: operator-composite + output: operator-composite-out-1 + to: + node: sink-composite + input: sink-composite-in-1 + + - from: + node: operator-composite + output: operator-composite-out-2 + to: + node: sink-composite + input: sink-composite-in-2 + + +mapping: + source-1: runtime-1 + sink-2: runtime-2 + source-composite: runtime-source + operator-composite: runtime-composite + sink-composite: runtime-sink diff --git a/zenoh-flow-descriptors/tests/descriptors/operator-1.yml b/zenoh-flow-descriptors/tests/descriptors/operator-1.yml new file mode 100644 index 00000000..9fa920a2 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/operator-1.yml @@ -0,0 +1,7 @@ +name: "operator-1" +uri: "file://operator-1.so" +inputs: + - "operator-1-in-1" + - "operator-1-in-2" +outputs: + - "operator-1-out" diff --git a/zenoh-flow-descriptors/tests/descriptors/operator-2.yml b/zenoh-flow-descriptors/tests/descriptors/operator-2.yml new file mode 100644 index 00000000..d4ada6c5 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/operator-2.yml @@ -0,0 +1,7 @@ +name: operator-2 + +uri: file://operator-2.so + +inputs: [operator-2-in] + +outputs: [operator-2-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/operator-composite.yml b/zenoh-flow-descriptors/tests/descriptors/operator-composite.yml new file mode 100644 index 00000000..e2df35cb --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/operator-composite.yml @@ -0,0 +1,61 @@ +name: composite + + +vars: + SCHEME: "" + BASE_DIR: "" + + +configuration: + foo: "composite-outer" + bar: "composite-outer" + + +inputs: + - id: operator-composite-in-1 + node: sub-operator-1 + input: sub-operator-1-in-1 + + - id: operator-composite-in-2 + node: sub-operator-1 + input: sub-operator-1-in-2 + + +outputs: + - id: operator-composite-out-1 + node: sub-operator-2 + output: sub-operator-2-out-1 + + - id: operator-composite-out-2 + node: sub-operator-2 + output: sub-operator-2-out-2 + + +operators: + - id: sub-operator-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sub-operator-1.yml" + + - id: sub-operator-composite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sub-operator-composite.yml" + configuration: + foo: "composite-inner" + buzz: "composite-inner" + + - id: sub-operator-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sub-operator-2.yml" + + +links: + - from: + node: sub-operator-1 + output: sub-operator-1-out + to: + node: sub-operator-composite + input: sub-operator-composite-in + + - from: + node: sub-operator-composite + output: sub-operator-composite-out + to: + node: sub-operator-2 + input: sub-operator-2-in diff --git a/zenoh-flow-descriptors/tests/descriptors/operator-infinite.yml b/zenoh-flow-descriptors/tests/descriptors/operator-infinite.yml new file mode 100644 index 00000000..bafd89d0 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/operator-infinite.yml @@ -0,0 +1,42 @@ +name: operator-infinite + +vars: + SCHEME: "" # set up by the data flow + BASE_DIR: "" # + +inputs: + - id: infinite-input + node: operator-1 + input: operator-1-in + + +outputs: + - id: infinite-output + node: operator-2 + output: operator-2-out + + +operators: + - id: operator-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-1.yml" + + - id: operator-infinite + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-infinite.yml" + + - id: operator-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/operator-2.yml" + + +links: + - from: + node: operator-1 + output: operator-1-out + to: + node: operator-infinite + input: infinite-input + - from: + node: operator-infinite + output: infinite-output + to: + node: operator-2 + input: operator-2-out diff --git a/zenoh-flow-descriptors/tests/descriptors/operator.yml b/zenoh-flow-descriptors/tests/descriptors/operator.yml new file mode 100644 index 00000000..9f16f981 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/operator.yml @@ -0,0 +1,7 @@ +name: operator + +uri: "{{ SCHEME }}operator.so" + +inputs: [operator-in] + +outputs: [operator-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/sink-composite.yml b/zenoh-flow-descriptors/tests/descriptors/sink-composite.yml new file mode 100644 index 00000000..52f684e0 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sink-composite.yml @@ -0,0 +1,10 @@ +name: composite-sink + +uri: "{{ SCHEME }}sink-composite.so" + +configuration: + bar: reverse + +inputs: + - sink-composite-in-1 + - sink-composite-in-2 diff --git a/zenoh-flow-descriptors/tests/descriptors/sink.yml b/zenoh-flow-descriptors/tests/descriptors/sink.yml new file mode 100644 index 00000000..8867d868 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sink.yml @@ -0,0 +1,5 @@ +name: sink + +uri: "{{ SCHEME }}sink.so" + +inputs: [sink-in] diff --git a/zenoh-flow-descriptors/tests/descriptors/source-composite.yml b/zenoh-flow-descriptors/tests/descriptors/source-composite.yml new file mode 100644 index 00000000..982efed2 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/source-composite.yml @@ -0,0 +1,13 @@ +name: composite-source + +vars: + SCHEME: "" + +configuration: + bar: re-reverse + +uri: "{{ SCHEME }}source-composite.so" + +outputs: + - source-composite-out-1 + - source-composite-out-2 diff --git a/zenoh-flow-descriptors/tests/descriptors/source.yml b/zenoh-flow-descriptors/tests/descriptors/source.yml new file mode 100644 index 00000000..65afd7db --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/source.yml @@ -0,0 +1,5 @@ +name: source + +uri: "{{ SCHEME }}source.so" + +outputs: [source-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/sub-operator-1.yml b/zenoh-flow-descriptors/tests/descriptors/sub-operator-1.yml new file mode 100644 index 00000000..4966895d --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sub-operator-1.yml @@ -0,0 +1,9 @@ +name: leaf-operator-1 + +uri: "{{ SCHEME }}sub-operator-1.so" + +inputs: + - sub-operator-1-in-1 + - sub-operator-1-in-2 + +outputs: [sub-operator-1-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/sub-operator-2.yml b/zenoh-flow-descriptors/tests/descriptors/sub-operator-2.yml new file mode 100644 index 00000000..e5372349 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sub-operator-2.yml @@ -0,0 +1,10 @@ +name: leaf-operator-2 + +uri: "{{ SCHEME }}sub-operator-2.so" + +inputs: [sub-operator-2-in] + +outputs: + - sub-operator-2-out-1 + - sub-operator-2-out-2 + diff --git a/zenoh-flow-descriptors/tests/descriptors/sub-operator-composite.yml b/zenoh-flow-descriptors/tests/descriptors/sub-operator-composite.yml new file mode 100644 index 00000000..e119d0d3 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sub-operator-composite.yml @@ -0,0 +1,33 @@ +name: sub-operator-composite + +vars: + SCHEME: "" # set up by the data flow + BASE_DIR: "" # + +inputs: + - id: sub-operator-composite-in + node: sub-sub-operator-1 + input: sub-sub-operator-1-in + + +outputs: + - id: sub-operator-composite-out + node: sub-sub-operator-2 + output: sub-sub-operator-2-out + + +operators: + - id: sub-sub-operator-1 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sub-sub-operator-1.yml" + + - id: sub-sub-operator-2 + descriptor: "{{ SCHEME }}{{ BASE_DIR }}/sub-sub-operator-2.yml" + + +links: + - from: + node: sub-sub-operator-1 + output: sub-sub-operator-1-out + to: + node: sub-sub-operator-2 + input: sub-sub-operator-2-in diff --git a/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-1.yml b/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-1.yml new file mode 100644 index 00000000..e9517368 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-1.yml @@ -0,0 +1,13 @@ +name: sub-leaf-operator-1 + +uri: "{{ SCHEME }}sub-sub-operator-1.so" + +configuration: + foo: "leaf" + bar: "leaf" + baz: "leaf" + quux: "leaf" + +inputs: [sub-sub-operator-1-in] + +outputs: [sub-sub-operator-1-out] diff --git a/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-2.yml b/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-2.yml new file mode 100644 index 00000000..8e93e598 --- /dev/null +++ b/zenoh-flow-descriptors/tests/descriptors/sub-sub-operator-2.yml @@ -0,0 +1,7 @@ +name: sub-leaf-operator-2 + +uri: "{{ SCHEME }}sub-sub-operator-2.so" + +inputs: [sub-sub-operator-2-in] + +outputs: [sub-sub-operator-2-out]