diff --git a/crates/block-producer/src/block_producer.rs b/crates/block-producer/src/block_producer.rs index 9ff354c1f..91f7fb30f 100644 --- a/crates/block-producer/src/block_producer.rs +++ b/crates/block-producer/src/block_producer.rs @@ -165,7 +165,7 @@ impl BlockProducer { self.last_submitted_tx_hash.clone() } - #[instrument(skip_all, fields(event = %event))] + #[instrument(skip_all, name = "block producer handle_event")] pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> { if let Some(ref tests_control) = self.tests_control { match tests_control.payload().await { diff --git a/crates/block-producer/src/challenger.rs b/crates/block-producer/src/challenger.rs index ab7aa751a..047ec9cb2 100644 --- a/crates/block-producer/src/challenger.rs +++ b/crates/block-producer/src/challenger.rs @@ -37,6 +37,7 @@ use gw_utils::genesis_info::CKBGenesisInfo; use gw_utils::transaction_skeleton::TransactionSkeleton; use gw_utils::wallet::Wallet; use tokio::sync::Mutex; +use tracing::instrument; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; @@ -110,6 +111,7 @@ impl Challenger { } } + #[instrument(skip_all, name = "challenger handle_event")] pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> { if let Some(ref tests_control) = self.tests_control { match tests_control.payload().await { diff --git a/crates/block-producer/src/cleaner.rs b/crates/block-producer/src/cleaner.rs index 14cc8d9dd..9c00dac29 100644 --- a/crates/block-producer/src/cleaner.rs +++ b/crates/block-producer/src/cleaner.rs @@ -12,6 +12,7 @@ use gw_types::core::Status; use gw_types::offchain::{global_state_from_slice, CellInfo, InputCellInfo, TxStatus}; use gw_types::packed::{CellDep, CellInput, Transaction, WitnessArgs}; use gw_types::prelude::Unpack; +use tracing::instrument; use std::collections::HashSet; use std::convert::TryFrom; @@ -72,6 +73,7 @@ impl Cleaner { } } + #[instrument(skip_all, name = "cleaner handle_event")] pub async fn handle_event(&self, _event: ChainEvent) -> Result<()> { if matches!(self.query_rollup_status().await?, Status::Halting) { return Ok(()); diff --git a/crates/block-producer/src/poller.rs b/crates/block-producer/src/poller.rs index 1a474b113..77b18d2b0 100644 --- a/crates/block-producer/src/poller.rs +++ b/crates/block-producer/src/poller.rs @@ -91,7 +91,7 @@ impl ChainUpdater { } // Start syncing - #[instrument(skip_all, fields(event = %_event))] + #[instrument(skip_all, name = "chain updater handle_event")] pub async fn handle_event(&mut self, _event: ChainEvent) -> Result<()> { let initial_syncing = !self.initialized; // Always start from last valid tip on l1 diff --git a/crates/block-producer/src/runner.rs b/crates/block-producer/src/runner.rs index 05e966dd5..1580932f9 100644 --- a/crates/block-producer/src/runner.rs +++ b/crates/block-producer/src/runner.rs @@ -65,6 +65,7 @@ use tokio::{ spawn, sync::{broadcast, mpsc, Mutex}, }; +use tracing::{info_span, instrument}; const MIN_CKB_VERSION: &str = "0.40.0"; const EVENT_TIMEOUT_SECONDS: u64 = 30; @@ -76,6 +77,21 @@ struct ChainTaskContext { withdrawal_unlocker: Option, cleaner: Option>, } + +struct ChainTaskRunStatus { + opt_tip_number_hash: Option<(u64, H256)>, + last_event_time: Instant, +} + +impl Default for ChainTaskRunStatus { + fn default() -> Self { + ChainTaskRunStatus { + opt_tip_number_hash: None, + last_event_time: Instant::now(), + } + } +} + struct ChainTask { rpc_client: RPCClient, poll_interval: Duration, @@ -137,6 +153,7 @@ impl ChainTask { } } + #[instrument(skip_all, fields(tip_number = tip_number, tip_hash = %tip_hash.pack()))] async fn sync_next( &self, tip_number: u64, @@ -303,36 +320,32 @@ impl ChainTask { } } - async fn run(&mut self, backoff: &mut ExponentialBackoff) -> Result<()> { + // How to get tip_number and tip_hash only once? then loop chain task run only? + #[instrument(skip_all, err(Debug))] + async fn run(&mut self, status: &ChainTaskRunStatus) -> Result { // get tip - let (mut tip_number, mut tip_hash) = { - let tip = self.rpc_client.get_tip().await?; - let tip_number: u64 = tip.number().unpack(); - let tip_hash: H256 = tip.block_hash().unpack(); - (tip_number, tip_hash) + let (tip_number, tip_hash) = match status.opt_tip_number_hash { + Some((number, hash)) => (number, hash), + None => { + let tip = self.rpc_client.get_tip().await?; + let tip_number: u64 = tip.number().unpack(); + let tip_hash: H256 = tip.block_hash().unpack(); + (tip_number, tip_hash) + } }; - let mut last_event_time = Instant::now(); + let opt_tip_number_hash = self + .metrics_monitor + .instrument(self.sync_next(tip_number, tip_hash, &status.last_event_time)) + .await?; - loop { - // Exit if shutdown event is received. - if self.shutdown_event.try_recv().is_ok() { - log::info!("ChainTask existed successfully"); - return Ok(()); - } + let updated_status = ChainTaskRunStatus { + opt_tip_number_hash: opt_tip_number_hash + .or_else(|| status.opt_tip_number_hash.to_owned()), + last_event_time: Instant::now(), + }; - if let Some((_tip_number, _tip_hash)) = self - .metrics_monitor - .instrument(self.sync_next(tip_number, tip_hash, &last_event_time)) - .await? - { - tip_number = _tip_number; - tip_hash = _tip_hash; - last_event_time = Instant::now(); - } - backoff.reset(); - tokio::time::sleep(self.poll_interval).await; - } + Ok(updated_status) } } @@ -870,6 +883,8 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> { let shutdown_send = shutdown_send.clone(); move || { rt_handle.block_on(async move { + use tracing::Instrument; + let _tx = chain_task_ended_tx; let ctx = ChainTaskContext { chain_updater, @@ -886,13 +901,51 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> { shutdown_send, shutdown_event_recv, ); - while let Err(err) = chain_task.run(&mut backoff).await { - if err.is::() { - log::error!("chain polling loop request error, will retry: {}", err); - tokio::time::sleep(backoff.next_sleep()).await; - } else { - log::error!("chain polling loop exit unexpected, error: {}", err); - break; + + let mut run_status = ChainTaskRunStatus::default(); + loop { + // Exit if shutdown event is received. + if chain_task.shutdown_event.try_recv().is_ok() { + log::info!("ChainTask existed successfully"); + return; + } + + let run_span = info_span!("chain_task_run"); + match chain_task + .run(&run_status) + .instrument(run_span.clone()) + .await + { + Ok(updated_status) => { + run_status = updated_status; + backoff.reset(); + + let sleep_span = + info_span!(parent: &run_span, "chain_task interval sleep"); + tokio::time::sleep(chain_task.poll_interval) + .instrument(sleep_span) + .await; + } + Err(err) if err.is::() => { + // Reset status and refresh tip number hash + run_status = ChainTaskRunStatus::default(); + let backoff_sleep = backoff.next_sleep(); + log::error!( + "chain polling loop request error, will retry in {}s: {}", + backoff_sleep.as_secs(), + err + ); + + let sleep_span = + info_span!(parent: &run_span, "chain_task backoff sleep"); + tokio::time::sleep(backoff_sleep) + .instrument(sleep_span) + .await; + } + Err(err) => { + log::error!("chain polling loop exit unexpected, error: {}", err); + break; + } } } }); diff --git a/crates/block-producer/src/withdrawal_unlocker.rs b/crates/block-producer/src/withdrawal_unlocker.rs index d14d3b000..3fa0fed2e 100644 --- a/crates/block-producer/src/withdrawal_unlocker.rs +++ b/crates/block-producer/src/withdrawal_unlocker.rs @@ -16,6 +16,7 @@ use gw_utils::fee::fill_tx_fee; use gw_utils::genesis_info::CKBGenesisInfo; use gw_utils::transaction_skeleton::TransactionSkeleton; use gw_utils::wallet::Wallet; +use tracing::instrument; use crate::types::ChainEvent; use crate::utils; @@ -50,6 +51,7 @@ impl FinalizedWithdrawalUnlocker { } } + #[instrument(skip_all, name = "withdrawal unlocker handle_event")] pub async fn handle_event(&mut self, _event: &ChainEvent) -> Result<()> { let unlocked = &self.unlocked_set; let rpc_client = &self.unlocker.rpc_client;