From cbdad88d34e53c513860c049032e206d35a23c74 Mon Sep 17 00:00:00 2001 From: Enzo Le Van Date: Wed, 4 Dec 2024 17:10:50 +0100 Subject: [PATCH] merge standalone binaries inside CLI --- Cargo.lock | 39 +---- Cargo.toml | 22 ++- zenoh-flow-standalone-daemon/Cargo.toml | 39 ----- zenoh-flow-standalone-daemon/src/main.rs | 122 ---------------- zenoh-flow-standalone-runtime/Cargo.toml | 37 ----- zfctl/Cargo.toml | 4 + zfctl/src/daemon_command.rs | 136 ++++++++++++++++++ zfctl/src/main.rs | 43 +++++- .../src/main.rs => zfctl/src/run_local.rs | 64 ++------- zfctl/src/runtime_command.rs | 6 +- 10 files changed, 213 insertions(+), 299 deletions(-) delete mode 100644 zenoh-flow-standalone-daemon/Cargo.toml delete mode 100644 zenoh-flow-standalone-daemon/src/main.rs delete mode 100644 zenoh-flow-standalone-runtime/Cargo.toml create mode 100644 zfctl/src/daemon_command.rs rename zenoh-flow-standalone-runtime/src/main.rs => zfctl/src/run_local.rs (64%) diff --git a/Cargo.lock b/Cargo.lock index 7a7ad14f..564a5c39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3403,41 +3403,6 @@ dependencies = [ "zenoh-flow-runtime", ] -[[package]] -name = "zenoh-flow-standalone-daemon" -version = "0.6.0-dev" -dependencies = [ - "anyhow", - "async-std", - "clap", - "serde", - "signal-hook", - "signal-hook-async-std", - "tracing", - "tracing-subscriber", - "uhlc 0.6.3", - "zenoh", - "zenoh-flow-commons", - "zenoh-flow-daemon", -] - -[[package]] -name = "zenoh-flow-standalone-runtime" -version = "0.6.0-dev" -dependencies = [ - "anyhow", - "async-std", - "clap", - "serde", - "tracing", - "tracing-subscriber", - "uhlc 0.6.3", - "zenoh-flow-commons", - "zenoh-flow-descriptors", - "zenoh-flow-records", - "zenoh-flow-runtime", -] - [[package]] name = "zenoh-keyexpr" version = "0.11.0-rc.3" @@ -3862,6 +3827,8 @@ dependencies = [ "serde_derive", "serde_json", "serde_yaml", + "signal-hook", + "signal-hook-async-std", "tracing", "tracing-subscriber", "uuid", @@ -3869,5 +3836,7 @@ dependencies = [ "zenoh-flow-commons", "zenoh-flow-daemon", "zenoh-flow-descriptors", + "zenoh-flow-records", + "zenoh-flow-runtime", "zenoh-util", ] diff --git a/Cargo.toml b/Cargo.toml index 392099c1..991ccb09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,18 +14,16 @@ [workspace] resolver = "2" members = [ - "examples", - "zenoh-flow-commons", - "zenoh-flow-daemon", - "zenoh-flow-derive", - "zenoh-flow-descriptors", - "zenoh-flow-nodes", - "zenoh-flow-records", - "zenoh-flow-runtime", - "zenoh-flow-standalone-daemon", - "zenoh-flow-standalone-runtime", - "zenoh-plugin-zenoh-flow", - "zfctl", + "examples", + "zenoh-flow-commons", + "zenoh-flow-daemon", + "zenoh-flow-derive", + "zenoh-flow-descriptors", + "zenoh-flow-nodes", + "zenoh-flow-records", + "zenoh-flow-runtime", + "zenoh-plugin-zenoh-flow", + "zfctl", ] [workspace.package] diff --git a/zenoh-flow-standalone-daemon/Cargo.toml b/zenoh-flow-standalone-daemon/Cargo.toml deleted file mode 100644 index ea4f192e..00000000 --- a/zenoh-flow-standalone-daemon/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# -# Copyright © 2021 ZettaScale Technology -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License 2.0 which is available at -# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -# which is available at https://www.apache.org/licenses/LICENSE-2.0. -# -# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -# -# Contributors: -# ZettaScale Zenoh Team, -# - -[package] -authors.workspace = true -categories.workspace = true -description = "A standalone Zenoh-Flow daemon." -edition.workspace = true -homepage.workspace = true -license.workspace = true -name = "zenoh-flow-standalone-daemon" -readme.workspace = true -repository.workspace = true -version.workspace = true - -[dependencies] -anyhow = { workspace = true } -async-std = { workspace = true } -clap = { workspace = true } -serde = { workspace = true } -signal-hook = "0.3" -signal-hook-async-std = "0.2" -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -uhlc = { workspace = true } -zenoh = { workspace = true } -zenoh-flow-commons = { workspace = true } -zenoh-flow-daemon = { workspace = true } diff --git a/zenoh-flow-standalone-daemon/src/main.rs b/zenoh-flow-standalone-daemon/src/main.rs deleted file mode 100644 index 9c0dcd54..00000000 --- a/zenoh-flow-standalone-daemon/src/main.rs +++ /dev/null @@ -1,122 +0,0 @@ -// -// Copyright © 2021 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use std::path::PathBuf; - -use async_std::stream::StreamExt; -use clap::Parser; -use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; -use signal_hook_async_std::Signals; -use zenoh::prelude::r#async::AsyncResolve; -use zenoh_flow_commons::{try_parse_from_file, Vars}; -use zenoh_flow_daemon::daemon::*; - -#[derive(Parser)] -struct Cli { - /// The human-readable name to give the Zenoh-Flow Runtime wrapped by this - /// Daemon. - /// - /// To start a Zenoh-Flow Daemon, at least a name is required. - name: Option, - /// The path of the configuration of the Zenoh-Flow Daemon. - /// - /// This configuration allows setting extensions supported by the Runtime - /// and its name. - #[arg(short, long, verbatim_doc_comment)] - configuration: Option, - /// The path to a Zenoh configuration to manage the connection to the Zenoh - /// network. - /// - /// If no configuration is provided, the Daemon will default to connecting as - /// a peer with multicast scouting enabled. - #[arg(short = 'z', long, verbatim_doc_comment)] - zenoh_configuration: Option, -} - -#[async_std::main] -async fn main() { - let _ = tracing_subscriber::fmt::try_init(); - - let cli = Cli::parse(); - - if cli.configuration.is_some() && cli.name.is_some() { - tracing::error!("Please either specify a or a , not both."); - return; - } else if cli.configuration.is_none() && cli.name.is_none() { - tracing::error!("Please provide at least a for this Zenoh-Flow Runtime."); - return; - } - - let zenoh_config = match cli.zenoh_configuration { - Some(path) => zenoh::prelude::Config::from_file(path.clone()).unwrap_or_else(|e| { - panic!( - "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", - path.display() - ) - }), - None => zenoh::config::peer(), - }; - - let zenoh_session = zenoh::open(zenoh_config) - .res_async() - .await - .unwrap_or_else(|e| panic!("Failed to open Zenoh session:\n{e:?}")) - .into_arc(); - - let daemon = match cli.configuration { - Some(path) => { - let (zenoh_flow_configuration, _) = - try_parse_from_file::(&path, Vars::default()) - .unwrap_or_else(|e| { - panic!( - "Failed to parse a Zenoh-Flow Configuration from < {} >:\n{e:?}", - path.display() - ) - }); - - Daemon::spawn_from_config(zenoh_session, zenoh_flow_configuration) - .await - .expect("Failed to spawn the Zenoh-Flow Daemon") - } - None => Daemon::spawn( - Runtime::builder(cli.name.unwrap()) - .session(zenoh_session) - .build() - .await - .expect("Failed to build the Zenoh-Flow Runtime"), - ) - .await - .expect("Failed to spawn the Zenoh-Flow Daemon"), - }; - - async_std::task::spawn(async move { - let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT]) - .expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]"); - - while let Some(signal) = signals.next().await { - match signal { - SIGTERM | SIGINT | SIGQUIT => { - tracing::info!("Received termination signal, shutting down."); - daemon.stop().await; - break; - } - - signal => { - tracing::warn!("Ignoring signal ({signal})"); - } - } - } - }) - .await; -} diff --git a/zenoh-flow-standalone-runtime/Cargo.toml b/zenoh-flow-standalone-runtime/Cargo.toml deleted file mode 100644 index b15b4266..00000000 --- a/zenoh-flow-standalone-runtime/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright © 2021 ZettaScale Technology -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License 2.0 which is available at -# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -# which is available at https://www.apache.org/licenses/LICENSE-2.0. -# -# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -# -# Contributors: -# ZettaScale Zenoh Team, -# - -[package] -authors = { workspace = true } -categories = { workspace = true } -description = "A standalone Zenoh-Flow runtime intended for tests and prototyping." -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -name = "zenoh-flow-standalone-runtime" -repository = { workspace = true } -version = { workspace = true } - -[dependencies] -anyhow = { workspace = true } -async-std = { workspace = true } -clap = { workspace = true } -serde = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { workspace = true } -uhlc = { workspace = true } -zenoh-flow-commons = { workspace = true } -zenoh-flow-descriptors = { workspace = true } -zenoh-flow-records = { workspace = true } -zenoh-flow-runtime = { workspace = true } diff --git a/zfctl/Cargo.toml b/zfctl/Cargo.toml index d5f992c1..466f88bf 100644 --- a/zfctl/Cargo.toml +++ b/zfctl/Cargo.toml @@ -33,6 +33,8 @@ clap = { workspace = true, features = ["derive", "wrap_help"] } comfy-table = "7.1.0" derive_more = "0.99.10" dirs = "5.0" +signal-hook = "0.3" +signal-hook-async-std = "0.2" git-version = { workspace = true } itertools = "0.12" log = { workspace = true } @@ -48,5 +50,7 @@ uuid = { workspace = true, features = ["serde", "v4"] } zenoh = { workspace = true } zenoh-flow-commons = { workspace = true } zenoh-flow-daemon = { workspace = true } +zenoh-flow-runtime = { workspace = true } zenoh-flow-descriptors = { workspace = true } +zenoh-flow-records = { workspace = true } zenoh-util = { workspace = true } diff --git a/zfctl/src/daemon_command.rs b/zfctl/src/daemon_command.rs new file mode 100644 index 00000000..5337c8a9 --- /dev/null +++ b/zfctl/src/daemon_command.rs @@ -0,0 +1,136 @@ +// +// Copyright © 2021 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::path::PathBuf; + +use async_std::stream::StreamExt; +use clap::{ArgGroup, Subcommand}; + +use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM}; +use signal_hook_async_std::Signals; +use zenoh::prelude::r#async::*; +use zenoh_flow_commons::{try_parse_from_file, Result, Vars}; +use zenoh_flow_daemon::daemon::{Daemon, ZenohFlowConfiguration}; + +use zenoh_flow_runtime::Runtime; + +#[derive(Subcommand)] +pub(crate) enum DaemonCommand { + /// Launch a Zenoh-Flow Daemon. + #[command(verbatim_doc_comment)] + #[command(group( + ArgGroup::new("exclusive") + .args(&["name", "configuration"]) + .required(true) + .multiple(false) + ))] + Start { + /// The human-readable name to give the Zenoh-Flow Runtime wrapped by this + /// Daemon. + /// + /// To start a Zenoh-Flow Daemon, at least a name is required. + name: Option, + /// The path of the configuration of the Zenoh-Flow Daemon. + /// + /// This configuration allows setting extensions supported by the Runtime + /// and its name. + #[arg(short, long, verbatim_doc_comment)] + configuration: Option, + /// The path to a Zenoh configuration to manage the connection to the Zenoh + /// network. + /// + /// If no configuration is provided, `zfctl` will default to connecting as + /// a peer with multicast scouting enabled. + #[arg(short = 'z', long, verbatim_doc_comment)] + zenoh_configuration: Option, + }, +} + +impl DaemonCommand { + pub async fn run(self, _session: Session) -> Result<()> { + match self { + DaemonCommand::Start { + name, + configuration, + zenoh_configuration, + } => { + let zenoh_config = match zenoh_configuration { + Some(path) => { + zenoh::prelude::Config::from_file(path.clone()).unwrap_or_else(|e| { + panic!( + "Failed to parse the Zenoh configuration from < {} >:\n{e:?}", + path.display() + ) + }) + } + None => zenoh::config::peer(), + }; + + let zenoh_session = zenoh::open(zenoh_config) + .res_async() + .await + .unwrap_or_else(|e| panic!("Failed to open Zenoh session:\n{e:?}")) + .into_arc(); + + let daemon = match configuration { + Some(path) => { + let (zenoh_flow_configuration, _) = + try_parse_from_file::(&path, Vars::default()) + .unwrap_or_else(|e| { + panic!( + "Failed to parse a Zenoh-Flow Configuration from < {} >:\n{e:?}", + path.display() + ) + }); + + Daemon::spawn_from_config(zenoh_session, zenoh_flow_configuration) + .await + .expect("Failed to spawn the Zenoh-Flow Daemon") + } + None => Daemon::spawn( + Runtime::builder(name.unwrap()) + .session(zenoh_session) + .build() + .await + .expect("Failed to build the Zenoh-Flow Runtime"), + ) + .await + .expect("Failed to spawn the Zenoh-Flow Daemon"), + }; + + async_std::task::spawn(async move { + let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT]) + .expect("Failed to create SignalsInfo for: [SIGTERM, SIGINT, SIGQUIT]"); + + while let Some(signal) = signals.next().await { + match signal { + SIGTERM | SIGINT | SIGQUIT => { + tracing::info!("Received termination signal, shutting down."); + daemon.stop().await; + break; + } + + signal => { + tracing::warn!("Ignoring signal ({signal})"); + } + } + } + }) + .await; + } + } + + Ok(()) + } +} diff --git a/zfctl/src/main.rs b/zfctl/src/main.rs index c68b8261..bee29dc9 100644 --- a/zfctl/src/main.rs +++ b/zfctl/src/main.rs @@ -18,6 +18,11 @@ use instance_command::InstanceCommand; mod runtime_command; use runtime_command::RuntimeCommand; +mod daemon_command; +use daemon_command::DaemonCommand; + +mod run_local; + mod utils; use std::path::PathBuf; @@ -25,6 +30,7 @@ use anyhow::anyhow; use clap::{Parser, Subcommand}; use utils::{get_random_runtime, get_runtime_by_name}; use zenoh::prelude::r#async::*; +use zenoh_flow_commons::parse_vars; use zenoh_flow_commons::{Result, RuntimeId}; const ZENOH_FLOW_INTERNAL_ERROR: &str = r#" @@ -82,6 +88,34 @@ enum Command { /// To interact with a Zenoh-Flow runtime. #[command(subcommand)] Runtime(RuntimeCommand), + + /// To interact with a Zenoh-Flow daemon. + #[command(subcommand)] + Daemon(DaemonCommand), + + /// Run a dataflow locally. + #[command(verbatim_doc_comment)] + RunLocal { + /// The data flow to execute. + flow: PathBuf, + /// The path to a Zenoh configuration to manage the connection to the Zenoh + /// network. + /// + /// If no configuration is provided, `zfctl` will default to connecting as + /// a peer with multicast scouting enabled. + #[arg(short = 'z', long, verbatim_doc_comment)] + zenoh_configuration: Option, + /// The, optional, location of the configuration to load nodes implemented not in Rust. + #[arg(short, long, value_name = "path")] + extensions: Option, + /// Variables to add / overwrite in the `vars` section of your data + /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. + /// + /// Example: + /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug + #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] + vars: Option>, + }, } #[async_std::main] @@ -123,6 +157,13 @@ async fn main() -> Result<()> { command.run(session, orchestrator_id).await } - Command::Runtime(command) => command.run(&session).await, + Command::Runtime(command) => command.run(session).await, + Command::Daemon(command) => command.run(session).await, + Command::RunLocal { + flow, + zenoh_configuration, + extensions, + vars, + } => run_local::run_locally(flow, zenoh_configuration, extensions, vars).await, } } diff --git a/zenoh-flow-standalone-runtime/src/main.rs b/zfctl/src/run_local.rs similarity index 64% rename from zenoh-flow-standalone-runtime/src/main.rs rename to zfctl/src/run_local.rs index a1283b4f..f31c5b6a 100644 --- a/zenoh-flow-standalone-runtime/src/main.rs +++ b/zfctl/src/run_local.rs @@ -1,56 +1,19 @@ -// -// Copyright © 2021 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - use std::path::PathBuf; use anyhow::Context; use async_std::io::ReadExt; -use clap::Parser; -use zenoh_flow_commons::{parse_vars, Vars}; +use zenoh_flow_commons::{Result, Vars}; use zenoh_flow_descriptors::{DataFlowDescriptor, FlattenedDataFlowDescriptor}; use zenoh_flow_records::DataFlowRecord; use zenoh_flow_runtime::{zenoh::AsyncResolve, Extensions, Runtime}; -#[derive(Parser)] -struct Cli { - /// The data flow to execute. +pub async fn run_locally( flow: PathBuf, - /// The path to a Zenoh configuration to manage the connection to the Zenoh - /// network. - /// - /// If no configuration is provided, `zfctl` will default to connecting as - /// a peer with multicast scouting enabled. - #[arg(short = 'z', long, verbatim_doc_comment)] zenoh_configuration: Option, - /// The, optional, location of the configuration to load nodes implemented not in Rust. - #[arg(short, long, value_name = "path")] extensions: Option, - /// Variables to add / overwrite in the `vars` section of your data - /// flow, with the form `KEY=VALUE`. Can be repeated multiple times. - /// - /// Example: - /// --vars HOME_DIR=/home/zenoh-flow --vars BUILD=debug - #[arg(long, value_parser = parse_vars::, verbatim_doc_comment)] vars: Option>, -} - -#[async_std::main] -async fn main() { - let _ = tracing_subscriber::fmt::try_init(); - let cli = Cli::parse(); - - let extensions = match cli.extensions { +) -> Result<()> { + let extensions = match extensions { Some(extensions_path) => { let (extensions, _) = zenoh_flow_commons::try_parse_from_file::( extensions_path.as_os_str(), @@ -67,23 +30,23 @@ async fn main() { None => Extensions::default(), }; - let vars = match cli.vars { + let vars = match vars { Some(v) => Vars::from(v), None => Vars::default(), }; let (data_flow, vars) = - zenoh_flow_commons::try_parse_from_file::(cli.flow.as_os_str(), vars) + zenoh_flow_commons::try_parse_from_file::(flow.as_os_str(), vars) .context(format!( "Failed to load data flow descriptor from < {} >", - &cli.flow.display() + &flow.display() )) .unwrap(); let flattened_flow = FlattenedDataFlowDescriptor::try_flatten(data_flow, vars) .context(format!( "Failed to flattened data flow extracted from < {} >", - &cli.flow.display() + &flow.display() )) .unwrap(); @@ -91,7 +54,7 @@ async fn main() { .add_extensions(extensions) .expect("Failed to add extensions"); - if let Some(path) = cli.zenoh_configuration { + if let Some(path) = zenoh_configuration { let zenoh_config = zenoh_flow_runtime::zenoh::Config::from_file(path.clone()) .unwrap_or_else(|e| { panic!( @@ -134,10 +97,9 @@ async fn main() { let mut input = [0_u8]; println!( r#" - The flow ({}) < {} > was successfully started. - - To abort its execution, simply enter 'q'. - "#, + The flow ({}) < {} > was successfully started. + To abort its execution, simply enter 'q'. + "#, record_name, instance_id ); @@ -152,4 +114,6 @@ async fn main() { .try_delete_instance(&instance_id) .await .unwrap_or_else(|e| panic!("Failed to delete data flow < {} >: {:?}", &instance_id, e)); + + Ok(()) } diff --git a/zfctl/src/runtime_command.rs b/zfctl/src/runtime_command.rs index e8d350da..d9d951ab 100644 --- a/zfctl/src/runtime_command.rs +++ b/zfctl/src/runtime_command.rs @@ -60,10 +60,10 @@ pub(crate) enum RuntimeCommand { } impl RuntimeCommand { - pub async fn run(self, session: &Session) -> Result<()> { + pub async fn run(self, session: Session) -> Result<()> { match self { RuntimeCommand::List => { - let runtimes = get_all_runtimes(session).await; + let runtimes = get_all_runtimes(&session).await; let mut table = Table::new(); table.set_width(80); @@ -81,7 +81,7 @@ impl RuntimeCommand { } => { let runtime_id = match (runtime_id, runtime_name) { (Some(id), _) => id, - (None, Some(name)) => get_runtime_by_name(session, &name).await, + (None, Some(name)) => get_runtime_by_name(&session, &name).await, (None, None) => { // This code is indeed unreachable because: // (1) The `group` macro has `required = true` which indicates that clap requires an entry for