From c884ad161128ec3dfecb6374e93ddeaa20873846 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Thu, 28 Mar 2024 10:04:34 +0100 Subject: [PATCH] benchmark-data-update: draft a comparison tools between websocket and grpc --- Cargo.lock | 43 ++++ bin/benchmark-data-update/Cargo.toml | 54 ++++ bin/benchmark-data-update/README.md | 4 + .../conf/example-config.toml | 14 ++ .../src/configuration.rs | 23 ++ bin/benchmark-data-update/src/main.rs | 72 ++++++ .../src/processors/data.rs | 238 ++++++++++++++++++ .../src/processors/exit.rs | 39 +++ .../src/processors/logger.rs | 176 +++++++++++++ .../src/processors/mod.rs | 3 + 10 files changed, 666 insertions(+) create mode 100644 bin/benchmark-data-update/Cargo.toml create mode 100644 bin/benchmark-data-update/README.md create mode 100644 bin/benchmark-data-update/conf/example-config.toml create mode 100644 bin/benchmark-data-update/src/configuration.rs create mode 100644 bin/benchmark-data-update/src/main.rs create mode 100644 bin/benchmark-data-update/src/processors/data.rs create mode 100644 bin/benchmark-data-update/src/processors/exit.rs create mode 100644 bin/benchmark-data-update/src/processors/logger.rs create mode 100644 bin/benchmark-data-update/src/processors/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c5c9a3ea5a..e2b62f843f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,49 @@ dependencies = [ "serde", ] +[[package]] +name = "benchmark-data-update" +version = "0.1.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-channel", + "async-trait", + "base64 0.21.4", + "bs58 0.3.1", + "bytemuck", + "chrono", + "fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)", + "futures 0.3.28", + "futures-channel", + "futures-core", + "futures-util", + "hdrhistogram", + "itertools", + "jemallocator", + "log 0.4.20", + "mango-feeds-connector", + "mango-feeds-lib", + "mango-v4", + "mango-v4-client", + "native-tls", + "rustls 0.20.9", + "serde", + "serde_derive", + "serde_json", + "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)", + "services-mango-lib", + "solana-client", + "solana-logger", + "solana-sdk", + "tokio", + "tokio-tungstenite 0.17.2", + "toml", + "tracing", + "ws", +] + [[package]] name = "bincode" version = "1.3.3" diff --git a/bin/benchmark-data-update/Cargo.toml b/bin/benchmark-data-update/Cargo.toml new file mode 100644 index 0000000000..feb0d7d890 --- /dev/null +++ b/bin/benchmark-data-update/Cargo.toml @@ -0,0 +1,54 @@ +[package] +name = "benchmark-data-update" +version = "0.1.0" +authors = ["Serge Farny "] +edition = "2021" +license = "AGPL-3.0-or-later" + +[dependencies] +mango-feeds-connector = { workspace = true } +mango-feeds-lib = { path = "../../lib/mango-feeds-lib" } +services-mango-lib = { path = "../../lib/services-mango-lib" } + +solana-client = { workspace = true } +solana-logger = { workspace = true } +solana-sdk = { workspace = true } + +anchor-lang = { workspace = true } +anchor-client = { workspace = true } + +fixed = { workspace = true, features = ["serde", "borsh"] } + +mango-v4 = { path = "../../programs/mango-v4", features = ["client"] } +mango-v4-client = { path = "../../lib/client" } + +serum_dex = { workspace = true } + +bs58 = "0.3.1" +log = "0.4" +anyhow = "1.0" +toml = "0.5" +serde = "1.0.130" +serde_derive = "1.0.130" +serde_json = "1.0.68" +futures = "0.3.17" +futures-core = "0.3" +futures-channel = "0.3" +futures-util = "0.3" +ws = "^0.9.2" +async-channel = "1.6" +async-trait = "0.1" +bytemuck = "^1.7.2" +itertools = "0.10.3" +jemallocator = "0.3.2" +chrono = "0.4.23" +base64 = "0.21" + +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.17" + +native-tls = "0.2" +rustls = "0.20.8" +tracing = { version = "0.1", features = ["log"] } + +hdrhistogram = "7.5.4" diff --git a/bin/benchmark-data-update/README.md b/bin/benchmark-data-update/README.md new file mode 100644 index 0000000000..495ff331d0 --- /dev/null +++ b/bin/benchmark-data-update/README.md @@ -0,0 +1,4 @@ +# benchmark data update + +Compare websocket and grpc connection performance + diff --git a/bin/benchmark-data-update/conf/example-config.toml b/bin/benchmark-data-update/conf/example-config.toml new file mode 100644 index 0000000000..b5172ed7b4 --- /dev/null +++ b/bin/benchmark-data-update/conf/example-config.toml @@ -0,0 +1,14 @@ +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[source_configuration] +rpc_ws_url = "wss://mango.rpcpool.com/" +rpc_http_url = "http://mango.rpcpool.com/" +snapshot_interval_secs = 900 +use_grpc = false +dedup_queue_size = 50000 + +[[source_configuration.grpc_sources]] +name = "benchmark-data-update" +connection_string = "http://tyo64.rpcpool.com/" +#token = "" +retry_connection_sleep_secs = 30 diff --git a/bin/benchmark-data-update/src/configuration.rs b/bin/benchmark-data-update/src/configuration.rs new file mode 100644 index 0000000000..8b0f2af47f --- /dev/null +++ b/bin/benchmark-data-update/src/configuration.rs @@ -0,0 +1,23 @@ +use mango_feeds_connector::GrpcSourceConfig; +use serde_derive::Deserialize; +use services_mango_lib::env_helper::string_or_env; + +#[derive(Clone, Debug, Deserialize)] +pub struct Configuration { + #[serde(deserialize_with = "string_or_env")] + pub mango_group: String, + pub source_configuration: SourceConfiguration, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SourceConfiguration { + #[serde(deserialize_with = "string_or_env")] + pub rpc_http_url: String, + #[serde(deserialize_with = "string_or_env")] + pub rpc_ws_url: String, + + pub snapshot_interval_secs: u64, + + pub dedup_queue_size: usize, + pub grpc_sources: Vec, +} diff --git a/bin/benchmark-data-update/src/main.rs b/bin/benchmark-data-update/src/main.rs new file mode 100644 index 0000000000..30ae2b9d8f --- /dev/null +++ b/bin/benchmark-data-update/src/main.rs @@ -0,0 +1,72 @@ +mod configuration; +mod processors; + +use futures_util::StreamExt; +// use mango_feeds_connector::metrics; +use mango_v4_client::tracing_subscriber_init; +use std::fs::File; +use std::io::Read; +use std::sync::atomic::Ordering; + +use crate::configuration::Configuration; +use crate::processors::data::{DataEventSource, DataProcessor}; +use crate::processors::exit::ExitProcessor; +use crate::processors::logger::LoggerProcessor; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + eprintln!("Please enter a config file path argument."); + return Ok(()); + } + + let configuration: Configuration = { + let mut file = File::open(&args[1])?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + toml::from_str(&contents).unwrap() + }; + + tracing_subscriber_init(); + + let exit_processor = ExitProcessor::init().await?; + + let ws_processor: DataProcessor = DataProcessor::init( + &configuration, + DataEventSource::Websocket, + exit_processor.exit.clone(), + ) + .await?; + let grpc_processor: DataProcessor = DataProcessor::init( + &configuration, + DataEventSource::Grpc, + exit_processor.exit.clone(), + ) + .await?; + + let logger_processor = LoggerProcessor::init( + &ws_processor.channel, + &grpc_processor.channel, + exit_processor.exit.clone(), + ) + .await?; + + let jobs = vec![ + exit_processor.job, + ws_processor.job, + grpc_processor.job, + logger_processor.job, + ]; + let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect(); + + while let Some(_) = jobs.next().await { + // if any job exit, stop the others threads & wait + exit_processor.exit.store(true, Ordering::Relaxed); + } + + // for now, we force exit here because websocket connection to RPC is not properly closed on exit + tracing::warn!("killing process"); + std::process::exit(0x0100); +} diff --git a/bin/benchmark-data-update/src/processors/data.rs b/bin/benchmark-data-update/src/processors/data.rs new file mode 100644 index 0000000000..ef06f20a3b --- /dev/null +++ b/bin/benchmark-data-update/src/processors/data.rs @@ -0,0 +1,238 @@ +use crate::configuration::Configuration; +use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot}; +use async_channel::Receiver; +use chrono::Utc; +use itertools::Itertools; +use mango_v4_client::account_update_stream::{Message, SnapshotType}; +use mango_v4_client::snapshot_source::is_mango_account; +use mango_v4_client::{ + account_update_stream, grpc_source, snapshot_source, websocket_source, MangoGroupContext, +}; +use services_mango_lib::fail_or_retry; +use services_mango_lib::retry_counter::RetryCounter; +use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use std::fmt::{Display, Pointer}; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::warn; + +pub struct DataProcessor { + pub channel: tokio::sync::broadcast::Sender, + pub job: JoinHandle<()>, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum DataEventSource { + Websocket, + Grpc, +} + +impl Display for DataEventSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +#[derive(Clone, Debug)] +pub enum DataEvent { + Other, + Snapshot(SnapshotEvent), + AccountUpdate(AccountUpdateEvent), +} + +#[derive(Clone, Debug)] +pub struct SnapshotEvent { + pub received_at: chrono::DateTime, + pub accounts: Vec, + pub source: DataEventSource, + pub slot: u64, +} + +#[derive(Clone, Debug)] +pub struct AccountUpdateEvent { + pub received_at: chrono::DateTime, + pub account: Pubkey, + pub source: DataEventSource, + pub slot: u64, +} + +impl DataProcessor { + pub async fn init( + configuration: &Configuration, + source: DataEventSource, + exit: Arc, + ) -> anyhow::Result { + let mut retry_counter = RetryCounter::new(2); + let mango_group = Pubkey::from_str(&configuration.mango_group)?; + let mango_stream = fail_or_retry!( + retry_counter, + Self::init_mango_source(configuration, source, exit.clone()).await + )?; + let (sender, _) = tokio::sync::broadcast::channel(8192); + let sender_clone = sender.clone(); + + let job = tokio::spawn(async move { + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down data processor..."); + break; + } + tokio::select! { + Ok(msg) = mango_stream.recv() => { + let received_at = Utc::now(); + if sender_clone.receiver_count() == 0 { + continue; + } + + let event = Self::parse_message(msg, source, received_at, mango_group); + + if event.is_none() { + continue; + } + + let res = sender_clone.send(event.unwrap()); + if res.is_err() { + break; + } + }, + else => { + warn!("mango update channel err"); + break; + } + } + } + }); + + let result = DataProcessor { + channel: sender, + job, + }; + + Ok(result) + } + + fn new_rpc_async(configuration: &Configuration) -> RpcClientAsync { + let commitment = CommitmentConfig::processed(); + RpcClientAsync::new_with_timeout_and_commitment( + configuration.source_configuration.rpc_http_url.clone(), + Duration::from_secs(60), + commitment, + ) + } + + fn parse_message( + message: Message, + source: DataEventSource, + received_at: chrono::DateTime, + mango_group: Pubkey, + ) -> Option { + match message { + Message::Account(account_write) => { + if is_mango_account(&account_write.account, &mango_group).is_some() { + return Some(AccountUpdate(AccountUpdateEvent { + account: account_write.pubkey, + received_at, + source, + slot: account_write.slot, + })); + } + } + Message::Snapshot(snapshot, snapshot_type) => { + let slot = snapshot[0].slot; + let mut result = Vec::new(); + for update in snapshot.iter() { + if is_mango_account(&update.account, &mango_group).is_some() { + result.push(update.pubkey); + assert!(slot == update.slot); + } + } + + return Some(Snapshot(SnapshotEvent { + accounts: result, + received_at, + source: source, + slot: slot, + })); + } + _ => {} + }; + + return Some(Other); + } + + async fn init_mango_source( + configuration: &Configuration, + source: DataEventSource, + exit: Arc, + ) -> anyhow::Result> { + // + // Client setup + // + let rpc_async = Self::new_rpc_async(configuration); + + let mango_group = Pubkey::from_str(&configuration.mango_group)?; + let group_context = MangoGroupContext::new_from_rpc(&rpc_async, mango_group).await?; + + let mango_oracles = group_context + .tokens + .values() + .map(|value| value.oracle) + .chain(group_context.perp_markets.values().map(|p| p.oracle)) + .unique() + .collect::>(); + + let serum_programs = group_context + .serum3_markets + .values() + .map(|s3| s3.serum_program) + .unique() + .collect_vec(); + + let (account_update_sender, account_update_receiver) = + async_channel::unbounded::(); + + if source == DataEventSource::Grpc { + let metrics_config = mango_feeds_connector::MetricsConfig { + output_stdout: false, + output_http: false, + }; + let metrics = mango_feeds_connector::metrics::start( + metrics_config, + "benchmark-data-update".to_string(), + ); + let sources = configuration.source_configuration.grpc_sources.clone(); + + grpc_source::start( + grpc_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + grpc_sources: sources, + }, + mango_oracles.clone(), + account_update_sender.clone(), + metrics, + exit, + ); + } else { + websocket_source::start( + websocket_source::Config { + rpc_http_url: configuration.source_configuration.rpc_http_url.clone(), + rpc_ws_url: configuration.source_configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + }, + mango_oracles.clone(), + account_update_sender.clone(), + ); + } + + Ok(account_update_receiver) + } +} diff --git a/bin/benchmark-data-update/src/processors/exit.rs b/bin/benchmark-data-update/src/processors/exit.rs new file mode 100644 index 0000000000..8ef8d0ef49 --- /dev/null +++ b/bin/benchmark-data-update/src/processors/exit.rs @@ -0,0 +1,39 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +pub struct ExitProcessor { + pub job: JoinHandle<()>, + pub exit: Arc, +} + +impl ExitProcessor { + pub async fn init() -> anyhow::Result { + let exit: Arc = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + loop { + if exit_clone.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + tokio::select! { + _ = interval.tick() => {} + _ = tokio::signal::ctrl_c()=> { + info!("Received SIGINT, shutting down..."); + exit_clone.store(true, Ordering::Relaxed); + break; + } + } + } + + warn!("shutting down exit processor..."); + }); + + let result = ExitProcessor { job, exit }; + Ok(result) + } +} diff --git a/bin/benchmark-data-update/src/processors/logger.rs b/bin/benchmark-data-update/src/processors/logger.rs new file mode 100644 index 0000000000..e705c41caf --- /dev/null +++ b/bin/benchmark-data-update/src/processors/logger.rs @@ -0,0 +1,176 @@ +use crate::configuration::Configuration; +use chrono::Utc; +use hdrhistogram::Histogram; +use solana_sdk::blake3::Hash; +use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +use super::data::{AccountUpdateEvent, DataEvent, DataEventSource, SnapshotEvent}; + +pub struct LoggerProcessor { + pub job: JoinHandle<()>, +} + +impl LoggerProcessor { + /// TODO FAS + /// Enlever slot de la key, et comparer en mode "min slot" -> il faut un update avec upd.slot >= existing.slot pour match + + pub async fn init( + data_sender_1: &tokio::sync::broadcast::Sender, + data_sender_2: &tokio::sync::broadcast::Sender, + exit: Arc, + ) -> anyhow::Result { + let mut first = true; + let mut got_1 = false; + let mut got_2 = false; + let mut data_1 = data_sender_1.subscribe(); + let mut data_2: tokio::sync::broadcast::Receiver = data_sender_2.subscribe(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(5 * 1000)); + let mut events = HashMap::::new(); + let mut grpc_late = Histogram::::new(3).unwrap(); + let mut ws_late = Histogram::::new(3).unwrap(); + + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + tokio::select! { + _ = interval.tick() => { + if !first { + Self::print(&mut events, &mut ws_late, &mut grpc_late); + continue; + } + + ws_late.clear(); + grpc_late.clear(); + events.clear(); + first = !got_1 && !got_2; + }, + Ok(msg) = data_1.recv() => { got_1 |= Self::handle(msg, &mut events, &mut ws_late, &mut grpc_late) }, + Ok(msg) = data_2.recv() => { got_2 |= Self::handle(msg, &mut events, &mut ws_late, &mut grpc_late) }, + } + } + }); + + let result = LoggerProcessor { job }; + + Ok(result) + } + + fn handle_account( + upd: AccountUpdateEvent, + pending_events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + is_snapshot: bool, + ) { + let key = upd.account; + if let Some(existing) = pending_events.get(&key) { + if existing.slot > upd.slot { + // still lagging + return; + } + + let delay = (upd.received_at - existing.received_at) + .num_nanoseconds() + .unwrap(); + match existing.source { + DataEventSource::Websocket => grpc_late.record(delay as u64).unwrap(), + DataEventSource::Grpc => ws_late.record(delay as u64).unwrap(), + } + + if is_snapshot { + // only match existing, + // but don't expect matching from the other source as there is probably nothing updated for the account + pending_events.remove(&key); + return; + } + + if upd.slot == existing.slot { + pending_events.remove(&key); + } else { + pending_events.insert(key, upd); + } + } else { + if is_snapshot { + return; // ignore + } + + pending_events.insert(key, upd); + } + } + + fn print( + events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + ) { + let ws_late = format!( + "{:?}", + Duration::from_nanos(ws_late.value_at_quantile(0.99)) + ); + let grpc_late = format!( + "{:?}", + Duration::from_nanos(grpc_late.value_at_quantile(0.99)) + ); + let pending_ws_events_count = events + .iter() + .filter(|f| f.1.source == DataEventSource::Grpc) + .count(); + let pending_grpc_events_count = events.len() - pending_ws_events_count; + + for x in events { + tracing::debug!( + "{} => {} {} (got from {})", + x.0, x.1.slot, x.1.received_at, x.1.source + ) + } + + info!( + ws_lateness = %ws_late, + grpc_lateness = %grpc_late, + pending_ws_events_count = %pending_ws_events_count, + pending_grpc_events_count = %pending_grpc_events_count, + ) + } + + fn handle( + msg: DataEvent, + events: &mut HashMap, + ws_late: &mut Histogram, + grpc_late: &mut Histogram, + ) -> bool { + match msg { + DataEvent::Other => false, + DataEvent::Snapshot(upd) => { + for acc in upd.accounts { + Self::handle_account( + AccountUpdateEvent { + received_at: upd.received_at, + account: acc, + source: upd.source, + slot: upd.slot, + }, + events, + ws_late, + grpc_late, + true, + ); + } + false + } + DataEvent::AccountUpdate(upd) => { + Self::handle_account(upd, events, ws_late, grpc_late, false); + true + } + } + } +} diff --git a/bin/benchmark-data-update/src/processors/mod.rs b/bin/benchmark-data-update/src/processors/mod.rs new file mode 100644 index 0000000000..43e2c9983d --- /dev/null +++ b/bin/benchmark-data-update/src/processors/mod.rs @@ -0,0 +1,3 @@ +pub mod data; +pub mod exit; +pub mod logger;