Skip to content

Commit

Permalink
log channel flow
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Aug 12, 2024
1 parent 8891828 commit 3894bcf
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
39 changes: 39 additions & 0 deletions chaindata_standalone/SAMPLEFLOWS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@



```
2024-08-12T08:58:14.644200Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:14.739358Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:14.774673Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:14.899019Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:14.899313Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:14.914404Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:14.946106Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:14.946583Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
2024-08-12T08:58:14.971928Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676593
2024-08-12T08:58:14.986530Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676627
2024-08-12T08:58:15.005577Z TRACE chaindata_standalone: [grpc->account_write_sender]: account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676657
2024-08-12T08:58:15.864777Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:15.864815Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195675784
2024-08-12T08:58:15.864856Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:17.875143Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:17.875227Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676308
2024-08-12T08:58:17.875421Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:17.875454Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:17.875516Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676381
2024-08-12T08:58:17.875610Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:18.880978Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:18.881039Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676458
2024-08-12T08:58:18.881218Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:18.881235Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:18.881297Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676472
2024-08-12T08:58:18.881378Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:19.887019Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:19.887155Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676532
2024-08-12T08:58:19.887320Z DEBUG chaindata_standalone::router_impl: -> updater.invalidate_one for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398
2024-08-12T08:58:19.887609Z TRACE chaindata_standalone::router_impl: [account_write_receiver->chain_data] account update for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.892641Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.892897Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676565
2024-08-12T08:58:20.893127Z TRACE chaindata_standalone::router_impl: [account_writes_channel->chain_data] .update_account for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
2024-08-12T08:58:20.893197Z TRACE chaindata_standalone::router_impl: [account_writes_channel->account_update_sender] send write for 2F9YF2KiCX5ZfuHso5RXxc83W9eKYLnCbAcDyBGRCtd7@_slot_283111398 write_version=1390195676579
```
7 changes: 4 additions & 3 deletions chaindata_standalone/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn main() {

let (account_write_sender, account_write_receiver) = async_channel::unbounded::<AccountWrite>();
let (slot_sender, slot_receiver) = async_channel::unbounded::<SlotUpdate>();
let (account_update_sender, _) = broadcast::channel(524288);
let (account_update_sender, _) = broadcast::channel(64); // 524288

Check warning on line 53 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
// TODO exit
start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone());
Expand All @@ -63,7 +63,6 @@ pub async fn main() {
exit_sender.subscribe(),
);


spawn_updater_job(
chain_data.clone(),
account_update_sender.subscribe(),
Expand Down Expand Up @@ -104,6 +103,7 @@ fn debug_chaindata(chain_data: Arc<RwLock<ChainData>>, mut exit: broadcast::Rece

}

// this is replacing the spawn_geyser_source task from router
fn start_plumbing_task(

Check warning on line 107 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
mut grpc_source_rx: Receiver<Message>,
account_write_sender: Sender<AccountWrite>,
Expand All @@ -119,7 +119,8 @@ fn start_plumbing_task(
let pubkey = Pubkey::try_from(update.pubkey.clone()).unwrap();
let owner = Pubkey::try_from(update.owner.clone()).unwrap();

trace!("get account update for {:?}@{} via grpc", pubkey, slot);
trace!("[grpc->account_write_sender]: account update for {}@_slot_{} write_version={}",
pubkey, slot, update.write_version);

account_write_sender.send(AccountWrite {
pubkey,
Expand Down
16 changes: 13 additions & 3 deletions chaindata_standalone/src/router_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::{info, warn};
use log::{info, trace, warn};
use mango_feeds_connector::chain_data::ChainData;
use mango_feeds_connector::{AccountWrite, SlotUpdate};
use solana_sdk::pubkey::Pubkey;
Expand All @@ -13,6 +13,7 @@ pub type ChainDataArcRw = Arc<RwLock<ChainData>>;
// from router project
pub fn start_chaindata_updating(
chain_data: ChainDataArcRw,
// = account_write_receiver
account_writes: async_channel::Receiver<AccountWrite>,
slot_updates: async_channel::Receiver<SlotUpdate>,
account_update_sender: broadcast::Sender<(Pubkey, u64)>,
Expand All @@ -33,6 +34,8 @@ pub fn start_chaindata_updating(
warn!("account write channel err {res:?}");
continue;
};
trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}",
account_write.pubkey, account_write.slot, account_write.write_version);

let mut writer = chain_data.write().unwrap();
handle_updated_account(&mut writer, account_write, &account_update_sender);
Expand Down Expand Up @@ -81,6 +84,8 @@ fn handle_updated_account(
use solana_sdk::account::WritableAccount;
use solana_sdk::clock::Epoch;

trace!("[account_writes_channel->chain_data] .update_account for {}@_slot_{} write_version={}",
account_write.pubkey, account_write.slot, account_write.write_version);
chain_data.update_account(
account_write.pubkey,
AccountData {
Expand All @@ -96,6 +101,8 @@ fn handle_updated_account(
},
);

trace!("[account_writes_channel->account_update_sender] send write for {}@_slot_{} write_version={}",
account_write.pubkey, account_write.slot, account_write.write_version);
// ignore failing sends when there are no receivers
let _err = account_update_sender.send((account_write.pubkey, account_write.slot));
}
Expand Down Expand Up @@ -126,7 +133,9 @@ pub fn spawn_updater_job(
// updater.on_metadata_update(res);
// }
res = account_updates.recv() => {
info!("-> updater.invalidate_one");
let (pubkey, slot) = res.unwrap();
trace!("-> updater.invalidate_one for {}@_slot_{}", pubkey, slot);

// if !updater.invalidate_one(res) {
// break 'drain_loop;
// }
Expand Down Expand Up @@ -159,7 +168,8 @@ pub fn spawn_updater_job(
// }
_ = refresh_one_interval.tick() => {
// updater.refresh_some();
info!("-> updater.refresh_some");
// note!
// info!("-> updater.refresh_some (10 ms tick)");
}
}
}
Expand Down

0 comments on commit 3894bcf

Please sign in to comment.