From 39befc3c9fc4bbe31f77fd99be09b4a7778e1972 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 4 Mar 2024 11:49:33 +0800 Subject: [PATCH] Refine the TODO comments --- commons/zenoh-sync/src/object_pool.rs | 1 - io/zenoh-link-commons/src/listener.rs | 2 +- .../zenoh-link-unixpipe/src/unix/unicast.rs | 3 ++- io/zenoh-transport/src/manager.rs | 2 +- .../src/multicast/establishment.rs | 3 +-- io/zenoh-transport/src/multicast/link.rs | 5 ++--- io/zenoh-transport/src/multicast/transport.rs | 10 +++++----- .../src/unicast/lowlatency/link.rs | 17 +++++++---------- .../src/unicast/lowlatency/transport.rs | 1 - .../src/unicast/universal/link.rs | 13 +++++-------- zenoh/src/net/runtime/adminspace.rs | 4 +--- zenoh/src/session.rs | 1 - zenoh/tests/routing.rs | 3 +-- 13 files changed, 26 insertions(+), 39 deletions(-) diff --git a/commons/zenoh-sync/src/object_pool.rs b/commons/zenoh-sync/src/object_pool.rs index c918da1fee..83b673c449 100644 --- a/commons/zenoh-sync/src/object_pool.rs +++ b/commons/zenoh-sync/src/object_pool.rs @@ -108,7 +108,6 @@ impl From for RecyclingObject { } } -// TODO: Check this necessity impl Drop for RecyclingObject { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index d7cfd43313..be61e9cf89 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -49,7 +49,7 @@ impl ListenerUnicastIP { } pub struct ListenersUnicastIP { - // TODO: should we change this to AsyncRwLock? + // TODO(yuyuan): should we change this to AsyncRwLock? listeners: Arc>>, pub token: CancellationToken, } diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index f790463635..8f3577a8e9 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -300,8 +300,9 @@ impl UnicastPipeListener { let token = CancellationToken::new(); let c_token = token.clone(); + + // WARN: The spawn_blocking is mandatory verified by the ping/pong test // create listening task - // TODO: Check the necessity of this spawn_blocking tokio::task::spawn_blocking(move || { ZRuntime::Acceptor.block_on(async move { loop { diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 0feb981b90..9bf40e5fd3 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -428,7 +428,7 @@ impl TransportManager { lsu } - // TODO: Can we make this async as above? + // TODO(yuyuan): Can we make this async as above? pub fn get_locators(&self) -> Vec { let mut lsu = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_unicast()); let mut lsm = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_multicast()); diff --git a/io/zenoh-transport/src/multicast/establishment.rs b/io/zenoh-transport/src/multicast/establishment.rs index cbec529471..a0b7576f03 100644 --- a/io/zenoh-transport/src/multicast/establishment.rs +++ b/io/zenoh-transport/src/multicast/establishment.rs @@ -91,11 +91,10 @@ pub(crate) async fn open_link( w_guard.insert(locator.clone(), ti.clone()); drop(w_guard); - // TODO: resolve the structure entanglement below + // TODO(yuyuan): resolve the structure entanglement below // Notify the transport event handler let transport: TransportMulticast = (&ti).into(); - // TODO: also check the dyn trait implementation in callback let callback = match manager.config.handler.new_multicast(transport.clone()) { Ok(c) => c, Err(e) => { diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 4714babea3..3565f747a0 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -257,7 +257,7 @@ pub(super) struct TransportLinkMulticastConfigUniversal { pub(super) batch_size: BatchSize, } -// TODO: Introduce TaskTracker and retire handle_tx, handle_rx, and signal_rx. +// TODO(yuyuan): Introduce TaskTracker or JoinSet and retire handle_tx, handle_rx, and signal_rx. #[derive(Clone)] pub(super) struct TransportLinkMulticastUniversal { // The underlying link @@ -329,6 +329,7 @@ impl TransportLinkMulticastUniversal { // Spawn the TX task let c_link = self.link.clone(); let c_transport = self.transport.clone(); + let handle = zenoh_runtime::ZRuntime::TX.spawn(async move { let res = tx_task( consumer, @@ -343,7 +344,6 @@ impl TransportLinkMulticastUniversal { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - // TODO: check which ZRuntime should be used zenoh_runtime::ZRuntime::Net.spawn(async move { c_transport.delete().await }); } }); @@ -380,7 +380,6 @@ impl TransportLinkMulticastUniversal { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - // TODO: check which ZRuntime should be used zenoh_runtime::ZRuntime::Net.spawn(async move { c_transport.delete().await }); } }); diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 460e66d002..449c713700 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -183,7 +183,7 @@ impl TransportMulticastInner { cb.closed(); } - // TODO: unify the termination with the above + // TODO(yuyuan): use CancellationToken to unify the termination with the above self.token.cancel(); Ok(()) @@ -366,7 +366,7 @@ impl TransportMulticastInner { ); // Create lease event - // TODO: refine the clone behaviors + // TODO(yuyuan): refine the clone behaviors let is_active = Arc::new(AtomicBool::new(false)); let c_is_active = is_active.clone(); let token = self.token.child_token(); @@ -389,10 +389,10 @@ impl TransportMulticastInner { let _ = c_self.del_peer(&c_locator, close::reason::EXPIRED); }; - // TODO: Put it into TaskTracker properly + // TODO(yuyuan): Put it into TaskTracker or store as JoinHandle zenoh_runtime::ZRuntime::Acceptor.spawn(task); - // TODO: Integrate the above async task into TransportMulticastPeer + // TODO(yuyuan): Integrate the above async task into TransportMulticastPeer // Store the new peer let peer = TransportMulticastPeer { version: join.version, @@ -423,7 +423,7 @@ impl TransportMulticastInner { reason ); - // TODO: Unify the termination + // TODO(yuyuan): Unify the termination peer.token.cancel(); peer.handler.closing(); drop(guard); diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 2af6b2b383..43e4516aa5 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -23,19 +23,20 @@ use tokio_util::sync::CancellationToken; use zenoh_buffers::{writer::HasWriter, ZSlice}; use zenoh_codec::*; use zenoh_core::{zasyncread, zasyncwrite}; +use zenoh_link::LinkUnicast; use zenoh_protocol::transport::TransportMessageLowLatency; use zenoh_protocol::transport::{KeepAlive, TransportBodyLowLatency}; use zenoh_result::{zerror, ZResult}; use zenoh_runtime::ZRuntime; pub(crate) async fn send_with_link( - link: &TransportLinkUnicast, + link: &LinkUnicast, msg: TransportMessageLowLatency, #[cfg(feature = "stats")] stats: &Arc, ) -> ZResult<()> { let len; let codec = Zenoh080::new(); - if link.link.is_streamed() { + if link.is_streamed() { let mut buffer = vec![0, 0, 0, 0]; let mut writer = buffer.writer(); codec @@ -47,7 +48,7 @@ pub(crate) async fn send_with_link( buffer[0..4].copy_from_slice(&le); - link.link.write_all(&buffer).await?; + link.write_all(&buffer).await?; } else { let mut buffer = vec![]; let mut writer = buffer.writer(); @@ -59,7 +60,7 @@ pub(crate) async fn send_with_link( { len = buffer.len() as u32; } - link.link.write_all(&buffer).await?; + link.write_all(&buffer).await?; } log::trace!("Sent: {:?}", msg); @@ -99,7 +100,7 @@ impl TransportUnicastLowlatency { pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> { let guard = zasyncwrite!(self.link); - let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?; + let link = &guard.as_ref().ok_or_else(|| zerror!("No link"))?.link; send_with_link( link, msg, @@ -139,18 +140,15 @@ impl TransportUnicastLowlatency { } pub(super) fn internal_start_rx(&self, lease: Duration) { - // TODO: Tidy the complex dependencies let rx_buffer_size = self.manager.config.link_rx_buffer_size; let token = self.token.child_token(); - // TODO: This can be improved to minimal let c_transport = self.clone(); let task = async move { let guard = zasyncread!(c_transport.link); let link_rx = guard.as_ref().unwrap().rx(); drop(guard); - // TODO: link_rx.link and link.link let is_streamed = link_rx.link.is_streamed(); // The pool of buffers @@ -221,7 +219,6 @@ async fn keepalive_task( token: CancellationToken, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { - // TODO: check this necessity let mut interval = tokio::time::interval_at(tokio::time::Instant::now() + keep_alive, keep_alive); @@ -233,7 +230,7 @@ async fn keepalive_task( }; let guard = zasyncwrite!(link); - let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?; + let link = &guard.as_ref().ok_or_else(|| zerror!("No link"))?.link; let _ = send_with_link( link, keepailve, diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index cdd2e4b261..283c143499 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -223,7 +223,6 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { /*************************************/ /* LINK */ /*************************************/ - // TODO: Check the correctness: is this called at most once? async fn add_link( &self, link: LinkUnicastWithOpenAck, diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index c58d0e3320..5c07b69738 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -75,7 +75,6 @@ impl TransportLinkUnicastUniversal { (result, consumer) } - // TODO: Not yet guaranteed is called at most once pub(super) fn start_tx( &mut self, transport: TransportUnicastUniversal, @@ -96,12 +95,12 @@ impl TransportLinkUnicastUniversal { ) .await; - // TODO: improve this callback if let Err(e) = res { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - // TODO: check which ZRuntime should be used + // TODO(yuyuan): do more study to check which ZRuntime should be used or refine the + // termination zenoh_runtime::ZRuntime::TX .spawn(async move { transport.del_link(tx.inner.link()).await }); } @@ -109,7 +108,6 @@ impl TransportLinkUnicastUniversal { self.tracker.spawn_on(task, &zenoh_runtime::ZRuntime::TX); } - // TODO: Not yet guaranteed is called at most once pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { let mut rx = self.link.rx(); let token = self.token.clone(); @@ -124,13 +122,13 @@ impl TransportLinkUnicastUniversal { ) .await; - // TODO: improve this callback + // TODO(yuyuan): improve this callback if let Err(e) = res { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - // TODO: check which ZRuntime should be used + // WARN: Must be spawned on RX zenoh_runtime::ZRuntime::RX .spawn(async move { transport.del_link((&rx.link).into()).await }); @@ -138,7 +136,7 @@ impl TransportLinkUnicastUniversal { // zenoh_runtime::ZRuntime::Net // .spawn(async move { transport.del_link((&rx.link).into()).await }); - // // WARN: Don't worry. This fix doesn't work + // // WARN: This cloud block // transport.del_link((&rx.link).into()).await; } }; @@ -168,7 +166,6 @@ async fn tx_task( token: CancellationToken, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { - // TODO: check this necessity let mut interval = tokio::time::interval_at(tokio::time::Instant::now() + keep_alive, keep_alive); loop { diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index bc36b0ac2c..dfe5d78be2 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -528,10 +528,8 @@ fn router_data(context: &AdminContext, query: Query) { } json }; - // TODO: Avoid this block_on - // TODO: Check this block_in_place let transports: Vec = zenoh_runtime::ZRuntime::Net - .block_on(transport_mgr.get_transports_unicast()) + .block_in_place(transport_mgr.get_transports_unicast()) .iter() .map(transport_to_json) .collect(); diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 14e19afa66..18a1438870 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1755,7 +1755,6 @@ impl Session { _ => 1, }; - // TODO: check which ZRuntime should be used zenoh_runtime::ZRuntime::Net.spawn({ let state = self.state.clone(); let zid = self.runtime.zid(); diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 14c97f284b..6c5afe0673 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -85,7 +85,7 @@ impl Task { tokio::select! { _ = token.cancelled() => break, - // TODO: this won't yield after a timeout raised from recipe + // WARN: this won't yield after a timeout since the put is a blocking call res = tokio::time::timeout(std::time::Duration::from_secs(1), session .put(ke, value.clone()) .congestion_control(CongestionControl::Block) @@ -599,7 +599,6 @@ async fn three_node_combination() -> Result<()> { ) .collect(); - // TODO: It should be able to run concurrently for chunks in recipe_list.chunks(4).map(|x| x.to_vec()) { let mut join_set = tokio::task::JoinSet::new(); for (pubsub, getqueryable) in chunks {