Skip to content

Commit

Permalink
Use Enum type to specify which runtime to be handed over and hence re…
Browse files Browse the repository at this point in the history
…move the unneeded lifetime.
  • Loading branch information
YuanYuYuan committed Apr 11, 2024
1 parent d8f842b commit cf08c08
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 55 deletions.
41 changes: 24 additions & 17 deletions commons/zenoh-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use core::panic;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::{
collections::HashMap,
env,
Expand All @@ -32,33 +32,33 @@ use zenoh_runtime_derive::{ConfigureZRuntime, GenericRuntimeParam};

const ZENOH_RUNTIME_ENV: &str = "ZENOH_RUNTIME";

trait DefaultParam<'a> {
fn param() -> RuntimeParam<'a>;
trait DefaultParam {
fn param() -> RuntimeParam;
}

#[derive(Serialize, Deserialize, Debug, GenericRuntimeParam)]
#[derive(Deserialize, Debug, GenericRuntimeParam)]
#[serde(deny_unknown_fields, default)]
pub struct RuntimeParam<'a> {
pub struct RuntimeParam {
pub worker_threads: usize,
pub max_blocking_threads: usize,
pub handover: &'a str,
pub handover: Option<ZRuntime>,
}

impl<'a> Default for RuntimeParam<'a> {
impl Default for RuntimeParam {
fn default() -> Self {
Self {
worker_threads: 1,
max_blocking_threads: 50,
handover: "",
handover: None,
}
}
}

pub trait RuntimeParamTrait {
fn param(&self) -> &RuntimeParam<'_>;
fn param(&self) -> &RuntimeParam;
}

impl RuntimeParam<'_> {
impl RuntimeParam {
pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_threads)
Expand All @@ -77,21 +77,21 @@ impl RuntimeParam<'_> {
}
}

#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, ConfigureZRuntime)]
#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, ConfigureZRuntime, Deserialize)]
pub enum ZRuntime {
#[alias(app)]
#[serde(rename = "app")]
#[param(worker_threads = 1)]
Application,
#[alias(acc)]
#[serde(rename = "acc")]
#[param(worker_threads = 1)]
Acceptor,
#[alias(tx)]
#[serde(rename = "tx")]
#[param(worker_threads = 1)]
TX,
#[alias(rx)]
#[serde(rename = "rx")]
#[param(worker_threads = 1)]
RX,
#[alias(net)]
#[serde(rename = "net")]
#[param(worker_threads = 1)]
Net,
}
Expand Down Expand Up @@ -155,8 +155,15 @@ impl ZRuntimePool {
}

pub fn get(&self, zrt: &ZRuntime) -> &Handle {
// Although the ZRuntime is called to use `zrt`, it may be handover to another one
// specified via the environmental variable.
let zrt = match zrt.param().handover {
Some(handover) => handover,
None => *zrt,
};

self.0
.get(zrt)
.get(&zrt)
.expect("The hashmap should contains {zrt} after initialization")
.get_or_init(|| zrt.init().expect("Failed to init {zrt}"))
.handle()
Expand Down
117 changes: 79 additions & 38 deletions commons/zenoh-runtime/zenoh-runtime-derive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
use proc_macro2::TokenStream;
use quote::{format_ident, quote, ToTokens};
use quote::{format_ident, quote};
use syn::{
punctuated::Punctuated, token::Comma, Data, DataEnum, DataStruct, DeriveInput, Fields, Ident,
Meta, Variant,
parse::{Parse, ParseStream},
punctuated::Punctuated,
spanned::Spanned,
token::Comma,
Data, DataEnum, DataStruct, DeriveInput, Expr, ExprLit, Fields, Ident, Lit, LitStr, Meta,
MetaNameValue, Token, Variant,
};

struct SerdeAttribute {
alias: LitStr,
}

impl Parse for SerdeAttribute {
fn parse(tokens: ParseStream) -> syn::Result<Self> {
let parsed = Punctuated::<MetaNameValue, Token![,]>::parse_terminated(tokens)?;
for kv in parsed {
if kv.path.is_ident("rename") {
if let Expr::Lit(ExprLit {
lit: Lit::Str(str), ..
}) = kv.value
{
return Ok(SerdeAttribute { alias: str });
}
}
}
Err(syn::Error::new(
tokens.span(),
"Invalid alias detected, expect #[serde(rename = \"name\")]",
))
}
}

fn parse_variants(
variants: &Punctuated<Variant, Comma>,
) -> (Vec<&Ident>, Vec<TokenStream>, Vec<TokenStream>) {
Expand All @@ -17,12 +45,22 @@ fn parse_variants(
let alias = var
.attrs
.iter()
.find(|attr| attr.path().is_ident("alias"))
.map(|attr| match &attr.meta {
Meta::List(list) => list.tokens.to_token_stream(),
_ => panic!("Invalid"),
.find(|attr| attr.path().is_ident("serde"))
.map(|attr| {
attr.parse_args::<SerdeAttribute>()
.map(|x| {
x.alias
.value()
.parse::<TokenStream>()
.expect("Failed to convert LitStr to Ident TokenStream")
})
.unwrap_or_else(syn::Error::into_compile_error)
})
.unwrap_or(name.to_string().to_lowercase().parse().unwrap());
.ok_or(syn::Error::new(
var.span(),
"#[serde(alias = \"name\")] is missing",
))
.unwrap_or_else(syn::Error::into_compile_error);
let param = var
.attrs
.iter()
Expand Down Expand Up @@ -63,8 +101,8 @@ fn declare_param(
#[derive(Debug, Clone, Copy)]
struct #helper_names;

impl<'a> DefaultParam<'a> for #helper_names {
fn param() -> RuntimeParam<'a> {
impl DefaultParam for #helper_names {
fn param() -> RuntimeParam {
RuntimeParam {
#params_with_default
}
Expand All @@ -73,30 +111,29 @@ fn declare_param(
)*

// pub is needed within lazy_static
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[derive(Deserialize, Debug, Clone, Copy)]
#[serde(deny_unknown_fields)]
pub struct AbstractRuntimeParam<'a> {
pub struct AbstractRuntimeParam {
#(
#[serde(borrow, default)]
#aliases: RuntimeParamHelper<'a, #helper_names>,
#[serde(default)]
#aliases: RuntimeParamHelper<#helper_names>,
)*
}

impl<'a> From<AbstractRuntimeParam<'a>> for GlobalRuntimeParam<'a> {
fn from(value: AbstractRuntimeParam<'a>) -> Self {
impl From<AbstractRuntimeParam> for GlobalRuntimeParam {
fn from(value: AbstractRuntimeParam) -> Self {
Self {
#(
#aliases: value.#aliases.into(),
// #aliases: todo!(),
)*
}
}
}

/// A global runtime parameter for zenoh runtimes
pub struct GlobalRuntimeParam<'a> {
pub struct GlobalRuntimeParam {
#(
#aliases: RuntimeParam<'a>,
#aliases: RuntimeParam,
)*
}

Expand All @@ -117,11 +154,18 @@ pub fn enum_iter(input: proc_macro::TokenStream) -> proc_macro::TokenStream {

let expanded = quote! {

use ron::{extensions::Extensions, options::Options};
use #enum_name::*;

lazy_static! {
// We need to hold the reference of ZENOH_RUNTIME_ENV_STRING to prevent the issue of
// "returning a value referencing data owned by the current function"
pub static ref ZENOH_RUNTIME_ENV_STRING: String = env::var(ZENOH_RUNTIME_ENV).unwrap_or("()".to_string());
pub static ref ZRUNTIME_PARAM: GlobalRuntimeParam<'static> = ron::from_str::<AbstractRuntimeParam>(&ZENOH_RUNTIME_ENV_STRING).unwrap().into();
pub static ref ZRUNTIME_PARAM: GlobalRuntimeParam = Options::default()
.with_default_extension(Extensions::IMPLICIT_SOME)
.from_str::<AbstractRuntimeParam>(&ZENOH_RUNTIME_ENV_STRING)
.unwrap()
.into();
// pub static ref ZRUNTIME_CONFIG: AbstractRuntimeParam<'static> = ron::from_str(&ZENOH_RUNTIME_ENV_STRING).unwrap();
}

Expand All @@ -130,14 +174,11 @@ pub fn enum_iter(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
impl #enum_name {
/// Create an iterator from enum
pub fn iter() -> impl Iterator<Item = #enum_name> {
use #enum_name::*;
[#(#variant_names,)*].into_iter()
}

/// Initialize the tokio runtime according to the given config
fn init(&self) -> Result<Runtime> {
use #enum_name::*;

match self {
#(
#variant_names => {
Expand All @@ -151,9 +192,7 @@ pub fn enum_iter(input: proc_macro::TokenStream) -> proc_macro::TokenStream {

impl RuntimeParamTrait for #enum_name {

fn param(&self) -> &RuntimeParam<'_> {
use #enum_name::*;

fn param(&self) -> &RuntimeParam {
match self {
#(
#variant_names => {
Expand All @@ -167,7 +206,6 @@ pub fn enum_iter(input: proc_macro::TokenStream) -> proc_macro::TokenStream {

impl std::fmt::Display for #enum_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use #enum_name::*;
match self {
#(
#variant_names => {
Expand Down Expand Up @@ -200,24 +238,24 @@ pub fn build_generic_runtime_param(input: proc_macro::TokenStream) -> proc_macro

use std::marker::PhantomData;

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[derive(Deserialize, Debug, Clone, Copy)]
#[serde(deny_unknown_fields, default)]
struct #helper_name<'a, T>
struct #helper_name<T>
where
T: DefaultParam<'a>,
T: DefaultParam,
{
#(
#field_names: #field_types,
)*
#[serde(skip)]
_phantom: PhantomData<&'a T>,
_phantom: PhantomData<T>,
}

impl<'a, T> From<RuntimeParam<'a>> for #helper_name<'a, T>
impl<T> From<RuntimeParam> for #helper_name<T>
where
T: DefaultParam<'a>,
T: DefaultParam,
{
fn from(value: RuntimeParam<'a>) -> Self {
fn from(value: RuntimeParam) -> Self {
let RuntimeParam { #(#field_names,)* } = value;
Self {
#(#field_names,)*
Expand All @@ -226,19 +264,22 @@ pub fn build_generic_runtime_param(input: proc_macro::TokenStream) -> proc_macro
}
}

impl<'a, T> From<#helper_name<'a, T>> for RuntimeParam<'a>
impl<T> From<#helper_name<T>> for RuntimeParam
where
T: DefaultParam<'a>,
T: DefaultParam,
{
fn from(value: #helper_name<'a, T>) -> Self {
fn from(value: #helper_name<T>) -> Self {
let #helper_name { #(#field_names,)* .. } = value;
Self {
#(#field_names,)*
}
}
}

impl<'a, T: DefaultParam<'a>> Default for #helper_name<'a, T> {
impl<T> Default for #helper_name<T>
where
T: DefaultParam,
{
fn default() -> Self {
T::param().into()
}
Expand Down

0 comments on commit cf08c08

Please sign in to comment.