diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index ca607be493..8562d5b3eb 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,11 +166,14 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - for msg in payload.drain(..) { - self.trigger_callback(msg, peer)?; - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + for msg in payload.drain(..) { + self.trigger_callback(msg, peer)?; } + Ok(()) } @@ -202,24 +205,29 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - return self.trigger_callback(msg, peer); - } else { - tracing::trace!( - "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", - self.manager.config.zid, - peer.zid, - priority - ); - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + // Defrag errors don't close transport + tracing::trace!("{}", e); + return Ok(()); + } + if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + return self.trigger_callback(msg, peer); + } else { + tracing::trace!( + "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", + self.manager.config.zid, + peer.zid, + priority + ); } } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index f826c054d4..e69a305876 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,20 +97,23 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - for msg in payload.drain(..) { - self.trigger_callback(callback.as_ref(), msg)?; - } - } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - payload - ); + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + for msg in payload.drain(..) { + self.trigger_callback(callback.as_ref(), msg)?; } + } else { + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + payload + ); } + Ok(()) } @@ -140,28 +143,33 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - return self.trigger_callback(callback.as_ref(), msg); - } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - msg - ); - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + // Defrag errors don't close transport + tracing::trace!("{}", e); + return Ok(()); + } + if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + return self.trigger_callback(callback.as_ref(), msg); } else { - tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + msg + ); } + } else { + tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); } }