Skip to content

Commit

Permalink
Merge pull request #19 from cita-cloud/potential_dead_lock
Browse files Browse the repository at this point in the history
fix potential dead lock
  • Loading branch information
rink1969 authored Apr 10, 2024
2 parents 04de162 + 6aa9d2f commit 7e7a54c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
12 changes: 9 additions & 3 deletions src/core/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ impl Controller {
let mut f_pool = self.forward_pool.write().await;
f_pool.body.push(raw_tx.clone());
if f_pool.body.len() > self.config.count_per_batch {
self.broadcast_send_txs(f_pool.clone()).await;
let txs = RawTransactions {
body: f_pool.body.clone(),
};
f_pool.body.clear();
self.broadcast_send_txs(txs).await;
}
}
// send to storage
Expand Down Expand Up @@ -301,8 +304,8 @@ impl Controller {
let mut hashes = Vec::new();
{
let auditor = self.auditor.read().await;
let mut pool = self.pool.write().await;
auditor.auditor_check_batch(&raw_txs)?;
let mut pool = self.pool.write().await;
for raw_tx in raw_txs.body.clone() {
let hash = get_tx_hash(&raw_tx)?.to_vec();
if pool.insert(raw_tx) {
Expand Down Expand Up @@ -1561,8 +1564,11 @@ impl Controller {
pub async fn retransmission_tx(&self) {
let mut f_pool = self.forward_pool.write().await;
if !f_pool.body.is_empty() {
self.broadcast_send_txs(f_pool.clone()).await;
let txs = RawTransactions {
body: f_pool.body.clone(),
};
f_pool.body.clear();
self.broadcast_send_txs(txs).await;
}
}
}
72 changes: 45 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,35 +219,53 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> {
));
let mut forward_interval = time::interval(Duration::from_micros(config.buffer_duration));

// create timer task
let controller_for_timer = controller.clone();
let rx_signal_for_timer = rx_signal.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = reconnect_interval.tick() => {
let _ = event_sender
.send(Event::BroadcastCSI)
.map_err(|e| {
warn!("send broadcast csi event failed: {}", e);
StatusCodeEnum::FatalError
});
let _ = event_sender
.send(Event::RecordAllNode)
.map_err(|e| {
warn!("send record all node event failed: {}", e);
StatusCodeEnum::FatalError
});
},
_ = inner_health_check_interval.tick() => {
let _ = event_sender
.send(Event::InnerHealthCheck)
.map_err(|e| {
warn!("send inner health check event failed: {}", e);
StatusCodeEnum::FatalError
});
},
_ = forward_interval.tick() => {
controller_for_timer
.retransmission_tx()
.await;
},
_ = rx_signal_for_timer.recv_async() => {
info!("timer task exit!");
break;
},
else => {
debug!("timer task exit!");
break;
}
}
}
});

loop {
tokio::select! {
_ = reconnect_interval.tick() => {
event_sender
.send(Event::BroadcastCSI)
.map_err(|e| {
warn!("send broadcast csi event failed: {}", e);
StatusCodeEnum::FatalError
})?;
event_sender
.send(Event::RecordAllNode)
.map_err(|e| {
warn!("send record all node event failed: {}", e);
StatusCodeEnum::FatalError
})?;
},
_ = inner_health_check_interval.tick() => {
event_sender
.send(Event::InnerHealthCheck)
.map_err(|e| {
warn!("send inner health check event failed: {}", e);
StatusCodeEnum::FatalError
})?;
},
_ = forward_interval.tick() => {
controller
.retransmission_tx()
.await;
},
Ok(event) = event_receiver.recv_async() => {
controller_state_machine.write().await.handle_with_context(&event, &mut controller).await;
},
Expand Down

0 comments on commit 7e7a54c

Please sign in to comment.