diff --git a/chaindata_standalone/src/main.rs b/chaindata_standalone/src/main.rs index c80f523..eae909a 100644 --- a/chaindata_standalone/src/main.rs +++ b/chaindata_standalone/src/main.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::env; use std::sync::{Arc, RwLock}; -use std::thread::sleep; use std::time::Duration; use async_channel::Sender; use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; @@ -12,6 +11,7 @@ use solana_sdk::pubkey::Pubkey; use tokio::sync::broadcast; use tokio::sync::mpsc::Receiver; use tokio::task::JoinHandle; +use tokio::time::sleep; use tracing_subscriber::EnvFilter; use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdatePing}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -49,13 +49,13 @@ pub async fn main() { let (account_write_sender, account_write_receiver) = async_channel::unbounded::(); let (slot_sender, slot_receiver) = async_channel::unbounded::(); - let (account_update_sender, _) = broadcast::channel(64); // 524288 + let (account_update_sender, _) = broadcast::channel(524288); // 524288 // TODO exit start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone()); let chain_data = Arc::new(RwLock::new(ChainData::new())); - router_impl::start_chaindata_updating( + let job1 = router_impl::start_chaindata_updating( chain_data.clone(), account_write_receiver, slot_receiver, @@ -63,7 +63,7 @@ pub async fn main() { exit_sender.subscribe(), ); - spawn_updater_job( + let job2 = spawn_updater_job( chain_data.clone(), account_update_sender.subscribe(), exit_sender.subscribe(), @@ -71,20 +71,19 @@ pub async fn main() { info!("chaindata standalone started - now wait some time to let it operate .."); - debug_chaindata(chain_data.clone(), exit_sender.subscribe(),); + let job3 = debug_chaindata(chain_data.clone(), exit_sender.subscribe(),); - sleep(std::time::Duration::from_secs(7)); + sleep(std::time::Duration::from_secs(7)).await; info!("done."); info!("send exit signal.."); exit_sender.send(()).unwrap(); - sleep(std::time::Duration::from_secs(1)); + sleep(std::time::Duration::from_secs(1)).await; info!("quitting."); } // TODO add exit -fn debug_chaindata(chain_data: Arc>, mut exit: broadcast::Receiver<()>,) { - +fn debug_chaindata(chain_data: Arc>, mut exit: broadcast::Receiver<()>,) -> JoinHandle<()> { tokio::spawn(async move { info!("starting debug task"); loop { @@ -92,15 +91,17 @@ fn debug_chaindata(chain_data: Arc>, mut exit: broadcast::Rece info!("exit signal received - stopping task"); return; } - let chain_data = chain_data.read().unwrap(); - info!("chaindata?"); - for account in chain_data.iter_accounts_rooted() { - info!("- chaindata.account {:?}", account); + { + let chain_data = chain_data.read().unwrap(); + info!("chaindata?"); + for account in chain_data.iter_accounts_rooted() { + info!("- chaindata.account {:?}", account); + } } - sleep(std::time::Duration::from_secs(1)); + + sleep(std::time::Duration::from_secs(1)).await; } - }); - + }) } // this is replacing the spawn_geyser_source task from router diff --git a/chaindata_standalone/src/router_impl.rs b/chaindata_standalone/src/router_impl.rs index 08911a9..5f54e91 100644 --- a/chaindata_standalone/src/router_impl.rs +++ b/chaindata_standalone/src/router_impl.rs @@ -65,9 +65,6 @@ pub fn start_chaindata_updating( status: slot_update.status, chain: 0, }); - - // TODO: slot updates can significantly affect state, do we need to track what needs to be updated - // when switching to a different fork? } } } @@ -103,7 +100,7 @@ fn handle_updated_account( trace!("[account_write_receiver->account_update_sender] send write for {}@_slot_{} write_version={}", account_write.pubkey, account_write.slot, account_write.write_version); - // ignore failing sends when there are no receivers + let _err = account_update_sender.send((account_write.pubkey, account_write.slot)); } @@ -111,7 +108,7 @@ pub fn spawn_updater_job( chain_data: ChainDataArcRw, mut account_updates: broadcast::Receiver<(Pubkey, u64)>, mut exit: broadcast::Receiver<()>, -) -> Option> { +) -> JoinHandle<()> { let listener_job = tokio::spawn(async move { @@ -126,50 +123,12 @@ pub fn spawn_updater_job( info!("shutting down update task"); break; } - // slot = slot_updates.recv() => { - // updater.detect_and_handle_slot_lag(slot); - // } - // res = metadata_updates.recv() => { - // updater.on_metadata_update(res); - // } res = account_updates.recv() => { let (pubkey, slot) = res.unwrap(); trace!("[account_update_sender->...]-> updater.invalidate_one for {}@_slot_{}", pubkey, slot); - // if !updater.invalidate_one(res) { - // break 'drain_loop; - // } - // - // let mut batchsize: u32 = 0; - // let started_at = Instant::now(); - // 'batch_loop: while let Ok(res) = account_updates.try_recv() { - // batchsize += 1; - // if !updater.invalidate_one(Ok(res)) { - // break 'drain_loop; - // } - // - // // budget for microbatch - // if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) { - // break 'batch_loop; - // } - // } - }, - // Ok(_) = price_updates.recv() => { - // updater.state.dirty_prices = true; - // }, - // _ = refresh_all_interval.tick() => { - // updater.refresh_all(&edges); - // - // if !updater.state.is_ready && snapshot_timeout < Instant::now() { - // error!("Failed to init '{}' before timeout", updater.dex.name); - // break; - // } - // } _ = refresh_one_interval.tick() => { - // updater.refresh_some(); - // note! - // info!("-> updater.refresh_some (10 ms tick)"); } } } @@ -179,5 +138,5 @@ pub fn spawn_updater_job( // let _ = updater.ready_sender.try_send(()); }); - Some(listener_job) + listener_job } \ No newline at end of file