From 92ca014313d4e26c066cb73cf3eaef84891a99f4 Mon Sep 17 00:00:00 2001 From: Serge Farnye Date: Wed, 3 Apr 2024 11:04:56 +0200 Subject: [PATCH] fixup after review (first round) --- Cargo.lock | 2 +- .../src/transaction_builder.rs | 4 +-- .../src/fill_event_filter.rs | 30 +++--------------- .../src/orderbook_filter.rs | 30 +++--------------- bin/service-mango-pnl/src/memory_target.rs | 25 ++------------- lib/client/src/grpc_source.rs | 31 ++++++------------- lib/client/src/websocket_source.rs | 31 ++++++------------- 7 files changed, 32 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f4998e8be..82610ad25b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3392,7 +3392,7 @@ dependencies = [ [[package]] name = "mango-feeds-connector" version = "0.2.2" -source = "git+https://github.com/blockworks-foundation/mango-feeds.git?branch=serge/custom_filtering#0e1cef027c68a261b0fdbd78cffef602eed09cdd" +source = "git+https://github.com/blockworks-foundation/mango-feeds.git?branch=serge/custom_filtering#7165e7dfd3bbddd0c00a39f91e845bae1ce618ed" dependencies = [ "anyhow", "async-channel", diff --git a/bin/service-mango-crank/src/transaction_builder.rs b/bin/service-mango-crank/src/transaction_builder.rs index 45c5cee5b3..4dd9cc6811 100644 --- a/bin/service-mango-crank/src/transaction_builder.rs +++ b/bin/service-mango-crank/src/transaction_builder.rs @@ -1,7 +1,7 @@ use mango_feeds_connector::{ account_write_filter::{self, AccountWriteRoute}, metrics::Metrics, - FeedWrite, SlotUpdate, + AccountWrite, SlotUpdate, }; use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; @@ -18,7 +18,7 @@ pub fn init( group_pk: Pubkey, metrics_sender: Metrics, ) -> anyhow::Result<( - async_channel::Sender, + async_channel::Sender, async_channel::Sender, async_channel::Receiver>, )> { diff --git a/bin/service-mango-fills/src/fill_event_filter.rs b/bin/service-mango-fills/src/fill_event_filter.rs index 8e392532da..127b9f3f12 100644 --- a/bin/service-mango-fills/src/fill_event_filter.rs +++ b/bin/service-mango-fills/src/fill_event_filter.rs @@ -2,7 +2,7 @@ use log::*; use mango_feeds_connector::{ chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, - FeedWrite, SlotUpdate, + AccountWrite, SlotUpdate, }; use mango_feeds_lib::serum::SerumEventQueueHeader; use mango_feeds_lib::MarketConfig; @@ -400,7 +400,7 @@ pub async fn init( metrics_sender: Metrics, exit: Arc, ) -> anyhow::Result<( - async_channel::Sender, + async_channel::Sender, async_channel::Sender, async_channel::Receiver, )> { @@ -423,7 +423,7 @@ pub async fn init( // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); + async_channel::unbounded::(); // Slot updates flowing from the outside into the single processing thread. From // there they'll flow into the postgres sending thread. @@ -463,7 +463,7 @@ pub async fn init( break; } tokio::select! { - Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver_c.recv() => { + Ok(account_write) = account_write_queue_receiver_c.recv() => { if !all_queue_pks.contains(&account_write.pubkey) { continue; } @@ -483,28 +483,6 @@ pub async fn init( }, ); } - Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver_c.recv() => { - for account_write in snapshot_write.accounts { - if !all_queue_pks.contains(&account_write.pubkey) { - continue; - } - - chain_cache.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - } Ok(slot_update) = slot_queue_receiver.recv() => { chain_cache.update_slot(SlotData { slot: slot_update.slot, diff --git a/bin/service-mango-orderbook/src/orderbook_filter.rs b/bin/service-mango-orderbook/src/orderbook_filter.rs index 41ed93a85a..185f305fd1 100644 --- a/bin/service-mango-orderbook/src/orderbook_filter.rs +++ b/bin/service-mango-orderbook/src/orderbook_filter.rs @@ -3,7 +3,7 @@ use fixed::types::I80F48; use itertools::Itertools; use log::*; use mango_feeds_connector::metrics::MetricU64; -use mango_feeds_connector::FeedWrite; +use mango_feeds_connector::AccountWrite; use mango_feeds_connector::{ chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, @@ -244,7 +244,7 @@ pub async fn init( metrics_sender: Metrics, exit: Arc, ) -> anyhow::Result<( - async_channel::Sender, + async_channel::Sender, async_channel::Sender, async_channel::Receiver, )> { @@ -255,7 +255,7 @@ pub async fn init( // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); + async_channel::unbounded::(); // Slot updates flowing from the outside into the single processing thread. From // there they'll flow into the postgres sending thread. @@ -288,7 +288,7 @@ pub async fn init( break; } tokio::select! { - Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver.recv() => { + Ok(account_write) = account_write_queue_receiver.recv() => { if !relevant_pubkeys.contains(&account_write.pubkey) { continue; } @@ -307,27 +307,6 @@ pub async fn init( }, ); }, - Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver.recv() => { - for account_write in snapshot_write.accounts { - if !relevant_pubkeys.contains(&account_write.pubkey) { - continue; - } - chain_cache.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - }, Ok(slot_update) = slot_queue_receiver.recv() => { chain_cache.update_slot(SlotData { slot: slot_update.slot, @@ -335,7 +314,6 @@ pub async fn init( status: slot_update.status, chain: 0, }); - } } diff --git a/bin/service-mango-pnl/src/memory_target.rs b/bin/service-mango-pnl/src/memory_target.rs index bedfc8b39d..ff717c9c2d 100644 --- a/bin/service-mango-pnl/src/memory_target.rs +++ b/bin/service-mango-pnl/src/memory_target.rs @@ -6,11 +6,11 @@ use std::sync::{Arc, RwLock}; pub async fn init( chain_data: Arc>, ) -> anyhow::Result<( - async_channel::Sender, + async_channel::Sender, async_channel::Sender, )> { let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); + async_channel::unbounded::(); let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); @@ -18,7 +18,7 @@ pub async fn init( tokio::spawn(async move { loop { tokio::select! { - Ok(FeedWrite::Account(account_write)) = account_write_queue_receiver.recv() => { + Ok(account_write) = account_write_queue_receiver.recv() => { let mut chain = chain_data.write().unwrap(); chain.update_account( account_write.pubkey, @@ -35,25 +35,6 @@ pub async fn init( }, ); } - Ok(FeedWrite::Snapshot(snapshot_write)) = account_write_queue_receiver.recv() => { - let mut chain = chain_data.write().unwrap(); - for account_write in snapshot_write.accounts { - chain.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - } Ok(slot_update) = slot_queue_receiver.recv() => { let mut chain = chain_data.write().unwrap(); chain.update_slot(SlotData { diff --git a/lib/client/src/grpc_source.rs b/lib/client/src/grpc_source.rs index 01bb6dde08..374b1155a4 100644 --- a/lib/client/src/grpc_source.rs +++ b/lib/client/src/grpc_source.rs @@ -7,7 +7,7 @@ use solana_sdk::pubkey::Pubkey; use anyhow::Context; use async_channel::{RecvError, Sender}; use mango_feeds_connector::{ - EntityFilter, FeedFilterType, FeedWrite, FilterConfig, GrpcSourceConfig, Memcmp, + AccountWrite, EntityFilter, FeedFilterType, FilterConfig, GrpcSourceConfig, Memcmp, SnapshotSourceConfig, SourceConfig, }; use solana_rpc::rpc_pubsub::RpcSolPubSubClient; @@ -99,7 +99,7 @@ async fn feed_data( let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded(); let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded(); let filters = FilterConfig { - entity_filter: EntityFilter::FilterByProgramIdAndCustomCriteria( + entity_filter: EntityFilter::FilterByProgramIdSelective( *serum_program, serum3_oo_custom_filters.clone(), ), @@ -122,7 +122,7 @@ async fn feed_data( // Make sure the serum3_oo_sub_map does not exit when there's no serum_programs let _unused_serum_sender; if config.serum_programs.is_empty() { - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = async_channel::unbounded::(); _unused_serum_sender = sender; serum3_oo_sub_map.insert(Pubkey::default(), receiver); } @@ -168,7 +168,7 @@ async fn feed_data( async fn handle_message( name: &str, - message: Result, + message: Result, sender: &Sender, ) -> bool { if let Ok(data) = message { @@ -180,24 +180,11 @@ async fn handle_message( } } -async fn handle_feed_write(sender: &Sender, data: FeedWrite) { - match data { - FeedWrite::Account(account) => sender - .send(Message::Account(AccountUpdate::from_feed(account))) - .await - .expect("sending must succeed"), - FeedWrite::Snapshot(mut snapshot) => sender - .send(Message::Snapshot( - snapshot - .accounts - .drain(0..) - .map(|a| AccountUpdate::from_feed(a)) - .collect(), - crate::account_update_stream::SnapshotType::Partial, - )) - .await - .expect("sending must succeed"), - } +async fn handle_feed_write(sender: &Sender, account: AccountWrite) { + sender + .send(Message::Account(AccountUpdate::from_feed(account))) + .await + .expect("sending must succeed"); } pub fn start( diff --git a/lib/client/src/websocket_source.rs b/lib/client/src/websocket_source.rs index 50c9c0e201..68749b2209 100644 --- a/lib/client/src/websocket_source.rs +++ b/lib/client/src/websocket_source.rs @@ -6,7 +6,7 @@ use solana_sdk::pubkey::Pubkey; use anyhow::Context; use async_channel::{RecvError, Sender}; use mango_feeds_connector::{ - EntityFilter, FeedFilterType, FeedWrite, FilterConfig, Memcmp, SnapshotSourceConfig, + AccountWrite, EntityFilter, FeedFilterType, FilterConfig, Memcmp, SnapshotSourceConfig, SourceConfig, }; use solana_rpc::rpc_pubsub::RpcSolPubSubClient; @@ -89,7 +89,7 @@ async fn feed_data( let (serum3_oo_sender, serum3_oo_receiver) = async_channel::unbounded(); let (serum3_oo_slot_sender, serum3_oo_slot_receiver) = async_channel::unbounded(); let filters = FilterConfig { - entity_filter: EntityFilter::FilterByProgramIdAndCustomCriteria( + entity_filter: EntityFilter::FilterByProgramIdSelective( *serum_program, serum3_oo_custom_filters.clone(), ), @@ -110,7 +110,7 @@ async fn feed_data( // Make sure the serum3_oo_sub_map does not exit when there's no serum_programs let _unused_serum_sender; if config.serum_programs.is_empty() { - let (sender, receiver) = async_channel::unbounded::(); + let (sender, receiver) = async_channel::unbounded::(); _unused_serum_sender = sender; serum3_oo_sub_map.insert(Pubkey::default(), receiver); } @@ -156,7 +156,7 @@ async fn feed_data( async fn handle_message( name: &str, - message: Result, + message: Result, sender: &Sender, ) -> bool { if let Ok(data) = message { @@ -168,24 +168,11 @@ async fn handle_message( } } -async fn handle_feed_write(sender: &Sender, data: FeedWrite) { - match data { - FeedWrite::Account(account) => sender - .send(Message::Account(AccountUpdate::from_feed(account))) - .await - .expect("sending must succeed"), - FeedWrite::Snapshot(mut snapshot) => sender - .send(Message::Snapshot( - snapshot - .accounts - .drain(0..) - .map(|a| AccountUpdate::from_feed(a)) - .collect(), - crate::account_update_stream::SnapshotType::Partial, - )) - .await - .expect("sending must succeed"), - } +async fn handle_feed_write(sender: &Sender, account: AccountWrite) { + sender + .send(Message::Account(AccountUpdate::from_feed(account))) + .await + .expect("sending must succeed"); } pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) {