diff --git a/chaindata_standalone/SAMPLEFLOWS.md b/chaindata_standalone/SAMPLEFLOWS.md new file mode 100644 index 0000000..b636811 --- /dev/null +++ b/chaindata_standalone/SAMPLEFLOWS.md @@ -0,0 +1,39 @@ + + + +``` +2024-08-12T08:58:14.644200Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:14.739358Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:14.774673Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:14.899019Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:14.899313Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:14.914404Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:14.946106Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:14.946583Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +2024-08-12T08:58:14.971928Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676593 +2024-08-12T08:58:14.986530Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676627 +2024-08-12T08:58:15.005577Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676657 +2024-08-12T08:58:15.864777Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:15.864815Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784 +2024-08-12T08:58:15.864856Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:17.875143Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:17.875227Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308 +2024-08-12T08:58:17.875421Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:17.875454Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:17.875516Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381 +2024-08-12T08:58:17.875610Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:18.880978Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:18.881039Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458 +2024-08-12T08:58:18.881218Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:18.881235Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:18.881297Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472 +2024-08-12T08:58:18.881378Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:19.887019Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:19.887155Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532 +2024-08-12T08:58:19.887320Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 +2024-08-12T08:58:19.887609Z TRACE chaindata_standalone::router_impl: [account_write_receiver->chain_data] account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.892641Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.892897Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565 +2024-08-12T08:58:20.893127Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +2024-08-12T08:58:20.893197Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579 +``` \ No newline at end of file diff --git a/chaindata_standalone/src/main.rs b/chaindata_standalone/src/main.rs index 9042dff..4cbafd7 100644 --- a/chaindata_standalone/src/main.rs +++ b/chaindata_standalone/src/main.rs @@ -49,7 +49,7 @@ pub async fn main() { let (account_write_sender, account_write_receiver) = async_channel::unbounded::(); let (slot_sender, slot_receiver) = async_channel::unbounded::(); - let (account_update_sender, _) = broadcast::channel(524288); + let (account_update_sender, _) = broadcast::channel(64); // 524288 // TODO exit start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone()); @@ -63,7 +63,6 @@ pub async fn main() { exit_sender.subscribe(), ); - spawn_updater_job( chain_data.clone(), account_update_sender.subscribe(), @@ -104,6 +103,7 @@ fn debug_chaindata(chain_data: Arc>, mut exit: broadcast::Rece } +// this is replacing the spawn_geyser_source task from router fn start_plumbing_task( mut grpc_source_rx: Receiver, account_write_sender: Sender, @@ -119,7 +119,8 @@ fn start_plumbing_task( let pubkey = Pubkey::try_from(update.pubkey.clone()).unwrap(); let owner = Pubkey::try_from(update.owner.clone()).unwrap(); - trace!("get account update for {:?}@{} via grpc", pubkey, slot); + trace!("[grpc->account_write_sender]: account update for {}@_slot_{} write_version={}", + pubkey, slot, update.write_version); account_write_sender.send(AccountWrite { pubkey, diff --git a/chaindata_standalone/src/router_impl.rs b/chaindata_standalone/src/router_impl.rs index bce84fa..1245367 100644 --- a/chaindata_standalone/src/router_impl.rs +++ b/chaindata_standalone/src/router_impl.rs @@ -1,4 +1,4 @@ -use log::{info, warn}; +use log::{info, trace, warn}; use mango_feeds_connector::chain_data::ChainData; use mango_feeds_connector::{AccountWrite, SlotUpdate}; use solana_sdk::pubkey::Pubkey; @@ -13,6 +13,7 @@ pub type ChainDataArcRw = Arc>; // from router project pub fn start_chaindata_updating( chain_data: ChainDataArcRw, + // = account_write_receiver account_writes: async_channel::Receiver, slot_updates: async_channel::Receiver, account_update_sender: broadcast::Sender<(Pubkey, u64)>, @@ -33,6 +34,8 @@ pub fn start_chaindata_updating( warn!("account write channel err {res:?}"); continue; }; + trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}", + account_write.pubkey, account_write.slot, account_write.write_version); let mut writer = chain_data.write().unwrap(); handle_updated_account(&mut writer, account_write, &account_update_sender); @@ -81,6 +84,8 @@ fn handle_updated_account( use solana_sdk::account::WritableAccount; use solana_sdk::clock::Epoch; + trace!("[account_writes_channel->chain_data] .update_account for {}@_slot_{} write_version={}", + account_write.pubkey, account_write.slot, account_write.write_version); chain_data.update_account( account_write.pubkey, AccountData { @@ -96,6 +101,8 @@ fn handle_updated_account( }, ); + trace!("[account_writes_channel->account_update_sender] send write for {}@_slot_{} write_version={}", + account_write.pubkey, account_write.slot, account_write.write_version); // ignore failing sends when there are no receivers let _err = account_update_sender.send((account_write.pubkey, account_write.slot)); } @@ -126,7 +133,9 @@ pub fn spawn_updater_job( // updater.on_metadata_update(res); // } res = account_updates.recv() => { - info!("-> updater.invalidate_one"); + let (pubkey, slot) = res.unwrap(); + trace!("-> updater.invalidate_one for {}@_slot_{}", pubkey, slot); + // if !updater.invalidate_one(res) { // break 'drain_loop; // } @@ -159,7 +168,8 @@ pub fn spawn_updater_job( // } _ = refresh_one_interval.tick() => { // updater.refresh_some(); - info!("-> updater.refresh_some"); + // note! + // info!("-> updater.refresh_some (10 ms tick)"); } } }