Skip to content

Commit

Permalink
Hide zenoh_config internals (#1419)
Browse files Browse the repository at this point in the history
* Hide `zenoh_config` internals

* Auto-enable `unstable_config` if `unstable` is enabled

* Mark questionable API items as `unstable`

* Add (De)Serialize impls

* Fix `Config::from_deserializer`

* Update `zenoh/tests/liveliness.rs`

* Add `Config::get_json`

* Add `Config::from_json5` and docs

* Fix typo

* Fix silly errors

* Fix Config::from_json5 error message

* Use `ZResult` everywhere, make `Config::get_json` stable

* Remove LookupGuard

* Add back `Notifier<Config>::get`

* Remove `validated_struct` dependency

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
fuzzypixelz and Mallets authored Sep 17, 2024
1 parent dfb3a40 commit 7f88264
Show file tree
Hide file tree
Showing 49 changed files with 838 additions and 705 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ internal = []

[dependencies]
tracing = { workspace = true }
flume = { workspace = true }
json5 = { workspace = true }
num_cpus = { workspace = true }
serde = { workspace = true, features = ["default"] }
Expand Down
2 changes: 0 additions & 2 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
//
use super::*;

pub const ENV: &str = "ZENOH_CONFIG";

macro_rules! mode_accessor {
($type:ty) => {
#[inline]
Expand Down
255 changes: 11 additions & 244 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@ pub mod wrappers;
#[allow(unused_imports)]
use std::convert::TryFrom; // This is a false positive from the rust analyser
use std::{
any::Any,
collections::HashSet,
fmt,
io::Read,
net::SocketAddr,
path::Path,
sync::{Arc, Mutex, MutexGuard, Weak},
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak,
};

use include::recursive_include;
Expand All @@ -36,7 +30,6 @@ use serde_json::{Map, Value};
use validated_struct::ValidatedMapAssociatedTypes;
pub use validated_struct::{GetError, ValidatedMap};
pub use wrappers::ZenohId;
use zenoh_core::zlock;
pub use zenoh_protocol::core::{
whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
};
Expand All @@ -57,7 +50,7 @@ pub use connection_retry::*;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct SecretString(String);

impl Deref for SecretString {
impl ops::Deref for SecretString {
type Target = String;

fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -229,16 +222,12 @@ fn config_keys() {
}

validated_struct::validator! {
/// The main configuration structure for Zenoh.
///
/// Most fields are optional as a way to keep defaults flexible. Some of the fields have different default values depending on the rest of the configuration.
///
/// To construct a configuration, we advise that you use a configuration file (JSON, JSON5 and YAML are currently supported, please use the proper extension for your format as the deserializer will be picked according to it).
#[derive(Default)]
#[recursive_attrs]
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
#[serde(default)]
#[serde(deny_unknown_fields)]
#[doc(hidden)]
Config {
/// The Zenoh ID of the instance. This ID MUST be unique throughout your Zenoh infrastructure and cannot exceed 16 bytes of length. If left unset, a random u128 will be generated.
id: ZenohId,
Expand Down Expand Up @@ -708,10 +697,7 @@ impl Config {

pub fn remove<K: AsRef<str>>(&mut self, key: K) -> ZResult<()> {
let key = key.as_ref();
self._remove(key)
}

fn _remove(&mut self, key: &str) -> ZResult<()> {
let key = key.strip_prefix('/').unwrap_or(key);
if !key.starts_with("plugins/") {
bail!(
Expand All @@ -720,6 +706,14 @@ impl Config {
}
self.plugins.remove(&key["plugins/".len()..])
}

pub fn get_retry_config(
&self,
endpoint: Option<&EndPoint>,
listen: bool,
) -> ConnectionRetryConf {
get_retry_config(self, endpoint, listen)
}
}

#[derive(Debug)]
Expand All @@ -743,12 +737,6 @@ impl std::fmt::Display for ConfigOpenErr {
}
impl std::error::Error for ConfigOpenErr {}
impl Config {
pub fn from_env() -> ZResult<Self> {
let path = std::env::var(defaults::ENV)
.map_err(|e| zerror!("Invalid ENV variable ({}): {}", defaults::ENV, e))?;
Self::from_file(path.as_str())
}

pub fn from_file<P: AsRef<Path>>(path: P) -> ZResult<Self> {
let path = path.as_ref();
let mut config = Self::_from_file(path)?;
Expand Down Expand Up @@ -820,227 +808,6 @@ fn config_from_json() {
println!("{}", serde_json::to_string_pretty(&config).unwrap());
}

pub type Notification = Arc<str>;

struct NotifierInner<T> {
inner: Mutex<T>,
subscribers: Mutex<Vec<flume::Sender<Notification>>>,
}
pub struct Notifier<T> {
inner: Arc<NotifierInner<T>>,
}
impl<T> Clone for Notifier<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl Notifier<Config> {
pub fn remove<K: AsRef<str>>(&self, key: K) -> ZResult<()> {
let key = key.as_ref();
self._remove(key)
}

fn _remove(&self, key: &str) -> ZResult<()> {
{
let mut guard = zlock!(self.inner.inner);
guard.remove(key)?;
}
self.notify(key);
Ok(())
}
}
impl<T: ValidatedMap> Notifier<T> {
pub fn new(inner: T) -> Self {
Notifier {
inner: Arc::new(NotifierInner {
inner: Mutex::new(inner),
subscribers: Mutex::new(Vec::new()),
}),
}
}
pub fn subscribe(&self) -> flume::Receiver<Notification> {
let (tx, rx) = flume::unbounded();
{
zlock!(self.inner.subscribers).push(tx);
}
rx
}
pub fn notify<K: AsRef<str>>(&self, key: K) {
let key = key.as_ref();
self._notify(key);
}
fn _notify(&self, key: &str) {
let key: Arc<str> = Arc::from(key);
let mut marked = Vec::new();
let mut guard = zlock!(self.inner.subscribers);
for (i, sub) in guard.iter().enumerate() {
if sub.send(key.clone()).is_err() {
marked.push(i)
}
}
for i in marked.into_iter().rev() {
guard.swap_remove(i);
}
}

pub fn lock(&self) -> MutexGuard<T> {
zlock!(self.inner.inner)
}
}

impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for Notifier<T> {
type Accessor = GetGuard<'a, T>;
}
impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for &Notifier<T> {
type Accessor = GetGuard<'a, T>;
}
impl<T: ValidatedMap + 'static> ValidatedMap for Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
{
let mut guard = zlock!(self.inner.inner);
guard.insert(key, value)?;
}
self.notify(key);
Ok(())
}
fn get<'a>(
&'a self,
key: &str,
) -> Result<<Self as validated_struct::ValidatedMapAssociatedTypes<'a>>::Accessor, GetError>
{
let guard: MutexGuard<'a, T> = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}
fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}
type Keys = T::Keys;
fn keys(&self) -> Self::Keys {
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> ValidatedMap for &Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
{
let mut guard = zlock!(self.inner.inner);
guard.insert(key, value)?;
}
self.notify(key);
Ok(())
}
fn get<'a>(
&'a self,
key: &str,
) -> Result<<Self as validated_struct::ValidatedMapAssociatedTypes<'a>>::Accessor, GetError>
{
let guard: MutexGuard<'a, T> = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}
fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}
type Keys = T::Keys;
fn keys(&self) -> Self::Keys {
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
pub fn insert<'d, D: serde::Deserializer<'d>>(
&self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
self.lock().insert(key, value)?;
self.notify(key);
Ok(())
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
let guard = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}

pub fn insert_json5(
&self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
self.insert(key, &mut json5::Deserializer::from_str(value)?)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
self.lock().keys().into_iter()
}
}

pub struct GetGuard<'a, T> {
_guard: MutexGuard<'a, T>,
subref: *const dyn Any,
}
use std::ops::Deref;
impl<'a, T> Deref for GetGuard<'a, T> {
type Target = dyn Any;

fn deref(&self) -> &Self::Target {
unsafe { &*self.subref }
}
}
impl<'a, T> AsRef<dyn Any> for GetGuard<'a, T> {
fn as_ref(&self) -> &dyn Any {
self.deref()
}
}

fn sequence_number_resolution_validator(b: &Bits) -> bool {
b <= &Bits::from(TransportSn::MAX)
}
Expand Down
Loading

0 comments on commit 7f88264

Please sign in to comment.