Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix liveliness bugs #1450

Merged
merged 14 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ pub mod flag {
/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// |<------------------| -- With interest_id field set
/// | |
/// | DECL FINAL |
/// |<------------------| -- With interest_id field set
Expand Down
93 changes: 47 additions & 46 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2351,60 +2351,61 @@ impl Primitives for WeakSession {
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
trace!("recv UndeclareQueryable {:?}", m.id);
}
#[cfg(not(feature = "unstable"))]
zenoh_protocol::network::DeclareBody::DeclareToken(_) => {}
#[cfg(feature = "unstable")]
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
trace!("recv DeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
Ok(key_expr) => {
if let Some(interest_id) = msg.interest_id {
if let Some(query) = state.liveliness_queries.get(&interest_id) {
let reply = Reply {
result: Ok(Sample {
key_expr,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
}),
Ok(key_expr) => {
if let Some(interest_id) = msg.interest_id {
if let Some(query) = state.liveliness_queries.get(&interest_id) {
let reply = Reply {
result: Ok(Sample {
key_expr,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
replier_id: None,
};

query.callback.call(reply);
}
} else if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
false,
&m.wire_expr,
None,
ZBuf::default(),
SubscriberKind::LivelinessSubscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
}),
#[cfg(feature = "unstable")]
None,
);
replier_id: None,
};

query.callback.call(reply);
return;
}
}
Err(err) => {
tracing::error!("Received DeclareToken for unknown wire_expr: {}", err)
if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
false,
&m.wire_expr,
None,
ZBuf::default(),
SubscriberKind::LivelinessSubscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
#[cfg(feature = "unstable")]
None,
);
}
}
Err(err) => {
tracing::error!("Received DeclareToken for unknown wire_expr: {}", err)
}
}
}
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ static INTEREST_TIMEOUT_MS: u64 = 10000;
pub(crate) struct CurrentInterest {
pub(crate) src_face: Arc<FaceState>,
pub(crate) src_interest_id: InterestId,
pub(crate) mode: InterestMode,
}

pub(crate) fn declare_final(
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl HatInterestTrait for HatCode {
let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
src_interest_id: id,
mode,
});

for dst_face in tables
Expand Down
44 changes: 27 additions & 17 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,35 @@ fn declare_simple_token(
) {
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
)
if interest.mode == InterestMode::Current {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
);
return;
}
} else if !face.local_interests.contains_key(&interest_id) {
tracing::debug!(
"Received DeclareToken for {} from {} with unknown interest_id {}. Ignore.",
res.expr(),
face,
interest_id,
);
return;
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ pub(super) fn declare_sub_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_subs.iter().any(|sub| {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ pub(super) fn declare_qabl_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_qabls.iter().any(|qabl| {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ pub(crate) fn declare_token_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if hat!(tables).linkstatepeer_tokens.iter().any(|token| {
Expand Down
19 changes: 13 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc<FaceState>)
.collect::<Vec<Arc<FaceState>>>()
{
if face.whatami == WhatAmI::Router {
for (res, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(face).local_interests.insert(
id,
Expand Down Expand Up @@ -125,16 +125,23 @@ impl HatInterestTrait for HatCode {
}
face_hat_mut!(face)
.remote_interests
.insert(id, (res.as_ref().map(|res| (*res).clone()), options));
.insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options));

let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
src_interest_id: id,
mode,
});

let propagated_mode = if mode.future() {
InterestMode::CurrentFuture
} else {
mode
};
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (options.tokens()
&& mode == InterestMode::Current
&& f.whatami == WhatAmI::Peer
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
Expand All @@ -144,7 +151,7 @@ impl HatInterestTrait for HatCode {
InterestState {
options,
res: res.as_ref().map(|res| (*res).clone()),
finalized: mode == InterestMode::Future,
finalized: propagated_mode == InterestMode::Future,
},
);
if mode.current() {
Expand All @@ -157,11 +164,11 @@ impl HatInterestTrait for HatCode {
}
let wire_expr = res
.as_ref()
.map(|res| Resource::decl_key(res, dst_face, dst_face.whatami == WhatAmI::Client));
.map(|res| Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client));
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
id,
mode,
mode: propagated_mode,
options,
wire_expr,
ext_qos: ext::QoSType::DECLARE,
Expand Down Expand Up @@ -214,7 +221,7 @@ impl HatInterestTrait for HatCode {
.collect::<Vec<InterestId>>()
{
let local_interest = dst_face.local_interests.get(&id).unwrap();
if local_interest.res == interest.0 && local_interest.options == interest.1
if local_interest.res == interest.0 && local_interest.options == interest.2
{
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zenoh_protocol::{
queryable::ext::QueryableInfoType,
QueryableId, SubscriberId, TokenId,
},
interest::{InterestId, InterestOptions},
interest::{InterestId, InterestMode, InterestOptions},
oam::id::OAM_LINKSTATE,
Declare, DeclareBody, DeclareFinal, Oam,
},
Expand Down Expand Up @@ -407,7 +407,7 @@ impl HatContext {

struct HatFace {
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestOptions)>,
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestMode, InterestOptions)>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_tokens: HashMap<Arc<Resource>, TokenId>,
Expand Down
8 changes: 4 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ fn propagate_simple_subscription_to(
let matching_interests = face_hat!(dst_face)
.remote_interests
.values()
.filter(|(r, o)| {
.filter(|(r, _, o)| {
o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
})
.cloned()
.collect::<Vec<(Option<Arc<Resource>>, InterestOptions)>>();
.collect::<Vec<(Option<Arc<Resource>>, InterestMode, InterestOptions)>>();

for (int_res, options) in matching_interests {
for (int_res, _, options) in matching_interests {
let res = if options.aggregate() {
int_res.as_ref().unwrap_or(res)
} else {
Expand Down Expand Up @@ -426,7 +426,7 @@ pub(super) fn declare_sub_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if tables.faces.values().any(|src_face| {
Expand Down
6 changes: 4 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ fn propagate_simple_queryable_to(
|| face_hat!(dst_face)
.remote_interests
.values()
.any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)))
.any(|(r, _, o)| {
o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
}))
&& src_face
.as_ref()
.map(|src_face| {
Expand Down Expand Up @@ -412,7 +414,7 @@ pub(super) fn declare_qabl_interest(
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
let interest_id = (!mode.future()).then_some(id);
let interest_id = Some(id);
if let Some(res) = res.as_ref() {
if aggregate {
if tables.faces.values().any(|src_face| {
Expand Down
Loading
Loading