Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make session an arc-like object #1364

Merged
merged 49 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
147ebc3
feat!: bind callback subscriber/queryable to session lifetime
wyfo Aug 28, 2024
82e7b16
fix: fix example
wyfo Aug 28, 2024
15fff73
fix: fix example
wyfo Aug 28, 2024
a1ff4a6
fix: add missing comment about ZST trick
wyfo Sep 2, 2024
0d4423b
Update zenoh/src/api/key_expr.rs
wyfo Sep 2, 2024
150c497
fix: formatting
wyfo Sep 2, 2024
baa2426
fix: don't use `Weak` when undeclared on drop
wyfo Sep 3, 2024
2c52a3c
feat!: make session an arc-like object
wyfo Sep 3, 2024
155c923
fix: use weak everywhere!
wyfo Sep 3, 2024
5dfd6f3
fix: fix doc
wyfo Sep 4, 2024
a938c78
feat: use pseudo-weak session with the same perf than arc
wyfo Sep 4, 2024
2ef2446
fix: fix resource cleanup
wyfo Sep 5, 2024
18f95c0
Merge pull request #1347 from ZettaScaleLabs/arc_session
Mallets Sep 6, 2024
8f159ea
Fix typo
Mallets Sep 6, 2024
bb9de5a
fix: align `MatchingListener` undeclaration on drop behavior
wyfo Sep 6, 2024
982482f
Merge pull request #1366 from ZettaScaleLabs/arc_session2
Mallets Sep 6, 2024
f2afe70
refactor: add comments about `WeakSession`
wyfo Sep 6, 2024
6762288
Merge pull request #1368 from ZettaScaleLabs/arc_session3
Mallets Sep 6, 2024
e790a43
Merge branch 'main' into dev/arcsession
Mallets Sep 6, 2024
8f15a04
refactor: add comment for `Session` and `Session::close` (#1369)
wyfo Sep 9, 2024
2febb76
feat: use builder method instead of handler type for undeclaration on…
wyfo Sep 9, 2024
357a8fd
Merge branch 'main' into dev/arcsession
wyfo Sep 9, 2024
f3f5403
chore: merge main into dev/arcsession (#1378)
wyfo Sep 9, 2024
1c7f5c1
Revert "chore: merge main into dev/arcsession (#1378)" (#1379)
Mallets Sep 9, 2024
9d448ea
Merge pull request #1380 from ZettaScaleLabs/arc_session6
Mallets Sep 9, 2024
21f544a
fix: various fixes
wyfo Sep 9, 2024
6915be7
fix: various fixes (2)
wyfo Sep 9, 2024
8cd6914
Merge pull request #1382 from ZettaScaleLabs/arc_session7
Mallets Sep 9, 2024
dff8456
chore: merge branch 'main' into 'dev/arcsession' (#1384)
wyfo Sep 9, 2024
c5a3b64
Merge branch 'main' into arc_session9
wyfo Sep 10, 2024
25a8dbe
refactor: use `IntoHandler` associated const for undeclaration on drop
wyfo Sep 10, 2024
9bfa333
Merge remote-tracking branch 'upstream/main' into arc_session9
wyfo Sep 10, 2024
91cf704
refactor: remove prelude
wyfo Sep 10, 2024
5eac34a
fix: fix config example
wyfo Sep 10, 2024
f79df29
fix: fix zenoh-ext examples
wyfo Sep 10, 2024
83d31d8
fix: fix plugins examples
wyfo Sep 11, 2024
588b641
Merge pull request #1392 from ZettaScaleLabs/arc_session9
Mallets Sep 11, 2024
a33b6d5
Merge remote-tracking branch 'upstream/main' into arc_session10
wyfo Sep 11, 2024
7eb2123
Merge remote-tracking branch 'upstream/main' into arc_session10
wyfo Sep 11, 2024
809212e
fix: fix merge
wyfo Sep 11, 2024
aa12e1d
Merge pull request #1400 from ZettaScaleLabs/arc_session10
Mallets Sep 11, 2024
e33578f
Fix shm test
Mallets Sep 11, 2024
467d01d
Merge branch 'main' into dev/arcsession
Mallets Sep 11, 2024
88b05ee
Fix clippy warnings
Mallets Sep 11, 2024
09317d6
refactor: use builder method for undeclaration on drop
wyfo Sep 11, 2024
f83eb8f
fix: remove useless lifetimes
wyfo Sep 11, 2024
acab2bf
fix: add missing unstable flag
wyfo Sep 11, 2024
9ff1756
feat: add tuple implementation for IntoHandler
wyfo Sep 11, 2024
e8fcd62
Merge pull request #1403 from ZettaScaleLabs/arc_session11
Mallets Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;

use zenoh::{config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{config::Config, key_expr::KeyExpr};

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration};
use zenoh::{
config::Config,
key_expr::KeyExpr,
prelude::*,
query::{QueryTarget, Selector},
};

Expand Down
84 changes: 81 additions & 3 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,8 @@ pub fn client<I: IntoIterator<Item = T>, T: Into<EndPoint>>(peers: I) -> Config

#[test]
fn config_keys() {
use validated_struct::ValidatedMap;
let c = Config::default();
dbg!(c.keys());
dbg!(Vec::from_iter(c.keys()));
}

validated_struct::validator! {
Expand Down Expand Up @@ -659,6 +658,40 @@ fn config_deser() {
}

impl Config {
pub fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
<Self as ValidatedMap>::insert(self, key, value)
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
<Self as ValidatedMap>::get(self, key)
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
<Self as ValidatedMap>::get_json(self, key)
}

pub fn insert_json5(
&mut self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
<Self as ValidatedMap>::insert_json5(self, key, value)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
<Self as ValidatedMap>::keys(self).into_iter()
}

pub fn set_plugin_validator<T: ConfigValidator + 'static>(&mut self, validator: Weak<T>) {
self.plugins.validator = validator;
}
Expand Down Expand Up @@ -778,7 +811,6 @@ impl std::fmt::Display for Config {

#[test]
fn config_from_json() {
use validated_struct::ValidatedMap;
let from_str = serde_json::Deserializer::from_str;
let mut config = Config::from_deserializer(&mut from_str(r#"{}"#)).unwrap();
config
Expand Down Expand Up @@ -944,6 +976,52 @@ where
self.lock().keys()
}
}
impl<T: ValidatedMap + 'static> Notifier<T>
where
T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>,
{
pub fn insert<'d, D: serde::Deserializer<'d>>(
&self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
self.lock().insert(key, value)?;
self.notify(key);
Ok(())
}

pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes>::Accessor, GetError> {
let guard = zlock!(self.inner.inner);
// SAFETY: MutexGuard pins the mutex behind which the value is held.
let subref = guard.get(key.as_ref())? as *const _;
Ok(GetGuard {
_guard: guard,
subref,
})
}

pub fn get_json(&self, key: &str) -> Result<String, GetError> {
self.lock().get_json(key)
}

pub fn insert_json5(
&self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
self.insert(key, &mut json5::Deserializer::from_str(value)?)
}

pub fn keys(&self) -> impl Iterator<Item = String> {
self.lock().keys().into_iter()
}
}

pub struct GetGuard<'a, T> {
_guard: MutexGuard<'a, T>,
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_alloc_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use zenoh::{
prelude::*,
shm::{
AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, PosixShmProviderBackend,
ShmProviderBuilder, POSIX_PROTOCOL_ID,
Expand All @@ -27,7 +26,7 @@ async fn main() {
run().await.unwrap()
}

async fn run() -> ZResult<()> {
async fn run() -> zenoh::Result<()> {
// create an SHM backend...
// NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs
let backend = PosixShmProviderBackend::builder()
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_bytes_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
//
use zenoh::{
bytes::ZBytes,
prelude::*,
shm::{
zshm, zshmmut, PosixShmProviderBackend, ShmProviderBuilder, ZShm, ZShmMut,
POSIX_PROTOCOL_ID,
},
Wait,
};

fn main() {
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;
use zenoh_ext::SubscriberForward;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{prelude::*, session::ZenohId};
use zenoh::session::ZenohId;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::{Duration, Instant};

use clap::Parser;
use zenoh::{bytes::ZBytes, key_expr::keyexpr, prelude::*, qos::CongestionControl, Config};
use zenoh::{bytes::ZBytes, key_expr::keyexpr, qos::CongestionControl, Config, Wait};
use zenoh_examples::CommonArgs;

fn main() {
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_ping_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::keyexpr,
prelude::*,
qos::CongestionControl,
shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID},
Config, Wait,
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::keyexpr, prelude::*, qos::CongestionControl, Config};
use zenoh::{key_expr::keyexpr, qos::CongestionControl, Config, Wait};
use zenoh_examples::CommonArgs;

fn main() {
Expand All @@ -21,7 +21,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).wait().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{bytes::Encoding, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{bytes::Encoding, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use clap::Parser;
use zenoh::{
key_expr::KeyExpr,
prelude::*,
shm::{
BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID,
},
Expand All @@ -25,7 +24,7 @@ use zenoh_examples::CommonArgs;
const N: usize = 10;

#[tokio::main]
async fn main() -> Result<(), ZError> {
async fn main() -> zenoh::Result<()> {
// Initiate logging
zenoh::init_log_from_env_or("error");

Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use clap::Parser;
use zenoh::{
bytes::ZBytes,
prelude::*,
qos::CongestionControl,
shm::{PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID},
Config, Wait,
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::convert::TryInto;
use clap::Parser;
use zenoh::{
bytes::ZBytes,
prelude::*,
qos::{CongestionControl, Priority},
Wait,
};
use zenoh_examples::CommonArgs;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use futures::select;
use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::*,
sample::{Sample, SampleKind},
Config,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config};
use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
6 changes: 2 additions & 4 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Instant;

use clap::Parser;
use zenoh::{prelude::*, Config};
use zenoh::{Config, Wait};
use zenoh_examples::CommonArgs;

struct Stats {
Expand Down Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Check ../README.md for usage.
//!

use zenoh::config::{Config, ValidatedMap};
use zenoh::config::Config;

#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum Wai {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::{hash_map::Entry, HashMap};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, prelude::*, time::Timestamp};
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down
Loading