Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't emit an error log if pipeline is closed #1658

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 73 additions & 20 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
fmt,
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
Expand Down Expand Up @@ -40,7 +41,7 @@ use zenoh_protocol::{
AtomicBatchSize, BatchSize, TransportMessage,
},
};
use zenoh_sync::{event, Notifier, Waiter};
use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter};

use super::{
batch::{Encode, WBatch},
Expand All @@ -56,6 +57,15 @@ struct StageInRefill {
s_ref_r: RingBufferReader<WBatch, RBLEN>,
}

#[derive(Debug)]
pub(crate) struct TransportClosed;
impl fmt::Display for TransportClosed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "transport closed")
}
}
impl std::error::Error for TransportClosed {}

impl StageInRefill {
fn pull(&mut self) -> Option<WBatch> {
self.s_ref_r.pull()
Expand All @@ -65,8 +75,12 @@ impl StageInRefill {
self.n_ref_r.wait().is_ok()
}

fn wait_deadline(&self, instant: Instant) -> bool {
self.n_ref_r.wait_deadline(instant).is_ok()
fn wait_deadline(&self, instant: Instant) -> Result<bool, TransportClosed> {
match self.n_ref_r.wait_deadline(instant) {
Ok(()) => Ok(true),
Err(WaitDeadlineError::Deadline) => Ok(false),
Err(WaitDeadlineError::WaitError) => Err(TransportClosed),
}
}
}

Expand Down Expand Up @@ -214,9 +228,9 @@ impl Deadline {
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
fn wait(&mut self, s_ref: &StageInRefill) -> Result<bool, TransportClosed> {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Immediate => Ok(false),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand All @@ -243,7 +257,7 @@ impl StageIn {
msg: &mut NetworkMessage,
priority: Priority,
deadline: &mut Deadline,
) -> bool {
) -> Result<bool, TransportClosed> {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

Expand All @@ -264,15 +278,15 @@ impl StageIn {
None => {
drop(c_guard);
// Wait for an available batch until deadline
if !deadline.wait(&self.s_ref) {
if !deadline.wait(&self.s_ref)? {
// Still no available batch.
// Restore the sequence number and drop the message
$($restore_sn)?
tracing::trace!(
"Zenoh message dropped because it's over the deadline {:?}: {:?}",
deadline.lazy_deadline.wait_time, msg
);
return false;
return Ok(false);
}
c_guard = self.mutex.current();
}
Expand All @@ -287,13 +301,13 @@ impl StageIn {
if !self.batching || $msg.is_express() {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
return Ok(true);
} else {
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
return Ok(true);
}
}};
}
Expand Down Expand Up @@ -404,7 +418,7 @@ impl StageIn {
// Clean the fragbuf
self.fragbuf.clear();

true
Ok(true)
}

#[inline]
Expand Down Expand Up @@ -767,7 +781,10 @@ pub(crate) struct TransmissionPipelineProducer {

impl TransmissionPipelineProducer {
#[inline]
pub(crate) fn push_network_message(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn push_network_message(
&self,
mut msg: NetworkMessage,
) -> Result<bool, TransportClosed> {
// If the queue is not QoS, it means that we only have one priority with index 0.
let (idx, priority) = if self.stage_in.len() > 1 {
let priority = msg.priority();
Expand All @@ -780,7 +797,7 @@ impl TransmissionPipelineProducer {
let (wait_time, max_wait_time) = if msg.is_droppable() {
// Checked if we are blocked on the priority queue and we drop directly the message
if self.status.is_congested(priority) {
return false;
return Ok(false);
}
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
Expand All @@ -789,11 +806,11 @@ impl TransmissionPipelineProducer {
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?;
if !sent {
self.status.set_congested(priority, true);
}
sent
Ok(sent)
}

#[inline]
Expand Down Expand Up @@ -1002,7 +1019,7 @@ mod tests {
"Pipeline Flow [>>>]: Pushed {} msgs ({payload_size} bytes)",
i + 1
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
}
}

Expand Down Expand Up @@ -1129,7 +1146,7 @@ mod tests {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
let c = counter.fetch_add(1, Ordering::AcqRel);
println!(
"Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes",
Expand Down Expand Up @@ -1173,12 +1190,13 @@ mod tests {

timeout(TIMEOUT, check).await?;

// Disable and drain the queue
timeout(
// Drain the queue (but don't drop it to avoid dropping the messages)
let _consumer = timeout(
TIMEOUT,
task::spawn_blocking(move || {
println!("Pipeline Blocking [---]: draining the queue");
let _ = consumer.drain();
consumer
}),
)
.await??;
Expand Down Expand Up @@ -1242,7 +1260,7 @@ mod tests {
let duration = Duration::from_millis(5_500);
let start = Instant::now();
while start.elapsed() < duration {
producer.push_network_message(message.clone());
producer.push_network_message(message.clone()).unwrap();
}
}
}
Expand All @@ -1269,4 +1287,39 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tx_pipeline_closed() -> ZResult<()> {
// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, consumer) =
TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice());
// Drop consumer to close the pipeline
drop(consumer);

let message: NetworkMessage = Push {
wire_expr: "test".into(),
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, true),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: PushBody::Put(Put {
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: vec![42u8].into(),
}),
}
.into();
// First message should not be rejected as the is one batch available in the queue
assert!(producer.push_network_message(message.clone()).is_ok());
// Second message should be rejected
assert!(producer.push_network_message(message.clone()).is_err());

Ok(())
}
}
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/multicast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TransportMulticast {
#[inline(always)]
pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> {
let transport = self.get_transport()?;
transport.schedule(message);
transport.schedule(message)?;
Ok(())
}

Expand Down
15 changes: 8 additions & 7 deletions io/zenoh-transport/src/multicast/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@
//
use zenoh_core::zread;
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;

use super::transport::TransportMulticastInner;
#[cfg(feature = "shared-memory")]
use crate::shm::map_zmsg_to_partner;

//noinspection ALL
impl TransportMulticastInner {
fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
macro_rules! zpush {
($guard:expr, $pipeline:expr, $msg:expr) => {
// Drop the guard before the push_zenoh_message since
// the link could be congested and this operation could
// block for fairly long time
let pl = $pipeline.clone();
drop($guard);
return pl.push_network_message($msg);
return Ok(pl.push_network_message($msg)?);
};
}

Expand All @@ -47,22 +48,22 @@ impl TransportMulticastInner {
}
}

false
Ok(false)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> bool {
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.shm) {
tracing::trace!("Failed SHM conversion: {}", e);
return false;
return Ok(false);
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -71,6 +72,6 @@ impl TransportMulticastInner {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}
5 changes: 1 addition & 4 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,7 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
/* TX */
/*************************************/
fn schedule(&self, msg: NetworkMessage) -> ZResult<()> {
match self.internal_schedule(msg) {
true => Ok(()),
false => bail!("error scheduling message!"),
}
self.internal_schedule(msg).map(|_| ())
}

fn add_debug_fields<'a, 'b: 'a, 'c>(
Expand Down
17 changes: 9 additions & 8 deletions io/zenoh-transport/src/unicast/universal/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use zenoh_protocol::{
network::NetworkMessage,
transport::close,
};
use zenoh_result::ZResult;

use super::transport::TransportUnicastUniversal;
#[cfg(feature = "shared-memory")]
Expand Down Expand Up @@ -68,7 +69,7 @@ impl TransportUnicastUniversal {
match_.full.or(match_.partial).or(match_.any)
}

fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
let transport_links = self
.links
.read()
Expand All @@ -93,7 +94,7 @@ impl TransportUnicastUniversal {
);

// No Link found
return false;
return Ok(false);
};

let transport_link = transport_links
Expand All @@ -112,7 +113,7 @@ impl TransportUnicastUniversal {
// block for fairly long time
drop(transport_links);
let droppable = msg.is_droppable();
let push = pipeline.push_network_message(msg);
let push = pipeline.push_network_message(msg)?;
if !push && !droppable {
tracing::error!(
"Unable to push non droppable network message to {}. Closing transport!",
Expand All @@ -131,22 +132,22 @@ impl TransportUnicastUniversal {
}
});
}
push
Ok(push)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.config.shm) {
tracing::trace!("Failed SHM conversion: {}", e);
return false;
return Ok(false);
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -155,7 +156,7 @@ impl TransportUnicastUniversal {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}

Expand Down
Loading