Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 27, 2024
1 parent 965e905 commit 2cbe20d
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 180 deletions.
19 changes: 19 additions & 0 deletions commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ pub mod flag {
/// | | This stops the transmission of subscriber declarations/undeclarations.
/// | |
///
/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
///
/// ```text
/// A B
/// | INTEREST |
/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// | UNDECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// | |
/// | ... |
/// | |
/// | INTEREST FINAL |
/// |------------------>| -- Mode: Final
/// | | This stops the transmission of subscriber declarations/undeclarations.
/// | |
///
/// Flags:
/// - |: Mode The mode of the interest*
/// -/
Expand Down
39 changes: 33 additions & 6 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,21 +1596,35 @@ impl SessionInner {
let known_tokens = if history {
state
.remote_tokens
.values()
.filter(|token| key_expr.intersects(token))
.cloned()
.collect::<Vec<KeyExpr<'static>>>()
.iter()
.filter_map(|(id, token)| {
if key_expr.intersects(token) {
Some((*id, token.clone()))
} else {
None
}
})
.collect::<Vec<(TokenId, KeyExpr<'static>)>>()
} else {
vec![]
};

let primitives = state.primitives()?;
drop(state);

let zid = self.zid();

if !known_tokens.is_empty() {
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
for token in known_tokens {
for (id, token) in known_tokens {
dbg!();
eprintln!(
"{zid} -> {zid}: DeclareToken(id={}, expr={}, interest_id=n/a) (session local)",
id,
&token,
);
eprintln!();
callback.call(Sample {
key_expr: token,
payload: ZBytes::empty(),
Expand Down Expand Up @@ -2368,13 +2382,26 @@ impl Primitives for WeakSession {
if state.primitives.is_none() {
return; // Session closing or closed
}

match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
Ok(key_expr) => {
dbg!();
eprintln!(
"{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})",
self.zid(),
self.zid(),
m.id,
key_expr,
msg.interest_id
);
eprintln!();

if let Some(interest_id) = msg.interest_id {
if let Some(query) = state.liveliness_queries.get(&interest_id) {
// dbg!(&key_expr);
let reply = Reply {
result: Ok(Sample {
key_expr,
Expand All @@ -2398,10 +2425,10 @@ impl Primitives for WeakSession {
return;
}
}
// NOTE: the token id here will never be 0 because if we're here it means we are in Future or CurrentFuture mode
if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
false,
&m.wire_expr,
Expand Down
40 changes: 38 additions & 2 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,25 @@ impl Primitives for Face {
msg.wire_expr.as_ref(),
msg.mode,
msg.options,
&mut |p, m| declares.push((p.clone(), m)),
&mut |p, m| {
// if let DeclareBody::DeclareToken(tok) = &m.msg.body {
// // println!("{}", std::backtrace::Backtrace::force_capture());
// if let Some(mux) = p.as_any().downcast_ref::<Mux>() {
// dbg!((
// tok,
// self.state.zid,
// mux.face.get().unwrap().state.upgrade().unwrap().zid
// ));
// } else {
// if let Some(sess) = p.as_any().downcast_ref::<WeakSession>() {
// dbg!((tok, self.state.zid, sess.runtime.zid()));
// } else {
// dbg!((tok, self.state.zid));
// }
// }
// }
declares.push((p.clone(), m))
},
);
drop(ctrl_lock);
for (p, m) in declares {
Expand Down Expand Up @@ -325,7 +343,25 @@ impl Primitives for Face {
&m.wire_expr,
msg.ext_nodeid.node_id,
msg.interest_id,
&mut |p, m| declares.push((p.clone(), m)),
&mut |p, m| {
// if let DeclareBody::DeclareToken(tok) = &m.msg.body {
// // println!("{}", std::backtrace::Backtrace::force_capture());
// if let Some(mux) = p.as_any().downcast_ref::<Mux>() {
// dbg!((
// tok,
// self.state.zid,
// mux.face.get().unwrap().state.upgrade().unwrap().zid
// ));
// } else {
// if let Some(sess) = p.as_any().downcast_ref::<WeakSession>() {
// dbg!((tok, self.state.zid, sess.runtime.zid()));
// } else {
// dbg!((tok, self.state.zid));
// }
// }
// }
declares.push((p.clone(), m))
},
);
drop(ctrl_lock);
for (p, m) in declares {
Expand Down
46 changes: 46 additions & 0 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ pub(crate) struct CurrentInterest {
pub(crate) mode: InterestMode,
}

#[derive(PartialEq, Clone)]
pub(crate) struct RemoteInterest {
pub(crate) res: Option<Arc<Resource>>,
pub(crate) options: InterestOptions,
}

impl RemoteInterest {
pub(crate) fn matches(&self, res: &Arc<Resource>) -> bool {
self.res.as_ref().map(|r| r.matches(res)).unwrap_or(true)
}

pub(crate) fn resource<'c, 'a: 'c, 'b: 'c>(
&'a self,
default: &'b Arc<Resource>,
) -> &'c Arc<Resource> {
if self.options.aggregate() {
self.res.as_ref().unwrap_or(default)
} else {
default
}
}
}

pub(crate) fn declare_final(
face: &mut Arc<FaceState>,
id: InterestId,
Expand Down Expand Up @@ -204,6 +227,18 @@ pub(crate) fn declare_interest(
(res, wtables)
};

dbg!();
eprintln!(
"{} -> {}: Interest(id={}, expr={}, mode={:?}, tokens={})",
face.zid,
wtables.zid,
id,
res.expr(),
mode,
options.tokens(),
);
eprintln!();

hat_code.declare_interest(
&mut wtables,
tables_ref,
Expand All @@ -224,6 +259,17 @@ pub(crate) fn declare_interest(
}
} else {
let mut wtables = zwrite!(tables_ref.tables);

dbg!();
eprintln!(
"{} -> {}: Interest(id={}, expr=<none>, mode={:?}, tokens={})",
face.zid,
wtables.zid,
id,
mode,
options.tokens(),
);

hat_code.declare_interest(
&mut wtables,
tables_ref,
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ impl Tables {
}
}
}

pub(crate) fn faces(&self) -> Vec<Arc<FaceState>> {
self.faces.values().cloned().collect::<Vec<_>>()
}
}

pub fn close_face(tables: &TablesLock, face: &Weak<FaceState>) {
Expand Down
11 changes: 11 additions & 0 deletions zenoh/src/net/routing/dispatcher/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ pub(crate) fn declare_token(
(res, wtables)
};

dbg!();
eprintln!(
"{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})",
face.zid,
wtables.zid,
id,
res.expr(),
interest_id
);
eprintln!();

hat_code.declare_token(
&mut wtables,
face,
Expand Down
19 changes: 13 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::{
use crate::net::routing::{
dispatcher::{
face::{FaceState, InterestState},
interests::{CurrentInterest, CurrentInterestCleanup},
interests::{CurrentInterest, CurrentInterestCleanup, RemoteInterest},
resource::Resource,
tables::{Tables, TablesLock},
},
Expand All @@ -47,7 +47,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc<FaceState>)
.collect::<Vec<Arc<FaceState>>>()
{
if face.whatami == WhatAmI::Router {
for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
for RemoteInterest { res, options } in
face_hat_mut!(&mut src_face).remote_interests.values()
{
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(face).local_interests.insert(
id,
Expand Down Expand Up @@ -123,9 +125,13 @@ impl HatInterestTrait for HatCode {
send_declare,
)
}
face_hat_mut!(face)
.remote_interests
.insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options));
face_hat_mut!(face).remote_interests.insert(
id,
RemoteInterest {
res: res.as_ref().map(|res| (*res).clone()),
options,
},
);

let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
Expand Down Expand Up @@ -221,7 +227,8 @@ impl HatInterestTrait for HatCode {
.collect::<Vec<InterestId>>()
{
let local_interest = dst_face.local_interests.get(&id).unwrap();
if local_interest.res == interest.0 && local_interest.options == interest.2
if local_interest.res == interest.res
&& local_interest.options == interest.options
{
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
Expand Down
9 changes: 6 additions & 3 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zenoh_protocol::{
queryable::ext::QueryableInfoType,
QueryableId, SubscriberId, TokenId,
},
interest::{InterestId, InterestMode, InterestOptions},
interest::{InterestId, InterestOptions},
oam::id::OAM_LINKSTATE,
Declare, DeclareBody, DeclareFinal, Oam,
},
Expand All @@ -59,7 +59,10 @@ use crate::net::{
codec::Zenoh080Routing,
protocol::linkstate::LinkStateList,
routing::{
dispatcher::face::{Face, InterestState},
dispatcher::{
face::{Face, InterestState},
interests::RemoteInterest,
},
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
RoutingContext,
},
Expand Down Expand Up @@ -407,7 +410,7 @@ impl HatContext {

struct HatFace {
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestMode, InterestOptions)>,
remote_interests: HashMap<InterestId, RemoteInterest>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_tokens: HashMap<Arc<Resource>, TokenId>,
Expand Down
15 changes: 9 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use zenoh_protocol::{
common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId,
UndeclareSubscriber,
},
interest::{InterestId, InterestMode, InterestOptions},
interest::{InterestId, InterestMode},
},
};
use zenoh_sync::get_mut_unchecked;
Expand All @@ -35,6 +35,7 @@ use crate::{
net::routing::{
dispatcher::{
face::FaceState,
interests::RemoteInterest,
pubsub::SubscriberInfo,
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
Expand Down Expand Up @@ -84,13 +85,15 @@ fn propagate_simple_subscription_to(
let matching_interests = face_hat!(dst_face)
.remote_interests
.values()
.filter(|(r, _, o)| {
o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
})
.filter(|interest| interest.options.subscribers() && interest.matches(res))
.cloned()
.collect::<Vec<(Option<Arc<Resource>>, InterestMode, InterestOptions)>>();
.collect::<Vec<_>>();

for (int_res, _, options) in matching_interests {
for RemoteInterest {
res: int_res,
options,
} in matching_interests
{
let res = if options.aggregate() {
int_res.as_ref().unwrap_or(res)
} else {
Expand Down
4 changes: 1 addition & 3 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ fn propagate_simple_queryable_to(
|| face_hat!(dst_face)
.remote_interests
.values()
.any(|(r, _, o)| {
o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
}))
.any(|interest| interest.options.queryables() && interest.matches(res)))
&& src_face
.as_ref()
.map(|src_face| {
Expand Down
Loading

0 comments on commit 2cbe20d

Please sign in to comment.