From 8fcc11e852c491827cd18e3147ed5f8ecfa8ea24 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Thu, 28 Mar 2024 16:54:55 +0100 Subject: [PATCH] benchmark-data-update: listen to oracle/serum accounts in additions to mango accounts --- .../src/processors/data.rs | 45 ++++++++----------- .../src/processors/logger.rs | 15 +++---- 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/bin/benchmark-data-update/src/processors/data.rs b/bin/benchmark-data-update/src/processors/data.rs index ef06f20a3..76f5e38f3 100644 --- a/bin/benchmark-data-update/src/processors/data.rs +++ b/bin/benchmark-data-update/src/processors/data.rs @@ -3,20 +3,17 @@ use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot}; use async_channel::Receiver; use chrono::Utc; use itertools::Itertools; -use mango_v4_client::account_update_stream::{Message, SnapshotType}; -use mango_v4_client::snapshot_source::is_mango_account; -use mango_v4_client::{ - account_update_stream, grpc_source, snapshot_source, websocket_source, MangoGroupContext, -}; +use mango_v4_client::account_update_stream::Message; +use mango_v4_client::{account_update_stream, grpc_source, websocket_source, MangoGroupContext}; use services_mango_lib::fail_or_retry; use services_mango_lib::retry_counter::RetryCounter; use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; -use std::fmt::{Display, Pointer}; +use std::fmt::Display; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; use tracing::warn; @@ -68,7 +65,6 @@ impl DataProcessor { exit: Arc, ) -> anyhow::Result { let mut retry_counter = RetryCounter::new(2); - let mango_group = Pubkey::from_str(&configuration.mango_group)?; let mango_stream = fail_or_retry!( retry_counter, Self::init_mango_source(configuration, source, exit.clone()).await @@ -89,7 +85,7 @@ impl DataProcessor { continue; } - let event = Self::parse_message(msg, source, received_at, mango_group); + let event = Self::parse_message(msg, source, received_at); if event.is_none() { continue; @@ -129,27 +125,22 @@ impl DataProcessor { message: Message, source: DataEventSource, received_at: chrono::DateTime, - mango_group: Pubkey, ) -> Option { match message { Message::Account(account_write) => { - if is_mango_account(&account_write.account, &mango_group).is_some() { - return Some(AccountUpdate(AccountUpdateEvent { - account: account_write.pubkey, - received_at, - source, - slot: account_write.slot, - })); - } + return Some(AccountUpdate(AccountUpdateEvent { + account: account_write.pubkey, + received_at, + source, + slot: account_write.slot, + })); } - Message::Snapshot(snapshot, snapshot_type) => { + Message::Snapshot(snapshot, _) => { let slot = snapshot[0].slot; let mut result = Vec::new(); for update in snapshot.iter() { - if is_mango_account(&update.account, &mango_group).is_some() { - result.push(update.pubkey); - assert!(slot == update.slot); - } + result.push(update.pubkey); + assert!(slot == update.slot); } return Some(Snapshot(SnapshotEvent { @@ -215,8 +206,8 @@ impl DataProcessor { open_orders_authority: mango_group, grpc_sources: sources, }, - mango_oracles.clone(), - account_update_sender.clone(), + mango_oracles, + account_update_sender, metrics, exit, ); @@ -228,8 +219,8 @@ impl DataProcessor { serum_programs, open_orders_authority: mango_group, }, - mango_oracles.clone(), - account_update_sender.clone(), + mango_oracles, + account_update_sender, ); } diff --git a/bin/benchmark-data-update/src/processors/logger.rs b/bin/benchmark-data-update/src/processors/logger.rs index e705c41ca..9f7bb6541 100644 --- a/bin/benchmark-data-update/src/processors/logger.rs +++ b/bin/benchmark-data-update/src/processors/logger.rs @@ -1,25 +1,19 @@ -use crate::configuration::Configuration; -use chrono::Utc; use hdrhistogram::Histogram; -use solana_sdk::blake3::Hash; use solana_sdk::pubkey::Pubkey; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; use tracing::{info, warn}; -use super::data::{AccountUpdateEvent, DataEvent, DataEventSource, SnapshotEvent}; +use super::data::{AccountUpdateEvent, DataEvent, DataEventSource}; pub struct LoggerProcessor { pub job: JoinHandle<()>, } impl LoggerProcessor { - /// TODO FAS - /// Enlever slot de la key, et comparer en mode "min slot" -> il faut un update avec upd.slot >= existing.slot pour match - pub async fn init( data_sender_1: &tokio::sync::broadcast::Sender, data_sender_2: &tokio::sync::broadcast::Sender, @@ -130,7 +124,10 @@ impl LoggerProcessor { for x in events { tracing::debug!( "{} => {} {} (got from {})", - x.0, x.1.slot, x.1.received_at, x.1.source + x.0, + x.1.slot, + x.1.received_at, + x.1.source ) }