Skip to content

Commit

Permalink
Merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Sep 4, 2024
2 parents fc21b2f + eb67b82 commit 7273ab9
Show file tree
Hide file tree
Showing 26 changed files with 471 additions and 494 deletions.
4 changes: 3 additions & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ where
let header: u8 = self.codec.read(&mut *reader)?;

let codec = Zenoh080Header::new(header);
codec.read(&mut *reader)
let mut msg: NetworkMessage = codec.read(&mut *reader)?;
msg.reliability = self.reliability;
Ok(msg)
}
}

Expand Down
13 changes: 6 additions & 7 deletions commons/zenoh-codec/src/transport/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ where
fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output {
let (m, f) = x;

// @TODO: m.is_reliable() always return true for the time being
// if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
// (f.reliability, m.is_reliable())
// {
// // We are not serializing on the right frame.
// return Err(BatchError::NewFrame);
// }
if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
(f.reliability, m.is_reliable())
{
// We are not serializing on the right frame.
return Err(BatchError::NewFrame);
}

// Mark the write operation
let mark = writer.mark();
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,12 @@ impl TryFrom<u8> for Priority {
#[repr(u8)]
pub enum Reliability {
#[default]
BestEffort,
Reliable,
BestEffort,
}

impl Reliability {
pub const DEFAULT: Self = Self::BestEffort;
pub const DEFAULT: Self = Self::Reliable;

#[cfg(feature = "test")]
pub fn rand() -> Self {
Expand Down
7 changes: 4 additions & 3 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use push::Push;
pub use request::{AtomicRequestId, Request, RequestId};
pub use response::{Response, ResponseFinal};

use crate::core::{CongestionControl, Priority};
use crate::core::{CongestionControl, Priority, Reliability};

pub mod id {
// WARNING: it's crucial that these IDs do NOT collide with the IDs
Expand Down Expand Up @@ -83,6 +83,7 @@ pub enum NetworkBody {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NetworkMessage {
pub body: NetworkBody,
pub reliability: Reliability,
#[cfg(feature = "stats")]
pub size: Option<core::num::NonZeroUsize>,
}
Expand All @@ -109,8 +110,7 @@ impl NetworkMessage {

#[inline]
pub fn is_reliable(&self) -> bool {
// TODO
true
self.reliability == Reliability::Reliable
}

#[inline]
Expand Down Expand Up @@ -179,6 +179,7 @@ impl From<NetworkBody> for NetworkMessage {
fn from(body: NetworkBody) -> Self {
Self {
body,
reliability: Reliability::DEFAULT,
#[cfg(feature = "stats")]
size: None,
}
Expand Down
3 changes: 2 additions & 1 deletion commons/zenoh-protocol/src/transport/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ impl Frame {
let ext_qos = ext::QoSType::rand();
let mut payload = vec![];
for _ in 0..rng.gen_range(1..4) {
let m = NetworkMessage::rand();
let mut m = NetworkMessage::rand();
m.reliability = reliability;
payload.push(m);
}

Expand Down
4 changes: 3 additions & 1 deletion io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ mod tests {
let mut batch = WBatch::new(config);

let tmsg: TransportMessage = KeepAlive.into();
let nmsg: NetworkMessage = Push {
let mut nmsg: NetworkMessage = Push {
wire_expr: WireExpr::empty(),
ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false),
ext_tstamp: None,
Expand Down Expand Up @@ -601,13 +601,15 @@ mod tests {
sn: 0,
ext_qos: frame::ext::QoSType::DEFAULT,
};
nmsg.reliability = frame.reliability;

// Serialize with a frame
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());

frame.reliability = Reliability::BestEffort;
nmsg.reliability = frame.reliability;
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());
Expand Down
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/common/defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl DefragBuffer {
pub(crate) fn push(&mut self, sn: TransportSn, zslice: ZSlice) -> ZResult<()> {
if sn != self.sn.get() {
self.clear();
bail!("Expected SN {}, received {}", self.sn.get(), sn)
bail!(
"Defragmentation SN error: expected SN {}, received {}",
self.sn.get(),
sn
)
}

let new_len = self.len + zslice.len();
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_core::zlock;
use zenoh_protocol::{
core::{Priority, Reliability},
core::Priority,
network::NetworkMessage,
transport::{
fragment::FragmentHeader,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl StageIn {

// The Frame
let frame = FrameHeader {
reliability: Reliability::Reliable, // TODO
reliability: msg.reliability,
sn,
ext_qos: frame::ext::QoSType::new(priority),
};
Expand Down
41 changes: 23 additions & 18 deletions io/zenoh-transport/src/multicast/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
for msg in payload.drain(..) {
self.trigger_callback(msg, peer)?;
}

Ok(())
}

Expand Down Expand Up @@ -202,23 +205,30 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if let Err(e) = guard.defrag.push(sn, payload) {
// Defrag errors don't close transport
tracing::trace!("{}", e);
return Ok(());
}
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard.defrag.defragment().ok_or_else(|| {
zerror!(
if let Some(msg) = guard.defrag.defragment() {
return self.trigger_callback(msg, peer);
} else {
tracing::trace!(
"Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.",
self.manager.config.zid,
peer.zid,
priority
)
})?;
return self.trigger_callback(msg, peer);
);
}
}

Ok(())
Expand All @@ -228,7 +238,7 @@ impl TransportMulticastInner {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.precedes(sn)?;
if !precedes {
tracing::debug!(
Expand All @@ -237,19 +247,14 @@ impl TransportMulticastInner {
sn,
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

// Set will always return OK because we have already checked
// with precedes() that the sn has the right resolution
let _ = guard.sn.set(sn);

Ok(())
Ok(true)
}

pub(super) fn read_messages(
Expand Down
59 changes: 31 additions & 28 deletions io/zenoh-transport/src/unicast/universal/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
for msg in payload.drain(..) {
Expand All @@ -111,6 +113,7 @@ impl TransportUnicastUniversal {
payload
);
}

Ok(())
}

Expand Down Expand Up @@ -140,28 +143,33 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if let Err(e) = guard.defrag.push(sn, payload) {
// Defrag errors don't close transport
tracing::trace!("{}", e);
return Ok(());
}
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard
.defrag
.defragment()
.ok_or_else(|| zerror!("Transport: {}. Defragmentation error.", self.config.zid))?;

let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
if let Some(msg) = guard.defrag.defragment() {
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
}
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid);
}
}

Expand All @@ -172,24 +180,19 @@ impl TransportUnicastUniversal {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.roll(sn)?;
if !precedes {
tracing::debug!(
tracing::trace!(
"Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.",
self.config.zid,
sn,
guard.sn.get()
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

Ok(())
Ok(true)
}

pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> {
Expand Down
Loading

0 comments on commit 7273ab9

Please sign in to comment.