From c650a84b5893f61b4b3fb7b12aaaf95d420114ca Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 23 Nov 2023 18:38:22 +0100 Subject: [PATCH] Fix unicast compression tests --- io/zenoh-transport/src/multicast/link.rs | 3 +- .../src/unicast/establishment/accept.rs | 5 +- .../src/unicast/establishment/open.rs | 6 +- io/zenoh-transport/src/unicast/link.rs | 177 ++++++++++++------ .../src/unicast/universal/link.rs | 16 +- .../tests/multicast_compression.rs | 4 +- .../tests/multicast_transport.rs | 4 +- .../tests/unicast_compression.rs | 12 +- 8 files changed, 149 insertions(+), 78 deletions(-) diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 339be56220..d173bcb7e4 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -65,7 +65,8 @@ pub(crate) struct TransportLinkMulticast { } impl TransportLinkMulticast { - pub fn new(link: LinkMulticast, config: TransportLinkMulticastConfig) -> Self { + pub fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self { + config.mtu = link.get_mtu().min(config.mtu); Self { link, config, diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index d3378aa364..338f4dd521 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -582,8 +582,9 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { + let mtu = link.get_mtu(); let config = TransportLinkUnicastConfig { - mtu: link.get_mtu(), + mtu, direction: TransportLinkUnicastDirection::Inbound, #[cfg(feature = "transport_compression")] is_compression: false, @@ -622,7 +623,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) let iack_out = { let mut state = State { transport: StateTransport { - batch_size: manager.config.batch_size, + batch_size: manager.config.batch_size.min(batch_size::UNICAST).min(mtu), resolution: manager.config.resolution, ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), #[cfg(feature = "transport_multilink")] diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index b007ce0ea2..5f539e510b 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -531,7 +531,11 @@ pub(crate) async fn open_link( let mut state = State { transport: StateTransport { - batch_size: manager.config.batch_size.min(batch_size::UNICAST), + batch_size: manager + .config + .batch_size + .min(batch_size::UNICAST) + .min(link.config.mtu), resolution: manager.config.resolution, ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), #[cfg(feature = "transport_multilink")] diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 3f084c9708..2398e11c45 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -40,20 +40,12 @@ pub(crate) struct TransportLinkUnicastConfig { pub(crate) struct TransportLinkUnicast { pub(crate) link: LinkUnicast, pub(crate) config: TransportLinkUnicastConfig, - #[cfg(feature = "transport_compression")] - pub(crate) buffer: Option, } impl TransportLinkUnicast { - pub fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self { - Self { - link, - config, - #[cfg(feature = "transport_compression")] - buffer: config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(config.mtu as usize), - )), - } + pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self { + config.mtu = link.get_mtu().min(config.mtu); + Self { link, config } } const fn batch_config(&self) -> BatchConfig { @@ -64,7 +56,82 @@ impl TransportLinkUnicast { } } - pub async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> { + pub(crate) fn tx(&self) -> TransportLinkUnicastTx { + TransportLinkUnicastTx { + inner: self.clone(), + #[cfg(feature = "transport_compression")] + buffer: self.config.is_compression.then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size(self.config.mtu as usize), + )), + } + } + + pub(crate) fn rx(&self) -> TransportLinkUnicastRx { + TransportLinkUnicastRx { + inner: self.clone(), + } + } + + pub(crate) async fn send(&self, msg: &TransportMessage) -> ZResult { + let mut link = self.tx(); + link.send(msg).await + } + + pub(crate) async fn recv(&self) -> ZResult { + let mut link = self.rx(); + link.recv().await + } + + pub(crate) async fn close(&self, reason: Option) -> ZResult<()> { + if let Some(reason) = reason { + // Build the close message + let message: TransportMessage = Close { + reason, + session: false, + } + .into(); + // Send the close message on the link + let _ = self.send(&message).await; + } + self.link.close().await + } +} + +impl fmt::Display for TransportLinkUnicast { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.link) + } +} + +impl fmt::Debug for TransportLinkUnicast { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TransportLinkUnicast") + .field("link", &self.link) + .field("config", &self.config) + .finish() + } +} + +impl From<&TransportLinkUnicast> for Link { + fn from(link: &TransportLinkUnicast) -> Self { + Link::from(&link.link) + } +} + +impl From for Link { + fn from(link: TransportLinkUnicast) -> Self { + Link::from(link.link) + } +} + +pub(crate) struct TransportLinkUnicastTx { + pub(crate) inner: TransportLinkUnicast, + #[cfg(feature = "transport_compression")] + pub(crate) buffer: Option, +} + +impl TransportLinkUnicastTx { + pub(crate) async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> { const ERR: &str = "Write error on link: "; // log::trace!("WBatch: {:?}", batch); @@ -89,30 +156,55 @@ impl TransportLinkUnicast { // log::trace!("WBytes: {:02x?}", bytes); // Send the message on the link - if self.link.is_streamed() { + if self.inner.link.is_streamed() { let len: BatchSize = bytes .len() .try_into() .map_err(|_| zerror!("Invalid batch length"))?; let len = len.to_le_bytes(); - self.link.write_all(&len).await?; + self.inner.link.write_all(&len).await?; } - self.link.write_all(bytes).await?; + self.inner.link.write_all(bytes).await?; Ok(()) } - pub async fn send(&mut self, msg: &TransportMessage) -> ZResult { + pub(crate) async fn send(&mut self, msg: &TransportMessage) -> ZResult { const ERR: &str = "Write error on link: "; // Create the batch for serializing the message - let mut batch = WBatch::new(self.batch_config()); + let mut batch = WBatch::new(self.inner.batch_config()); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; self.send_batch(&mut batch).await?; Ok(len) } +} + +impl fmt::Display for TransportLinkUnicastTx { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.inner) + } +} +impl fmt::Debug for TransportLinkUnicastTx { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut s = f.debug_struct("TransportLinkUnicastRx"); + s.field("link", &self.inner.link) + .field("config", &self.inner.config); + #[cfg(feature = "transport_compression")] + { + s.field("buffer", &self.buffer.as_ref().map(|b| b.capacity())); + } + s.finish() + } +} + +pub(crate) struct TransportLinkUnicastRx { + pub(crate) inner: TransportLinkUnicast, +} + +impl TransportLinkUnicastRx { pub async fn recv_batch(&mut self, buff: C) -> ZResult where C: Fn() -> T + Copy, @@ -121,10 +213,10 @@ impl TransportLinkUnicast { const ERR: &str = "Read error from link: "; let mut into = (buff)(); - let end = if self.link.is_streamed() { + let end = if self.inner.link.is_streamed() { // Read and decode the message length let mut len = BatchSize::MIN.to_le_bytes(); - self.link.read_exact(&mut len).await?; + self.inner.link.read_exact(&mut len).await?; let len = BatchSize::from_le_bytes(len) as usize; // Read the bytes @@ -132,18 +224,18 @@ impl TransportLinkUnicast { .as_mut_slice() .get_mut(..len) .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; - self.link.read_exact(slice).await?; + self.inner.link.read_exact(slice).await?; len } else { // Read the bytes - self.link.read(into.as_mut_slice()).await? + self.inner.link.read(into.as_mut_slice()).await? }; // log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]); let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.batch_config(), buffer); + let mut batch = RBatch::new(self.inner.batch_config(), buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -154,7 +246,7 @@ impl TransportLinkUnicast { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.link.get_mtu() as usize; + let mtu = self.inner.config.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; @@ -163,46 +255,19 @@ impl TransportLinkUnicast { .map_err(|_| zerror!("Decode error on link: {}", self))?; Ok(msg) } - - pub async fn close(&mut self, reason: Option) -> ZResult<()> { - if let Some(reason) = reason { - // Build the close message - let message: TransportMessage = Close { - reason, - session: false, - } - .into(); - // Send the close message on the link - let _ = self.send(&message).await; - } - self.link.close().await - } } -impl fmt::Display for TransportLinkUnicast { +impl fmt::Display for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.link) + write!(f, "{}", self.inner) } } -impl fmt::Debug for TransportLinkUnicast { +impl fmt::Debug for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TransportLinkUnicast") - .field("link", &self.link) - .field("config", &self.config) - .field("buffer", &self.buffer.as_ref().map(|b| b.capacity())) + f.debug_struct("TransportLinkUnicastRx") + .field("link", &self.inner.link) + .field("config", &self.inner.config) .finish() } } - -impl From<&TransportLinkUnicast> for Link { - fn from(link: &TransportLinkUnicast) -> Self { - Link::from(&link.link) - } -} - -impl From for Link { - fn from(link: TransportLinkUnicast) -> Self { - Link::from(link.link) - } -} diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 7a503557f9..c573dcf991 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -23,7 +23,7 @@ use crate::{ }, priority::TransportPriorityTx, }, - unicast::link::TransportLinkUnicast, + unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx}, TransportExecutor, }; use async_std::prelude::FutureExt; @@ -89,7 +89,7 @@ impl TransportLinkUnicastUniversal { let handle = executor.spawn(async move { let res = tx_task( consumer, - c_link.clone(), + c_link.tx(), keep_alive, #[cfg(feature = "stats")] c_transport.stats.clone(), @@ -123,7 +123,7 @@ impl TransportLinkUnicastUniversal { let handle = task::spawn(async move { // Start the consume task let res = rx_task( - c_link.clone(), + c_link.rx(), c_transport.clone(), lease, c_signal.clone(), @@ -171,7 +171,7 @@ impl TransportLinkUnicastUniversal { /*************************************/ async fn tx_task( mut pipeline: TransmissionPipelineConsumer, - mut link: TransportLinkUnicast, + mut link: TransportLinkUnicastTx, keep_alive: Duration, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { @@ -225,7 +225,7 @@ async fn tx_task( } async fn rx_task( - mut link: TransportLinkUnicast, + mut link: TransportLinkUnicastRx, transport: TransportUnicastUniversal, lease: Duration, signal: Signal, @@ -237,7 +237,7 @@ async fn rx_task( } async fn read( - link: &mut TransportLinkUnicast, + link: &mut TransportLinkUnicastRx, pool: &RecyclingObjectPool, ) -> ZResult where @@ -257,7 +257,7 @@ async fn rx_task( } // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.inner.config.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -277,7 +277,7 @@ async fn rx_task( { transport.stats.inc_rx_bytes(2 + n); // Account for the batch len encoding (16 bits) } - transport.read_messages(batch, &link)?; + transport.read_messages(batch, &link.inner)?; } Action::Stop => break, } diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index c498bff27f..fafb28e642 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -14,7 +14,7 @@ // Restricting to macos by default because of no IPv6 support // on GitHub CI actions on Linux and Windows. -#[cfg(target_family = "unix")] +#[cfg(all(target_family = "unix", feature = "transport_compression"))] mod tests { use async_std::{prelude::FutureExt, task}; use std::{ @@ -346,7 +346,7 @@ mod tests { // Define the locator let endpoints: Vec = vec![ format!( - "udp/224.{}.{}.{}:7447", + "udp/224.{}.{}.{}:21000", rand::random::(), rand::random::(), rand::random::() diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index c686db4866..0822d08f58 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -330,7 +330,7 @@ mod tests { } } - #[cfg(feature = "transport_udp")] + #[cfg(all(feature = "transport_compression", feature = "transport_udp"))] #[test] fn transport_multicast_udp_only() { env_logger::init(); @@ -342,7 +342,7 @@ mod tests { // Define the locator let endpoints: Vec = vec![ format!( - "udp/224.{}.{}.{}:7447", + "udp/224.{}.{}.{}:20000", rand::random::(), rand::random::(), rand::random::() diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index 848a5bdd2f..be979fef23 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -435,8 +435,8 @@ mod tests { // Define the locators let endpoints: Vec = vec![ - format!("tcp/127.0.0.1:{}", 16000).parse().unwrap(), - format!("tcp/[::1]:{}", 16001).parse().unwrap(), + format!("tcp/127.0.0.1:{}", 19000).parse().unwrap(), + format!("tcp/[::1]:{}", 19001).parse().unwrap(), ]; // Define the reliability and congestion control let channel = [ @@ -467,7 +467,7 @@ mod tests { }); // Define the locators - let endpoints: Vec = vec![format!("tcp/127.0.0.1:{}", 16100).parse().unwrap()]; + let endpoints: Vec = vec![format!("tcp/127.0.0.1:{}", 19100).parse().unwrap()]; // Define the reliability and congestion control let channel = [ Channel { @@ -498,8 +498,8 @@ mod tests { // Define the locator let endpoints: Vec = vec![ - format!("udp/127.0.0.1:{}", 16010).parse().unwrap(), - format!("udp/[::1]:{}", 16011).parse().unwrap(), + format!("udp/127.0.0.1:{}", 19010).parse().unwrap(), + format!("udp/[::1]:{}", 19011).parse().unwrap(), ]; // Define the reliability and congestion control let channel = [ @@ -530,7 +530,7 @@ mod tests { }); // Define the locator - let endpoints: Vec = vec![format!("udp/127.0.0.1:{}", 16110).parse().unwrap()]; + let endpoints: Vec = vec![format!("udp/127.0.0.1:{}", 19110).parse().unwrap()]; // Define the reliability and congestion control let channel = [ Channel {