Skip to content

Commit

Permalink
Add express support in the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 14, 2024
1 parent cc68ffb commit ea7179f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
16 changes: 14 additions & 2 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,30 @@ impl NetworkMessage {
true
}

#[inline]
pub fn is_express(&self) -> bool {
match &self.body {
NetworkBody::Push(msg) => msg.ext_qos.is_express(),
NetworkBody::Request(msg) => msg.ext_qos.is_express(),
NetworkBody::Response(msg) => msg.ext_qos.is_express(),
NetworkBody::ResponseFinal(msg) => msg.ext_qos.is_express(),
NetworkBody::Declare(msg) => msg.ext_qos.is_express(),
NetworkBody::OAM(msg) => msg.ext_qos.is_express(),
}
}

#[inline]
pub fn is_droppable(&self) -> bool {
if !self.is_reliable() {
return true;
}

let cc = match &self.body {
NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::Push(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::Request(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::Response(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(),
NetworkBody::OAM(msg) => msg.ext_qos.get_congestion_control(),
};

Expand All @@ -131,11 +143,11 @@ impl NetworkMessage {
#[inline]
pub fn priority(&self) -> Priority {
match &self.body {
NetworkBody::Declare(msg) => msg.ext_qos.get_priority(),
NetworkBody::Push(msg) => msg.ext_qos.get_priority(),
NetworkBody::Request(msg) => msg.ext_qos.get_priority(),
NetworkBody::Response(msg) => msg.ext_qos.get_priority(),
NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_priority(),
NetworkBody::Declare(msg) => msg.ext_qos.get_priority(),
NetworkBody::OAM(msg) => msg.ext_qos.get_priority(),
}
}
Expand Down
24 changes: 15 additions & 9 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,26 @@ impl StageIn {
}

macro_rules! zretok {
($batch:expr) => {{
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
($batch:expr, $msg:expr) => {{
if $msg.is_express() {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
} else {
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
}
}};
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!(false);
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch),
Ok(_) => zretok!(batch, msg),
Err(e) => e,
};

Expand All @@ -194,7 +200,7 @@ impl StageIn {
if let BatchError::NewFrame = e {
// Attempt a serialization with a new frame
if batch.encode((&*msg, &frame)).is_ok() {
zretok!(batch);
zretok!(batch, msg);
}
}

Expand All @@ -206,7 +212,7 @@ impl StageIn {

// Attempt a second serialization on fully empty batch
if batch.encode((&*msg, &frame)).is_ok() {
zretok!(batch);
zretok!(batch, msg);
}

// The second serialization attempt has failed. This means that the message is
Expand Down

0 comments on commit ea7179f

Please sign in to comment.