Skip to content

Commit

Permalink
Partially fix performance regression
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Dec 13, 2023
1 parent abd2720 commit 2049922
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 55 deletions.
26 changes: 1 addition & 25 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,6 @@ pub(crate) struct TransportLinkUnicast {
pub(crate) config: TransportLinkUnicastConfig,
}

/*
pub(crate) fn new(link: LinkUnicast, mut config: TransportLinkUnicastConfig) -> Self {
config.batch.mtu = link.get_mtu().min(config.batch.mtu);
Self { link, config }
}
pub(crate) fn tx(&self) -> TransportLinkUnicastTx {
TransportLinkUnicastTx {
inner: self.clone(),
buffer: zcondfeat!(
"transport_compression",
self.config
.batch
.is_compression
.then_some(BBuf::with_capacity(
lz4_flex::block::get_maximum_output_size(self.config.batch.mtu as usize),
)),
None
),
*/

impl TransportLinkUnicast {
pub(crate) fn new(link: LinkUnicast, config: TransportLinkUnicastConfig) -> Self {
Self::init(link, config)
Expand Down Expand Up @@ -96,10 +75,7 @@ impl TransportLinkUnicast {

pub(crate) fn rx(&self) -> TransportLinkUnicastRx {
TransportLinkUnicastRx {
inner: Self {
link: self.link.clone(),
config: self.config,
},
inner: self.clone(),
}
}

Expand Down
55 changes: 26 additions & 29 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use async_std::prelude::FutureExt;
use async_std::task;
use async_std::task::JoinHandle;
use std::{
ops::Deref,
sync::{Arc, RwLock},
time::Duration,
};
Expand All @@ -40,29 +39,23 @@ use zenoh_protocol::transport::{KeepAlive, TransportMessage};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};

pub(super) struct TransportLinkUnicastUniversalInner {
// The underlying link
pub(super) link: TransportLinkUnicast,
// The transmission pipeline
pub(super) pipeline: TransmissionPipelineProducer,
// The transport this link is associated to
transport: TransportUnicastUniversal,
// The signals to stop TX/RX tasks
pub(super) struct TaskHandler {
// The handlers to stop TX/RX tasks
handle_tx: RwLock<Option<async_executor::Task<()>>>,
signal_rx: Signal,
handle_rx: RwLock<Option<JoinHandle<()>>>,
}

#[derive(Clone)]
pub(super) struct TransportLinkUnicastUniversal(Arc<TransportLinkUnicastUniversalInner>);

impl Deref for TransportLinkUnicastUniversal {
type Target = Arc<TransportLinkUnicastUniversalInner>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
pub(super) struct TransportLinkUnicastUniversal {
// The underlying link
pub(super) link: TransportLinkUnicast,
// The transmission pipeline
pub(super) pipeline: TransmissionPipelineProducer,
// The transport this link is associated to
transport: TransportUnicastUniversal,
// The task handling substruct
tasks: Arc<TaskHandler>,
}

impl TransportLinkUnicastUniversal {
Expand All @@ -87,16 +80,20 @@ impl TransportLinkUnicastUniversal {
// The pipeline
let (producer, consumer) = TransmissionPipeline::make(config, priority_tx);

let inner = TransportLinkUnicastUniversalInner {
link,
pipeline: producer,
transport,
let tasks = Arc::new(TaskHandler {
handle_tx: RwLock::new(None),
signal_rx: Signal::new(),
handle_rx: RwLock::new(None),
});

let result = Self {
link,
pipeline: producer,
transport,
tasks,
};

(Self(Arc::new(inner)), consumer)
(result, consumer)
}
}

Expand All @@ -107,7 +104,7 @@ impl TransportLinkUnicastUniversal {
executor: &TransportExecutor,
keep_alive: Duration,
) {
let mut guard = zwrite!(self.handle_tx);
let mut guard = zwrite!(self.tasks.handle_tx);
if guard.is_none() {
// Spawn the TX task
let mut tx = self.link.tx();
Expand Down Expand Up @@ -137,12 +134,12 @@ impl TransportLinkUnicastUniversal {
}

pub(super) fn start_rx(&mut self, lease: Duration) {
let mut guard = zwrite!(self.handle_rx);
let mut guard = zwrite!(self.tasks.handle_rx);
if guard.is_none() {
// Spawn the RX task
let mut rx = self.link.rx();
let c_transport = self.transport.clone();
let c_signal = self.signal_rx.clone();
let c_signal = self.tasks.signal_rx.clone();
let c_rx_buffer_size = self.transport.manager.config.link_rx_buffer_size;

let handle = task::spawn(async move {
Expand All @@ -168,20 +165,20 @@ impl TransportLinkUnicastUniversal {
}

pub(super) fn stop_rx(&mut self) {
self.signal_rx.trigger();
self.tasks.signal_rx.trigger();
}

pub(super) async fn close(mut self) -> ZResult<()> {
log::trace!("{}: closing", self.link);
self.stop_tx();
self.stop_rx();

let handle_tx = zwrite!(self.handle_tx).take();
let handle_tx = zwrite!(self.tasks.handle_tx).take();
if let Some(handle) = handle_tx {
handle.await;
}

let handle_rx = zwrite!(self.handle_rx).take();
let handle_rx = zwrite!(self.tasks.handle_rx).take();
if let Some(handle) = handle_rx {
handle.await;
}
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ pub(crate) struct TransportUnicastUniversal {
pub(super) priority_rx: Arc<[TransportPriorityRx]>,
// The links associated to the channel
pub(super) links: Arc<RwLock<Box<[TransportLinkUnicastUniversal]>>>,
add_link_lock: Arc<AsyncMutex<()>>,
// The callback
pub(super) callback: Arc<RwLock<Option<Arc<dyn TransportPeerEventHandler>>>>,
// Lock used to ensure no race in add_link method
add_link_lock: Arc<AsyncMutex<()>>,
// Mutex for notification
pub(super) alive: Arc<AsyncMutex<bool>>,
// Transport statistics
Expand Down

0 comments on commit 2049922

Please sign in to comment.