diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index c81514ab3e..d7a25ea0a9 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -95,6 +95,7 @@ where fn write(self, writer: &mut W, x: &Declare) -> Self::Output { let Declare { + interest_id, ext_qos, ext_tstamp, ext_nodeid, @@ -103,6 +104,9 @@ where // Header let mut header = id::DECLARE; + if x.interest_id.is_some() { + header |= declare::flag::I; + } let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); @@ -111,6 +115,11 @@ where } self.write(&mut *writer, header)?; + // Body + if let Some(interest_id) = interest_id { + self.write(&mut *writer, interest_id)?; + } + // Extensions if ext_qos != &declare::ext::QoSType::DEFAULT { n_exts -= 1; @@ -157,6 +166,11 @@ where return Err(DidntRead); } + let mut interest_id = None; + if imsg::has_flag(self.header, declare::flag::I) { + interest_id = Some(self.codec.read(&mut *reader)?); + } + // Extensions let mut ext_qos = declare::ext::QoSType::DEFAULT; let mut ext_tstamp = None; @@ -192,10 +206,11 @@ where let body: DeclareBody = self.codec.read(&mut *reader)?; Ok(Declare { - body, + interest_id, ext_qos, ext_tstamp, ext_nodeid, + body, }) } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index d41d8bf67f..10027259c2 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -25,20 +25,22 @@ pub use subscriber::*; pub use token::*; pub mod flag { - // pub const X: u8 = 1 << 5; // 0x20 Reserved - // pub const X: u8 = 1 << 6; // 0x40 Reserved + pub const I: u8 = 1 << 5; // 0x20 Interest if I==1 then the declare is in a response to an Interest with future==false + // pub const X: u8 = 1 << 6; // 0x40 Reserved pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } /// Flags: -/// - X: Reserved +/// - I: Interest If I==1 then the declare is in a response to an Interest with future==false /// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|X|X| DECLARE | +/// |Z|X|I| DECLARE | /// +-+-+-+---------+ +/// ~interest_id:z32~ if I==1 +/// +---------------+ /// ~ [decl_exts] ~ if Z==1 /// +---------------+ /// ~ declaration ~ @@ -46,6 +48,7 @@ pub mod flag { /// #[derive(Debug, Clone, PartialEq, Eq)] pub struct Declare { + pub interest_id: Option, pub ext_qos: ext::QoSType, pub ext_tstamp: Option, pub ext_nodeid: ext::NodeIdType, @@ -132,16 +135,18 @@ impl Declare { let mut rng = rand::thread_rng(); - let body = DeclareBody::rand(); + let interest_id = rng.gen_bool(0.5).then_some(rng.gen::()); let ext_qos = ext::QoSType::rand(); let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); let ext_nodeid = ext::NodeIdType::rand(); + let body = DeclareBody::rand(); Self { - body, + interest_id, ext_qos, ext_tstamp, ext_nodeid, + body, } } } diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index f340f24cf1..aaa1d13724 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -664,6 +664,7 @@ impl SyncResolve for KeyExprUndeclaration<'_> { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(zenoh_protocol::network::Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 0450dab38a..194b97fca8 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -452,6 +452,7 @@ impl Resource { .insert(expr_id, nonwild_prefix.clone()); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 290f90f95f..e85bb77bf9 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -53,6 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -136,6 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -169,6 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -203,6 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 2ac3f1b993..5c0bc5349b 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -93,6 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -164,6 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -414,6 +418,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -455,6 +460,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 9fba744a9c..150c12a632 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -126,6 +126,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -169,6 +170,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -337,6 +339,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -362,6 +365,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index a722176292..b495248788 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -53,6 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -136,6 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -169,6 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -203,6 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 38f77bec45..72c32b9217 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -93,6 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -164,6 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -408,6 +412,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -559,6 +564,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -600,6 +606,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -628,6 +635,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -766,6 +774,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -791,6 +800,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 61abaa7c55..99e787beb5 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -194,6 +194,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -247,6 +248,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -471,6 +473,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -496,6 +499,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -768,6 +775,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -866,6 +874,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -891,6 +900,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 166ff16bd0..d460ee3f1c 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -276,6 +276,8 @@ impl AdminSpace { zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -287,6 +289,7 @@ impl AdminSpace { }); primitives.send_declare(Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 516bcd0109..4067f2ad8f 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -579,6 +579,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -606,6 +607,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -627,6 +629,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -654,6 +657,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -675,6 +679,7 @@ fn client_test() { Primitives::send_declare( primitives2.as_ref(), Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index b9e20a4e68..addb757807 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -872,6 +872,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1084,6 +1085,7 @@ impl Session { // }; primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1140,6 +1142,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1191,6 +1194,7 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1212,6 +1216,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1247,6 +1252,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1271,6 +1277,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { + interest_id: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT,