Skip to content

Commit

Permalink
Add custom filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
farnyser committed Apr 11, 2024
1 parent 5e72a9e commit 1096ee3
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
.DS_Store
.idea/
*.pem
.vscode
.idea

node_modules
dist
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }

bs58 = "0.5"
base64 = "0.21.0"
log = "0.4"
tracing = "0.1.40"
rand = "0.7"
anyhow = "1.0"
toml = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions connector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mango-feeds-connector"
version = "0.3.0"
version = "0.3.1"
authors = ["Christian Kamm <[email protected]>"]
edition = "2021"
license = "AGPL-3.0-or-later"
Expand All @@ -27,7 +27,6 @@ rustls = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }

log = { workspace = true }
anyhow = { workspace = true }

itertools = { workspace = true }
Expand All @@ -40,6 +39,7 @@ async-trait = { workspace = true }
warp = { workspace = true }

# 1.9.0+solana.1.16.1
tracing = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

Expand Down
4 changes: 2 additions & 2 deletions connector/examples/combined_example_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> {
let filter_config = filter_config1;

grpc_plugin_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
Expand Down
4 changes: 2 additions & 2 deletions connector/examples/geyser_example_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> {
let filter_config = filter_config1;

grpc_plugin_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
Expand Down
4 changes: 2 additions & 2 deletions connector/examples/websocket_example_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ async fn main() -> anyhow::Result<()> {
});

websocket_source::process_events(
&config,
&filter_config,
config,
filter_config,
account_write_queue_sender,
slot_queue_sender,
)
Expand Down
4 changes: 2 additions & 2 deletions connector/src/account_write_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use crate::{
};

use async_trait::async_trait;
use log::*;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
time::{Duration, Instant},
};
use tracing::*;

#[async_trait]
pub trait AccountWriteSink {
Expand Down Expand Up @@ -83,7 +83,7 @@ pub fn init(
),
},
);
}
},
Ok(slot_update) = slot_queue_receiver.recv() => {
trace!("slot update processed {:?}", slot_update);
chain_data.update_slot(SlotData {
Expand Down
44 changes: 39 additions & 5 deletions connector/src/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@ use yellowstone_grpc_proto::tonic::{
Request,
};

use log::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, env, str::FromStr, time::Duration};
use tracing::*;
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter, subscribe_request_filter_accounts_filter_memcmp,
SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp,
};

use yellowstone_grpc_proto::prelude::{
geyser_client::GeyserClient, subscribe_update, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate,
};

use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa};
use crate::snapshot::{get_filtered_snapshot_gpa, get_snapshot_gma, get_snapshot_gpa};
use crate::{
chain_data::SlotStatus,
metrics::{MetricType, Metrics},
AccountWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig,
AccountWrite, FeedFilterType, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig,
TlsConfig,
};
use crate::{EntityFilter, FilterConfig};

Expand Down Expand Up @@ -113,6 +118,29 @@ async fn feed_data_geyser(
},
);
}
EntityFilter::FilterByProgramIdSelective(program_id, criteria) => {
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![program_id.to_string()],
filters: criteria.iter().map(|c| {
SubscribeRequestFilterAccountsFilter {
filter: Some(match c {
FeedFilterType::DataSize(ds) => subscribe_request_filter_accounts_filter::Filter::Datasize(*ds),
FeedFilterType::Memcmp(cmp) => {
subscribe_request_filter_accounts_filter::Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp {
offset: cmp.offset as u64,
data: Some(subscribe_request_filter_accounts_filter_memcmp::Data::Bytes(cmp.bytes.clone()))
})
},
FeedFilterType::TokenAccountState => subscribe_request_filter_accounts_filter::Filter::TokenAccountState(true)
})
}
}).collect(),
},
);
}
}

slots.insert(
Expand Down Expand Up @@ -225,6 +253,9 @@ async fn feed_data_geyser(
EntityFilter::FilterByProgramId(program_id) => {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse();
},
EntityFilter::FilterByProgramIdSelective(program_id,filters) => {
snapshot_gpa = tokio::spawn(get_filtered_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string(), Some(filters.clone()))).fuse();
}
};
}
}
Expand Down Expand Up @@ -363,8 +394,8 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
}

pub async fn process_events(
config: &SourceConfig,
filter_config: &FilterConfig,
config: SourceConfig,
filter_config: FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics,
Expand Down Expand Up @@ -544,6 +575,7 @@ pub async fn process_events(
Message::Snapshot(update) => {
metric_snapshots.increment();
info!("processing snapshot...");

for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
Expand All @@ -553,6 +585,7 @@ pub async fn process_events(
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();

account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
Expand All @@ -561,6 +594,7 @@ pub async fn process_events(
(key, None) => warn!("account not found {}", key),
}
}

info!("processing snapshot done");
}
}
Expand Down
29 changes: 28 additions & 1 deletion connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ pub mod snapshot;
pub mod websocket_source;

use itertools::Itertools;
use solana_client::rpc_filter::RpcFilterType;
use std::str::FromStr;
use {
serde_derive::Deserialize,
serde_derive::{Deserialize, Serialize},
solana_sdk::{account::Account, pubkey::Pubkey},
};

Expand Down Expand Up @@ -94,10 +95,36 @@ pub struct SnapshotSourceConfig {
pub rpc_http_url: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum FeedFilterType {
DataSize(u64),
Memcmp(Memcmp),
TokenAccountState,
}

impl FeedFilterType {
fn to_rpc_filter(&self) -> RpcFilterType {
match self {
FeedFilterType::Memcmp(m) => RpcFilterType::Memcmp(
solana_client::rpc_filter::Memcmp::new_raw_bytes(m.offset, m.bytes.clone()),
),
FeedFilterType::DataSize(ds) => RpcFilterType::DataSize(*ds),
FeedFilterType::TokenAccountState => RpcFilterType::TokenAccountState,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Memcmp {
pub offset: usize,
pub bytes: Vec<u8>,
}

#[derive(Clone, Debug, Deserialize)]
pub enum EntityFilter {
FilterByAccountIds(Vec<Pubkey>),
FilterByProgramId(Pubkey),
FilterByProgramIdSelective(Pubkey, Vec<FeedFilterType>),
}
impl EntityFilter {
pub fn filter_by_program_id(program_id: &str) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion connector/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use {
crate::MetricsConfig,
log::*,
std::collections::HashMap,
std::fmt,
std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time,
tracing::*,
warp::{Filter, Rejection, Reply},
};

Expand Down
17 changes: 13 additions & 4 deletions connector/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use jsonrpc_core_client::transports::http;
use log::*;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, RpcKeyedAccount},
};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use tracing::*;

use crate::AnyhowWrap;
use crate::{AnyhowWrap, FeedFilterType};

/// gPA snapshot struct
pub struct SnapshotProgramAccounts {
Expand All @@ -27,7 +28,15 @@ pub async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect::<crate::GetProgramAccountsClient>(&rpc_http_url)
get_filtered_snapshot_gpa(rpc_http_url, program_id, None).await
}

pub async fn get_filtered_snapshot_gpa(
rpc_http_url: String,
program_id: String,
filters: Option<Vec<FeedFilterType>>,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect_with_options::<AccountsScanClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;

Expand All @@ -38,7 +47,7 @@ pub async fn get_snapshot_gpa(
min_context_slot: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()),
with_context: Some(true),
account_config: account_info_config.clone(),
};
Expand Down
Loading

0 comments on commit 1096ee3

Please sign in to comment.