Skip to content

Commit

Permalink
fix after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser committed Apr 3, 2024
1 parent 0e1cef0 commit b4dee7d
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 80 deletions.
35 changes: 5 additions & 30 deletions connector/src/account_write_filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::Metrics,
FeedWrite, SlotUpdate,
AccountWrite, SlotUpdate,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -36,11 +36,11 @@ pub fn init(
routes: Vec<AccountWriteRoute>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<FeedWrite>,
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<FeedWrite>();
async_channel::unbounded::<AccountWrite>();

// Slot updates flowing from the outside into this processing thread. From
// there the AccountWriteRoute::sink() callback is triggered.
Expand All @@ -61,7 +61,7 @@ pub fn init(
tokio::spawn(async move {
loop {
tokio::select! {
Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver.recv() => {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
trace!("account write skipped {:?}", account_write.pubkey);
continue;
Expand All @@ -83,32 +83,7 @@ pub fn init(
),
},
);
}
Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver.recv() => {
for account_write in snapshot_write.accounts {
if !all_queue_pks.contains(&account_write.pubkey) {
trace!("account write skipped {:?}", account_write.pubkey);
continue;
} else {
trace!("account write processed {:?}", account_write.pubkey);
}

chain_data.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
}
},
Ok(slot_update) = slot_queue_receiver.recv() => {
trace!("slot update processed {:?}", slot_update);
chain_data.update_slot(SlotData {
Expand Down
27 changes: 12 additions & 15 deletions connector/src/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ use yellowstone_grpc_proto::prelude::{
};

use crate::snapshot::{get_filtered_snapshot_gpa, get_snapshot_gma, get_snapshot_gpa};
use crate::FeedWrite::Snapshot;
use crate::{
chain_data::SlotStatus,
metrics::{MetricType, Metrics},
AccountWrite, FeedFilterType, FeedWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig,
SnapshotWrite, SourceConfig, TlsConfig,
AccountWrite, FeedFilterType, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig,
TlsConfig,
};
use crate::{EntityFilter, FilterConfig};

Expand Down Expand Up @@ -119,7 +118,7 @@ async fn feed_data_geyser(
},
);
}
EntityFilter::FilterByProgramIdAndCustomCriteria(program_id, criteria) => {
EntityFilter::FilterByProgramIdSelective(program_id, criteria) => {
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
Expand Down Expand Up @@ -248,7 +247,7 @@ async fn feed_data_geyser(
EntityFilter::FilterByProgramId(program_id) => {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse();
},
EntityFilter::FilterByProgramIdAndCustomCriteria(program_id,filters) => {
EntityFilter::FilterByProgramIdSelective(program_id,filters) => {
snapshot_gpa = tokio::spawn(get_filtered_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string(), Some(filters.clone()))).fuse();
}
};
Expand Down Expand Up @@ -390,7 +389,7 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
pub async fn process_events(
config: SourceConfig,
filter_config: FilterConfig,
account_write_queue_sender: async_channel::Sender<FeedWrite>,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -519,7 +518,7 @@ pub async fn process_events(
// let mut uncompressed: Vec<u8> = Vec::new();
// zstd_decompress(&update.data, &mut uncompressed).unwrap();
account_write_queue_sender
.send(FeedWrite::Account(AccountWrite {
.send(AccountWrite {
pubkey: Pubkey::try_from(update.pubkey.clone()).unwrap(),
slot: info.slot,
write_version: update.write_version,
Expand All @@ -530,7 +529,7 @@ pub async fn process_events(
data: update.data,
// TODO: what should this be? related to account deletes?
is_selected: true,
}))
})
.await
.expect("send success");
}
Expand Down Expand Up @@ -569,7 +568,6 @@ pub async fn process_events(
metric_snapshots.increment();
info!("processing snapshot...");

let mut to_send = vec![];
for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
Expand All @@ -579,17 +577,16 @@ pub async fn process_events(
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();
to_send.push(AccountWrite::from(pubkey, update.slot, 0, account));

account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
.expect("send success");
}
(key, None) => warn!("account not found {}", key),
}
}

account_write_queue_sender
.send(Snapshot(SnapshotWrite { accounts: to_send }))
.await
.expect("send success");

info!("processing snapshot done");
}
}
Expand Down
13 changes: 1 addition & 12 deletions connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
}
}

#[derive(Clone, PartialEq, Debug)]
pub enum FeedWrite {
Account(AccountWrite),
Snapshot(SnapshotWrite),
}

#[derive(Clone, PartialEq, Debug)]
pub struct SnapshotWrite {
pub accounts: Vec<AccountWrite>,
}

#[derive(Clone, PartialEq, Debug)]
pub struct AccountWrite {
pub pubkey: Pubkey,
Expand Down Expand Up @@ -135,7 +124,7 @@ pub struct Memcmp {
pub enum EntityFilter {
FilterByAccountIds(Vec<Pubkey>),
FilterByProgramId(Pubkey),
FilterByProgramIdAndCustomCriteria(Pubkey, Vec<FeedFilterType>),
FilterByProgramIdSelective(Pubkey, Vec<FeedFilterType>),
}
impl EntityFilter {
pub fn filter_by_program_id(program_id: &str) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions connector/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn get_snapshot_gpa(
pub async fn get_filtered_snapshot_gpa(
rpc_http_url: String,
program_id: String,
fitlers: Option<Vec<FeedFilterType>>,
filters: Option<Vec<FeedFilterType>>,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
.await
Expand All @@ -47,7 +47,7 @@ pub async fn get_filtered_snapshot_gpa(
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: fitlers.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()),
filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()),
with_context: Some(true),
account_config: account_info_config.clone(),
};
Expand Down
35 changes: 14 additions & 21 deletions connector/src/websocket_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::snapshot::{
get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts,
};
use crate::{
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedFilterType, FeedWrite,
FilterConfig, SlotUpdate, SnapshotWrite, SourceConfig,
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedFilterType, FilterConfig,
SlotUpdate, SourceConfig,
};

const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
Expand Down Expand Up @@ -54,7 +54,7 @@ async fn feed_data(
EntityFilter::FilterByProgramId(program_id) => {
feed_data_by_program(config, program_id.to_string(), sender).await
}
EntityFilter::FilterByProgramIdAndCustomCriteria(program_id, filters) => {
EntityFilter::FilterByProgramIdSelective(program_id, filters) => {
feed_data_by_program_and_filters(
config,
program_id.to_string(),
Expand Down Expand Up @@ -312,7 +312,7 @@ async fn feed_data_by_program_and_filters(
pub async fn process_events(
config: SourceConfig,
filter_config: FilterConfig,
account_write_queue_sender: async_channel::Sender<FeedWrite>,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
) {
// Subscribe to program account updates websocket
Expand Down Expand Up @@ -353,33 +353,26 @@ pub async fn process_events(
let account: Account = update.value.account.decode().unwrap();
let pubkey = Pubkey::from_str(&update.value.pubkey).unwrap();
account_write_queue_sender
.send(FeedWrite::Account(AccountWrite::from(
pubkey,
update.context.slot,
0,
account,
)))
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
}
WebsocketMessage::SnapshotUpdate((slot, accounts)) => {
debug!("snapshot update {slot}");
let mut to_send = vec![];
for (pubkey, account) in accounts {
if let Some(account) = account {
let pubkey = Pubkey::from_str(&pubkey).unwrap();
to_send.push(AccountWrite::from(
pubkey,
slot,
0,
account.decode().unwrap(),
));
account_write_queue_sender
.send(AccountWrite::from(
pubkey,
slot,
0,
account.decode().unwrap(),
))
.await
.expect("send success");
}
}
account_write_queue_sender
.send(FeedWrite::Snapshot(SnapshotWrite { accounts: to_send }))
.await
.expect("send success");
}
WebsocketMessage::SlotUpdate(update) => {
trace!("slot update");
Expand Down

0 comments on commit b4dee7d

Please sign in to comment.