Skip to content

Commit

Permalink
Fix sleep and lock issue
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser committed Aug 12, 2024
1 parent a97183c commit 61b0bcb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 60 deletions.
33 changes: 17 additions & 16 deletions chaindata_standalone/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;

Check warning on line 1 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
use std::env;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use async_channel::Sender;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
Expand All @@ -12,6 +11,7 @@ use solana_sdk::pubkey::Pubkey;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;

Check warning on line 13 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
use tokio::time::sleep;
use tracing_subscriber::EnvFilter;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdatePing};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -49,58 +49,59 @@ pub async fn main() {

let (account_write_sender, account_write_receiver) = async_channel::unbounded::<AccountWrite>();
let (slot_sender, slot_receiver) = async_channel::unbounded::<SlotUpdate>();
let (account_update_sender, _) = broadcast::channel(64); // 524288
let (account_update_sender, _) = broadcast::channel(524288); // 524288

Check warning on line 53 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
// TODO exit
start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone());

let chain_data = Arc::new(RwLock::new(ChainData::new()));
router_impl::start_chaindata_updating(
let job1 = router_impl::start_chaindata_updating(
chain_data.clone(),
account_write_receiver,
slot_receiver,
account_update_sender.clone(),
exit_sender.subscribe(),
);

spawn_updater_job(
let job2 = spawn_updater_job(
chain_data.clone(),
account_update_sender.subscribe(),
exit_sender.subscribe(),
);

Check warning on line 71 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
info!("chaindata standalone started - now wait some time to let it operate ..");

debug_chaindata(chain_data.clone(), exit_sender.subscribe(),);
let job3 = debug_chaindata(chain_data.clone(), exit_sender.subscribe(),);

sleep(std::time::Duration::from_secs(7));
sleep(std::time::Duration::from_secs(7)).await;
info!("done.");

info!("send exit signal..");
exit_sender.send(()).unwrap();
sleep(std::time::Duration::from_secs(1));
sleep(std::time::Duration::from_secs(1)).await;
info!("quitting.");
}

Check warning on line 83 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs

// TODO add exit
fn debug_chaindata(chain_data: Arc<RwLock<ChainData>>, mut exit: broadcast::Receiver<()>,) {

fn debug_chaindata(chain_data: Arc<RwLock<ChainData>>, mut exit: broadcast::Receiver<()>,) -> JoinHandle<()> {
tokio::spawn(async move {
info!("starting debug task");
loop {
if exit.try_recv().is_ok() {
info!("exit signal received - stopping task");
return;
}
let chain_data = chain_data.read().unwrap();
info!("chaindata?");
for account in chain_data.iter_accounts_rooted() {
info!("- chaindata.account {:?}", account);
{
let chain_data = chain_data.read().unwrap();
info!("chaindata?");
for account in chain_data.iter_accounts_rooted() {
info!("- chaindata.account {:?}", account);

Check warning on line 98 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
}
}
sleep(std::time::Duration::from_secs(1));

sleep(std::time::Duration::from_secs(1)).await;
}
});

})
}

// this is replacing the spawn_geyser_source task from router
Expand Down
47 changes: 3 additions & 44 deletions chaindata_standalone/src/router_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ pub fn start_chaindata_updating(
status: slot_update.status,
chain: 0,
});

// TODO: slot updates can significantly affect state, do we need to track what needs to be updated
// when switching to a different fork?
}
}
}
Expand Down Expand Up @@ -103,15 +100,15 @@ fn handle_updated_account(

trace!("[account_write_receiver->account_update_sender] send write for {}@_slot_{} write_version={}",
account_write.pubkey, account_write.slot, account_write.write_version);
// ignore failing sends when there are no receivers

let _err = account_update_sender.send((account_write.pubkey, account_write.slot));
}

pub fn spawn_updater_job(
chain_data: ChainDataArcRw,
mut account_updates: broadcast::Receiver<(Pubkey, u64)>,
mut exit: broadcast::Receiver<()>,
) -> Option<JoinHandle<()>> {
) -> JoinHandle<()> {

let listener_job = tokio::spawn(async move {

Expand All @@ -126,50 +123,12 @@ pub fn spawn_updater_job(
info!("shutting down update task");
break;
}
// slot = slot_updates.recv() => {
// updater.detect_and_handle_slot_lag(slot);
// }
// res = metadata_updates.recv() => {
// updater.on_metadata_update(res);
// }
res = account_updates.recv() => {
let (pubkey, slot) = res.unwrap();
trace!("[account_update_sender->...]-> updater.invalidate_one for {}@_slot_{}", pubkey, slot);

// if !updater.invalidate_one(res) {
// break 'drain_loop;
// }
//
// let mut batchsize: u32 = 0;
// let started_at = Instant::now();
// 'batch_loop: while let Ok(res) = account_updates.try_recv() {
// batchsize += 1;
// if !updater.invalidate_one(Ok(res)) {
// break 'drain_loop;
// }
//
// // budget for microbatch
// if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) {
// break 'batch_loop;
// }
// }

},
// Ok(_) = price_updates.recv() => {
// updater.state.dirty_prices = true;
// },
// _ = refresh_all_interval.tick() => {
// updater.refresh_all(&edges);
//
// if !updater.state.is_ready && snapshot_timeout < Instant::now() {
// error!("Failed to init '{}' before timeout", updater.dex.name);
// break;
// }
// }
_ = refresh_one_interval.tick() => {
// updater.refresh_some();
// note!
// info!("-> updater.refresh_some (10 ms tick)");
}
}
}
Expand All @@ -179,5 +138,5 @@ pub fn spawn_updater_job(
// let _ = updater.ready_sender.try_send(());
});

Some(listener_job)
listener_job
}

0 comments on commit 61b0bcb

Please sign in to comment.