Skip to content

Commit

Permalink
refacto: move records into their own crate
Browse files Browse the repository at this point in the history
This commit creates the package `zenoh-flow-records`.

The central structure is the `DataFlowRecord`: a data flow that is ready to be
instantiated. All nodes are mapped to a runtime and the required connectors were
added.

The preferred way to create a `DataFlowRecord` is to call the method
`complete_mapping_and_connect` on a `FlattenedDataFlowDescriptor` instance.

This commit was also the opportunity to:
- remove the `runtime` field from all the descriptors / records,
- the remaining `String` were transformed into `Arc<str>`,

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 18, 2023
1 parent b312b48 commit 20466bd
Show file tree
Hide file tree
Showing 33 changed files with 989 additions and 288 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"zenoh-flow-derive",
"zenoh-flow-descriptors",
"zenoh-flow-plugin",
"zenoh-flow-records",
"zfctl",
]

Expand Down Expand Up @@ -51,11 +52,14 @@ serde_derive = "1.0"
serde_json = "1.0"
serde_yaml = "0.9"
uhlc = "0.6"
uuid = "1.1"
uuid = { version = "1.1", features = ["serde", "v4"] }
zenoh = { version = "0.7.2-rc" }
zenoh-collections = { version = "0.7.2-rc" }
zenoh-core = { version = "0.7.2-rc" }
zenoh-ext = { version = "0.7.2-rc" }
zenoh-flow-commons = { path = "./zenoh-flow-commons" }
zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" }
zenoh-flow-records = { path = "./zenoh-flow-records" }
zenoh-plugin-trait = { version = "0.7.2-rc", default-features = false }
zenoh-sync = { version = "0.7.2-rc" }
zenoh-util = { version = "0.7.2-rc" }
Expand Down
2 changes: 2 additions & 0 deletions zenoh-flow-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
bytesize = "1.2.0"
humantime = "2.1"
ramhorns = "0.14"
serde = { workspace = true }
serde_json = { workspace = true }
15 changes: 15 additions & 0 deletions zenoh-flow-commons/src/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,25 @@
//!
//! The external crates [bytesize] and [humantime] are leveraged for these purposes.
use crate::NodeId;
use std::str::FromStr;

use serde::Deserializer;

pub fn deserialize_id<'de, D>(deserializer: D) -> std::result::Result<NodeId, D::Error>
where
D: Deserializer<'de>,
{
let id: String = serde::de::Deserialize::deserialize(deserializer)?;
if id.contains('/') {
return Err(serde::de::Error::custom(format!(
"A NodeId cannot contain any '/': {id}"
)));
}

Ok(id.into())
}

pub fn deserialize_size<'de, D>(deserializer: D) -> std::result::Result<Option<usize>, D::Error>
where
D: Deserializer<'de>,
Expand Down
19 changes: 14 additions & 5 deletions zenoh-flow-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
// ZettaScale Zenoh Team, <[email protected]>
//

mod vars;
pub use vars::Vars;
mod configuration;
pub use configuration::Configuration;

mod deserialize;
pub use deserialize::deserialize_id;

mod identifiers;
pub use identifiers::{NodeId, PortId, RuntimeId};

mod configuration;
pub use configuration::Configuration;

mod merge;
pub use merge::IMergeOverwrite;

mod runtime;
pub use runtime::RuntimeContext;

mod shared_memory;
pub use shared_memory::{SharedMemoryConfiguration, SharedMemoryParameters};

mod vars;
pub use vars::Vars;

/// Zenoh-Flow's result type.
pub type Result<T> = std::result::Result<T, anyhow::Error>;
22 changes: 22 additions & 0 deletions zenoh-flow-commons/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::{RuntimeId, SharedMemoryParameters};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct RuntimeContext {
pub id: RuntimeId,
pub shared_memory: SharedMemoryParameters,
}
62 changes: 62 additions & 0 deletions zenoh-flow-commons/src/shared_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::fmt::Display;

use crate::deserialize::{deserialize_size, deserialize_time};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct SharedMemoryConfiguration {
pub(crate) number_elements: Option<usize>,
#[serde(deserialize_with = "deserialize_size")]
pub(crate) element_size: Option<usize>,
#[serde(deserialize_with = "deserialize_time")]
pub(crate) backoff: Option<u64>,
}

// TODO@J-Loudet
impl Display for SharedMemoryConfiguration {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct SharedMemoryParameters {
pub number_elements: usize,
// Size, in bytes, of a single element.
pub element_size: usize,
// Duration, in nanoseconds, to wait before retrying the last operation.
pub backoff: u64,
}

// TODO@J-Loudet
impl Display for SharedMemoryParameters {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

impl SharedMemoryParameters {
pub fn from_configuration(configuration: &SharedMemoryConfiguration, default: &Self) -> Self {
Self {
number_elements: configuration
.number_elements
.unwrap_or(default.number_elements),
element_size: configuration.element_size.unwrap_or(default.element_size),
backoff: configuration.backoff.unwrap_or(default.backoff),
}
}
}
6 changes: 3 additions & 3 deletions zenoh-flow-descriptors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
bytesize = "1.2.0"
humantime = "2.1"
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
url = "2.2"
zenoh-flow-commons = { path = "../zenoh-flow-commons" }
uuid = { workspace = true }
zenoh-flow-commons = { workspace = true }
zenoh-flow-records = { workspace = true }
3 changes: 1 addition & 2 deletions zenoh-flow-descriptors/src/composite/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
//

use super::ISubstituable;
use crate::deserialize::deserialize_id;
use crate::{InputDescriptor, OutputDescriptor};

use serde::{Deserialize, Serialize};
use zenoh_flow_commons::{NodeId, PortId};
use zenoh_flow_commons::{deserialize_id, NodeId, PortId};

/// TODO@J-Loudet example?
/// TODO@J-Loudet documentation?
Expand Down
2 changes: 0 additions & 2 deletions zenoh-flow-descriptors/src/composite/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl IFlattenableComposite for CompositeOperatorDescriptor {
mut self,
composite_id: NodeId,
overwriting_configuration: Configuration,
runtime: Option<zenoh_flow_commons::RuntimeId>,
vars: Vars,
ancestors: &mut HashSet<Arc<str>>,
) -> Result<(Vec<Self::Flattened>, Vec<LinkDescriptor>, Patch)> {
Expand All @@ -172,7 +171,6 @@ impl IFlattenableComposite for CompositeOperatorDescriptor {
let (mut operators, mut links, patch) = operator_desc
.flatten_maybe_composite::<CompositeOperatorDescriptor>(
node_overwriting_configuration,
runtime.clone(),
vars.clone(),
// If we don't clone the ancestors between successive calls, consecutive composite operators
// referring to the same descriptor would be falsely flagged as "infinite recursions".
Expand Down
9 changes: 0 additions & 9 deletions zenoh-flow-descriptors/src/composite/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ fn test_flatten_composite_descriptor_non_nested() {
configuration: Configuration::default(),
};

let runtime = Some("zf-plugin-1".into());
let (flattened_operators, flattened_links, patch) = composite_descriptor
.flatten_composite(
"composite".into(),
Configuration::default(),
runtime.clone(),
Vars::default(),
&mut HashSet::default(),
)
Expand All @@ -82,7 +80,6 @@ fn test_flatten_composite_descriptor_non_nested() {
outputs: vec!["operator-1-out".into()],
uri: Some("file://operator-1.so".into()),
configuration: Configuration::default(),
runtime: runtime.clone(),
},
FlattenedOperatorDescriptor {
id: "composite/my-operator-2".into(),
Expand All @@ -91,7 +88,6 @@ fn test_flatten_composite_descriptor_non_nested() {
outputs: vec!["operator-2-out".into()],
uri: Some("file://operator-2.so".into()),
configuration: Configuration::default(),
runtime,
},
];

Expand Down Expand Up @@ -180,7 +176,6 @@ fn test_flatten_composite_descriptor_nested() {
.flatten_composite(
"composite".into(),
Configuration::default(),
None,
Vars::from([("SCHEME", "file://"), ("BASE_DIR", BASE_DIR)]),
&mut HashSet::default(),
)
Expand All @@ -197,7 +192,6 @@ fn test_flatten_composite_descriptor_nested() {
outputs: vec!["composite-outer-out".into()],
uri: Some("file://composite-outer.so".into()),
configuration: Configuration::default(),
runtime: None,
},
FlattenedOperatorDescriptor {
id: "composite/composite-nested/operator-1".into(),
Expand All @@ -206,7 +200,6 @@ fn test_flatten_composite_descriptor_nested() {
outputs: vec!["operator-1-out".into()],
uri: Some("file://operator-1.so".into()),
configuration: Configuration::default(),
runtime: None,
},
FlattenedOperatorDescriptor {
id: "composite/composite-nested/operator-2".into(),
Expand All @@ -215,7 +208,6 @@ fn test_flatten_composite_descriptor_nested() {
outputs: vec!["operator-2-out".into()],
uri: Some("file://operator-2.so".into()),
configuration: Configuration::default(),
runtime: None,
},
FlattenedOperatorDescriptor {
id: "composite/composite-outer-i".into(),
Expand All @@ -224,7 +216,6 @@ fn test_flatten_composite_descriptor_nested() {
outputs: vec!["composite-outer-out".into()],
uri: Some("file://composite-outer.so".into()),
configuration: Configuration::default(),
runtime: None,
},
];

Expand Down
44 changes: 26 additions & 18 deletions zenoh-flow-descriptors/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use crate::{
SinkDescriptor, SourceDescriptor,
};

use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use serde::{Deserialize, Serialize};
use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, RuntimeId, Vars};
Expand Down Expand Up @@ -146,7 +149,7 @@ use zenoh_flow_commons::{Configuration, IMergeOverwrite, NodeId, Result, Runtime
/// ```
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct DataFlowDescriptor {
pub flow: String,
pub flow: Arc<str>,
#[serde(default)]
pub configuration: Configuration,
pub operators: Vec<NodeDescriptor>,
Expand All @@ -168,7 +171,7 @@ impl DataFlowDescriptor {
mapping,
} = self;

let mapping = mapping.unwrap_or_default();
let mut mapping = mapping.unwrap_or_default();

let mut flattened_sources = Vec::with_capacity(sources.len());
for source_desc in sources {
Expand All @@ -177,13 +180,10 @@ impl DataFlowDescriptor {
.configuration
.clone()
.merge_overwrite(flow_configuration.clone());
let runtime = mapping.get(&source_desc.id).cloned();

flattened_sources.push(source_desc.flatten::<SourceDescriptor>(
overwriting_configuration,
runtime,
vars.clone(),
)?);
flattened_sources.push(
source_desc.flatten::<SourceDescriptor>(overwriting_configuration, vars.clone())?,
);
}

let mut flattened_sinks = Vec::with_capacity(sinks.len());
Expand All @@ -194,13 +194,9 @@ impl DataFlowDescriptor {
.clone()
.merge_overwrite(flow_configuration.clone());

let runtime = mapping.get(&sink_desc.id).cloned();

flattened_sinks.push(sink_desc.flatten::<SinkDescriptor>(
overwriting_configuration,
runtime,
vars.clone(),
)?);
flattened_sinks.push(
sink_desc.flatten::<SinkDescriptor>(overwriting_configuration, vars.clone())?,
);
}

let mut flattened_operators = Vec::with_capacity(operators.len());
Expand All @@ -210,16 +206,27 @@ impl DataFlowDescriptor {
.configuration
.clone()
.merge_overwrite(flow_configuration.clone());
let runtime = mapping.get(&operator_desc.id).cloned();

let operator_id = operator_desc.id.clone();
let (mut flat_operators, mut flat_links, patch) = operator_desc
.flatten_maybe_composite::<CompositeOperatorDescriptor>(
overwriting_configuration,
runtime,
vars.clone(),
&mut HashSet::default(),
)?;

// If the composite operator (i.e. before flattening) appears in the mapping, we need to:
// 1. remove it from the list (it is not, per se, a real operator),
// 2. propagate that mapping to all the "flattened" operators.
//
// NOTE: it does not matter if the operator is not composite, we perform a (useless) removal / re-insert on
// the same `NodeId`. It could potentially be avoided but this code is not on the critical path.
if let Some(runtime) = mapping.remove(&operator_id) {
flat_operators.iter().for_each(|operator| {
mapping.insert(operator.id.clone(), runtime.clone());
});
}

flattened_operators.append(&mut flat_operators);
patch.apply(&mut links);
links.append(&mut flat_links);
Expand All @@ -231,6 +238,7 @@ impl DataFlowDescriptor {
operators: flattened_operators,
sinks: flattened_sinks,
links,
mapping,
})
}
}
Expand Down
Loading

0 comments on commit 20466bd

Please sign in to comment.