From 6aa9d2f3a38021bed31ceb18af1a61b6b12c9b9f Mon Sep 17 00:00:00 2001 From: rink1969 Date: Mon, 8 Apr 2024 21:06:27 +0800 Subject: [PATCH] fix potential dead lock --- src/core/controller.rs | 12 +++++-- src/main.rs | 72 ++++++++++++++++++++++++++---------------- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/src/core/controller.rs b/src/core/controller.rs index bf0b0de..cd5def6 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -254,8 +254,11 @@ impl Controller { let mut f_pool = self.forward_pool.write().await; f_pool.body.push(raw_tx.clone()); if f_pool.body.len() > self.config.count_per_batch { - self.broadcast_send_txs(f_pool.clone()).await; + let txs = RawTransactions { + body: f_pool.body.clone(), + }; f_pool.body.clear(); + self.broadcast_send_txs(txs).await; } } // send to storage @@ -301,8 +304,8 @@ impl Controller { let mut hashes = Vec::new(); { let auditor = self.auditor.read().await; - let mut pool = self.pool.write().await; auditor.auditor_check_batch(&raw_txs)?; + let mut pool = self.pool.write().await; for raw_tx in raw_txs.body.clone() { let hash = get_tx_hash(&raw_tx)?.to_vec(); if pool.insert(raw_tx) { @@ -1561,8 +1564,11 @@ impl Controller { pub async fn retransmission_tx(&self) { let mut f_pool = self.forward_pool.write().await; if !f_pool.body.is_empty() { - self.broadcast_send_txs(f_pool.clone()).await; + let txs = RawTransactions { + body: f_pool.body.clone(), + }; f_pool.body.clear(); + self.broadcast_send_txs(txs).await; } } } diff --git a/src/main.rs b/src/main.rs index c58477c..a0e3e5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -219,35 +219,53 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { )); let mut forward_interval = time::interval(Duration::from_micros(config.buffer_duration)); + // create timer task + let controller_for_timer = controller.clone(); + let rx_signal_for_timer = rx_signal.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = reconnect_interval.tick() => { + let _ = event_sender + .send(Event::BroadcastCSI) + .map_err(|e| { + warn!("send broadcast csi event failed: {}", e); + StatusCodeEnum::FatalError + }); + let _ = event_sender + .send(Event::RecordAllNode) + .map_err(|e| { + warn!("send record all node event failed: {}", e); + StatusCodeEnum::FatalError + }); + }, + _ = inner_health_check_interval.tick() => { + let _ = event_sender + .send(Event::InnerHealthCheck) + .map_err(|e| { + warn!("send inner health check event failed: {}", e); + StatusCodeEnum::FatalError + }); + }, + _ = forward_interval.tick() => { + controller_for_timer + .retransmission_tx() + .await; + }, + _ = rx_signal_for_timer.recv_async() => { + info!("timer task exit!"); + break; + }, + else => { + debug!("timer task exit!"); + break; + } + } + } + }); + loop { tokio::select! { - _ = reconnect_interval.tick() => { - event_sender - .send(Event::BroadcastCSI) - .map_err(|e| { - warn!("send broadcast csi event failed: {}", e); - StatusCodeEnum::FatalError - })?; - event_sender - .send(Event::RecordAllNode) - .map_err(|e| { - warn!("send record all node event failed: {}", e); - StatusCodeEnum::FatalError - })?; - }, - _ = inner_health_check_interval.tick() => { - event_sender - .send(Event::InnerHealthCheck) - .map_err(|e| { - warn!("send inner health check event failed: {}", e); - StatusCodeEnum::FatalError - })?; - }, - _ = forward_interval.tick() => { - controller - .retransmission_tx() - .await; - }, Ok(event) = event_receiver.recv_async() => { controller_state_machine.write().await.handle_with_context(&event, &mut controller).await; },