Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow transport defragmentation errors and fix dropping of old transport messages #1344

Merged
merged 6 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/common/defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl DefragBuffer {
pub(crate) fn push(&mut self, sn: TransportSn, zslice: ZSlice) -> ZResult<()> {
if sn != self.sn.get() {
self.clear();
bail!("Expected SN {}, received {}", self.sn.get(), sn)
bail!(
"Defragmentation SN error: expected SN {}, received {}",
self.sn.get(),
sn
)
}

let new_len = self.len + zslice.len();
Expand Down
41 changes: 23 additions & 18 deletions io/zenoh-transport/src/multicast/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
for msg in payload.drain(..) {
self.trigger_callback(msg, peer)?;
}

Ok(())
}

Expand Down Expand Up @@ -202,23 +205,30 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if let Err(e) = guard.defrag.push(sn, payload) {
// Defrag errors don't close transport
tracing::trace!("{}", e);
return Ok(());
}
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard.defrag.defragment().ok_or_else(|| {
zerror!(
if let Some(msg) = guard.defrag.defragment() {
return self.trigger_callback(msg, peer);
} else {
tracing::trace!(
"Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.",
self.manager.config.zid,
peer.zid,
priority
)
})?;
return self.trigger_callback(msg, peer);
);
}
}

Ok(())
Expand All @@ -228,7 +238,7 @@ impl TransportMulticastInner {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.precedes(sn)?;
if !precedes {
tracing::debug!(
Expand All @@ -237,19 +247,14 @@ impl TransportMulticastInner {
sn,
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

// Set will always return OK because we have already checked
// with precedes() that the sn has the right resolution
let _ = guard.sn.set(sn);

Ok(())
Ok(true)
}

pub(super) fn read_messages(
Expand Down
59 changes: 31 additions & 28 deletions io/zenoh-transport/src/unicast/universal/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
// Drop invalid message and continue
return Ok(());
}
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
for msg in payload.drain(..) {
Expand All @@ -111,6 +113,7 @@ impl TransportUnicastUniversal {
payload
);
}

Ok(())
}

Expand Down Expand Up @@ -140,28 +143,33 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if !self.verify_sn(sn, &mut guard)? {
oteffahi marked this conversation as resolved.
Show resolved Hide resolved
// Drop invalid message and continue
return Ok(());
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if let Err(e) = guard.defrag.push(sn, payload) {
// Defrag errors don't close transport
tracing::trace!("{}", e);
return Ok(());
}
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard
.defrag
.defragment()
.ok_or_else(|| zerror!("Transport: {}. Defragmentation error.", self.config.zid))?;

let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
if let Some(msg) = guard.defrag.defragment() {
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
}
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid);
}
}

Expand All @@ -172,24 +180,19 @@ impl TransportUnicastUniversal {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.roll(sn)?;
if !precedes {
tracing::debug!(
tracing::trace!(
"Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.",
self.config.zid,
sn,
guard.sn.get()
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

Ok(())
Ok(true)
}

pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> {
Expand Down
Loading
Loading