Skip to content

Commit

Permalink
Send Declare, Request, Response, ResponseFinal and Oam messages with …
Browse files Browse the repository at this point in the history
…CongestionControl::Block
  • Loading branch information
OlivierHecart committed Sep 8, 2023
1 parent d0f08e8 commit 1829a20
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 63 deletions.
24 changes: 24 additions & 0 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,30 @@ pub mod ext {
let inner: u8 = rng.gen();
Self { inner }
}

pub fn declare_default() -> Self {
Self::new(Priority::default(), CongestionControl::Block, false)
}

pub fn push_default() -> Self {
Self::new(Priority::default(), CongestionControl::Drop, false)
}

pub fn request_default() -> Self {
Self::new(Priority::default(), CongestionControl::Block, false)
}

pub fn response_default() -> Self {
Self::new(Priority::default(), CongestionControl::Block, false)
}

pub fn response_final_default() -> Self {
Self::new(Priority::default(), CongestionControl::Block, false)
}

pub fn oam_default() -> Self {
Self::new(Priority::default(), CongestionControl::Block, false)
}
}

impl<const ID: u8> Default for QoSType<{ ID }> {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ impl SyncResolve for KeyExprUndeclaration<'_> {
let primitives = state.primitives.as_ref().unwrap().clone();
drop(state);
primitives.send_declare(zenoh_protocol::network::Declare {
ext_qos: declare::ext::QoSType::default(),
ext_qos: declare::ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: declare::ext::NodeIdType::default(),
body: DeclareBody::UndeclareKeyExpr(UndeclareKeyExpr { id: expr_id }),
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Network {
Ok(NetworkBody::OAM(Oam {
id: OAM_LINKSTATE,
body: ZExtBody::ZBuf(buf),
ext_qos: oam::ext::QoSType::default(),
ext_qos: oam::ext::QoSType::oam_default(),
ext_tstamp: None,
})
.into())
Expand Down
32 changes: 16 additions & 16 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn send_sourced_subscription_to_net_childs(
log::debug!("Send subscription {} on {}", res.expr(), someface);

someface.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: routing_context.unwrap_or(0),
Expand Down Expand Up @@ -114,7 +114,7 @@ fn propagate_simple_subscription_to(
get_mut_unchecked(dst_face).local_subs.insert(res.clone());
let key_expr = Resource::decl_key(res, dst_face);
dst_face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand Down Expand Up @@ -469,7 +469,7 @@ pub fn declare_client_subscription(
#[cfg(not(windows))]
for mcast_group in &wtables.mcast_groups {
mcast_group.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand All @@ -488,7 +488,7 @@ pub fn declare_client_subscription(
#[cfg(not(windows))]
for mcast_group in &wtables.mcast_groups {
mcast_group.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand Down Expand Up @@ -572,7 +572,7 @@ fn send_forget_sourced_subscription_to_net_childs(
log::debug!("Send forget subscription {} on {}", res.expr(), someface);

someface.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: routing_context.unwrap_or(0),
Expand All @@ -595,7 +595,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc<Resource>
if face.local_subs.contains(res) {
let wire_expr = Resource::get_best_key(res, "", face.id);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
Expand Down Expand Up @@ -631,7 +631,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc<
{
let wire_expr = Resource::get_best_key(res, "", face.id);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
Expand Down Expand Up @@ -870,7 +870,7 @@ pub(crate) fn undeclare_client_subscription(
{
let wire_expr = Resource::get_best_key(res, "", face.id);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber {
Expand Down Expand Up @@ -930,7 +930,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
get_mut_unchecked(face).local_subs.insert(sub.clone());
let key_expr = Resource::decl_key(sub, face);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand All @@ -954,7 +954,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
get_mut_unchecked(face).local_subs.insert(sub.clone());
let key_expr = Resource::decl_key(sub, face);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand All @@ -974,7 +974,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
get_mut_unchecked(face).local_subs.insert(sub.clone());
let key_expr = Resource::decl_key(sub, face);
face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand Down Expand Up @@ -1167,7 +1167,7 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links:
if forget {
let wire_expr = Resource::get_best_key(res, "", dst_face.id);
dst_face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::UndeclareSubscriber(
Expand All @@ -1189,7 +1189,7 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links:
mode: Mode::Push,
};
dst_face.primitives.send_declare(Declare {
ext_qos: ext::QoSType::default(),
ext_qos: ext::QoSType::declare_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
Expand Down Expand Up @@ -1831,7 +1831,7 @@ pub fn full_reentrant_route_data(

outface.primitives.send_push(Push {
wire_expr: key_expr,
ext_qos: ext::QoSType::default(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
Expand Down Expand Up @@ -1860,7 +1860,7 @@ pub fn full_reentrant_route_data(

outface.primitives.send_push(Push {
wire_expr: key_expr.into(),
ext_qos: ext::QoSType::default(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
Expand Down Expand Up @@ -1907,7 +1907,7 @@ pub fn pull_data(tables_ref: &RwLock<Tables>, face: &Arc<FaceState>, expr: WireE
for (key_expr, payload) in route {
face.primitives.send_push(Push {
wire_expr: key_expr,
ext_qos: ext::QoSType::default(), // TODO
ext_qos: ext::QoSType::push_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
payload,
Expand Down
Loading

0 comments on commit 1829a20

Please sign in to comment.