diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index b1eeca261f..41e1b26e72 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -108,11 +108,42 @@ macro_rules! face_hat_mut { } use face_hat_mut; +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, +} + +impl TreesComputationWorker { + fn new() -> Self { + let (tx, rx) = flume::bounded::>(1); + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children); + queries::queries_tree_change(&mut tables, &new_children); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} + struct HatTables { peer_subs: HashSet>, peer_qabls: HashSet>, peers_net: Option, - peers_trees_task: Option, + peers_trees_worker: TreesComputationWorker, } impl HatTables { @@ -121,36 +152,13 @@ impl HatTables { peer_subs: HashSet::new(), peer_qabls: HashSet::new(), peers_net: None, - peers_trees_task: None, + peers_trees_worker: TreesComputationWorker::new(), } } fn schedule_compute_trees(&mut self, tables_ref: Arc) { - tracing::trace!("Schedule computations"); - if self.peers_trees_task.is_none() { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(); - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children); - queries::queries_tree_change(&mut tables, &new_children); - - tracing::trace!("Computations completed"); - hat_mut!(tables).peers_trees_task = None; - }, - TerminatableTask::create_cancellation_token(), - ); - self.peers_trees_task = Some(task); - } + tracing::trace!("Schedule trees computation"); + let _ = self.peers_trees_worker.tx.try_send(tables_ref); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index b4b88d66e9..407562425e 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -113,6 +113,44 @@ macro_rules! face_hat_mut { } use face_hat_mut; +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, +} + +impl TreesComputationWorker { + fn new(net_type: WhatAmI) -> Self { + let (tx, rx) = flume::bounded::>(1); + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = match net_type { + WhatAmI::Router => hat_mut!(tables) + .routers_net + .as_mut() + .unwrap() + .compute_trees(), + _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), + }; + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); + queries::queries_tree_change(&mut tables, &new_children, net_type); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} + struct HatTables { router_subs: HashSet>, peer_subs: HashSet>, @@ -121,8 +159,8 @@ struct HatTables { routers_net: Option, peers_net: Option, shared_nodes: Vec, - routers_trees_task: Option, - peers_trees_task: Option, + routers_trees_worker: TreesComputationWorker, + peers_trees_worker: TreesComputationWorker, router_peers_failover_brokering: bool, } @@ -136,8 +174,8 @@ impl HatTables { routers_net: None, peers_net: None, shared_nodes: vec![], - routers_trees_task: None, - peers_trees_task: None, + routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router), + peers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer), router_peers_failover_brokering, } } @@ -240,45 +278,15 @@ impl HatTables { } fn schedule_compute_trees(&mut self, tables_ref: Arc, net_type: WhatAmI) { - tracing::trace!("Schedule computations"); - if (net_type == WhatAmI::Router && self.routers_trees_task.is_none()) - || (net_type == WhatAmI::Peer && self.peers_trees_task.is_none()) - { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = match net_type { - WhatAmI::Router => hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .compute_trees(), - _ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(), - }; - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); - queries::queries_tree_change(&mut tables, &new_children, net_type); - - tracing::trace!("Computations completed"); - match net_type { - WhatAmI::Router => hat_mut!(tables).routers_trees_task = None, - _ => hat_mut!(tables).peers_trees_task = None, - }; - }, - TerminatableTask::create_cancellation_token(), - ); - match net_type { - WhatAmI::Router => self.routers_trees_task = Some(task), - _ => self.peers_trees_task = Some(task), - }; + tracing::trace!("Schedule trees computation"); + match net_type { + WhatAmI::Router => { + let _ = self.routers_trees_worker.tx.try_send(tables_ref); + } + WhatAmI::Peer => { + let _ = self.peers_trees_worker.tx.try_send(tables_ref); + } + _ => (), } } }