diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 2e059710c9..0231a808f0 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1436,7 +1436,7 @@ impl Session { remote_id: id, key_expr: key_expr.clone().into_owned(), origin, - callback, + callback: callback.clone(), }; let sub_state = Arc::new(sub_state); @@ -1467,9 +1467,42 @@ impl Session { } } + let known_tokens = if history { + state + .remote_tokens + .values() + .filter(|token| key_expr.intersects(token)) + .cloned() + .collect::>>() + } else { + vec![] + }; + let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); + if !known_tokens.is_empty() { + self.task_controller + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move { + for token in known_tokens { + callback(Sample { + key_expr: token, + payload: ZBytes::empty(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + reliability: Reliability::Reliable, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }) + } + }); + } + primitives.send_interest(Interest { id, mode: if history { diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index bca5dca323..18a202cd82 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -339,7 +339,7 @@ pub(crate) fn declare_token_interest( for src_face in tables .faces .values() - .filter(|f| mode.future() || f.whatami == WhatAmI::Client) + .filter(|f| f.whatami == WhatAmI::Client) .cloned() .collect::>>() { @@ -371,7 +371,7 @@ pub(crate) fn declare_token_interest( for src_face in tables .faces .values() - .filter(|f| mode.future() || f.whatami == WhatAmI::Client) + .filter(|f| f.whatami == WhatAmI::Client) .cloned() .collect::>>() { diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 5d85ebbe13..3b81a3ce57 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -475,7 +475,7 @@ pub(crate) fn declare_token_interest( for src_face in tables .faces .values() - .filter(|f| mode.future() || f.whatami != WhatAmI::Router) + .filter(|f| f.whatami != WhatAmI::Router) .cloned() .collect::>>() { @@ -508,7 +508,7 @@ pub(crate) fn declare_token_interest( for src_face in tables .faces .values() - .filter(|f| mode.future() || f.whatami != WhatAmI::Router) + .filter(|f| f.whatami != WhatAmI::Router) .cloned() .collect::>>() {