diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 5305d0a50c..d3cd38684f 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::num::{NonZeroU8, NonZeroUsize}; +use std::num::NonZeroUsize; use zenoh_buffers::{ buffer::Buffer, reader::{DidntRead, HasReader}, @@ -26,62 +26,125 @@ use zenoh_protocol::{ network::NetworkMessage, transport::{fragment::FragmentHeader, frame::FrameHeader, BatchSize, TransportMessage}, }; -use zenoh_result::ZResult; +use zenoh_result::{zerror, ZResult}; #[cfg(feature = "transport_compression")] -use {std::sync::Arc, zenoh_protocol::common::imsg, zenoh_result::zerror}; +use {std::sync::Arc, zenoh_protocol::common::imsg}; + +const L_LEN: usize = (BatchSize::BITS / 8) as usize; +const H_LEN: usize = BatchHeader::SIZE; // Split the inner buffer into (length, header, payload) inmutable slices -#[cfg(feature = "transport_compression")] macro_rules! zsplit { - ($slice:expr, $header:expr) => {{ - match $header.get() { - Some(_) => $slice.split_at(BatchHeader::INDEX + 1), - None => (&[], $slice), + ($slice:expr, $config:expr) => {{ + match ($config.is_streamed, $config.has_header()) { + (true, true) => { + let (l, s) = $slice.split_at(L_LEN); + let (h, p) = s.split_at(H_LEN); + (l, h, p) + } + (true, false) => { + let (l, p) = $slice.split_at(L_LEN); + (l, &[], p) + } + (false, true) => { + let (h, p) = $slice.split_at(H_LEN); + (&[], h, p) + } + (false, false) => (&[], &[], $slice), + } + }}; +} + +macro_rules! zsplit_mut { + ($slice:expr, $config:expr) => {{ + match ($config.is_streamed, $config.has_header()) { + (true, true) => { + let (l, s) = $slice.split_at_mut(L_LEN); + let (h, p) = s.split_at_mut(H_LEN); + (l, h, p) + } + (true, false) => { + let (l, p) = $slice.split_at_mut(L_LEN); + (l, &mut [], p) + } + (false, true) => { + let (h, p) = $slice.split_at_mut(H_LEN); + (&mut [], h, p) + } + (false, false) => (&mut [], &mut [], $slice), } }}; } // Batch config -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct BatchConfig { pub mtu: BatchSize, + pub is_streamed: bool, #[cfg(feature = "transport_compression")] pub is_compression: bool, } +impl Default for BatchConfig { + fn default() -> Self { + BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + } + } +} + impl BatchConfig { - fn header(&self) -> BatchHeader { - #[allow(unused_mut)] // No need for mut when "transport_compression" is disabled - let mut h = 0; + const fn has_header(&self) -> bool { + #[cfg(not(feature = "transport_compression"))] + { + false + } + #[cfg(feature = "transport_compression")] + { + self.is_compression + } + } + + fn header(&self) -> Option { + #[cfg(not(feature = "transport_compression"))] + { + None + } #[cfg(feature = "transport_compression")] - if self.is_compression { - h |= BatchHeader::COMPRESSION; + { + self.is_compression + .then_some(BatchHeader::new(BatchHeader::COMPRESSION)) + } + } + + pub fn max_buffer_size(&self) -> usize { + let mut len = self.mtu as usize; + if self.is_streamed { + len += BatchSize::BITS as usize / 8; } - BatchHeader::new(h) + len } } // Batch header #[repr(transparent)] #[derive(Copy, Clone, Debug)] -pub struct BatchHeader(Option); +pub struct BatchHeader(u8); impl BatchHeader { + const SIZE: usize = 1; #[cfg(feature = "transport_compression")] - const INDEX: usize = 0; - #[cfg(feature = "transport_compression")] - const COMPRESSION: u8 = 1; - - fn new(h: u8) -> Self { - Self(NonZeroU8::new(h)) - } + const COMPRESSION: u8 = 1; // 1 << 0 #[cfg(feature = "transport_compression")] - const fn is_empty(&self) -> bool { - self.0.is_none() + const fn new(h: u8) -> Self { + Self(h) } - const fn get(&self) -> Option { + const fn as_u8(&self) -> u8 { self.0 } @@ -90,8 +153,7 @@ impl BatchHeader { #[cfg(feature = "transport_compression")] #[inline(always)] pub fn is_compression(&self) -> bool { - self.0 - .is_some_and(|h| imsg::has_flag(h.get(), Self::COMPRESSION)) + imsg::has_flag(self.as_u8(), Self::COMPRESSION) } } @@ -113,7 +175,6 @@ impl WBatchStats { #[derive(Debug)] pub enum Finalize { Batch, - #[cfg(feature = "transport_compression")] Buffer, } @@ -143,7 +204,7 @@ pub struct WBatch { // The batch codec pub codec: Zenoh080Batch, // It contains 1 byte as additional header, e.g. to signal the batch is compressed - pub header: BatchHeader, + pub config: BatchConfig, // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, @@ -152,9 +213,9 @@ pub struct WBatch { impl WBatch { pub fn new(config: BatchConfig) -> Self { let mut batch = Self { - buffer: BBuf::with_capacity(config.mtu as usize), + buffer: BBuf::with_capacity(config.max_buffer_size()), codec: Zenoh080Batch::new(), - header: config.header(), + config, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -174,7 +235,8 @@ impl WBatch { /// Get the total number of bytes that have been serialized on the [`WBatch`][WBatch]. #[inline(always)] pub fn len(&self) -> BatchSize { - self.buffer.len() as BatchSize + let (_l, _h, p) = Self::split(self.buffer.as_slice(), &self.config); + p.len() as BatchSize } /// Clear the [`WBatch`][WBatch] memory buffer and related internal state. @@ -186,10 +248,7 @@ impl WBatch { { self.stats.clear(); } - if let Some(h) = self.header.get() { - let mut writer = self.buffer.writer(); - let _ = writer.write_u8(h.get()); - } + Self::init(&mut self.buffer, &self.config); } /// Get a `&[u8]` to access the internal memory buffer, usually for transmitting it on the network. @@ -198,37 +257,70 @@ impl WBatch { self.buffer.as_slice() } + fn init(buffer: &mut BBuf, config: &BatchConfig) { + let mut writer = buffer.writer(); + if config.is_streamed { + let _ = writer.write_exact(&BatchSize::MIN.to_be_bytes()); + } + if let Some(h) = config.header() { + let _ = writer.write_u8(h.as_u8()); + } + } + // Split (length, header, payload) internal buffer slice #[inline(always)] - #[cfg(feature = "transport_compression")] - fn split(&self) -> (&[u8], &[u8]) { - zsplit!(self.buffer.as_slice(), self.header) + fn split<'a>(buffer: &'a [u8], config: &BatchConfig) -> (&'a [u8], &'a [u8], &'a [u8]) { + zsplit!(buffer, config) + } + + // Split (length, header, payload) internal buffer slice + #[inline(always)] + fn split_mut<'a>( + buffer: &'a mut [u8], + config: &BatchConfig, + ) -> (&'a mut [u8], &'a mut [u8], &'a mut [u8]) { + zsplit_mut!(buffer, config) } - pub fn finalize( - &mut self, - #[cfg(feature = "transport_compression")] buffer: Option<&mut BBuf>, - ) -> ZResult { + pub fn finalize(&mut self, mut buffer: Option<&mut BBuf>) -> ZResult { + #[allow(unused_mut)] + let mut res = Finalize::Batch; + #[cfg(feature = "transport_compression")] - if self.header.is_compression() { - let buffer = buffer.ok_or_else(|| zerror!("Support buffer not provided"))?; - buffer.clear(); - return self.compress(buffer); + if let Some(h) = self.config.header() { + if h.is_compression() { + let buffer = buffer + .as_mut() + .ok_or_else(|| zerror!("Support buffer not provided"))?; + res = self.compress(buffer)?; + } + } + + if self.config.is_streamed { + let buff = match res { + Finalize::Batch => self.buffer.as_mut_slice(), + Finalize::Buffer => buffer + .as_mut() + .ok_or_else(|| zerror!("Support buffer not provided"))? + .as_mut_slice(), + }; + let (length, header, payload) = Self::split_mut(buff, &self.config); + let len: BatchSize = (header.len() as BatchSize) + (payload.len() as BatchSize); + length.copy_from_slice(&len.to_le_bytes()); } - Ok(Finalize::Batch) + Ok(res) } #[cfg(feature = "transport_compression")] fn compress(&mut self, support: &mut BBuf) -> ZResult { // Write the initial bytes for the batch - let mut writer = support.writer(); - if let Some(h) = self.header.get() { - let _ = writer.write_u8(h.get()); - } + support.clear(); + Self::init(support, &self.config); // Compress the actual content - let (_header, payload) = self.split(); + let (_length, _header, payload) = Self::split(self.buffer.as_slice(), &self.config); + let mut writer = support.writer(); writer .with_slot(writer.remaining(), |b| { lz4_flex::block::compress_into(payload, b).unwrap_or(0) @@ -240,11 +332,8 @@ impl WBatch { Ok(Finalize::Buffer) } else { // Keep the original uncompressed buffer and unset the compression flag from the header - let h = self - .buffer - .as_mut_slice() - .get_mut(BatchHeader::INDEX) - .ok_or_else(|| zerror!("Header not present"))?; + let (_l, h, _p) = Self::split_mut(self.buffer.as_mut_slice(), &self.config); + let h = h.first_mut().ok_or_else(|| zerror!("Empty BatchHeader"))?; *h &= !BatchHeader::COMPRESSION; Ok(Finalize::Batch) } @@ -300,21 +389,19 @@ pub struct RBatch { buffer: ZSlice, // The batch codec codec: Zenoh080Batch, - // It contains 1 byte as additional header, e.g. to signal the batch is compressed - #[cfg(feature = "transport_compression")] - header: BatchHeader, + // The batch config + config: BatchConfig, } impl RBatch { - pub fn new(#[allow(unused_variables)] config: BatchConfig, buffer: T) -> Self + pub fn new(config: BatchConfig, buffer: T) -> Self where T: Into, { Self { buffer: buffer.into(), codec: Zenoh080Batch::new(), - #[cfg(feature = "transport_compression")] - header: config.header(), + config, } } @@ -329,9 +416,8 @@ impl RBatch { // Split (length, header, payload) internal buffer slice #[inline(always)] - #[cfg(feature = "transport_compression")] - fn split(&self) -> (&[u8], &[u8]) { - zsplit!(self.buffer.as_slice(), self.header) + fn split<'a>(buffer: &'a [u8], config: &BatchConfig) -> (&'a [u8], &'a [u8], &'a [u8]) { + zsplit!(buffer, config) } pub fn initialize(&mut self, #[allow(unused_variables)] buff: C) -> ZResult<()> @@ -339,41 +425,44 @@ impl RBatch { C: Fn() -> T + Copy, T: ZSliceBuffer + 'static, { + #[allow(unused_variables)] + let (l, h, p) = Self::split(self.buffer.as_slice(), &self.config); + #[cfg(feature = "transport_compression")] - if !self.header.is_empty() { - let h = *self - .buffer - .get(BatchHeader::INDEX) - .ok_or_else(|| zerror!("Batch header not present"))?; - let header = BatchHeader::new(h); - - if header.is_compression() { - self.decompress(buff)?; - } else { - self.buffer = self - .buffer - .subslice(BatchHeader::INDEX + 1, self.buffer.len()) - .ok_or_else(|| zerror!("Invalid batch length"))?; + { + if self.config.has_header() { + let b = *h + .first() + .ok_or_else(|| zerror!("Batch header not present"))?; + let header = BatchHeader::new(b); + + if header.is_compression() { + let zslice = self.decompress(p, buff)?; + self.buffer = zslice; + return Ok(()); + } } } + self.buffer = self + .buffer + .subslice(l.len() + h.len(), self.buffer.len()) + .ok_or_else(|| zerror!("Invalid batch length"))?; + Ok(()) } #[cfg(feature = "transport_compression")] - fn decompress(&mut self, mut buff: impl FnMut() -> T) -> ZResult<()> + fn decompress(&self, payload: &[u8], mut buff: impl FnMut() -> T) -> ZResult where T: ZSliceBuffer + 'static, { - let (_h, p) = self.split(); - let mut into = (buff)(); - let n = lz4_flex::block::decompress_into(p, into.as_mut_slice()) + let n = lz4_flex::block::decompress_into(payload, into.as_mut_slice()) .map_err(|_| zerror!("Decompression error"))?; - self.buffer = ZSlice::make(Arc::new(into), 0, n) + let zslice = ZSlice::make(Arc::new(into), 0, n) .map_err(|_| zerror!("Invalid decompression buffer length"))?; - - Ok(()) + Ok(zslice) } } @@ -422,6 +511,7 @@ mod tests { for msg_in in msg_ins { let config = BatchConfig { mtu: BatchSize::MAX, + is_streamed: rng.gen_bool(0.5), #[cfg(feature = "transport_compression")] is_compression: rng.gen_bool(0.5), }; @@ -465,6 +555,7 @@ mod tests { fn serialization_batch() { let config = BatchConfig { mtu: BatchSize::MAX, + is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, }; diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 19e7a47289..256dfbef47 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -482,10 +482,7 @@ impl StageOut { #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct TransmissionPipelineConf { - pub(crate) is_streamed: bool, - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, - pub(crate) batch_size: BatchSize, + pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) backoff: Duration, } @@ -493,10 +490,12 @@ pub(crate) struct TransmissionPipelineConf { impl Default for TransmissionPipelineConf { fn default() -> Self { Self { - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: false, - batch_size: BatchSize::MAX, + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: false, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), } @@ -533,12 +532,7 @@ impl TransmissionPipeline { let (mut s_ref_w, s_ref_r) = RingBuffer::::init(); // Fill the refill ring buffer with batches for _ in 0..*num { - let bc = BatchConfig { - mtu: config.batch_size, - #[cfg(feature = "transport_compression")] - is_compression: config.is_compression, - }; - let batch = WBatch::new(bc); + let batch = WBatch::new(config.batch); assert!(s_ref_w.push(batch).is_none()); } // Create the channel for notifying that new batches are in the refill ring buffer @@ -736,10 +730,12 @@ mod tests { const TIMEOUT: Duration = Duration::from_secs(60); const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { - is_streamed: true, - #[cfg(feature = "transport_compression")] - is_compression: true, - batch_size: BatchSize::MAX, + batch: BatchConfig { + mtu: BatchSize::MAX, + is_streamed: true, + #[cfg(feature = "transport_compression")] + is_compression: true, + }, queue_size: [1; Priority::NUM], backoff: Duration::from_micros(1), }; @@ -874,7 +870,7 @@ mod tests { // Make sure to put only one message per batch: set the payload size // to half of the batch in such a way the serialized zenoh message // will be larger then half of the batch size (header + payload). - let payload_size = (CONFIG.batch_size / 2) as usize; + let payload_size = (CONFIG.batch.mtu / 2) as usize; // Send reliable messages let key = "test".into(); diff --git a/io/zenoh-transport/src/multicast/establishment.rs b/io/zenoh-transport/src/multicast/establishment.rs index e31ab05d30..cec09ebdf2 100644 --- a/io/zenoh-transport/src/multicast/establishment.rs +++ b/io/zenoh-transport/src/multicast/establishment.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use crate::{ - common::seq_num, + common::{batch::BatchConfig, seq_num}, multicast::{ link::{TransportLinkMulticast, TransportLinkMulticastConfig}, transport::TransportMulticastInner, @@ -62,9 +62,12 @@ pub(crate) async fn open_link( // Create the transport let locator = link.get_dst().to_owned(); let config = TransportLinkMulticastConfig { - mtu: link.get_mtu(), - #[cfg(feature = "transport_compression")] - is_compression: manager.config.multicast.is_compression, + batch: BatchConfig { + mtu: link.get_mtu(), + #[cfg(feature = "transport_compression")] + is_compression: manager.config.multicast.is_compression, + ..Default::default() + }, }; let link = TransportLinkMulticast::new(link, config); diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index fbb917c281..8e1d17fefe 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -34,10 +34,8 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -#[cfg(feature = "transport_compression")] -use zenoh_buffers::BBuf; -use zenoh_buffers::{ZSlice, ZSliceBuffer}; -use zenoh_core::zlock; +use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{Link, LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohId}, @@ -51,11 +49,7 @@ use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; /****************************/ #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(crate) struct TransportLinkMulticastConfig { - // MTU - pub(crate) mtu: BatchSize, - // Compression is active on the link - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, + pub(crate) batch: BatchConfig, } #[derive(Clone, PartialEq, Eq)] @@ -66,25 +60,26 @@ pub(crate) struct TransportLinkMulticast { impl TransportLinkMulticast { pub(crate) fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self { - config.mtu = link.get_mtu().min(config.mtu); + config.batch.mtu = link.get_mtu().min(config.batch.mtu); + config.batch.is_streamed = false; Self { link, config } } - const fn batch_config(&self) -> BatchConfig { - BatchConfig { - mtu: self.config.mtu, - #[cfg(feature = "transport_compression")] - is_compression: self.config.is_compression, - } - } - pub(crate) fn tx(&self) -> TransportLinkMulticastTx { TransportLinkMulticastTx { inner: self.clone(), - #[cfg(feature = "transport_compression")] - buffer: self.config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(self.config.mtu as usize), - )), + buffer: zcondfeat!( + "transport_compression", + self.config + .batch + .is_compression + .then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size( + self.config.batch.max_buffer_size() + ), + )), + None + ), } } @@ -148,7 +143,6 @@ impl From for Link { pub(crate) struct TransportLinkMulticastTx { pub(crate) inner: TransportLinkMulticast, - #[cfg(feature = "transport_compression")] pub(crate) buffer: Option, } @@ -157,15 +151,11 @@ impl TransportLinkMulticastTx { const ERR: &str = "Write error on link: "; let res = batch - .finalize( - #[cfg(feature = "transport_compression")] - self.buffer.as_mut(), - ) + .finalize(self.buffer.as_mut()) .map_err(|_| zerror!("{ERR}{self}"))?; let bytes = match res { Finalize::Batch => batch.as_slice(), - #[cfg(feature = "transport_compression")] Finalize::Buffer => self .buffer .as_ref() @@ -183,7 +173,7 @@ impl TransportLinkMulticastTx { const ERR: &str = "Write error on link: "; // Create the batch for serializing the message - let mut batch = WBatch::new(self.inner.batch_config()); + let mut batch = WBatch::new(self.inner.config.batch); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; self.send_batch(&mut batch).await?; @@ -225,7 +215,7 @@ impl TransportLinkMulticastRx { let mut into = (buff)(); let (n, locator) = self.inner.link.read(into.as_mut_slice()).await?; let buffer = ZSlice::make(Arc::new(into), 0, n).map_err(|_| zerror!("Error"))?; - let mut batch = RBatch::new(self.inner.batch_config(), buffer); + let mut batch = RBatch::new(self.inner.config.batch, buffer); batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?; Ok((batch, locator.into_owned())) } @@ -330,10 +320,7 @@ impl TransportLinkMulticastUniversal { if self.handle_tx.is_none() { let tpc = TransmissionPipelineConf { - is_streamed: false, - #[cfg(feature = "transport_compression")] - is_compression: self.link.config.is_compression, - batch_size: config.batch_size, + batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, }; @@ -582,7 +569,7 @@ async fn rx_task( } // The pool of buffers - let mtu = link.inner.config.mtu as usize; + let mtu = link.inner.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 112b471b9e..a3e5651bdb 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -14,6 +14,7 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ + common::batch::BatchConfig, unicast::{ establishment::{ compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie, @@ -586,11 +587,15 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> { let mtu = link.get_mtu(); + let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - mtu, direction: TransportLinkUnicastDirection::Inbound, - #[cfg(feature = "transport_compression")] - is_compression: false, + batch: BatchConfig { + mtu, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: false, + }, }; let mut link = TransportLinkUnicast::new(link.clone(), config); let mut fsm = AcceptLink { @@ -705,10 +710,13 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) }; let a_config = TransportLinkUnicastConfig { - mtu: state.transport.batch_size, direction: TransportLinkUnicastDirection::Inbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), + batch: BatchConfig { + mtu: state.transport.batch_size, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), + }, }; let a_link = TransportLinkUnicast::new(link.link.clone(), a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 4c1314dd29..6e10509d69 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -14,6 +14,7 @@ #[cfg(feature = "shared-memory")] use crate::unicast::shared_memory_unicast::Challenge; use crate::{ + common::batch::BatchConfig, unicast::{ establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm}, link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection}, @@ -511,11 +512,15 @@ pub(crate) async fn open_link( link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { direction: TransportLinkUnicastDirection::Outbound, - mtu: link.get_mtu(), - #[cfg(feature = "transport_compression")] - is_compression: false, // Perform the exchange Init/Open exchange with no compression + batch: BatchConfig { + mtu: link.get_mtu(), + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: false, // Perform the exchange Init/Open exchange with no compression + }, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -537,7 +542,7 @@ pub(crate) async fn open_link( .config .batch_size .min(batch_size::UNICAST) - .min(link.config.mtu), + .min(link.config.batch.mtu), resolution: manager.config.resolution, ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), #[cfg(feature = "transport_multilink")] @@ -616,10 +621,13 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - mtu: state.transport.batch_size, direction: TransportLinkUnicastDirection::Outbound, - #[cfg(feature = "transport_compression")] - is_compression: state.link.ext_compression.is_compression(), + batch: BatchConfig { + mtu: state.transport.batch_size, + is_streamed, + #[cfg(feature = "transport_compression")] + is_compression: state.link.ext_compression.is_compression(), + }, }; let o_link = TransportLinkUnicast::new(link.link.clone(), o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index afc12bc87d..5b4da7365b 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -14,9 +14,8 @@ use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; use std::fmt; use std::sync::Arc; -#[cfg(feature = "transport_compression")] -use zenoh_buffers::BBuf; -use zenoh_buffers::{ZSlice, ZSliceBuffer}; +use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; +use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; use zenoh_protocol::transport::{BatchSize, Close, TransportMessage}; use zenoh_result::{zerror, ZResult}; @@ -31,11 +30,7 @@ pub(crate) enum TransportLinkUnicastDirection { pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, - // MTU - pub(crate) mtu: BatchSize, - // Compression is active on the link - #[cfg(feature = "transport_compression")] - pub(crate) is_compression: bool, + pub(crate) batch: BatchConfig, } #[derive(Clone, PartialEq, Eq)] @@ -46,25 +41,23 @@ pub(crate) struct TransportLinkUnicast { impl TransportLinkUnicast { pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self { - config.mtu = link.get_mtu().min(config.mtu); + config.batch.mtu = link.get_mtu().min(config.batch.mtu); Self { link, config } } - const fn batch_config(&self) -> BatchConfig { - BatchConfig { - mtu: self.config.mtu, - #[cfg(feature = "transport_compression")] - is_compression: self.config.is_compression, - } - } - pub(crate) fn tx(&self) -> TransportLinkUnicastTx { TransportLinkUnicastTx { inner: self.clone(), - #[cfg(feature = "transport_compression")] - buffer: self.config.is_compression.then_some(BBuf::with_capacity( - lz4_flex::block::get_maximum_output_size(self.config.mtu as usize), - )), + buffer: zcondfeat!( + "transport_compression", + self.config + .batch + .is_compression + .then_some(BBuf::with_capacity( + lz4_flex::block::get_maximum_output_size(self.config.batch.mtu as usize), + )), + None + ), } } @@ -128,7 +121,6 @@ impl From for Link { pub(crate) struct TransportLinkUnicastTx { pub(crate) inner: TransportLinkUnicast, - #[cfg(feature = "transport_compression")] pub(crate) buffer: Option, } @@ -139,15 +131,11 @@ impl TransportLinkUnicastTx { // log::trace!("WBatch: {:?}", batch); let res = batch - .finalize( - #[cfg(feature = "transport_compression")] - self.buffer.as_mut(), - ) + .finalize(self.buffer.as_mut()) .map_err(|_| zerror!("{ERR}{self}"))?; let bytes = match res { Finalize::Batch => batch.as_slice(), - #[cfg(feature = "transport_compression")] Finalize::Buffer => self .buffer .as_ref() @@ -158,14 +146,6 @@ impl TransportLinkUnicastTx { // log::trace!("WBytes: {:02x?}", bytes); // Send the message on the link - if self.inner.link.is_streamed() { - let len: BatchSize = bytes - .len() - .try_into() - .map_err(|_| zerror!("Invalid batch length"))?; - let len = len.to_le_bytes(); - self.inner.link.write_all(&len).await?; - } self.inner.link.write_all(bytes).await?; Ok(()) @@ -175,7 +155,7 @@ impl TransportLinkUnicastTx { const ERR: &str = "Write error on link: "; // Create the batch for serializing the message - let mut batch = WBatch::new(self.inner.batch_config()); + let mut batch = WBatch::new(self.inner.config.batch); batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?; let len = batch.len() as usize; self.send_batch(&mut batch).await?; @@ -191,14 +171,11 @@ impl fmt::Display for TransportLinkUnicastTx { impl fmt::Debug for TransportLinkUnicastTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut s = f.debug_struct("TransportLinkUnicastRx"); - s.field("link", &self.inner.link) - .field("config", &self.inner.config); - #[cfg(feature = "transport_compression")] - { - s.field("buffer", &self.buffer.as_ref().map(|b| b.capacity())); - } - s.finish() + f.debug_struct("TransportLinkUnicastRx") + .field("link", &self.inner.link) + .field("config", &self.inner.config) + .field("buffer", &self.buffer.as_ref().map(|b| b.capacity())) + .finish() } } @@ -219,15 +196,15 @@ impl TransportLinkUnicastRx { // Read and decode the message length let mut len = BatchSize::MIN.to_le_bytes(); self.inner.link.read_exact(&mut len).await?; - let len = BatchSize::from_le_bytes(len) as usize; + let l = BatchSize::from_le_bytes(len) as usize; // Read the bytes let slice = into .as_mut_slice() - .get_mut(..len) + .get_mut(len.len()..len.len() + l) .ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?; self.inner.link.read_exact(slice).await?; - len + len.len() + l } else { // Read the bytes self.inner.link.read(into.as_mut_slice()).await? @@ -237,7 +214,7 @@ impl TransportLinkUnicastRx { let buffer = ZSlice::make(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.inner.batch_config(), buffer); + let mut batch = RBatch::new(self.inner.config.batch, buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -248,7 +225,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.inner.config.mtu as usize; + let mtu = self.inner.config.batch.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 437e9c4fa4..4cfbbee115 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -216,7 +216,7 @@ async fn rx_task_stream( } // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; @@ -248,7 +248,7 @@ async fn rx_task_dgram( rx_buffer_size: usize, ) -> ZResult<()> { // The pool of buffers - let mtu = link.config.mtu as usize; + let mtu = link.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 74db7f751e..aba680bc43 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -16,7 +16,7 @@ use super::transport::TransportUnicastUniversal; use crate::common::stats::TransportStats; use crate::{ common::{ - batch::RBatch, + batch::{BatchConfig, RBatch}, pipeline::{ TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, @@ -71,10 +71,12 @@ impl TransportLinkUnicastUniversal { ) { if self.handle_tx.is_none() { let config = TransmissionPipelineConf { - is_streamed: self.link.link.is_streamed(), - #[cfg(feature = "transport_compression")] - is_compression: self.link.config.is_compression, - batch_size: self.link.config.mtu, + batch: BatchConfig { + mtu: self.link.config.batch.mtu, + is_streamed: self.link.link.is_streamed(), + #[cfg(feature = "transport_compression")] + is_compression: self.link.config.batch.is_compression, + }, queue_size: self.transport.manager.config.queue_size, backoff: self.transport.manager.config.queue_backoff, }; @@ -257,7 +259,7 @@ async fn rx_task( } // The pool of buffers - let mtu = link.inner.config.mtu as usize; + let mtu = link.inner.config.batch.max_buffer_size(); let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1;