Skip to content

Commit

Permalink
Fix liveliness bugs (#1450)
Browse files Browse the repository at this point in the history
* Peers only porpagate new tokens

* Routers check failover brokering before propagating one shot tooken undeclares to peers

* Only propagate Current interests to non finalized peers

* Peers propagate Current interests as CurrentFuture to routers

* Historical replies to CurrentFuture token interests are sent with an interest_id

* Peers send initial tokens with the INITIAL_INTEREST_ID

* Add liveliness tests

* Also apply changes to linstate peers

* Fix merge

* Apply new CurrentFuture interest behavior to pubsub and queries

* Ignore received declares for unknown interests

* Fix key_expr mapping bug
  • Loading branch information
OlivierHecart authored Sep 19, 2024
1 parent 9fd0e65 commit b3b71a7
Show file tree
Hide file tree
Showing 17 changed files with 583 additions and 119 deletions.
6 changes: 3 additions & 3 deletions commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ pub mod flag {
/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | |
/// | DECL FINAL |
/// |<------------------| -- With interest_id field set
Expand Down
93 changes: 47 additions & 46 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2351,60 +2351,61 @@ impl Primitives for WeakSession {
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
trace!("recv UndeclareQueryable {:?}", m.id);
}
#[cfg(not(feature = "unstable"))]
zenoh_protocol::network::DeclareBody::DeclareToken(_) => {}
#[cfg(feature = "unstable")]
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
trace!("recv DeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
Ok(key_expr) => {
if let Some(interest_id) = msg.interest_id {
if let Some(query) = state.liveliness_queries.get(&interest_id) {
let reply = Reply {
result: Ok(Sample {
key_expr,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
}),
Ok(key_expr) => {
if let Some(interest_id) = msg.interest_id {
if let Some(query) = state.liveliness_queries.get(&interest_id) {
let reply = Reply {
result: Ok(Sample {
key_expr,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
replier_id: None,
};

query.callback.call(reply);
}
} else if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
false,
&m.wire_expr,
None,
ZBuf::default(),
SubscriberKind::LivelinessSubscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
}),
#[cfg(feature = "unstable")]
None,
);
replier_id: None,
};

query.callback.call(reply);
return;
}
}
Err(err) => {
tracing::error!("Received DeclareToken for unknown wire_expr: {}", err)
if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
false,
&m.wire_expr,
None,
ZBuf::default(),
SubscriberKind::LivelinessSubscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
#[cfg(feature = "unstable")]
None,
);
}
}
Err(err) => {
tracing::error!("Received DeclareToken for unknown wire_expr: {}", err)
}
}
}
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ static INTEREST_TIMEOUT_MS: u64 = 10000;
pub(crate) struct CurrentInterest {
pub(crate) src_face: Arc<FaceState>,
pub(crate) src_interest_id: InterestId,
pub(crate) mode: InterestMode,
}

pub(crate) fn declare_final(
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl HatInterestTrait for HatCode {
let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
src_interest_id: id,
mode,
});

for dst_face in tables
Expand Down
44 changes: 27 additions & 17 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,35 @@ fn declare_simple_token(
) {
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
)
if interest.mode == InterestMode::Current {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
);
return;
}
} else if !face.local_interests.contains_key(&interest_id) {
tracing::debug!(
"Received DeclareToken for {} from {} with unknown interest_id {}. Ignore.",
res.expr(),
face,
interest_id,
);
return;
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ pub(super) fn declare_sub_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_subs.iter().any(|sub| {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ pub(super) fn declare_qabl_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_qabls.iter().any(|qabl| {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ pub(crate) fn declare_token_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_tokens.iter().any(|token| {
Expand Down
19 changes: 13 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc<FaceState>)
.collect::<Vec<Arc<FaceState>>>()
{
if face.whatami == WhatAmI::Router {
for (res, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(face).local_interests.insert(
id,
Expand Down Expand Up @@ -125,16 +125,23 @@ impl HatInterestTrait for HatCode {
}
face_hat_mut!(face)
.remote_interests
.insert(id, (res.as_ref().map(|res| (*res).clone()), options));
.insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options));

let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
src_interest_id: id,
mode,
});

let propagated_mode = if mode.future() {
InterestMode::CurrentFuture
} else {
mode
};
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (options.tokens()
&& mode == InterestMode::Current
&& f.whatami == WhatAmI::Peer
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
Expand All @@ -144,7 +151,7 @@ impl HatInterestTrait for HatCode {
InterestState {
options,
res: res.as_ref().map(|res| (*res).clone()),
finalized: mode == InterestMode::Future,
finalized: propagated_mode == InterestMode::Future,
},
);
if mode.current() {
Expand All @@ -157,11 +164,11 @@ impl HatInterestTrait for HatCode {
}
let wire_expr = res
.as_ref()
.map(|res| Resource::decl_key(res, dst_face, dst_face.whatami == WhatAmI::Client));
.map(|res| Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client));
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
id,
mode,
mode: propagated_mode,
options,
wire_expr,
ext_qos: ext::QoSType::DECLARE,
Expand Down Expand Up @@ -214,7 +221,7 @@ impl HatInterestTrait for HatCode {
.collect::<Vec<InterestId>>()
{
let local_interest = dst_face.local_interests.get(&id).unwrap();
if local_interest.res == interest.0 && local_interest.options == interest.1
if local_interest.res == interest.0 && local_interest.options == interest.2
{
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zenoh_protocol::{
queryable::ext::QueryableInfoType,
QueryableId, SubscriberId, TokenId,
},
interest::{InterestId, InterestOptions},
interest::{InterestId, InterestMode, InterestOptions},
oam::id::OAM_LINKSTATE,
Declare, DeclareBody, DeclareFinal, Oam,
},
Expand Down Expand Up @@ -407,7 +407,7 @@ impl HatContext {

struct HatFace {
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestOptions)>,
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestMode, InterestOptions)>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_tokens: HashMap<Arc<Resource>, TokenId>,
Expand Down
8 changes: 4 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ fn propagate_simple_subscription_to(
let matching_interests = face_hat!(dst_face)
.remote_interests
.values()
.filter(|(r, o)| {
.filter(|(r, _, o)| {
o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
})
.cloned()
.collect::<Vec<(Option<Arc<Resource>>, InterestOptions)>>();
.collect::<Vec<(Option<Arc<Resource>>, InterestMode, InterestOptions)>>();

for (int_res, options) in matching_interests {
for (int_res, _, options) in matching_interests {
let res = if options.aggregate() {
int_res.as_ref().unwrap_or(res)
} else {
Expand Down Expand Up @@ -426,7 +426,7 @@ pub(super) fn declare_sub_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if tables.faces.values().any(|src_face| {
Expand Down
6 changes: 4 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ fn propagate_simple_queryable_to(
|| face_hat!(dst_face)
.remote_interests
.values()
.any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)))
.any(|(r, _, o)| {
o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
}))
&& src_face
.as_ref()
.map(|src_face| {
Expand Down Expand Up @@ -412,7 +414,7 @@ pub(super) fn declare_qabl_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if tables.faces.values().any(|src_face| {
Expand Down
Loading

0 comments on commit b3b71a7

Please sign in to comment.