diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 6af7fef243..0e198ddf0f 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -110,6 +110,18 @@ impl NetworkMessage { true } + #[inline] + pub fn is_express(&self) -> bool { + match &self.body { + NetworkBody::Push(msg) => msg.ext_qos.is_express(), + NetworkBody::Request(msg) => msg.ext_qos.is_express(), + NetworkBody::Response(msg) => msg.ext_qos.is_express(), + NetworkBody::ResponseFinal(msg) => msg.ext_qos.is_express(), + NetworkBody::Declare(msg) => msg.ext_qos.is_express(), + NetworkBody::OAM(msg) => msg.ext_qos.is_express(), + } + } + #[inline] pub fn is_droppable(&self) -> bool { if !self.is_reliable() { @@ -117,11 +129,11 @@ impl NetworkMessage { } let cc = match &self.body { - NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::Push(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::Request(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::Response(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_congestion_control(), + NetworkBody::Declare(msg) => msg.ext_qos.get_congestion_control(), NetworkBody::OAM(msg) => msg.ext_qos.get_congestion_control(), }; @@ -131,11 +143,11 @@ impl NetworkMessage { #[inline] pub fn priority(&self) -> Priority { match &self.body { - NetworkBody::Declare(msg) => msg.ext_qos.get_priority(), NetworkBody::Push(msg) => msg.ext_qos.get_priority(), NetworkBody::Request(msg) => msg.ext_qos.get_priority(), NetworkBody::Response(msg) => msg.ext_qos.get_priority(), NetworkBody::ResponseFinal(msg) => msg.ext_qos.get_priority(), + NetworkBody::Declare(msg) => msg.ext_qos.get_priority(), NetworkBody::OAM(msg) => msg.ext_qos.get_priority(), } } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 3968eabdf5..516834fa41 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -161,12 +161,18 @@ impl StageIn { } macro_rules! zretok { - ($batch:expr) => {{ - let bytes = $batch.len(); - *c_guard = Some($batch); - drop(c_guard); - self.s_out.notify(bytes); - return true; + ($batch:expr, $msg:expr) => {{ + if $msg.is_express() { + // Move out existing batch + self.s_out.move_batch($batch); + return true; + } else { + let bytes = $batch.len(); + *c_guard = Some($batch); + drop(c_guard); + self.s_out.notify(bytes); + return true; + } }}; } @@ -174,7 +180,7 @@ impl StageIn { let mut batch = zgetbatch_rets!(false); // Attempt the serialization on the current batch let e = match batch.encode(&*msg) { - Ok(_) => zretok!(batch), + Ok(_) => zretok!(batch, msg), Err(e) => e, }; @@ -194,7 +200,7 @@ impl StageIn { if let BatchError::NewFrame = e { // Attempt a serialization with a new frame if batch.encode((&*msg, &frame)).is_ok() { - zretok!(batch); + zretok!(batch, msg); } } @@ -206,7 +212,7 @@ impl StageIn { // Attempt a second serialization on fully empty batch if batch.encode((&*msg, &frame)).is_ok() { - zretok!(batch); + zretok!(batch, msg); } // The second serialization attempt has failed. This means that the message is