Skip to content

Commit

Permalink
snapshot works
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Aug 12, 2024
1 parent 2eba164 commit 57da6db
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
50 changes: 38 additions & 12 deletions chaindata_standalone/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::env;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use async_channel::Sender;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task};
use itertools::Itertools;
use log::{debug, info, trace, warn};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;

Check warning on line 11 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs

use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, broadcast};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing_subscriber::EnvFilter;
Expand All @@ -20,6 +20,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;

use mango_feeds_connector::chain_data::{ChainData, SlotStatus};
use mango_feeds_connector::{AccountWrite, SlotUpdate};
use crate::account_write::account_write_from;
use crate::get_program_account::get_snapshot_gpa;

use crate::router_impl::{AccountOrSnapshotUpdate, spawn_updater_job};
Expand Down Expand Up @@ -55,13 +56,18 @@ pub async fn main() {
let (_jh_grpc_source, grpc_accounts_rx) = create_geyser_autoconnection_task(grpc_source_config.clone(), raydium_accounts(), exit_sender.subscribe());


let (account_write_sender, account_write_receiver) = async_channel::unbounded::<AccountOrSnapshotUpdate>();
let (account_write_sender, account_write_receiver) = mpsc::channel::<AccountOrSnapshotUpdate>(100_000);
let (slot_sender, slot_receiver) = async_channel::unbounded::<SlotUpdate>();
let (account_update_sender, _) = broadcast::channel(524288); // 524288

// TODO exit
start_plumbing_task(grpc_accounts_rx, account_write_sender.clone(), slot_sender.clone());

Check warning on line 63 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs

let rpc_http_url = env::var("RPC_HTTP_URL").expect("need http rpc url");
start_gpa_snapshot_fetcher(
rpc_http_url, Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap(),

Check warning on line 67 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
account_write_sender.clone());


let chain_data = Arc::new(RwLock::new(ChainData::new()));
let job1 = router_impl::start_chaindata_updating(
chain_data.clone(),
Expand All @@ -71,9 +77,6 @@ pub async fn main() {
exit_sender.subscribe(),
);

let rpc_http_url = env::var("RPC_HTTP_URL").expect("need http rpc url");
start_gpa_snapshot_fetcher(rpc_http_url, Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8").unwrap());

let job2 = spawn_updater_job(
chain_data.clone(),
account_update_sender.subscribe(),
Expand All @@ -93,13 +96,36 @@ pub async fn main() {
info!("quitting.");

Check warning on line 96 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
}

fn start_gpa_snapshot_fetcher(rpc_http_url: String, program_id: Pubkey) {
fn start_gpa_snapshot_fetcher(rpc_http_url: String, program_id: Pubkey, account_write_sender: mpsc::Sender<AccountOrSnapshotUpdate>) {
tokio::spawn(async move {
info!("loading snapshot from compressed gPA RPC endpoint ...");
let rpc_http_url = rpc_http_url.clone();
// 675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8 -> 796157 accounts, 50s

Check warning on line 103 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
let snapshot = get_snapshot_gpa(&rpc_http_url, &program_id, true).await.unwrap();
info!("downloaded snapshot for slot {} with {:?} accounts", snapshot.slot, snapshot.accounts.len());

for update_chunk in snapshot.accounts.chunks(1024) {
let chunk = update_chunk.into_iter()
.map(|update| {
let slot = update.slot;
let pubkey = Pubkey::try_from(update.pubkey.clone()).unwrap();
AccountWrite {
pubkey,
slot,
write_version: update.write_version,
lamports: update.lamports,
owner: Pubkey::try_from(update.owner.clone()).unwrap(),
executable: update.executable,

Check warning on line 118 in chaindata_standalone/src/main.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/mango-feeds/mango-feeds/chaindata_standalone/src/main.rs
rent_epoch: update.rent_epoch,
data: update.data.clone(), // TODO not nice
is_selected: false, // what is that?
}
}).collect_vec();

info!("sending snapshot chunk with {} accounts", chunk.len());
let _sent_res = account_write_sender.send(AccountOrSnapshotUpdate::SnapshotUpdate(chunk)).await;
}

});
}

Expand Down Expand Up @@ -127,9 +153,9 @@ fn debug_chaindata(chain_data: Arc<RwLock<ChainData>>, mut exit: broadcast::Rece

// this is replacing the spawn_geyser_source task from router
fn start_plumbing_task(
mut grpc_source_rx: Receiver<Message>,
account_write_sender: Sender<AccountOrSnapshotUpdate>,
slot_sender: Sender<SlotUpdate>) {
mut grpc_source_rx: mpsc::Receiver<Message>,
account_write_sender: mpsc::Sender<AccountOrSnapshotUpdate>,
slot_sender: async_channel::Sender<SlotUpdate>) {
tokio::spawn(async move {
info!("starting plumbing task");
loop {
Expand Down
28 changes: 20 additions & 8 deletions chaindata_standalone/src/router_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use mango_feeds_connector::{AccountWrite, SlotUpdate};
use solana_sdk::pubkey::Pubkey;
use std::sync::{Arc, RwLock, RwLockWriteGuard};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, error};

pub type ChainDataArcRw = Arc<RwLock<ChainData>>;
Expand All @@ -22,7 +23,7 @@ pub enum AccountOrSnapshotUpdate {
pub fn start_chaindata_updating(
chain_data: ChainDataArcRw,
// = account_write_receiver
account_writes: async_channel::Receiver<AccountOrSnapshotUpdate>,
mut account_writes: mpsc::Receiver<AccountOrSnapshotUpdate>,
slot_updates: async_channel::Receiver<SlotUpdate>,
account_update_sender: broadcast::Sender<(Pubkey, u64)>,
mut exit: broadcast::Receiver<()>,
Expand All @@ -37,23 +38,32 @@ pub fn start_chaindata_updating(
break;
}
res = account_writes.recv() => {
let Ok(account_write) = res
let Some(update) = res
else {
warn!("account write channel err {res:?}");
continue;
};
// trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}",
// account_write.pubkey, account_write.slot, account_write.write_version);

match &update {
AccountOrSnapshotUpdate::AccountUpdate(account_update) => {
trace!("[account_write_receiver->chain_data] account update for {}@_slot_{} write_version={}",
account_update.pubkey, account_update.slot, account_update.write_version);
}
AccountOrSnapshotUpdate::SnapshotUpdate(account_writes) => {
trace!("[account_write_receiver->chain_data] account update from snapshot with {} accounts",
account_writes.len());
}
}

let mut writer = chain_data.write().unwrap();
handle_updated_account(&mut writer, account_write, &account_update_sender);
handle_updated_account(&mut writer, update, &account_update_sender);

let mut batchsize: u32 = 0;
let started_at = Instant::now();
'batch_loop: while let Ok(res) = account_writes.try_recv() {
'batch_loop: while let Ok(bupdate) = account_writes.try_recv() {
batchsize += 1;

handle_updated_account(&mut writer, res, &account_update_sender);
handle_updated_account(&mut writer, bupdate, &account_update_sender);

// budget for microbatch
if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) {
Expand All @@ -67,6 +77,7 @@ pub fn start_chaindata_updating(
warn!("slot channel err {res:?}");
continue;
};

chain_data.write().unwrap().update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
Expand Down Expand Up @@ -126,6 +137,7 @@ fn handle_updated_account(
one_update(chain_data, account_update_sender, account_write)
}
AccountOrSnapshotUpdate::SnapshotUpdate(snapshot) => {
debug!("Update from snapshot data: {}", snapshot.len());
for account_write in snapshot {
one_update(chain_data, account_update_sender, account_write)
}
Expand Down

0 comments on commit 57da6db

Please sign in to comment.