Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide zenoh_config internals #1419

Merged
merged 16 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal = []

[dependencies]
tracing = { workspace = true }
flume = { workspace = true }
json5 = { workspace = true }
num_cpus = { workspace = true }
serde = { workspace = true, features = ["default"] }
Expand Down
249 changes: 11 additions & 238 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@ pub mod wrappers;
#[allow(unused_imports)]
use std::convert::TryFrom; // This is a false positive from the rust analyser
use std::{
any::Any,
collections::HashSet,
fmt,
io::Read,
net::SocketAddr,
path::Path,
sync::{Arc, Mutex, MutexGuard, Weak},
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak,
};

use include::recursive_include;
Expand All @@ -36,7 +30,6 @@ use serde_json::{Map, Value};
use validated_struct::ValidatedMapAssociatedTypes;
pub use validated_struct::{GetError, ValidatedMap};
pub use wrappers::ZenohId;
use zenoh_core::zlock;
pub use zenoh_protocol::core::{
whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
};
Expand All @@ -57,7 +50,7 @@ pub use connection_retry::*;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct SecretString(String);

impl Deref for SecretString {
impl ops::Deref for SecretString {
type Target = String;

fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -229,16 +222,12 @@ fn config_keys() {
}

validated_struct::validator! {
/// The main configuration structure for Zenoh.
///
/// Most fields are optional as a way to keep defaults flexible. Some of the fields have different default values depending on the rest of the configuration.
///
/// To construct a configuration, we advise that you use a configuration file (JSON, JSON5 and YAML are currently supported, please use the proper extension for your format as the deserializer will be picked according to it).
#[derive(Default)]
#[recursive_attrs]
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
#[serde(default)]
#[serde(deny_unknown_fields)]
#[doc(hidden)]
Config {
/// The Zenoh ID of the instance. This ID MUST be unique throughout your Zenoh infrastructure and cannot exceed 16 bytes of length. If left unset, a random u128 will be generated.
id: ZenohId,
Expand Down Expand Up @@ -708,10 +697,7 @@ impl Config {

pub fn remove<K: AsRef<str>>(&mut self, key: K) -> ZResult<()> {
let key = key.as_ref();
self._remove(key)
}

fn _remove(&mut self, key: &str) -> ZResult<()> {
let key = key.strip_prefix('/').unwrap_or(key);
if !key.starts_with("plugins/") {
bail!(
Expand All @@ -720,6 +706,14 @@ impl Config {
}
self.plugins.remove(&key["plugins/".len()..])
}

pub fn get_retry_config(
&self,
endpoint: Option<&EndPoint>,
listen: bool,
) -> ConnectionRetryConf {
get_retry_config(self, endpoint, listen)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -820,227 +814,6 @@ fn config_from_json() {
println!("{}", serde_json::to_string_pretty(&config).unwrap());
}

pub type Notification = Arc<str>;

struct NotifierInner<T> {
inner: Mutex<T>,
subscribers: Mutex<Vec<flume::Sender<Notification>>>,
}
pub struct Notifier<T> {
inner: Arc<NotifierInner<T>>,
}
impl<T> Clone for Notifier<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl Notifier<Config> {
pub fn remove<K: AsRef<str>>(&self, key: K) -> ZResult<()> {
let key = key.as_ref();
self._remove(key)
}

fn _remove(&self, key: &str) -> ZResult<()> {
{
let mut guard = zlock!(self.inner.inner);
guard.remove(key)?;
}
self.notify(key);
Ok(())
}
}
impl<T: ValidatedMap> Notifier<T> {
pub fn new(inner: T) -> Self {
Notifier {
inner: Arc::new(NotifierInner {
inner: Mutex::new(inner),
subscribers: Mutex::new(Vec::new()),
}),
}
}
pub fn subscribe(&self) -> flume::Receiver<Notification> {
let (tx, rx) = flume::unbounded();
{
zlock!(self.inner.subscribers).push(tx);
}
rx
}
pub fn notify<K: AsRef<str>>(&self, key: K) {
let key = key.as_ref();
self._notify(key);
}
fn _notify(&self, key: &str) {
let key: Arc<str> = Arc::from(key);
let mut marked = Vec::new();
let mut guard = zlock!(self.inner.subscribers);
for (i, sub) in guard.iter().enumerate() {
if sub.send(key.clone()).is_err() {
marked.push(i)
}
}
for i in marked.into_iter().rev() {
guard.swap_remove(i);
}
}

pub fn lock(&self) -> MutexGuard<T> {
zlock!(self.inner.inner)
}
}

impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for Notifier<T> {
type Accessor = GetGuard<'a, T>;
}
impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for &Notifier<T> {
type Accessor = GetGuard<'a, T>;
}
impl<T: ValidatedMap + 'static> ValidatedMap for Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
{
let mut guard = zlock!(self.inner.inner);
guard.insert(key, value)?;
}
self.notify(key);
Ok(())
}
fn get<'a>(
&'a self,
key: &str,
) -> Result<<Self as validated_struct::ValidatedMapAssociatedTypes<'a>>::Accessor, GetError>
{
let guard: MutexGuard<'a, T> = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}
fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}
type Keys = T::Keys;
fn keys(&self) -> Self::Keys {
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> ValidatedMap for &Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
{
let mut guard = zlock!(self.inner.inner);
guard.insert(key, value)?;
}
self.notify(key);
Ok(())
}
fn get<'a>(
&'a self,
key: &str,
) -> Result<<Self as validated_struct::ValidatedMapAssociatedTypes<'a>>::Accessor, GetError>
{
let guard: MutexGuard<'a, T> = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}
fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}
type Keys = T::Keys;
fn keys(&self) -> Self::Keys {
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
pub fn insert<'d, D: serde::Deserializer<'d>>(
&self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
self.lock().insert(key, value)?;
self.notify(key);
Ok(())
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
let guard = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}

pub fn insert_json5(
&self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
self.insert(key, &mut json5::Deserializer::from_str(value)?)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
self.lock().keys().into_iter()
}
}

pub struct GetGuard<'a, T> {
_guard: MutexGuard<'a, T>,
subref: *const dyn Any,
}
use std::ops::Deref;
impl<'a, T> Deref for GetGuard<'a, T> {
type Target = dyn Any;

fn deref(&self) -> &Self::Target {
unsafe { &*self.subref }
}
}
impl<'a, T> AsRef<dyn Any> for GetGuard<'a, T> {
fn as_ref(&self) -> &dyn Any {
self.deref()
}
}

fn sequence_number_resolution_validator(b: &Bits) -> bool {
b <= &Bits::from(TransportSn::MAX)
}
Expand Down
20 changes: 20 additions & 0 deletions commons/zenoh-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,26 @@ pub fn unstable(attr: TokenStream, tokens: TokenStream) -> TokenStream {
TokenStream::from(item.to_token_stream())
}

// FIXME(fuzzypixelz): refactor `unstable` macro to accept arguments
#[proc_macro_attribute]
pub fn unstable_config(args: TokenStream, tokens: TokenStream) -> TokenStream {
let tokens = unstable_doc(args, tokens);
let mut item = match parse_annotable_item!(tokens) {
Ok(item) => item,
Err(err) => return err.into_compile_error().into(),
};

let attrs = match item.attributes_mut() {
Ok(attrs) => attrs,
Err(err) => return err.into_compile_error().into(),
};

let feature_gate: Attribute = parse_quote!(#[cfg(feature = "unstable_config")]);
attrs.push(feature_gate);

TokenStream::from(item.to_token_stream())
}

#[proc_macro_attribute]
/// Adds a `#[cfg(feature = "internal")]` and `#[doc(hidden)]` attributes to the item.
pub fn internal(_attr: TokenStream, tokens: TokenStream) -> TokenStream {
Expand Down
Loading