Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix latency regression #619

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 180 additions & 89 deletions io/zenoh-transport/src/common/batch.rs

Large diffs are not rendered by default.

34 changes: 15 additions & 19 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,21 +482,20 @@ impl StageOut {

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TransmissionPipelineConf {
pub(crate) is_streamed: bool,
#[cfg(feature = "transport_compression")]
pub(crate) is_compression: bool,
pub(crate) batch_size: BatchSize,
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) backoff: Duration,
}

impl Default for TransmissionPipelineConf {
fn default() -> Self {
Self {
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
batch_size: BatchSize::MAX,
batch: BatchConfig {
mtu: BatchSize::MAX,
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
},
queue_size: [1; Priority::NUM],
backoff: Duration::from_micros(1),
}
Expand Down Expand Up @@ -533,12 +532,7 @@ impl TransmissionPipeline {
let (mut s_ref_w, s_ref_r) = RingBuffer::<WBatch, RBLEN>::init();
// Fill the refill ring buffer with batches
for _ in 0..*num {
let bc = BatchConfig {
mtu: config.batch_size,
#[cfg(feature = "transport_compression")]
is_compression: config.is_compression,
};
let batch = WBatch::new(bc);
let batch = WBatch::new(config.batch);
assert!(s_ref_w.push(batch).is_none());
}
// Create the channel for notifying that new batches are in the refill ring buffer
Expand Down Expand Up @@ -736,10 +730,12 @@ mod tests {
const TIMEOUT: Duration = Duration::from_secs(60);

const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf {
is_streamed: true,
#[cfg(feature = "transport_compression")]
is_compression: true,
batch_size: BatchSize::MAX,
batch: BatchConfig {
mtu: BatchSize::MAX,
is_streamed: true,
#[cfg(feature = "transport_compression")]
is_compression: true,
},
queue_size: [1; Priority::NUM],
backoff: Duration::from_micros(1),
};
Expand Down Expand Up @@ -874,7 +870,7 @@ mod tests {
// Make sure to put only one message per batch: set the payload size
// to half of the batch in such a way the serialized zenoh message
// will be larger then half of the batch size (header + payload).
let payload_size = (CONFIG.batch_size / 2) as usize;
let payload_size = (CONFIG.batch.mtu / 2) as usize;

// Send reliable messages
let key = "test".into();
Expand Down
11 changes: 7 additions & 4 deletions io/zenoh-transport/src/multicast/establishment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
common::seq_num,
common::{batch::BatchConfig, seq_num},
multicast::{
link::{TransportLinkMulticast, TransportLinkMulticastConfig},
transport::TransportMulticastInner,
Expand Down Expand Up @@ -62,9 +62,12 @@ pub(crate) async fn open_link(
// Create the transport
let locator = link.get_dst().to_owned();
let config = TransportLinkMulticastConfig {
mtu: link.get_mtu(),
#[cfg(feature = "transport_compression")]
is_compression: manager.config.multicast.is_compression,
batch: BatchConfig {
mtu: link.get_mtu(),
#[cfg(feature = "transport_compression")]
is_compression: manager.config.multicast.is_compression,
..Default::default()
},
};
let link = TransportLinkMulticast::new(link, config);

Expand Down
57 changes: 22 additions & 35 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
#[cfg(feature = "transport_compression")]
use zenoh_buffers::BBuf;
use zenoh_buffers::{ZSlice, ZSliceBuffer};
use zenoh_core::zlock;
use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer};
use zenoh_core::{zcondfeat, zlock};
use zenoh_link::{Link, LinkMulticast, Locator};
use zenoh_protocol::{
core::{Bits, Priority, Resolution, WhatAmI, ZenohId},
Expand All @@ -51,11 +49,7 @@ use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};
/****************************/
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) struct TransportLinkMulticastConfig {
// MTU
pub(crate) mtu: BatchSize,
// Compression is active on the link
#[cfg(feature = "transport_compression")]
pub(crate) is_compression: bool,
pub(crate) batch: BatchConfig,
}

#[derive(Clone, PartialEq, Eq)]
Expand All @@ -66,25 +60,26 @@ pub(crate) struct TransportLinkMulticast {

impl TransportLinkMulticast {
pub(crate) fn new(link: LinkMulticast, mut config: TransportLinkMulticastConfig) -> Self {
config.mtu = link.get_mtu().min(config.mtu);
config.batch.mtu = link.get_mtu().min(config.batch.mtu);
config.batch.is_streamed = false;
Self { link, config }
}

const fn batch_config(&self) -> BatchConfig {
BatchConfig {
mtu: self.config.mtu,
#[cfg(feature = "transport_compression")]
is_compression: self.config.is_compression,
}
}

pub(crate) fn tx(&self) -> TransportLinkMulticastTx {
TransportLinkMulticastTx {
inner: self.clone(),
#[cfg(feature = "transport_compression")]
buffer: self.config.is_compression.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(self.config.mtu as usize),
)),
buffer: zcondfeat!(
"transport_compression",
self.config
.batch
.is_compression
.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(
self.config.batch.max_buffer_size()
),
)),
None
),
}
}

Expand Down Expand Up @@ -148,7 +143,6 @@ impl From<TransportLinkMulticast> for Link {

pub(crate) struct TransportLinkMulticastTx {
pub(crate) inner: TransportLinkMulticast,
#[cfg(feature = "transport_compression")]
pub(crate) buffer: Option<BBuf>,
}

Expand All @@ -157,15 +151,11 @@ impl TransportLinkMulticastTx {
const ERR: &str = "Write error on link: ";

let res = batch
.finalize(
#[cfg(feature = "transport_compression")]
self.buffer.as_mut(),
)
.finalize(self.buffer.as_mut())
.map_err(|_| zerror!("{ERR}{self}"))?;

let bytes = match res {
Finalize::Batch => batch.as_slice(),
#[cfg(feature = "transport_compression")]
Finalize::Buffer => self
.buffer
.as_ref()
Expand All @@ -183,7 +173,7 @@ impl TransportLinkMulticastTx {
const ERR: &str = "Write error on link: ";

// Create the batch for serializing the message
let mut batch = WBatch::new(self.inner.batch_config());
let mut batch = WBatch::new(self.inner.config.batch);
batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?;
let len = batch.len() as usize;
self.send_batch(&mut batch).await?;
Expand Down Expand Up @@ -225,7 +215,7 @@ impl TransportLinkMulticastRx {
let mut into = (buff)();
let (n, locator) = self.inner.link.read(into.as_mut_slice()).await?;
let buffer = ZSlice::make(Arc::new(into), 0, n).map_err(|_| zerror!("Error"))?;
let mut batch = RBatch::new(self.inner.batch_config(), buffer);
let mut batch = RBatch::new(self.inner.config.batch, buffer);
batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?;
Ok((batch, locator.into_owned()))
}
Expand Down Expand Up @@ -330,10 +320,7 @@ impl TransportLinkMulticastUniversal {

if self.handle_tx.is_none() {
let tpc = TransmissionPipelineConf {
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: self.link.config.is_compression,
batch_size: config.batch_size,
batch: self.link.config.batch,
queue_size: self.transport.manager.config.queue_size,
backoff: self.transport.manager.config.queue_backoff,
};
Expand Down Expand Up @@ -582,7 +569,7 @@ async fn rx_task(
}

// The pool of buffers
let mtu = link.inner.config.mtu as usize;
let mtu = link.inner.config.batch.max_buffer_size();
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
Expand Down
20 changes: 14 additions & 6 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#[cfg(feature = "shared-memory")]
use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
common::batch::BatchConfig,
unicast::{
establishment::{
compute_sn, ext, finalize_transport, AcceptFsm, Cookie, InputFinalize, Zenoh080Cookie,
Expand Down Expand Up @@ -586,11 +587,15 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {

pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) -> ZResult<()> {
let mtu = link.get_mtu();
let is_streamed = link.is_streamed();
let config = TransportLinkUnicastConfig {
mtu,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: false,
batch: BatchConfig {
mtu,
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: false,
},
};
let mut link = TransportLinkUnicast::new(link.clone(), config);
let mut fsm = AcceptLink {
Expand Down Expand Up @@ -705,10 +710,13 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
};

let a_config = TransportLinkUnicastConfig {
mtu: state.transport.batch_size,
direction: TransportLinkUnicastDirection::Inbound,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
batch: BatchConfig {
mtu: state.transport.batch_size,
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
},
};
let a_link = TransportLinkUnicast::new(link.link.clone(), a_config);
let s_link = format!("{:?}", a_link);
Expand Down
22 changes: 15 additions & 7 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#[cfg(feature = "shared-memory")]
use crate::unicast::shared_memory_unicast::Challenge;
use crate::{
common::batch::BatchConfig,
unicast::{
establishment::{compute_sn, ext, finalize_transport, InputFinalize, OpenFsm},
link::{TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection},
Expand Down Expand Up @@ -511,11 +512,15 @@ pub(crate) async fn open_link(
link: LinkUnicast,
manager: &TransportManager,
) -> ZResult<TransportUnicast> {
let is_streamed = link.is_streamed();
let config = TransportLinkUnicastConfig {
direction: TransportLinkUnicastDirection::Outbound,
mtu: link.get_mtu(),
#[cfg(feature = "transport_compression")]
is_compression: false, // Perform the exchange Init/Open exchange with no compression
batch: BatchConfig {
mtu: link.get_mtu(),
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: false, // Perform the exchange Init/Open exchange with no compression
},
};
let mut link = TransportLinkUnicast::new(link, config);
let mut fsm = OpenLink {
Expand All @@ -537,7 +542,7 @@ pub(crate) async fn open_link(
.config
.batch_size
.min(batch_size::UNICAST)
.min(link.config.mtu),
.min(link.config.batch.mtu),
resolution: manager.config.resolution,
ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos),
#[cfg(feature = "transport_multilink")]
Expand Down Expand Up @@ -616,10 +621,13 @@ pub(crate) async fn open_link(
};

let o_config = TransportLinkUnicastConfig {
mtu: state.transport.batch_size,
direction: TransportLinkUnicastDirection::Outbound,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
batch: BatchConfig {
mtu: state.transport.batch_size,
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
},
};
let o_link = TransportLinkUnicast::new(link.link.clone(), o_config);
let s_link = format!("{:?}", o_link);
Expand Down
Loading
Loading