Skip to content

Commit

Permalink
User session answers LivelinessSubscribers history with locally known…
Browse files Browse the repository at this point in the history
… tokens
  • Loading branch information
OlivierHecart committed Sep 11, 2024
1 parent c28055e commit 8b5e6b8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
35 changes: 34 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ impl Session {
remote_id: id,
key_expr: key_expr.clone().into_owned(),
origin,
callback,
callback: callback.clone(),
};

let sub_state = Arc::new(sub_state);
Expand Down Expand Up @@ -1467,9 +1467,42 @@ impl Session {
}
}

let known_tokens = if history {
state
.remote_tokens
.values()
.filter(|token| key_expr.intersects(token))
.cloned()
.collect::<Vec<KeyExpr<'static>>>()
} else {
vec![]
};

let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);

if !known_tokens.is_empty() {
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
for token in known_tokens {
callback(Sample {
key_expr: token,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
})
}
});
}

primitives.send_interest(Interest {
id,
mode: if history {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| mode.future() || f.whatami == WhatAmI::Client)
.filter(|f| f.whatami == WhatAmI::Client)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
Expand Down Expand Up @@ -371,7 +371,7 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| mode.future() || f.whatami == WhatAmI::Client)
.filter(|f| f.whatami == WhatAmI::Client)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| mode.future() || f.whatami != WhatAmI::Router)
.filter(|f| f.whatami != WhatAmI::Router)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
Expand Down Expand Up @@ -508,7 +508,7 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| mode.future() || f.whatami != WhatAmI::Router)
.filter(|f| f.whatami != WhatAmI::Router)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
Expand Down

0 comments on commit 8b5e6b8

Please sign in to comment.