diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index d52c7b55c..945b1ac67 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -797,7 +797,7 @@ impl TransmissionPipelineProducer { let (wait_time, max_wait_time) = if msg.is_droppable() { // Checked if we are blocked on the priority queue and we drop directly the message if self.status.is_congested(priority) { - return false; + return Ok(false); } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { @@ -806,11 +806,11 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - let sent = queue.push_network_message(&mut msg, priority, &mut deadline); + let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?; if !sent { self.status.set_congested(priority, true); } - sent + Ok(sent) } #[inline] @@ -1286,4 +1286,104 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn tx_pipeline_closed() -> ZResult<()> { + fn schedule(queue: TransmissionPipelineProducer, counter: Arc, id: usize) { + // Make sure to put only one message per batch: set the payload size + // to half of the batch in such a way the serialized zenoh message + // will be larger then half of the batch size (header + payload). + let payload_size = (CONFIG_STREAMED.batch.mtu / 2) as usize; + + // Send reliable messages + let key = "test".into(); + let payload = ZBuf::from(vec![0_u8; payload_size]); + + let message: NetworkMessage = Push { + wire_expr: key, + ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + payload, + }), + } + .into(); + + // The last push should block since there shouldn't any more batches + // available for serialization. + let num_msg = 1 + CONFIG_STREAMED.queue_size[0]; + for i in 0..num_msg { + println!( + "Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes" + ); + queue.push_network_message(message.clone()).unwrap(); + let c = counter.fetch_add(1, Ordering::AcqRel); + println!( + "Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes", + id, i, c + 1, + payload_size + ); + } + } + + // Pipeline + let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; + let priorities = vec![tct]; + let (producer, mut consumer) = + TransmissionPipeline::make(CONFIG_NOT_STREAMED, priorities.as_slice()); + + let counter = Arc::new(AtomicUsize::new(0)); + + let c_producer = producer.clone(); + let c_counter = counter.clone(); + let h1 = task::spawn_blocking(move || { + schedule(c_producer, c_counter, 1); + }); + + let c_counter = counter.clone(); + let h2 = task::spawn_blocking(move || { + schedule(producer, c_counter, 2); + }); + + // Wait to have sent enough messages and to have blocked + println!( + "Pipeline Blocking [---]: waiting to have {} messages being scheduled", + CONFIG_STREAMED.queue_size[Priority::MAX as usize] + ); + let check = async { + while counter.load(Ordering::Acquire) + < CONFIG_STREAMED.queue_size[Priority::MAX as usize] + { + tokio::time::sleep(SLEEP).await; + } + }; + + timeout(TIMEOUT, check).await?; + + // Disable and drain the queue + timeout( + TIMEOUT, + task::spawn_blocking(move || { + println!("Pipeline Blocking [---]: draining the queue"); + let _ = consumer.drain(); + }), + ) + .await??; + + // Make sure that the tasks scheduling have been unblocked + println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked"); + timeout(TIMEOUT, h1).await??; + println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked"); + timeout(TIMEOUT, h2).await??; + + Ok(()) + } }