From 1096ee33a8dec3b33de588349cd7b28cad44469e Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Wed, 13 Mar 2024 10:53:12 +0100 Subject: [PATCH] Add custom filtering --- .gitignore | 2 + Cargo.lock | 4 +- Cargo.toml | 2 +- connector/Cargo.toml | 4 +- .../examples/combined_example_consumer.rs | 4 +- connector/examples/geyser_example_consumer.rs | 4 +- .../examples/websocket_example_consumer.rs | 4 +- connector/src/account_write_filter.rs | 4 +- connector/src/grpc_plugin_source.rs | 44 ++++++++++++++++--- connector/src/lib.rs | 29 +++++++++++- connector/src/metrics.rs | 2 +- connector/src/snapshot.rs | 17 +++++-- connector/src/websocket_source.rs | 34 ++++++++++---- 13 files changed, 121 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 79c1ec3..0c3d5b4 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ .DS_Store .idea/ *.pem +.vscode +.idea node_modules dist \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index df9cdae..09b9086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2708,7 +2708,7 @@ dependencies = [ [[package]] name = "mango-feeds-connector" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "async-channel", @@ -2718,7 +2718,6 @@ dependencies = [ "itertools 0.10.5", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", - "log 0.4.21", "rustls 0.20.9", "serde", "serde_derive", @@ -2728,6 +2727,7 @@ dependencies = [ "solana-rpc", "solana-sdk", "tokio", + "tracing", "warp", "yellowstone-grpc-client", "yellowstone-grpc-proto", diff --git a/Cargo.toml b/Cargo.toml index f2e99b9..b8757ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } bs58 = "0.5" base64 = "0.21.0" -log = "0.4" +tracing = "0.1.40" rand = "0.7" anyhow = "1.0" toml = "0.5" diff --git a/connector/Cargo.toml b/connector/Cargo.toml index afa63b8..eec4714 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mango-feeds-connector" -version = "0.3.0" +version = "0.3.1" authors = ["Christian Kamm "] edition = "2021" license = "AGPL-3.0-or-later" @@ -27,7 +27,6 @@ rustls = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } -log = { workspace = true } anyhow = { workspace = true } itertools = { workspace = true } @@ -40,6 +39,7 @@ async-trait = { workspace = true } warp = { workspace = true } # 1.9.0+solana.1.16.1 +tracing = { workspace = true } yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } diff --git a/connector/examples/combined_example_consumer.rs b/connector/examples/combined_example_consumer.rs index ef390c7..6697e7f 100644 --- a/connector/examples/combined_example_consumer.rs +++ b/connector/examples/combined_example_consumer.rs @@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> { let filter_config = filter_config1; grpc_plugin_source::process_events( - &config, - &filter_config, + config, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), diff --git a/connector/examples/geyser_example_consumer.rs b/connector/examples/geyser_example_consumer.rs index ef390c7..6697e7f 100644 --- a/connector/examples/geyser_example_consumer.rs +++ b/connector/examples/geyser_example_consumer.rs @@ -95,8 +95,8 @@ async fn main() -> anyhow::Result<()> { let filter_config = filter_config1; grpc_plugin_source::process_events( - &config, - &filter_config, + config, + filter_config, account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), diff --git a/connector/examples/websocket_example_consumer.rs b/connector/examples/websocket_example_consumer.rs index cde15b5..e466952 100644 --- a/connector/examples/websocket_example_consumer.rs +++ b/connector/examples/websocket_example_consumer.rs @@ -77,8 +77,8 @@ async fn main() -> anyhow::Result<()> { }); websocket_source::process_events( - &config, - &filter_config, + config, + filter_config, account_write_queue_sender, slot_queue_sender, ) diff --git a/connector/src/account_write_filter.rs b/connector/src/account_write_filter.rs index d3e24ab..85083d1 100644 --- a/connector/src/account_write_filter.rs +++ b/connector/src/account_write_filter.rs @@ -5,13 +5,13 @@ use crate::{ }; use async_trait::async_trait; -use log::*; use solana_sdk::{account::WritableAccount, pubkey::Pubkey, stake_history::Epoch}; use std::{ collections::{BTreeSet, HashMap}, sync::Arc, time::{Duration, Instant}, }; +use tracing::*; #[async_trait] pub trait AccountWriteSink { @@ -83,7 +83,7 @@ pub fn init( ), }, ); - } + }, Ok(slot_update) = slot_queue_receiver.recv() => { trace!("slot update processed {:?}", slot_update); chain_data.update_slot(SlotData { diff --git a/connector/src/grpc_plugin_source.rs b/connector/src/grpc_plugin_source.rs index 86a0440..4b8bcca 100644 --- a/connector/src/grpc_plugin_source.rs +++ b/connector/src/grpc_plugin_source.rs @@ -11,21 +11,26 @@ use yellowstone_grpc_proto::tonic::{ Request, }; -use log::*; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{collections::HashMap, env, str::FromStr, time::Duration}; +use tracing::*; +use yellowstone_grpc_proto::geyser::{ + subscribe_request_filter_accounts_filter, subscribe_request_filter_accounts_filter_memcmp, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, +}; use yellowstone_grpc_proto::prelude::{ geyser_client::GeyserClient, subscribe_update, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeUpdate, }; -use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa}; +use crate::snapshot::{get_filtered_snapshot_gpa, get_snapshot_gma, get_snapshot_gpa}; use crate::{ chain_data::SlotStatus, metrics::{MetricType, Metrics}, - AccountWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig, + AccountWrite, FeedFilterType, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, + TlsConfig, }; use crate::{EntityFilter, FilterConfig}; @@ -113,6 +118,29 @@ async fn feed_data_geyser( }, ); } + EntityFilter::FilterByProgramIdSelective(program_id, criteria) => { + accounts.insert( + "client".to_owned(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![program_id.to_string()], + filters: criteria.iter().map(|c| { + SubscribeRequestFilterAccountsFilter { + filter: Some(match c { + FeedFilterType::DataSize(ds) => subscribe_request_filter_accounts_filter::Filter::Datasize(*ds), + FeedFilterType::Memcmp(cmp) => { + subscribe_request_filter_accounts_filter::Filter::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { + offset: cmp.offset as u64, + data: Some(subscribe_request_filter_accounts_filter_memcmp::Data::Bytes(cmp.bytes.clone())) + }) + }, + FeedFilterType::TokenAccountState => subscribe_request_filter_accounts_filter::Filter::TokenAccountState(true) + }) + } + }).collect(), + }, + ); + } } slots.insert( @@ -225,6 +253,9 @@ async fn feed_data_geyser( EntityFilter::FilterByProgramId(program_id) => { snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse(); }, + EntityFilter::FilterByProgramIdSelective(program_id,filters) => { + snapshot_gpa = tokio::spawn(get_filtered_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string(), Some(filters.clone()))).fuse(); + } }; } } @@ -363,8 +394,8 @@ fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig { } pub async fn process_events( - config: &SourceConfig, - filter_config: &FilterConfig, + config: SourceConfig, + filter_config: FilterConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, metrics_sender: Metrics, @@ -544,6 +575,7 @@ pub async fn process_events( Message::Snapshot(update) => { metric_snapshots.increment(); info!("processing snapshot..."); + for account in update.accounts.iter() { metric_snapshot_account_writes.increment(); metric_account_queue.set(account_write_queue_sender.len() as u64); @@ -553,6 +585,7 @@ pub async fn process_events( // TODO: Resnapshot on invalid data? let pubkey = Pubkey::from_str(key).unwrap(); let account: Account = ui_account.decode().unwrap(); + account_write_queue_sender .send(AccountWrite::from(pubkey, update.slot, 0, account)) .await @@ -561,6 +594,7 @@ pub async fn process_events( (key, None) => warn!("account not found {}", key), } } + info!("processing snapshot done"); } } diff --git a/connector/src/lib.rs b/connector/src/lib.rs index 62e2cde..bfa201a 100644 --- a/connector/src/lib.rs +++ b/connector/src/lib.rs @@ -6,9 +6,10 @@ pub mod snapshot; pub mod websocket_source; use itertools::Itertools; +use solana_client::rpc_filter::RpcFilterType; use std::str::FromStr; use { - serde_derive::Deserialize, + serde_derive::{Deserialize, Serialize}, solana_sdk::{account::Account, pubkey::Pubkey}, }; @@ -94,10 +95,36 @@ pub struct SnapshotSourceConfig { pub rpc_http_url: String, } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum FeedFilterType { + DataSize(u64), + Memcmp(Memcmp), + TokenAccountState, +} + +impl FeedFilterType { + fn to_rpc_filter(&self) -> RpcFilterType { + match self { + FeedFilterType::Memcmp(m) => RpcFilterType::Memcmp( + solana_client::rpc_filter::Memcmp::new_raw_bytes(m.offset, m.bytes.clone()), + ), + FeedFilterType::DataSize(ds) => RpcFilterType::DataSize(*ds), + FeedFilterType::TokenAccountState => RpcFilterType::TokenAccountState, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Memcmp { + pub offset: usize, + pub bytes: Vec, +} + #[derive(Clone, Debug, Deserialize)] pub enum EntityFilter { FilterByAccountIds(Vec), FilterByProgramId(Pubkey), + FilterByProgramIdSelective(Pubkey, Vec), } impl EntityFilter { pub fn filter_by_program_id(program_id: &str) -> Self { diff --git a/connector/src/metrics.rs b/connector/src/metrics.rs index 2a90a77..9137404 100644 --- a/connector/src/metrics.rs +++ b/connector/src/metrics.rs @@ -1,10 +1,10 @@ use { crate::MetricsConfig, - log::*, std::collections::HashMap, std::fmt, std::sync::{atomic, Arc, Mutex, RwLock}, tokio::time, + tracing::*, warp::{Filter, Rejection, Reply}, }; diff --git a/connector/src/snapshot.rs b/connector/src/snapshot.rs index 36b4ac0..99bd08b 100644 --- a/connector/src/snapshot.rs +++ b/connector/src/snapshot.rs @@ -1,14 +1,15 @@ use jsonrpc_core_client::transports::http; -use log::*; use solana_account_decoder::{UiAccount, UiAccountEncoding}; use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_response::{OptionalContext, RpcKeyedAccount}, }; use solana_rpc::rpc::rpc_accounts::AccountsDataClient; +use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; +use tracing::*; -use crate::AnyhowWrap; +use crate::{AnyhowWrap, FeedFilterType}; /// gPA snapshot struct pub struct SnapshotProgramAccounts { @@ -27,7 +28,15 @@ pub async fn get_snapshot_gpa( rpc_http_url: String, program_id: String, ) -> anyhow::Result { - let rpc_client = http::connect::(&rpc_http_url) + get_filtered_snapshot_gpa(rpc_http_url, program_id, None).await +} + +pub async fn get_filtered_snapshot_gpa( + rpc_http_url: String, + program_id: String, + filters: Option>, +) -> anyhow::Result { + let rpc_client = http::connect_with_options::(&rpc_http_url, true) .await .map_err_anyhow()?; @@ -38,7 +47,7 @@ pub async fn get_snapshot_gpa( min_context_slot: None, }; let program_accounts_config = RpcProgramAccountsConfig { - filters: None, + filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()), with_context: Some(true), account_config: account_info_config.clone(), }; diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index ea27f62..401a587 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -12,7 +12,6 @@ use solana_sdk::{ }; use anyhow::Context; -use log::*; use std::ops::Sub; use std::{ str::FromStr, @@ -20,13 +19,14 @@ use std::{ time::{Duration, Instant}, }; use tokio::time::timeout; +use tracing::*; use crate::snapshot::{ get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts, }; use crate::{ - chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FilterConfig, SlotUpdate, - SourceConfig, + chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedFilterType, FilterConfig, + SlotUpdate, SourceConfig, }; const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300); @@ -54,6 +54,15 @@ async fn feed_data( EntityFilter::FilterByProgramId(program_id) => { feed_data_by_program(config, program_id.to_string(), sender).await } + EntityFilter::FilterByProgramIdSelective(program_id, filters) => { + feed_data_by_program_and_filters( + config, + program_id.to_string(), + sender, + Some(filters.clone()), + ) + .await + } } } @@ -182,6 +191,15 @@ async fn feed_data_by_program( config: &SourceConfig, program_id: String, sender: async_channel::Sender, +) -> anyhow::Result<()> { + feed_data_by_program_and_filters(config, program_id, sender, None).await +} + +async fn feed_data_by_program_and_filters( + config: &SourceConfig, + program_id: String, + sender: async_channel::Sender, + filters: Option>, ) -> anyhow::Result<()> { debug!("feed_data_by_program"); @@ -198,7 +216,7 @@ async fn feed_data_by_program( min_context_slot: None, }; let program_accounts_config = RpcProgramAccountsConfig { - filters: None, + filters: filters.map(|v| v.iter().map(|f| f.to_rpc_filter()).collect()), with_context: Some(true), account_config: account_info_config.clone(), }; @@ -292,16 +310,14 @@ async fn feed_data_by_program( // TODO: rename / split / rework pub async fn process_events( - config: &SourceConfig, - filter_config: &FilterConfig, + config: SourceConfig, + filter_config: FilterConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, ) { // Subscribe to program account updates websocket let (update_sender, update_receiver) = async_channel::unbounded::(); info!("using config {config:?}"); - let config = config.clone(); - let filter_config = filter_config.clone(); tokio::spawn(async move { // if the websocket disconnects, we get no data in a while etc, reconnect and try again loop { @@ -342,7 +358,7 @@ pub async fn process_events( .expect("send success"); } WebsocketMessage::SnapshotUpdate((slot, accounts)) => { - trace!("snapshot update {slot}"); + debug!("snapshot update {slot}"); for (pubkey, account) in accounts { if let Some(account) = account { let pubkey = Pubkey::from_str(&pubkey).unwrap();