diff --git a/.config/nextest.toml b/.config/nextest.toml index aa2c3ac37b..37b7dfcea0 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,6 +1,6 @@ # By default, retry a few times until pass the test within the specified timeout [profile.default] -retries = 4 +retries = 1 slow-timeout = { period = "60s", terminate-after = 2 } # Run the following tests exclusively with longer timeout diff --git a/Cargo.toml b/Cargo.toml index 03fe60fad4..dc24991488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,6 +128,7 @@ rand = { version = "0.8.5", default-features = false } # Default features are di rand_chacha = "0.3.1" rcgen = "0.11" regex = "1.7.1" +ron = "0.8.1" ringbuffer-spsc = "0.1.9" rsa = "0.9" rustc_version = "0.4.0" diff --git a/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs b/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs index 45b36aa5e7..60bda80bda 100644 --- a/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs +++ b/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs @@ -19,8 +19,6 @@ use zenoh::prelude::r#async::*; async fn main() { zenoh_util::init_log_test(); - let _z = zenoh_runtime::ZRuntimePoolGuard; - let pub_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap(); let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap(); diff --git a/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs b/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs index edd8000bf1..1c82339392 100644 --- a/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs +++ b/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs @@ -20,8 +20,6 @@ use zenoh::prelude::r#async::*; async fn main() { zenoh_util::init_log_test(); - let _z = zenoh_runtime::ZRuntimePoolGuard; - let queryable_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap(); let get_selector = Selector::try_from("test/valgrind/**").unwrap(); diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs index b77dffeba0..4a69f5c4e3 100644 --- a/commons/zenoh-macros/src/lib.rs +++ b/commons/zenoh-macros/src/lib.rs @@ -352,3 +352,38 @@ pub fn ke(tokens: TokenStream) -> TokenStream { Err(e) => panic!("{}", e), } } + +mod zenoh_runtime_derive; +use syn::DeriveInput; +use zenoh_runtime_derive::{derive_generic_runtime_param, derive_register_param}; + +/// Make the underlying struct `Param` be generic over any `T` satifying a generated `trait DefaultParam { fn param() -> Param; }` +/// ```rust,ignore +/// #[derive(GenericRuntimeParam)] +/// struct Param { +/// ... +/// } +/// ``` +#[proc_macro_derive(GenericRuntimeParam)] +pub fn generic_runtime_param(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = syn::parse_macro_input!(input); + derive_generic_runtime_param(input) + .unwrap_or_else(syn::Error::into_compile_error) + .into() +} + +/// Register the input `Enum` with the struct `Param` specified in the param attribute +/// ```rust,ignore +/// #[derive(RegisterParam)] +/// #[param(Param)] +/// enum Enum { +/// ... +/// } +/// ``` +#[proc_macro_derive(RegisterParam, attributes(alias, param))] +pub fn register_param(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = syn::parse_macro_input!(input); + derive_register_param(input) + .unwrap_or_else(syn::Error::into_compile_error) + .into() +} diff --git a/commons/zenoh-macros/src/zenoh_runtime_derive.rs b/commons/zenoh-macros/src/zenoh_runtime_derive.rs new file mode 100644 index 0000000000..c08844481c --- /dev/null +++ b/commons/zenoh-macros/src/zenoh_runtime_derive.rs @@ -0,0 +1,344 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This crate is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) +use proc_macro2::TokenStream; +use quote::{format_ident, quote, ToTokens}; +use syn::{ + parse::{Parse, ParseStream}, + punctuated::Punctuated, + spanned::Spanned, + token::Comma, + Data, DataEnum, DataStruct, DeriveInput, Expr, ExprLit, Fields, Ident, Lit, LitStr, Meta, + MetaNameValue, Token, Variant, +}; + +struct SerdeAttribute { + rename: LitStr, +} + +impl Parse for SerdeAttribute { + fn parse(tokens: ParseStream) -> syn::Result { + let parsed = Punctuated::::parse_terminated(tokens)?; + for kv in parsed { + if kv.path.is_ident("rename") { + if let Expr::Lit(ExprLit { + lit: Lit::Str(str), .. + }) = kv.value + { + return Ok(SerdeAttribute { rename: str }); + } + } + } + Err(syn::Error::new( + tokens.span(), + "Invalid rename detected, expect #[serde(rename = \"name\")]", + )) + } +} + +fn parse_variants( + variants: &Punctuated, +) -> (Vec<&Ident>, Vec, Vec) { + let mut name_vec = Vec::new(); + let mut alias_vec = Vec::new(); + let mut param_vec = Vec::new(); + + for var in variants { + let name = &var.ident; + let alias = var + .attrs + .iter() + .find(|attr| attr.path().is_ident("serde")) + .map(|attr| { + attr.parse_args::() + .map(|x| { + x.rename + .value() + .parse::() + .expect("Failed to convert LitStr to Ident TokenStream") + }) + .unwrap_or_else(syn::Error::into_compile_error) + }) + .ok_or(syn::Error::new( + var.span(), + "#[serde(alias = \"name\")] is missing", + )) + .unwrap_or_else(syn::Error::into_compile_error); + let param = var + .attrs + .iter() + .find(|attr| attr.path().is_ident("param")) + .map(|attr| match &attr.meta { + Meta::List(list) => list.tokens.to_string().replace('=', ":").parse(), + _ => panic!("Invalid"), + }) + .unwrap_or("".parse()) + .unwrap(); + + name_vec.push(name); + alias_vec.push(alias); + param_vec.push(param); + } + + (name_vec, alias_vec, param_vec) +} + +fn generate_declare_param( + meta_param: &Ident, + variant_names: &[&Ident], + aliases: &Vec, + params: &[TokenStream], +) -> TokenStream { + let default_param_of_variant: Vec<_> = variant_names + .iter() + .map(|name| format_ident!("DefaultParamOf{}", name)) + .collect(); + let params_with_default = params.iter().map(|x| { + if x.to_string() != "" { + quote!(#x, ..Default::default()) + } else { + quote!(..Default::default()) + } + }); + quote! { + trait DefaultParam { + fn param() -> #meta_param; + } + + #( + // Declare struct DefaultParamOf`#variant_name` + #[derive(Debug, Clone, Copy)] + struct #default_param_of_variant; + + // impl `DefaultParam` for `DefaultParamOf#variant_name` + impl DefaultParam for #default_param_of_variant { + fn param() -> #meta_param { + #meta_param { + #params_with_default + } + } + } + )* + + // An internal helper struct for parsing the RuntimeParam + #[derive(Deserialize, Debug, Clone, Copy)] + #[serde(deny_unknown_fields)] + struct AbstractRuntimeParam { + #( + #[serde(default)] + #aliases: RuntimeParamHelper<#default_param_of_variant>, + )* + } + + // AbstractRuntimeParam => GlobalRuntimeParam, extract fields from AbstractRuntimeParam + impl From for GlobalRuntimeParam { + fn from(value: AbstractRuntimeParam) -> Self { + Self { + #( + #aliases: value.#aliases.into(), + )* + } + } + } + + /// A global runtime parameter for zenoh runtimes + // pub is needed within lazy_static + pub struct GlobalRuntimeParam { + #( + #aliases: #meta_param, + )* + } + + } +} + +pub(crate) fn derive_register_param(input: DeriveInput) -> Result { + // enum representing the runtime + let enum_name = &input.ident; + + // Parse the parameter to register, called meta_param + let attr = input + .attrs + .iter() + .find(|attr| attr.path().is_ident("param")) + .ok_or(syn::Error::new( + input.span(), + "Expected attribute #[param(StructName)] is missing.", + ))?; + let meta_param = match &attr.meta { + Meta::List(list) => syn::parse2::(list.tokens.to_token_stream()), + _ => Err(syn::Error::new( + attr.span(), + format!("Failed to parse #[param({})]", attr.to_token_stream()), + )), + }?; + + // Parse the variants and associated parameters of the enum + let variants = match input.data { + Data::Enum(DataEnum { variants, .. }) => variants, + _ => unimplemented!("Only enum is supported."), + }; + let (variant_names, aliases, params) = parse_variants(&variants); + let declare_param_quote = + generate_declare_param(&meta_param, &variant_names, &aliases, ¶ms); + + let tokens = quote! { + + use ron::{extensions::Extensions, options::Options}; + use #enum_name::*; + + lazy_static! { + // We need to hold the reference of ZENOH_RUNTIME_ENV_STRING to prevent the issue of + // "returning a value referencing data owned by the current function" + pub static ref ZENOH_RUNTIME_ENV_STRING: String = env::var(ZENOH_RUNTIME_ENV).unwrap_or("()".to_string()); + pub static ref ZRUNTIME_PARAM: GlobalRuntimeParam = Options::default() + .with_default_extension(Extensions::IMPLICIT_SOME) + .from_str::(&ZENOH_RUNTIME_ENV_STRING) + .unwrap() + .into(); + } + + #declare_param_quote + + impl #enum_name { + #[doc = concat!("Create an iterator from ", stringify!(#enum_name))] + pub fn iter() -> impl Iterator { + [#(#variant_names,)*].into_iter() + } + + #[doc = "Initialize the tokio runtime according to the given config"] + fn init(&self) -> Result { + match self { + #( + #variant_names => { + ZRUNTIME_PARAM.#aliases.build(#variant_names) + }, + )* + } + } + + } + + #[doc = concat!( + "Borrow the underlying `", + stringify!(#meta_param), + "` from ", + stringify!(#enum_name) + )] + impl Borrow<#meta_param> for #enum_name { + fn borrow(&self) -> &#meta_param { + match self { + #( + #variant_names => { + &ZRUNTIME_PARAM.#aliases + }, + )* + } + } + } + impl std::fmt::Display for #enum_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + #( + #variant_names => { + write!(f, stringify!(#aliases)) + }, + )* + + } + } + } + }; + Ok(tokens) +} + +pub(crate) fn derive_generic_runtime_param(input: DeriveInput) -> Result { + let meta_param = &input.ident; + let fields = match &input.data { + Data::Struct(DataStruct { + fields: Fields::Named(fields), + .. + }) => &fields.named, + _ => { + return Err(syn::Error::new( + input.span(), + "Expected a struct with named fields", + )) + } + }; + let field_names: Vec<_> = fields.iter().map(|field| &field.ident).collect(); + let field_types: Vec<_> = fields.iter().map(|field| &field.ty).collect(); + let helper_name = format_ident!("{}Helper", meta_param); + + let tokens = quote! { + + use std::marker::PhantomData; + + // Declare a helper struct to be generic over any T implementing DefaultParam + #[derive(Deserialize, Debug, Clone, Copy)] + #[serde(deny_unknown_fields, default)] + struct #helper_name + where + T: DefaultParam, + { + #( + #field_names: #field_types, + )* + #[serde(skip)] + _phantom: PhantomData, + } + + impl From<#meta_param> for #helper_name + where + T: DefaultParam, + { + fn from(value: #meta_param) -> Self { + let #meta_param { #(#field_names,)* } = value; + Self { + #(#field_names,)* + _phantom: PhantomData::default(), + } + } + } + + impl From<#helper_name> for #meta_param + where + T: DefaultParam, + { + fn from(value: #helper_name) -> Self { + let #helper_name { #(#field_names,)* .. } = value; + Self { + #(#field_names,)* + } + } + } + + impl Default for #helper_name + where + T: DefaultParam, + { + fn default() -> Self { + T::param().into() + } + } + + }; + + Ok(tokens) +} diff --git a/commons/zenoh-runtime/Cargo.toml b/commons/zenoh-runtime/Cargo.toml index e5bd64b8c5..e3f0c7a3c0 100644 --- a/commons/zenoh-runtime/Cargo.toml +++ b/commons/zenoh-runtime/Cargo.toml @@ -13,8 +13,12 @@ description = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +libc = { workspace = true } +ron = { workspace = true } +serde = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } zenoh-result = { workspace = true, features = ["std"] } zenoh-collections = { workspace = true, features = ["std"] } +zenoh-macros = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "sync", "time"] } diff --git a/commons/zenoh-runtime/src/lib.rs b/commons/zenoh-runtime/src/lib.rs index 492e0a6665..1a9d765420 100644 --- a/commons/zenoh-runtime/src/lib.rs +++ b/commons/zenoh-runtime/src/lib.rs @@ -13,7 +13,9 @@ // use core::panic; use lazy_static::lazy_static; +use serde::Deserialize; use std::{ + borrow::Borrow, collections::HashMap, env, future::Future, @@ -25,88 +27,97 @@ use std::{ time::Duration, }; use tokio::runtime::{Handle, Runtime, RuntimeFlavor}; -use zenoh_collections::Properties; +use zenoh_macros::{GenericRuntimeParam, RegisterParam}; use zenoh_result::ZResult as Result; -const ZENOH_RUNTIME_THREADS_ENV: &str = "ZENOH_RUNTIME_THREADS"; +pub const ZENOH_RUNTIME_ENV: &str = "ZENOH_RUNTIME"; + +/// Available parameters to configure the ZRuntime. +#[derive(Deserialize, Debug, GenericRuntimeParam)] +#[serde(deny_unknown_fields, default)] +pub struct RuntimeParam { + /// Number of async worker threads. At least one. + pub worker_threads: usize, + /// Number of maximal worker threads for blocking tasks. At least one. + pub max_blocking_threads: usize, + /// Hand over one ZRuntime to another one. + pub handover: Option, +} + +impl Default for RuntimeParam { + fn default() -> Self { + Self { + worker_threads: 1, + max_blocking_threads: 50, + handover: None, + } + } +} -#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug)] +impl RuntimeParam { + pub fn build(&self, zrt: ZRuntime) -> Result { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(self.worker_threads) + .max_blocking_threads(self.max_blocking_threads) + .enable_io() + .enable_time() + .thread_name_fn(move || { + let id = ZRUNTIME_INDEX + .get(&zrt) + .unwrap() + .fetch_add(1, Ordering::SeqCst); + format!("{}-{}", zrt, id) + }) + .build()?; + Ok(rt) + } +} + +/// [`ZRuntime`], the access point of manipulate runtimes within zenoh. +/// The runtime parameter can be configured by setting the environmental variable [`ZENOH_RUNTIME_ENV`]. +/// The parsing syntax use [RON](https://github.com/ron-rs/ron). An example configuration looks +/// like +/// +/// ```console +/// ZENOH_RUNTIME='( +/// rx: (handover: app), +/// acc: (handover: app), +/// app: (worker_threads: 2), +/// tx: (max_blocking_threads: 1) +/// )' +/// ``` +/// Note: The runtime parameter takes effect at the beginning of the zenoh process and no longer be +/// changed after the initialization. +#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)] +#[param(RuntimeParam)] pub enum ZRuntime { + /// Renamed to app. Default param: worker_threads = 1. + #[serde(rename = "app")] + #[param(worker_threads = 1)] Application, + + /// Renamed to acc. Default param: worker_threads = 1. + #[serde(rename = "acc")] + #[param(worker_threads = 1)] Acceptor, + + /// Renamed to tx. Default param: worker_threads = 1. + #[serde(rename = "tx")] + #[param(worker_threads = 1)] TX, + + /// Renamed to rx. Default param: worker_threads = 2. + #[serde(rename = "rx")] + #[param(worker_threads = 2)] RX, + + /// Renamed to net. Default param: worker_threads = 1. + #[serde(rename = "net")] + #[param(worker_threads = 1)] Net, } impl ZRuntime { - fn iter() -> impl Iterator { - use ZRuntime::*; - [Application, Acceptor, TX, RX, Net].into_iter() - } - - fn init(&self) -> Result { - let config = &ZRUNTIME_CONFIG; - - let thread_name = format!("{self:?}"); - - use ZRuntime::*; - let rt = match self { - Application => tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.application_threads) - .enable_io() - .enable_time() - .thread_name_fn(move || { - static ATOMIC_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("{thread_name}-{}", id) - }) - .build()?, - Acceptor => tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.acceptor_threads) - .enable_io() - .enable_time() - .thread_name_fn(move || { - static ATOMIC_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("{thread_name}-{}", id) - }) - .build()?, - TX => tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.tx_threads) - .enable_io() - .enable_time() - .thread_name_fn(move || { - static ATOMIC_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("{thread_name}-{}", id) - }) - .build()?, - RX => tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.rx_threads) - .enable_io() - .enable_time() - .thread_name_fn(move || { - static ATOMIC_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("{thread_name}-{}", id) - }) - .build()?, - Net => tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.net_threads) - .enable_io() - .enable_time() - .thread_name_fn(move || { - static ATOMIC_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("{thread_name}-{}", id) - }) - .build()?, - }; - - Ok(rt) - } - pub fn block_in_place(&self, f: F) -> R where F: Future, @@ -128,20 +139,42 @@ impl Deref for ZRuntime { } lazy_static! { - pub static ref ZRUNTIME_CONFIG: ZRuntimeConfig = ZRuntimeConfig::from_env(); pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new(); + pub static ref ZRUNTIME_INDEX: HashMap = ZRuntime::iter() + .map(|zrt| (zrt, AtomicUsize::new(0))) + .collect(); +} + +// To drop the data mannually since Rust does not drop static variables. +pub extern "C" fn cleanup() { + unsafe { + std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read()); + std::mem::drop((ZRUNTIME_INDEX.deref() as *const HashMap).read()); + } } pub struct ZRuntimePool(HashMap>); impl ZRuntimePool { fn new() -> Self { + // Register a callback to clean the static variables. + unsafe { + libc::atexit(cleanup); + } Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect()) } pub fn get(&self, zrt: &ZRuntime) -> &Handle { + // Although the ZRuntime is called to use `zrt`, it may be handed over to another one + // specified via the environmental variable. + let param: &RuntimeParam = zrt.borrow(); + let zrt = match param.handover { + Some(handover) => handover, + None => *zrt, + }; + self.0 - .get(zrt) + .get(&zrt) .expect("The hashmap should contains {zrt} after initialization") .get_or_init(|| zrt.init().expect("Failed to init {zrt}")) .handle() @@ -166,79 +199,6 @@ impl Drop for ZRuntimePool { } } -/// In order to prevent valgrind reporting memory leaks, -/// we use this guard to force drop ZRUNTIME_POOL since Rust does not drop static variables. -#[doc(hidden)] -pub struct ZRuntimePoolGuard; - -impl Drop for ZRuntimePoolGuard { - fn drop(&mut self) { - unsafe { - let ptr = &(*ZRUNTIME_POOL) as *const ZRuntimePool; - std::mem::drop(ptr.read()); - } - } -} - -#[derive(Debug, Copy, Clone)] -pub struct ZRuntimeConfig { - pub application_threads: usize, - pub acceptor_threads: usize, - pub tx_threads: usize, - pub rx_threads: usize, - pub net_threads: usize, -} - -impl ZRuntimeConfig { - fn from_env() -> ZRuntimeConfig { - let mut c = Self::default(); - - if let Ok(s) = env::var(ZENOH_RUNTIME_THREADS_ENV) { - let ps = Properties::from(s); - if let Some(n) = ps.get("tx") { - if let Ok(n) = n.parse::() { - c.tx_threads = n; - } - } - if let Some(n) = ps.get("rx") { - if let Ok(n) = n.parse::() { - c.rx_threads = n; - } - } - if let Some(n) = ps.get("net") { - if let Ok(n) = n.parse::() { - c.net_threads = n; - } - } - if let Some(n) = ps.get("acceptor") { - if let Ok(n) = n.parse::() { - c.acceptor_threads = n; - } - } - if let Some(n) = ps.get("application") { - if let Ok(n) = n.parse::() { - c.application_threads = n; - } - } - } - - c - } -} - -// WARN: at least two otherwise fail on the routing test -impl Default for ZRuntimeConfig { - fn default() -> Self { - Self { - application_threads: 2, - acceptor_threads: 2, - tx_threads: 2, - rx_threads: 2, - net_threads: 2, - } - } -} - #[should_panic(expected = "Zenoh runtime doesn't support")] #[tokio::test] async fn block_in_place_fail_test() {