Skip to content

Commit

Permalink
Fix unicast compression tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 23, 2023
1 parent 50a29ed commit c650a84
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 78 deletions.
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ pub(crate) struct TransportLinkMulticast {
}

impl TransportLinkMulticast {
pub fn new(link: LinkMulticast, config: TransportLinkMulticastConfig) -> Self {
pub fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self {
config.mtu = link.get_mtu().min(config.mtu);
Self {
link,
config,
Expand Down
5 changes: 3 additions & 2 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,9 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
}

pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> {
let mtu = link.get_mtu();
let config = TransportLinkUnicastConfig {
mtu: link.get_mtu(),
mtu,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: false,
Expand Down Expand Up @@ -622,7 +623,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
let iack_out = {
let mut state = State {
transport: StateTransport {
batch_size: manager.config.batch_size,
batch_size: manager.config.batch_size.min(batch_size::UNICAST).min(mtu),
resolution: manager.config.resolution,
ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos),
#[cfg(feature = "transport_multilink")]
Expand Down
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,11 @@ pub(crate) async fn open_link(

let mut state = State {
transport: StateTransport {
batch_size: manager.config.batch_size.min(batch_size::UNICAST),
batch_size: manager
.config
.batch_size
.min(batch_size::UNICAST)
.min(link.config.mtu),
resolution: manager.config.resolution,
ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos),
#[cfg(feature = "transport_multilink")]
Expand Down
177 changes: 121 additions & 56 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,12 @@ pub(crate) struct TransportLinkUnicastConfig {
pub(crate) struct TransportLinkUnicast {
pub(crate) link: LinkUnicast,
pub(crate) config: TransportLinkUnicastConfig,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

impl TransportLinkUnicast {
pub fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self {
Self {
link,
config,
#[cfg(feature = "transport_compression")]
buffer: config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(config.mtu as usize),
)),
}
pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self {
config.mtu = link.get_mtu().min(config.mtu);
Self { link, config }
}

const fn batch_config(&self) -> BatchConfig {
Expand All @@ -64,7 +56,82 @@ impl TransportLinkUnicast {
}
}

pub async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
pub(crate) fn tx(&self) -> TransportLinkUnicastTx {
TransportLinkUnicastTx {
inner: self.clone(),
#[cfg(feature = "transport_compression")]
buffer: self.config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(self.config.mtu as usize),
)),
}
}

pub(crate) fn rx(&self) -> TransportLinkUnicastRx {
TransportLinkUnicastRx {
inner: self.clone(),
}
}

pub(crate) async fn send(&self, msg: &TransportMessage) -> ZResult<usize> {
let mut link = self.tx();
link.send(msg).await
}

pub(crate) async fn recv(&self) -> ZResult<TransportMessage> {
let mut link = self.rx();
link.recv().await
}

pub(crate) async fn close(&self, reason: Option<u8>) -> ZResult<()> {
if let Some(reason) = reason {
// Build the close message
let message: TransportMessage = Close {
reason,
session: false,
}
.into();
// Send the close message on the link
let _ = self.send(&message).await;
}
self.link.close().await
}
}

impl fmt::Display for TransportLinkUnicast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.link)
}
}

impl fmt::Debug for TransportLinkUnicast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TransportLinkUnicast")
.field("link", &self.link)
.field("config", &self.config)
.finish()
}
}

impl From<&TransportLinkUnicast> for Link {
fn from(link: &TransportLinkUnicast) -> Self {
Link::from(&link.link)
}
}

impl From<TransportLinkUnicast> for Link {
fn from(link: TransportLinkUnicast) -> Self {
Link::from(link.link)
}
}

pub(crate) struct TransportLinkUnicastTx {
pub(crate) inner: TransportLinkUnicast,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

impl TransportLinkUnicastTx {
pub(crate) async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
const ERR: &str = "Write error on link: ";

// log::trace!("WBatch: {:?}", batch);
Expand All @@ -89,30 +156,55 @@ impl TransportLinkUnicast {
// log::trace!("WBytes: {:02x?}", bytes);

// Send the message on the link
if self.link.is_streamed() {
if self.inner.link.is_streamed() {
let len: BatchSize = bytes
.len()
.try_into()
.map_err(|_| zerror!("Invalid batch length"))?;
let len = len.to_le_bytes();
self.link.write_all(&len).await?;
self.inner.link.write_all(&len).await?;
}
self.link.write_all(bytes).await?;
self.inner.link.write_all(bytes).await?;

Ok(())
}

pub async fn send(&mut self, msg: &TransportMessage) -> ZResult<usize> {
pub(crate) async fn send(&mut self, msg: &TransportMessage) -> ZResult<usize> {
const ERR: &str = "Write error on link: ";

// Create the batch for serializing the message
let mut batch = WBatch::new(self.batch_config());
let mut batch = WBatch::new(self.inner.batch_config());
batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?;
let len = batch.len() as usize;
self.send_batch(&mut batch).await?;
Ok(len)
}
}

impl fmt::Display for TransportLinkUnicastTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.inner)
}
}

impl fmt::Debug for TransportLinkUnicastTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct("TransportLinkUnicastRx");
s.field("link", &self.inner.link)
.field("config", &self.inner.config);
#[cfg(feature = "transport_compression")]
{
s.field("buffer", &self.buffer.as_ref().map(|b| b.capacity()));
}
s.finish()
}
}

pub(crate) struct TransportLinkUnicastRx {
pub(crate) inner: TransportLinkUnicast,
}

impl TransportLinkUnicastRx {
pub async fn recv_batch<C, T>(&mut self, buff: C) -> ZResult<RBatch>
where
C: Fn() -> T + Copy,
Expand All @@ -121,29 +213,29 @@ impl TransportLinkUnicast {
const ERR: &str = "Read error from link: ";

let mut into = (buff)();
let end = if self.link.is_streamed() {
let end = if self.inner.link.is_streamed() {
// Read and decode the message length
let mut len = BatchSize::MIN.to_le_bytes();
self.link.read_exact(&mut len).await?;
self.inner.link.read_exact(&mut len).await?;
let len = BatchSize::from_le_bytes(len) as usize;

// Read the bytes
let slice = into
.as_mut_slice()
.get_mut(..len)
.ok_or_else(|| zerror!("{ERR}{self}. Invalid batch length or buffer size."))?;
self.link.read_exact(slice).await?;
self.inner.link.read_exact(slice).await?;
len
} else {
// Read the bytes
self.link.read(into.as_mut_slice()).await?
self.inner.link.read(into.as_mut_slice()).await?
};

// log::trace!("RBytes: {:02x?}", &into.as_slice()[0..end]);

let buffer = ZSlice::make(Arc::new(into), 0, end)
.map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?;
let mut batch = RBatch::new(self.batch_config(), buffer);
let mut batch = RBatch::new(self.inner.batch_config(), buffer);
batch
.initialize(buff)
.map_err(|e| zerror!("{ERR}{self}. {e}."))?;
Expand All @@ -154,7 +246,7 @@ impl TransportLinkUnicast {
}

pub async fn recv(&mut self) -> ZResult<TransportMessage> {
let mtu = self.link.get_mtu() as usize;
let mtu = self.inner.config.mtu as usize;
let mut batch = self
.recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice())
.await?;
Expand All @@ -163,46 +255,19 @@ impl TransportLinkUnicast {
.map_err(|_| zerror!("Decode error on link: {}", self))?;
Ok(msg)
}

pub async fn close(&mut self, reason: Option<u8>) -> ZResult<()> {
if let Some(reason) = reason {
// Build the close message
let message: TransportMessage = Close {
reason,
session: false,
}
.into();
// Send the close message on the link
let _ = self.send(&message).await;
}
self.link.close().await
}
}

impl fmt::Display for TransportLinkUnicast {
impl fmt::Display for TransportLinkUnicastRx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.link)
write!(f, "{}", self.inner)
}
}

impl fmt::Debug for TransportLinkUnicast {
impl fmt::Debug for TransportLinkUnicastRx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TransportLinkUnicast")
.field("link", &self.link)
.field("config", &self.config)
.field("buffer", &self.buffer.as_ref().map(|b| b.capacity()))
f.debug_struct("TransportLinkUnicastRx")
.field("link", &self.inner.link)
.field("config", &self.inner.config)
.finish()
}
}

impl From<&TransportLinkUnicast> for Link {
fn from(link: &TransportLinkUnicast) -> Self {
Link::from(&link.link)
}
}

impl From<TransportLinkUnicast> for Link {
fn from(link: TransportLinkUnicast) -> Self {
Link::from(link.link)
}
}
16 changes: 8 additions & 8 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
priority::TransportPriorityTx,
},
unicast::link::TransportLinkUnicast,
unicast::link::{TransportLinkUnicast, TransportLinkUnicastRx, TransportLinkUnicastTx},
TransportExecutor,
};
use async_std::prelude::FutureExt;
Expand Down Expand Up @@ -89,7 +89,7 @@ impl TransportLinkUnicastUniversal {
let handle = executor.spawn(async move {
let res = tx_task(
consumer,
c_link.clone(),
c_link.tx(),
keep_alive,
#[cfg(feature = "stats")]
c_transport.stats.clone(),
Expand Down Expand Up @@ -123,7 +123,7 @@ impl TransportLinkUnicastUniversal {
let handle = task::spawn(async move {
// Start the consume task
let res = rx_task(
c_link.clone(),
c_link.rx(),
c_transport.clone(),
lease,
c_signal.clone(),
Expand Down Expand Up @@ -171,7 +171,7 @@ impl TransportLinkUnicastUniversal {
/*************************************/
async fn tx_task(
mut pipeline: TransmissionPipelineConsumer,
mut link: TransportLinkUnicast,
mut link: TransportLinkUnicastTx,
keep_alive: Duration,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
Expand Down Expand Up @@ -225,7 +225,7 @@ async fn tx_task(
}

async fn rx_task(
mut link: TransportLinkUnicast,
mut link: TransportLinkUnicastRx,
transport: TransportUnicastUniversal,
lease: Duration,
signal: Signal,
Expand All @@ -237,7 +237,7 @@ async fn rx_task(
}

async fn read<T, F>(
link: &mut TransportLinkUnicast,
link: &mut TransportLinkUnicastRx,
pool: &RecyclingObjectPool<T, F>,
) -> ZResult<Action>
where
Expand All @@ -257,7 +257,7 @@ async fn rx_task(
}

// The pool of buffers
let mtu = link.config.mtu as usize;
let mtu = link.inner.config.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand All @@ -277,7 +277,7 @@ async fn rx_task(
{
transport.stats.inc_rx_bytes(2 + n); // Account for the batch len encoding (16 bits)
}
transport.read_messages(batch, &link)?;
transport.read_messages(batch, &link.inner)?;
}
Action::Stop => break,
}
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/tests/multicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

// Restricting to macos by default because of no IPv6 support
// on GitHub CI actions on Linux and Windows.
#[cfg(target_family = "unix")]
#[cfg(all(target_family = "unix", feature = "transport_compression"))]
mod tests {
use async_std::{prelude::FutureExt, task};
use std::{
Expand Down Expand Up @@ -346,7 +346,7 @@ mod tests {
// Define the locator
let endpoints: Vec<EndPoint> = vec![
format!(
"udp/224.{}.{}.{}:7447",
"udp/224.{}.{}.{}:21000",
rand::random::<u8>(),
rand::random::<u8>(),
rand::random::<u8>()
Expand Down
Loading

0 comments on commit c650a84

Please sign in to comment.