diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index ad77030ae..b62640cc5 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -26,12 +26,15 @@ use example_cli::{ const DB_MAGIC: &[u8] = b"bdk_example_rpc"; const DB_PATH: &str = ".bdk_example_rpc.db"; +/// The mpsc channel bound for emissions from [`Emitter`]. const CHANNEL_BOUND: usize = 10; +/// Width of the elapsed time stdout. +const ELAPSED_WIDTH: usize = 10; /// Delay for printing status to stdout. const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6); /// Delay between mempool emissions. const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30); -/// Delay for commiting to persistance. +/// Delay for committing to persistance. const DB_COMMIT_DELAY: Duration = Duration::from_secs(60); type ChangeSet = ( @@ -111,18 +114,30 @@ enum RpcCommands { } fn main() -> anyhow::Result<()> { + let start = Instant::now(); + let (args, keymap, index, db, init_changeset) = example_cli::init::(DB_MAGIC, DB_PATH)?; + println!( + "[{:>ELAPSED_WIDTH$}s] loaded initial changeset from db", + start.elapsed().as_secs_f32() + ); let graph = Mutex::new({ let mut graph = IndexedTxGraph::new(index); graph.apply_changeset(init_changeset.1); graph }); - println!("loaded indexed tx graph from db"); + println!( + "[{:>ELAPSED_WIDTH$}s] loaded indexed tx graph from changeset", + start.elapsed().as_secs_f32() + ); let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0)); - println!("loaded local chain from db"); + println!( + "[{:>ELAPSED_WIDTH$}s] loaded local chain from changeset", + start.elapsed().as_secs_f32() + ); let rpc_cmd = match args.command { example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd, @@ -153,14 +168,11 @@ fn main() -> anyhow::Result<()> { .. } = rpc_args; - let mut chain = chain.lock().unwrap(); - let mut graph = graph.lock().unwrap(); - let mut db = db.lock().unwrap(); - - graph.index.set_lookahead_for_all(lookahead); + graph.lock().unwrap().index.set_lookahead_for_all(lookahead); + let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; - let mut emitter = match chain.tip() { + let mut emitter = match chain_tip { Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), None => Emitter::from_height(&rpc_client, fallback_height), }; @@ -169,6 +181,10 @@ fn main() -> anyhow::Result<()> { let mut last_print = Instant::now(); while let Some((height, block)) = emitter.next_block()? { + let mut chain = chain.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut db = db.lock().unwrap(); + let chain_update = CheckPoint::from_header(&block.header, height).into_update(false); let chain_changeset = chain @@ -182,7 +198,8 @@ fn main() -> anyhow::Result<()> { last_db_commit = Instant::now(); db.commit()?; println!( - "commited to db (took {}s)", + "[{:>ELAPSED_WIDTH$}s] commited to db (took {}s)", + start.elapsed().as_secs_f32(), last_db_commit.elapsed().as_secs_f32() ); } @@ -200,7 +217,8 @@ fn main() -> anyhow::Result<()> { ) }; println!( - "synced to {} @ {} | total: {} sats", + "[{:>ELAPSED_WIDTH$}s] synced to {} @ {} | total: {} sats", + start.elapsed().as_secs_f32(), synced_to.hash(), synced_to.height(), balance.total() @@ -209,13 +227,15 @@ fn main() -> anyhow::Result<()> { } } - // mempool let mempool_txs = emitter.mempool()?; - let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs); - db.stage((local_chain::ChangeSet::default(), graph_changeset)); - - // commit one last time! - db.commit()?; + let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed( + mempool_txs.iter().map(|(tx, time)| (tx, *time)), + ); + { + let mut db = db.lock().unwrap(); + db.stage((local_chain::ChangeSet::default(), graph_changeset)); + db.commit()?; // commit one last time + } } RpcCommands::Live { rpc_args } => { let RpcArgs { @@ -228,10 +248,12 @@ fn main() -> anyhow::Result<()> { graph.lock().unwrap().index.set_lookahead_for_all(lookahead); let last_cp = chain.lock().unwrap().tip(); + println!( + "[{:>ELAPSED_WIDTH$}s] starting emitter thread...", + start.elapsed().as_secs_f32() + ); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { - println!("emitter thread started..."); - let rpc_client = rpc_args.new_client()?; let mut emitter = match last_cp { Some(cp) => Emitter::from_checkpoint(&rpc_client, cp), @@ -270,15 +292,15 @@ fn main() -> anyhow::Result<()> { Ok(()) }); - let mut db = db.lock().unwrap(); - let mut graph = graph.lock().unwrap(); - let mut chain = chain.lock().unwrap(); let mut tip_height = 0_u32; - let mut last_db_commit = Instant::now(); let mut last_print = Option::::None; for emission in rx { + let mut db = db.lock().unwrap(); + let mut graph = graph.lock().unwrap(); + let mut chain = chain.lock().unwrap(); + let changeset = match emission { Emission::Block { height, block } => { let chain_update = @@ -307,7 +329,8 @@ fn main() -> anyhow::Result<()> { last_db_commit = Instant::now(); db.commit()?; println!( - "commited to db (took {}s)", + "[{:>ELAPSED_WIDTH$}s] commited to db (took {}s)", + start.elapsed().as_secs_f32(), last_db_commit.elapsed().as_secs_f32() ); } @@ -324,7 +347,8 @@ fn main() -> anyhow::Result<()> { ) }; println!( - "synced to {} @ {} / {} | total: {} sats", + "[{:>ELAPSED_WIDTH$}s] synced to {} @ {} / {} | total: {} sats", + start.elapsed().as_secs_f32(), synced_to.hash(), synced_to.height(), tip_height,