Skip to content

Commit

Permalink
Align TransportLinkMulticast to TransportLinkUnicast
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 24, 2023
1 parent c650a84 commit 9acebcb
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 51 deletions.
179 changes: 131 additions & 48 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use zenoh_core::zlock;
use zenoh_link::{Link, LinkMulticast, Locator};
use zenoh_protocol::{
core::{Bits, Priority, Resolution, WhatAmI, ZenohId},
transport::{BatchSize, Join, PrioritySn, TransportMessage, TransportSn},
transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn},
};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};
Expand All @@ -56,25 +56,16 @@ pub(crate) struct TransportLinkMulticastConfig {
pub(crate) is_compression: bool,
}

#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq)]
pub(crate) struct TransportLinkMulticast {
pub(crate) link: LinkMulticast,
pub(crate) config: TransportLinkMulticastConfig,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

impl TransportLinkMulticast {
pub fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self {
pub(crate) fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self {
config.mtu = link.get_mtu().min(config.mtu);
Self {
link,
config,
#[cfg(feature = "transport_compression")]
buffer: config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(config.mtu as usize),
)),
}
Self { link, config }
}

const fn batch_config(&self) -> BatchConfig {
Expand All @@ -85,7 +76,82 @@ impl TransportLinkMulticast {
}
}

pub async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
pub(crate) fn tx(&self) -> TransportLinkMulticastTx {
TransportLinkMulticastTx {
inner: self.clone(),
#[cfg(feature = "transport_compression")]
buffer: self.config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(self.config.mtu as usize),
)),
}
}

pub(crate) fn rx(&self) -> TransportLinkMulticastRx {
TransportLinkMulticastRx {
inner: self.clone(),
}
}

pub(crate) async fn send(&self, msg: &TransportMessage) -> ZResult<usize> {
let mut link = self.tx();
link.send(msg).await
}

// pub(crate) async fn recv(&self) -> ZResult<(TransportMessage, Locator)> {
// let mut link = self.rx();
// link.recv().await
// }

pub(crate) async fn close(&self, reason: Option<u8>) -> ZResult<()> {
if let Some(reason) = reason {
// Build the close message
let message: TransportMessage = Close {
reason,
session: false,
}
.into();
// Send the close message on the link
let _ = self.send(&message).await;
}
self.link.close().await
}
}

impl fmt::Display for TransportLinkMulticast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.link)
}
}

impl fmt::Debug for TransportLinkMulticast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TransportLinkMulticast")
.field("link", &self.link)
.field("config", &self.config)
.finish()
}
}

impl From<&TransportLinkMulticast> for Link {
fn from(link: &TransportLinkMulticast) -> Self {
Link::from(&link.link)
}
}

impl From<TransportLinkMulticast> for Link {
fn from(link: TransportLinkMulticast) -> Self {
Link::from(link.link)
}
}

pub(crate) struct TransportLinkMulticastTx {
pub(crate) inner: TransportLinkMulticast,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

impl TransportLinkMulticastTx {
pub(crate) async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
const ERR: &str = "Write error on link: ";

let res = batch
Expand All @@ -106,21 +172,47 @@ impl TransportLinkMulticast {
};

// Send the message on the link
self.link.write_all(bytes).await?;
self.inner.link.write_all(bytes).await?;

Ok(())
}

pub async fn send(&mut self, msg: &TransportMessage) -> ZResult<usize> {
pub(crate) async fn send(&mut self, msg: &TransportMessage) -> ZResult<usize> {
const ERR: &str = "Write error on link: ";

// Create the batch for serializing the message
let mut batch = WBatch::new(self.batch_config());
let mut batch = WBatch::new(self.inner.batch_config());
batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?;
let len = batch.len() as usize;
self.send_batch(&mut batch).await?;
Ok(len)
}
}

impl fmt::Display for TransportLinkMulticastTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.inner)
}
}

impl fmt::Debug for TransportLinkMulticastTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct("TransportLinkMulticastRx");
s.field("link", &self.inner.link)
.field("config", &self.inner.config);
#[cfg(feature = "transport_compression")]
{
s.field("buffer", &self.buffer.as_ref().map(|b| b.capacity()));
}
s.finish()
}
}

pub(crate) struct TransportLinkMulticastRx {
pub(crate) inner: TransportLinkMulticast,
}

impl TransportLinkMulticastRx {
pub async fn recv_batch<C, T>(&self, buff: C) -> ZResult<(RBatch, Locator)>
where
C: Fn() -> T + Copy,
Expand All @@ -129,47 +221,37 @@ impl TransportLinkMulticast {
const ERR: &str = "Read error from link: ";

let mut into = (buff)();
let (n, locator) = self.link.read(into.as_mut_slice()).await?;
let (n, locator) = self.inner.link.read(into.as_mut_slice()).await?;
let buffer = ZSlice::make(Arc::new(into), 0, n).map_err(|_| zerror!("Error"))?;
let mut batch = RBatch::new(self.batch_config(), buffer);
let mut batch = RBatch::new(self.inner.batch_config(), buffer);
batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?;
Ok((batch, locator.into_owned()))
}

// pub async fn recv(&self) -> ZResult<(TransportMessage, Locator)> {
// pub async fn recv(&mut self) -> ZResult<(TransportMessage, Locator)> {
// let mtu = self.inner.config.mtu as usize;
// let (mut batch, locator) = self
// .recv_batch(|| {
// zenoh_buffers::vec::uninit(self.link.get_mtu() as usize).into_boxed_slice()
// })
// .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice())
// .await?;

// let msg = batch
// .decode()
// .map_err(|_| zerror!("Decode error on link: {}", self))?;

// Ok((msg, locator))
// }

pub async fn close(&self) -> ZResult<()> {
self.link.close().await
}
}

impl fmt::Display for TransportLinkMulticast {
impl fmt::Display for TransportLinkMulticastRx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.link)
}
}

impl From<&TransportLinkMulticast> for Link {
fn from(link: &TransportLinkMulticast) -> Self {
Link::from(&link.link)
write!(f, "{}", self.inner)
}
}

impl From<TransportLinkMulticast> for Link {
fn from(link: TransportLinkMulticast) -> Self {
Link::from(link.link)
impl fmt::Debug for TransportLinkMulticastRx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TransportLinkMulticastRx")
.field("link", &self.inner.link)
.field("config", &self.inner.config)
.finish()
}
}

Expand Down Expand Up @@ -263,7 +345,7 @@ impl TransportLinkMulticastUniversal {
let handle = task::spawn(async move {
let res = tx_task(
consumer,
c_link.clone(),
c_link.tx(),
config,
initial_sns,
#[cfg(feature = "stats")]
Expand Down Expand Up @@ -298,7 +380,7 @@ impl TransportLinkMulticastUniversal {
let handle = task::spawn(async move {
// Start the consume task
let res = rx_task(
c_link.clone(),
c_link.rx(),
ctransport.clone(),
c_signal.clone(),
c_rx_buffer_size,
Expand Down Expand Up @@ -337,7 +419,7 @@ impl TransportLinkMulticastUniversal {
handle_tx.await;
}

self.link.close().await
self.link.close(None).await
}
}

Expand All @@ -346,7 +428,7 @@ impl TransportLinkMulticastUniversal {
/*************************************/
async fn tx_task(
mut pipeline: TransmissionPipelineConsumer,
mut link: TransportLinkMulticast,
mut link: TransportLinkMulticastTx,
config: TransportLinkMulticastConfigUniversal,
mut last_sns: Vec<PrioritySn>,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
Expand Down Expand Up @@ -466,7 +548,7 @@ async fn tx_task(
}

async fn rx_task(
link: TransportLinkMulticast,
mut link: TransportLinkMulticastRx,
transport: TransportMulticastInner,
signal: Signal,
rx_buffer_size: usize,
Expand All @@ -478,7 +560,7 @@ async fn rx_task(
}

async fn read<T, F>(
link: &TransportLinkMulticast,
link: &mut TransportLinkMulticastRx,
pool: &RecyclingObjectPool<T, F>,
) -> ZResult<Action>
where
Expand All @@ -498,15 +580,16 @@ async fn rx_task(
}

// The pool of buffers
let mtu = link.link.get_mtu() as usize;
let mtu = link.inner.config.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
}

let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());
while !signal.is_triggered() {
// Async read from the underlying link
let action = read(&link, &pool).race(stop(signal.clone())).await?;
let action = read(&mut link, &pool).race(stop(signal.clone())).await?;
match action {
Action::Read((batch, locator)) => {
#[cfg(feature = "stats")]
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_std::task;
use async_std::task::JoinHandle;
use std::{sync::Arc, time::Duration};
use zenoh_buffers::ZSliceBuffer;
use zenoh_protocol::transport::{close, KeepAlive, TransportMessage};
use zenoh_protocol::transport::{KeepAlive, TransportMessage};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};

Expand Down Expand Up @@ -162,7 +162,7 @@ impl TransportLinkUnicastUniversal {
handle_tx.await;
}

self.link.close(Some(close::reason::GENERIC)).await
self.link.close(None).await
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ fn gossip() -> Result<()> {
async_std::task::block_on(async {
zasync_executor_init!();

let locator = String::from("tcp/127.0.0.1:17448");
let locator = String::from("tcp/127.0.0.1:17449");
let ke = String::from("testKeyExprGossip");
let msg_size = 8;

Expand Down
3 changes: 3 additions & 0 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ async fn test_session_pubsub(peer01: &Session, peer02: &Session, reliability: Re
}
});

// Wait for the messages to arrive
task::sleep(SLEEP).await;

println!("[PS][03b] Unsubscribing on peer01 session");
ztimeout!(sub.undeclare().res_async()).unwrap();

Expand Down

0 comments on commit 9acebcb

Please sign in to comment.