Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow transport defragmentation errors and fix dropping of old transport messages #1344

Merged
merged 6 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/common/defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ impl DefragBuffer {
pub(crate) fn push(&mut self, sn: TransportSn, zslice: ZSlice) -> ZResult<()> {
if sn != self.sn.get() {
self.clear();
bail!("Expected SN {}, received {}", self.sn.get(), sn)
bail!(
"Defragmentation SN error: expected SN {}, received {}",
self.sn.get(),
sn
)
}

let new_len = self.len + zslice.len();
Expand Down
55 changes: 26 additions & 29 deletions io/zenoh-transport/src/multicast/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

for msg in payload.drain(..) {
self.trigger_callback(msg, peer)?;
if self.verify_sn(sn, &mut guard)? {
for msg in payload.drain(..) {
self.trigger_callback(msg, peer)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -202,23 +202,25 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard.defrag.defragment().ok_or_else(|| {
zerror!(
"Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.",
self.manager.config.zid,
peer.zid,
priority
)
})?;
return self.trigger_callback(msg, peer);
if self.verify_sn(sn, &mut guard)? {
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
if let Err(e) = guard.defrag.push(sn, payload) {
tracing::trace!("{}", e);
} else if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
if let Some(msg) = guard.defrag.defragment() {
return self.trigger_callback(msg, peer);
} else {
tracing::trace!(
"Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.",
self.manager.config.zid,
peer.zid,
priority
);
}
}
}

Ok(())
Expand All @@ -228,7 +230,7 @@ impl TransportMulticastInner {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.precedes(sn)?;
if !precedes {
tracing::debug!(
Expand All @@ -237,19 +239,14 @@ impl TransportMulticastInner {
sn,
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

// Set will always return OK because we have already checked
// with precedes() that the sn has the right resolution
let _ = guard.sn.set(sn);

Ok(())
Ok(true)
}

pub(super) fn read_messages(
Expand Down
83 changes: 39 additions & 44 deletions io/zenoh-transport/src/unicast/universal/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
for msg in payload.drain(..) {
self.trigger_callback(callback.as_ref(), msg)?;
if self.verify_sn(sn, &mut guard)? {
oteffahi marked this conversation as resolved.
Show resolved Hide resolved
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
for msg in payload.drain(..) {
self.trigger_callback(callback.as_ref(), msg)?;
}
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
payload
);
}
} else {
oteffahi marked this conversation as resolved.
Show resolved Hide resolved
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
payload
);
}
Ok(())
}
Expand Down Expand Up @@ -140,28 +140,28 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

self.verify_sn(sn, &mut guard)?;

if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
guard.defrag.push(sn, payload)?;
if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
let msg = guard
.defrag
.defragment()
.ok_or_else(|| zerror!("Transport: {}. Defragmentation error.", self.config.zid))?;

let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
if self.verify_sn(sn, &mut guard)? {
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
if let Err(e) = guard.defrag.push(sn, payload) {
tracing::trace!("{}", e);
} else if !more {
// When shared-memory feature is disabled, msg does not need to be mutable
if let Some(msg) = guard.defrag.defragment() {
let callback = zread!(self.callback).clone();
if let Some(callback) = callback.as_ref() {
return self.trigger_callback(callback.as_ref(), msg);
} else {
tracing::debug!(
"Transport: {}. No callback available, dropping messages: {:?}",
self.config.zid,
msg
);
}
} else {
tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid);
}
}
}

Expand All @@ -172,24 +172,19 @@ impl TransportUnicastUniversal {
&self,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<()> {
) -> ZResult<bool> {
let precedes = guard.sn.roll(sn)?;
if !precedes {
tracing::debug!(
tracing::trace!(
"Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.",
self.config.zid,
sn,
guard.sn.get()
guard.sn.next()
);
// Drop the fragments if needed
if !guard.defrag.is_empty() {
guard.defrag.clear();
}
// Keep reading
return Ok(());
return Ok(false);
}

Ok(())
Ok(true)
}

pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> {
Expand Down
15 changes: 2 additions & 13 deletions io/zenoh-transport/tests/unicast_defragmentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,8 @@ async fn run(endpoint: &EndPoint, channel: Channel, msg_size: usize) {
);
client_transport.schedule(message.clone()).unwrap();

// Wait that the client transport has been closed
ztimeout!(async {
while client_transport.get_zid().is_ok() {
tokio::time::sleep(SLEEP).await;
}
});

// Wait on the router manager that the transport has been closed
ztimeout!(async {
while !router_manager.get_transports_unicast().await.is_empty() {
tokio::time::sleep(SLEEP).await;
}
});
// wait a little bit for the message to be sent
tokio::time::sleep(SLEEP).await;
oteffahi marked this conversation as resolved.
Show resolved Hide resolved

// Stop the locators on the manager
println!("Del locator: {endpoint}");
Expand Down
Loading