Skip to content

Commit

Permalink
Add wait_before_close option
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Sep 27, 2024
1 parent e79c800 commit 413ab8d
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 34 deletions.
5 changes: 4 additions & 1 deletion DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,11 @@
/// Using CongestionControl::Block the caller is blocked until a batch is available and re-inserted into the queue.
/// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here.
congestion_control: {
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
},
/// Perform batching of messages if they are smaller of the batch_size
batching: {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ impl Default for CongestionControlConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
wait_before_close: 5000000,
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,11 @@ validated_struct::validator! {
/// Using CongestionControl::Block the caller is blocked until a batch is available and re-inserted into the queue.
/// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here.
pub congestion_control: CongestionControlConf {
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
pub wait_before_drop: u64,
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: u64,
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: u64,
},
pub batching: BatchingConf {
/// Perform adaptive batching of messages if they are smaller of the batch_size.
Expand Down
53 changes: 23 additions & 30 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl StageIn {
&mut self,
msg: &mut NetworkMessage,
priority: Priority,
deadline_before_drop: Option<Option<Instant>>,
deadline: Option<Instant>,
) -> bool {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();
Expand All @@ -154,31 +154,22 @@ impl StageIn {
None => match self.s_ref.pull() {
Some(mut batch) => {
batch.clear();
self.s_out.atomic_backoff.first_write.store(LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, Ordering::Relaxed);
self.s_out.atomic_backoff.first_write.store(
LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds,
Ordering::Relaxed,
);
break batch;
}
None => {
drop(c_guard);
match deadline_before_drop {
Some(deadline) if !$fragment => {
// We are in the congestion scenario and message is droppable
// Wait for an available batch until deadline
if !deadline.map_or(false, |deadline| self.s_ref.wait_deadline(deadline)) {
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
return false
}
}
_ => {
// Block waiting for an available batch
if !self.s_ref.wait() {
// Some error prevented the queue to wait and give back an available batch
// Restore the sequence number and drop the message
$restore_sn;
return false;
}
}
// Wait for an available batch until deadline
if !deadline
.map_or(false, |deadline| self.s_ref.wait_deadline(deadline))
{
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
return false;
}
c_guard = self.mutex.current();
}
Expand Down Expand Up @@ -513,6 +504,7 @@ pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -597,6 +589,7 @@ impl TransmissionPipeline {
stage_in: stage_in.into_boxed_slice().into(),
active: active.clone(),
wait_before_drop: config.wait_before_drop,
wait_before_close: config.wait_before_close,
};
let consumer = TransmissionPipelineConsumer {
stage_out: stage_out.into_boxed_slice(),
Expand All @@ -614,6 +607,7 @@ pub(crate) struct TransmissionPipelineProducer {
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_close: Duration,
}

impl TransmissionPipelineProducer {
Expand All @@ -627,18 +621,15 @@ impl TransmissionPipelineProducer {
(0, Priority::DEFAULT)
};
// If message is droppable, compute a deadline after which the sample could be dropped
let deadline_before_drop = if msg.is_droppable() {
if self.wait_before_drop.is_zero() {
Some(None)
} else {
Some(Some(Instant::now() + self.wait_before_drop))
}
let wait_time = if msg.is_droppable() {
self.wait_before_drop
} else {
None
self.wait_before_close
};
let deadline = (!wait_time.is_zero()).then_some(Instant::now() + wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, deadline_before_drop)
queue.push_network_message(&mut msg, priority, deadline)
}

#[inline]
Expand Down Expand Up @@ -793,6 +784,7 @@ mod tests {
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -806,6 +798,7 @@ mod tests {
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
13 changes: 13 additions & 0 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub struct TransportManagerConfig {
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -133,6 +134,7 @@ pub struct TransportManagerBuilder {
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_close: Duration,
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -187,6 +189,11 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self {
self.wait_before_close = wait_before_close;
self
}

pub fn queue_size(mut self, queue_size: QueueSizeConf) -> Self {
self.queue_size = queue_size;
self
Expand Down Expand Up @@ -247,6 +254,9 @@ impl TransportManagerBuilder {
self = self.wait_before_drop(Duration::from_micros(
*link.tx().queue().congestion_control().wait_before_drop(),
));
self = self.wait_before_close(Duration::from_micros(
*link.tx().queue().congestion_control().wait_before_close(),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());
Expand Down Expand Up @@ -305,6 +315,7 @@ impl TransportManagerBuilder {
batch_size: self.batch_size,
batching: self.batching_enabled,
wait_before_drop: self.wait_before_drop,
wait_before_close: self.wait_before_close,
queue_size,
queue_backoff: self.batching_time_limit,
defrag_buff_size: self.defrag_buff_size,
Expand Down Expand Up @@ -344,6 +355,7 @@ impl Default for TransportManagerBuilder {
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().wait_before_drop();
let wait_before_close = *queue.congestion_control().wait_before_close();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
Expand All @@ -352,6 +364,7 @@ impl Default for TransportManagerBuilder {
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: Duration::from_micros(wait_before_drop),
wait_before_close: Duration::from_micros(wait_before_close),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl TransportLinkMulticastUniversal {
batch: self.link.config.batch,
queue_size: self.transport.manager.config.queue_size,
wait_before_drop: self.transport.manager.config.wait_before_drop,
wait_before_close: self.transport.manager.config.wait_before_close,
batching_enabled: self.transport.manager.config.batching,
batching_time_limit: self.transport.manager.config.queue_backoff,
};
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl TransportLinkUnicastUniversal {
},
queue_size: transport.manager.config.queue_size,
wait_before_drop: transport.manager.config.wait_before_drop,
wait_before_close: transport.manager.config.wait_before_close,
batching_enabled: transport.manager.config.batching,
batching_time_limit: transport.manager.config.queue_backoff,
};
Expand Down
23 changes: 22 additions & 1 deletion io/zenoh-transport/src/unicast/universal/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use zenoh_protocol::{
core::{Priority, PriorityRange, Reliability},
network::NetworkMessage,
transport::close,
};

use super::transport::TransportUnicastUniversal;
Expand Down Expand Up @@ -110,7 +111,27 @@ impl TransportUnicastUniversal {
// the link could be congested and this operation could
// block for fairly long time
drop(transport_links);
pipeline.push_network_message(msg)
let droppable = msg.is_droppable();
let push = pipeline.push_network_message(msg);
if !push && !droppable {
tracing::error!(
"Unable to push non droppable network message to {}. Closing transport!",
self.config.zid
);
zenoh_runtime::ZRuntime::RX.spawn({
let transport = self.clone();
async move {
if let Err(e) = transport.close(close::reason::GENERIC).await {
tracing::error!(
"Error closing transport with {}: {}",
transport.config.zid,
e
);
}
}
});
}
push
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
Expand Down

0 comments on commit 413ab8d

Please sign in to comment.