Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make session an arc-like object #1364

Merged
merged 49 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
147ebc3
feat!: bind callback subscriber/queryable to session lifetime
wyfo Aug 28, 2024
82e7b16
fix: fix example
wyfo Aug 28, 2024
15fff73
fix: fix example
wyfo Aug 28, 2024
a1ff4a6
fix: add missing comment about ZST trick
wyfo Sep 2, 2024
0d4423b
Update zenoh/src/api/key_expr.rs
wyfo Sep 2, 2024
150c497
fix: formatting
wyfo Sep 2, 2024
baa2426
fix: don't use `Weak` when undeclared on drop
wyfo Sep 3, 2024
2c52a3c
feat!: make session an arc-like object
wyfo Sep 3, 2024
155c923
fix: use weak everywhere!
wyfo Sep 3, 2024
5dfd6f3
fix: fix doc
wyfo Sep 4, 2024
a938c78
feat: use pseudo-weak session with the same perf than arc
wyfo Sep 4, 2024
2ef2446
fix: fix resource cleanup
wyfo Sep 5, 2024
18f95c0
Merge pull request #1347 from ZettaScaleLabs/arc_session
Mallets Sep 6, 2024
8f159ea
Fix typo
Mallets Sep 6, 2024
bb9de5a
fix: align `MatchingListener` undeclaration on drop behavior
wyfo Sep 6, 2024
982482f
Merge pull request #1366 from ZettaScaleLabs/arc_session2
Mallets Sep 6, 2024
f2afe70
refactor: add comments about `WeakSession`
wyfo Sep 6, 2024
6762288
Merge pull request #1368 from ZettaScaleLabs/arc_session3
Mallets Sep 6, 2024
e790a43
Merge branch 'main' into dev/arcsession
Mallets Sep 6, 2024
8f15a04
refactor: add comment for `Session` and `Session::close` (#1369)
wyfo Sep 9, 2024
2febb76
feat: use builder method instead of handler type for undeclaration on…
wyfo Sep 9, 2024
357a8fd
Merge branch 'main' into dev/arcsession
wyfo Sep 9, 2024
f3f5403
chore: merge main into dev/arcsession (#1378)
wyfo Sep 9, 2024
1c7f5c1
Revert "chore: merge main into dev/arcsession (#1378)" (#1379)
Mallets Sep 9, 2024
9d448ea
Merge pull request #1380 from ZettaScaleLabs/arc_session6
Mallets Sep 9, 2024
21f544a
fix: various fixes
wyfo Sep 9, 2024
6915be7
fix: various fixes (2)
wyfo Sep 9, 2024
8cd6914
Merge pull request #1382 from ZettaScaleLabs/arc_session7
Mallets Sep 9, 2024
dff8456
chore: merge branch 'main' into 'dev/arcsession' (#1384)
wyfo Sep 9, 2024
c5a3b64
Merge branch 'main' into arc_session9
wyfo Sep 10, 2024
25a8dbe
refactor: use `IntoHandler` associated const for undeclaration on drop
wyfo Sep 10, 2024
9bfa333
Merge remote-tracking branch 'upstream/main' into arc_session9
wyfo Sep 10, 2024
91cf704
refactor: remove prelude
wyfo Sep 10, 2024
5eac34a
fix: fix config example
wyfo Sep 10, 2024
f79df29
fix: fix zenoh-ext examples
wyfo Sep 10, 2024
83d31d8
fix: fix plugins examples
wyfo Sep 11, 2024
588b641
Merge pull request #1392 from ZettaScaleLabs/arc_session9
Mallets Sep 11, 2024
a33b6d5
Merge remote-tracking branch 'upstream/main' into arc_session10
wyfo Sep 11, 2024
7eb2123
Merge remote-tracking branch 'upstream/main' into arc_session10
wyfo Sep 11, 2024
809212e
fix: fix merge
wyfo Sep 11, 2024
aa12e1d
Merge pull request #1400 from ZettaScaleLabs/arc_session10
Mallets Sep 11, 2024
e33578f
Fix shm test
Mallets Sep 11, 2024
467d01d
Merge branch 'main' into dev/arcsession
Mallets Sep 11, 2024
88b05ee
Fix clippy warnings
Mallets Sep 11, 2024
09317d6
refactor: use builder method for undeclaration on drop
wyfo Sep 11, 2024
f83eb8f
fix: remove useless lifetimes
wyfo Sep 11, 2024
acab2bf
fix: add missing unstable flag
wyfo Sep 11, 2024
9ff1756
feat: add tuple implementation for IntoHandler
wyfo Sep 11, 2024
e8fcd62
Merge pull request #1403 from ZettaScaleLabs/arc_session11
Mallets Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;

use zenoh::{config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{config::Config, key_expr::KeyExpr};

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration};
use zenoh::{
config::Config,
key_expr::KeyExpr,
prelude::*,
query::{QueryTarget, Selector},
};

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;
use zenoh_ext::SubscriberForward;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{prelude::*, session::ZenohId};
use zenoh::session::ZenohId;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).wait().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use futures::select;
use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::*,
sample::{Sample, SampleKind},
Config,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config};
use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
4 changes: 1 addition & 3 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
1 change: 0 additions & 1 deletion plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::ZResult,
sample::Sample,
session::SessionDeclarations,
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};

Expand Down
1 change: 0 additions & 1 deletion plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use zenoh::{
config::Config,
key_expr::keyexpr,
qos::{CongestionControl, QoSBuilderTrait},
session::SessionDeclarations,
};

const HTML: &str = r#"
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use zenoh::{
prelude::*,
query::{Parameters, QueryConsolidation, Reply, Selector, ZenohParameters},
sample::{Sample, SampleKind},
session::{Session, SessionDeclarations},
session::Session,
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};

Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
use flume::{Receiver, Sender};
use futures::{pin_mut, select, FutureExt};
use tokio::{sync::RwLock, time::interval};
use zenoh::{key_expr::keyexpr, prelude::*};
use zenoh::key_expr::keyexpr;
use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig};

use crate::{backends_mgt::StoreIntercept, storages_mgt::StorageMessage};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use zenoh::{
},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
session::{Session, SessionDeclarations},
session::Session,
time::{Timestamp, NTP64},
};
use zenoh_backend_traits::{
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/examples/examples/z_query_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::{arg, Parser};
use zenoh::{config::Config, prelude::*, query::ReplyKeyExpr};
use zenoh::{config::Config, query::ReplyKeyExpr};
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

Expand Down
25 changes: 12 additions & 13 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use zenoh::{
pubsub::FlumeSubscriber,
query::{Query, Queryable, ZenohParameters},
sample::{Locality, Sample},
session::{SessionDeclarations, SessionRef},
Error, Resolvable, Resolve, Result as ZResult,
Error, Resolvable, Resolve, Result as ZResult, Session,
};

/// The builder of PublicationCache, allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: SessionRef<'a>,
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Option<Locality>,
Expand All @@ -43,7 +42,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {

impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
pub(crate) fn new(
session: SessionRef<'a>,
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
) -> PublicationCacheBuilder<'a, 'b, 'c> {
PublicationCacheBuilder {
Expand Down Expand Up @@ -95,8 +94,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
}
}

impl<'a> Resolvable for PublicationCacheBuilder<'a, '_, '_> {
type To = ZResult<PublicationCache<'a>>;
impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
type To = ZResult<PublicationCache>;
}

impl Wait for PublicationCacheBuilder<'_, '_, '_> {
Expand All @@ -105,7 +104,7 @@ impl Wait for PublicationCacheBuilder<'_, '_, '_> {
}
}

impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> {
impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

Expand All @@ -114,14 +113,14 @@ impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> {
}
}

pub struct PublicationCache<'a> {
local_sub: FlumeSubscriber<'a>,
_queryable: Queryable<'a, flume::Receiver<Query>>,
pub struct PublicationCache {
local_sub: FlumeSubscriber,
_queryable: Queryable<flume::Receiver<Query>>,
task: TerminatableTask,
}

impl<'a> PublicationCache<'a> {
fn new(conf: PublicationCacheBuilder<'a, '_, '_>) -> ZResult<PublicationCache<'a>> {
impl PublicationCache {
fn new(conf: PublicationCacheBuilder<'_, '_, '_>) -> ZResult<PublicationCache> {
let key_expr = conf.pub_key_expr?;
// the queryable_prefix (optional), and the key_expr for PublicationCache's queryable ("[<queryable_prefix>]/<pub_key_expr>")
let (queryable_prefix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
Expand Down Expand Up @@ -258,7 +257,7 @@ impl<'a> PublicationCache<'a> {

/// Undeclare this [`PublicationCache`]`.
#[inline]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
ResolveFuture::new(async move {
let PublicationCache {
_queryable,
Expand Down
24 changes: 12 additions & 12 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ use zenoh::{
pubsub::{Reliability, Subscriber},
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait},
session::{SessionDeclarations, SessionRef},
time::Timestamp,
Error, Resolvable, Resolve, Result as ZResult,
Error, Resolvable, Resolve, Result as ZResult, Session,
};

use crate::ExtractSample;

/// The builder of [`FetchingSubscriber`], allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) session: SessionRef<'a>,
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
Expand Down Expand Up @@ -224,7 +223,7 @@ where
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
}

impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
Expand Down Expand Up @@ -362,7 +361,7 @@ pub struct FetchingSubscriberBuilder<
> where
TryIntoSample: ExtractSample,
{
pub(crate) session: SessionRef<'a>,
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
Expand Down Expand Up @@ -548,7 +547,7 @@ where
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
}

impl<
Expand Down Expand Up @@ -620,28 +619,29 @@ where
/// }
/// # }
/// ```
pub struct FetchingSubscriber<'a, Handler> {
subscriber: Subscriber<'a, ()>,
pub struct FetchingSubscriber<Handler> {
subscriber: Subscriber<()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}

impl<Handler> std::ops::Deref for FetchingSubscriber<'_, Handler> {
impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
&self.handler
}
}

impl<Handler> std::ops::DerefMut for FetchingSubscriber<'_, Handler> {
impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.handler
}
}

impl<'a, Handler> FetchingSubscriber<'a, Handler> {
impl<Handler> FetchingSubscriber<Handler> {
fn new<
'a,
KeySpace,
InputHandler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Expand Down Expand Up @@ -724,7 +724,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {

/// Undeclare this [`FetchingSubscriber`]`.
#[inline]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
self.subscriber.undeclare()
}

Expand Down
Loading
Loading