Skip to content

Commit

Permalink
feat!: make session an arc-like object (#1364)
Browse files Browse the repository at this point in the history
* feat!: bind callback subscriber/queryable to session lifetime

To determine if the entity is callback-only, the only elegant way I've found is the rule "handler is ZST means callback-only". Unless users starts writing fancy implementations, it should be correct 100% of the time.

Session entities now uses weak references, except publishers because it would impact performances. Weak references also solves the issue of mass undeclarations before closing the session (when the session is an `Arc`), except for publishers.

`Undeclarable` trait has been refactored a little bit to better match its use in the code.

* fix: fix example

* fix: fix example

* fix: add missing comment about ZST trick

* Update zenoh/src/api/key_expr.rs

Co-authored-by: Luca Cominardi <[email protected]>

* fix: formatting

* fix: don't use `Weak` when undeclared on drop

* feat!: make session an arc-like object

The refactoring is quite deep, so this is the first (dirty) iteration
which passes the tests.

* fix: use weak everywhere!

* fix: fix doc

* feat: use pseudo-weak session with the same perf than arc

* fix: fix resource cleanup

* Fix typo

* fix: align `MatchingListener` undeclaration on drop behavior

* refactor: add comments about `WeakSession`

* refactor: add comment for `Session` and `Session::close` (#1369)

* refactor: add comment for `Session` and `Session::close`

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* fix: don't run `Session::close` example

* fix: fix `Session` example

* fix: fix `Session` doc

---------

Co-authored-by: Luca Cominardi <[email protected]>

* feat: use builder method instead of handler type for undeclaration on drop (#1377)

* refactor: add comment for `Session` and `Session::close`

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* Update zenoh/src/api/session.rs

Co-authored-by: Luca Cominardi <[email protected]>

* fix: don't run `Session::close` example

* fix: fix `Session` example

* fix: fix `Session` doc

* feat: use builder method instead of handler type for undeclaration on drop

---------

Co-authored-by: Luca Cominardi <[email protected]>

* chore: merge main into dev/arcsession (#1378)

* Fix bug with QueryTarget ALL_COMPLETE in clients and peers (#1358)

* Fix bug with QueryTarget ALL_COMPLETE in clients and peers

* Fix BEST_MATCHING queryable selection

* Properly fix Query targeting in non writer side filtering situations

* Improve fix

* Update zenoh/src/net/routing/hat/linkstate_peer/queries.rs

Co-authored-by: Joseph Perez <[email protected]>

* Update zenoh/src/net/routing/hat/p2p_peer/queries.rs

Co-authored-by: Joseph Perez <[email protected]>

* Update zenoh/src/net/routing/hat/router/queries.rs

Co-authored-by: Joseph Perez <[email protected]>

* Update zenoh/src/net/routing/hat/client/queries.rs

Co-authored-by: Joseph Perez <[email protected]>

* Remove non used ordered-float dependency

---------

Co-authored-by: Luca Cominardi <[email protected]>
Co-authored-by: Joseph Perez <[email protected]>

* fix: publisher should not be clonable (#1370)

* made builder traits internal (#1376)

* scaffolding macro added

* builder traits made internal

* doc corrected

* cargo fmt

* typo fix

* typo fix

* Fix bugs querying liveliness tokens (#1374)

* Fix bug in liveliness get in client

* Fix bug treating token interests replies from routers in peers

* Peers propagate current token interests to remote peers with unfinalize initial declarations push

* Don't register current interests declaration replies

* Add comments

* Add comments

* Add comments

---------

Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
Co-authored-by: Michael Ilyin <[email protected]>

* Revert "chore: merge main into dev/arcsession (#1378)" (#1379)

This reverts commit f3f5403.

* fix: various fixes

* fix: various fixes (2)

* chore: merge branch 'main' into 'dev/arcsession' (#1384)

* feat(zenoh_id): exposing into slice & try from slice

Allowing users to create a ZenohId from a slice, using TryFrom, and also
allowing users to convert a ZenohId into a [u8; 16].

* Add LivlinessSubscriber history option (#1355)

* Close #1357

* feat(zenoh_id): replacing from slice with  function

---------

Co-authored-by: Darius Maitia <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>

* refactor: use `IntoHandler` associated const for undeclaration on drop

* refactor: remove prelude

* fix: fix config example

* fix: fix zenoh-ext examples

* fix: fix plugins examples

* fix: fix merge

* Fix shm test

* Fix clippy warnings

* refactor: use builder method for undeclaration on drop

* fix: remove useless lifetimes

* fix: add missing unstable flag

* feat: add tuple implementation for IntoHandler

---------

Co-authored-by: Joseph Perez <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Michael Ilyin <[email protected]>
Co-authored-by: Darius Maitia <[email protected]>
  • Loading branch information
5 people authored Sep 12, 2024
1 parent a2eaf2e commit 77250fb
Show file tree
Hide file tree
Showing 75 changed files with 1,286 additions and 1,725 deletions.
2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;

use zenoh::{config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{config::Config, key_expr::KeyExpr};

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration};
use zenoh::{
config::Config,
key_expr::KeyExpr,
prelude::*,
query::{QueryTarget, Selector},
};

Expand Down
84 changes: 81 additions & 3 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,8 @@ pub fn client<I: IntoIterator<Item = T>, T: Into<EndPoint>>(peers: I) -> Config

#[test]
fn config_keys() {
use validated_struct::ValidatedMap;
let c = Config::default();
dbg!(c.keys());
dbg!(Vec::from_iter(c.keys()));
}

validated_struct::validator! {
Expand Down Expand Up @@ -659,6 +658,40 @@ fn config_deser() {
}

impl Config {
pub fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
<Self as ValidatedMap>::insert(self, key, value)
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
<Self as ValidatedMap>::get(self, key)
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
<Self as ValidatedMap>::get_json(self, key)
}

pub fn insert_json5(
&mut self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
<Self as ValidatedMap>::insert_json5(self, key, value)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
<Self as ValidatedMap>::keys(self).into_iter()
}

pub fn set_plugin_validator<T: ConfigValidator + 'static>(&mut self, validator: Weak<T>) {
self.plugins.validator = validator;
}
Expand Down Expand Up @@ -778,7 +811,6 @@ impl std::fmt::Display for Config {

#[test]
fn config_from_json() {
use validated_struct::ValidatedMap;
let from_str = serde_json::Deserializer::from_str;
let mut config = Config::from_deserializer(&mut from_str(r#"{}"#)).unwrap();
config
Expand Down Expand Up @@ -944,6 +976,52 @@ where
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
pub fn insert<'d, D: serde::Deserializer<'d>>(
&self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
self.lock().insert(key, value)?;
self.notify(key);
Ok(())
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
let guard = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}

pub fn insert_json5(
&self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
self.insert(key, &mut json5::Deserializer::from_str(value)?)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
self.lock().keys().into_iter()
}
}

pub struct GetGuard<'a, T> {
_guard: MutexGuard<'a, T>,
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_alloc_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use zenoh::{
prelude::*,
shm::{
AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, PosixShmProviderBackend,
ShmProviderBuilder, POSIX_PROTOCOL_ID,
Expand All @@ -27,7 +26,7 @@ async fn main() {
run().await.unwrap()
}

async fn run() -> ZResult<()> {
async fn run() -> zenoh::Result<()> {
// create an SHM backend...
// NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs
let backend = PosixShmProviderBackend::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_bytes_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
//
use zenoh::{
bytes::ZBytes,
prelude::*,
shm::{
zshm, zshmmut, PosixShmProviderBackend, ShmProviderBuilder, ZShm, ZShmMut,
POSIX_PROTOCOL_ID,
},
Wait,
};

fn main() {
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;
use zenoh_ext::SubscriberForward;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{prelude::*, session::ZenohId};
use zenoh::session::ZenohId;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::{Duration, Instant};

use clap::Parser;
use zenoh::{bytes::ZBytes, key_expr::keyexpr, prelude::*, qos::CongestionControl, Config};
use zenoh::{bytes::ZBytes, key_expr::keyexpr, qos::CongestionControl, Config, Wait};
use zenoh_examples::CommonArgs;

fn main() {
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_ping_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::keyexpr,
prelude::*,
qos::CongestionControl,
shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID},
Config, Wait,
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::keyexpr, prelude::*, qos::CongestionControl, Config};
use zenoh::{key_expr::keyexpr, qos::CongestionControl, Config, Wait};
use zenoh_examples::CommonArgs;

fn main() {
Expand All @@ -21,7 +21,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).wait().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{bytes::Encoding, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{bytes::Encoding, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use clap::Parser;
use zenoh::{
key_expr::KeyExpr,
prelude::*,
shm::{
BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID,
},
Expand All @@ -25,7 +24,7 @@ use zenoh_examples::CommonArgs;
const N: usize = 10;

#[tokio::main]
async fn main() -> Result<(), ZError> {
async fn main() -> zenoh::Result<()> {
// Initiate logging
zenoh::init_log_from_env_or("error");

Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use clap::Parser;
use zenoh::{
bytes::ZBytes,
prelude::*,
qos::CongestionControl,
shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID},
Config, Wait,
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::convert::TryInto;
use clap::Parser;
use zenoh::{
bytes::ZBytes,
prelude::*,
qos::{CongestionControl, Priority},
Wait,
};
use zenoh_examples::CommonArgs;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use futures::select;
use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::*,
sample::{Sample, SampleKind},
Config,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config};
use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
6 changes: 2 additions & 4 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Instant;

use clap::Parser;
use zenoh::{prelude::*, Config};
use zenoh::{Config, Wait};
use zenoh_examples::CommonArgs;

struct Stats {
Expand Down Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Check ../README.md for usage.
//!
use zenoh::config::{Config, ValidatedMap};
use zenoh::config::Config;

#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum Wai {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::{hash_map::Entry, HashMap};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, prelude::*, time::Timestamp};
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down
Loading

0 comments on commit 77250fb

Please sign in to comment.