Skip to content

Commit

Permalink
benchmark-data-update: draft a comparison tools between websocket and…
Browse files Browse the repository at this point in the history
… grpc
  • Loading branch information
farnyser committed Mar 28, 2024
1 parent 81d6285 commit c884ad1
Show file tree
Hide file tree
Showing 10 changed files with 666 additions and 0 deletions.
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions bin/benchmark-data-update/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
[package]
name = "benchmark-data-update"
version = "0.1.0"
authors = ["Serge Farny <[email protected]>"]
edition = "2021"
license = "AGPL-3.0-or-later"

[dependencies]
mango-feeds-connector = { workspace = true }
mango-feeds-lib = { path = "../../lib/mango-feeds-lib" }
services-mango-lib = { path = "../../lib/services-mango-lib" }

solana-client = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }

anchor-lang = { workspace = true }
anchor-client = { workspace = true }

fixed = { workspace = true, features = ["serde", "borsh"] }

mango-v4 = { path = "../../programs/mango-v4", features = ["client"] }
mango-v4-client = { path = "../../lib/client" }

serum_dex = { workspace = true }

bs58 = "0.3.1"
log = "0.4"
anyhow = "1.0"
toml = "0.5"
serde = "1.0.130"
serde_derive = "1.0.130"
serde_json = "1.0.68"
futures = "0.3.17"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
ws = "^0.9.2"
async-channel = "1.6"
async-trait = "0.1"
bytemuck = "^1.7.2"
itertools = "0.10.3"
jemallocator = "0.3.2"
chrono = "0.4.23"
base64 = "0.21"

tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"

native-tls = "0.2"
rustls = "0.20.8"
tracing = { version = "0.1", features = ["log"] }

hdrhistogram = "7.5.4"
4 changes: 4 additions & 0 deletions bin/benchmark-data-update/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# benchmark data update

Compare websocket and grpc connection performance

14 changes: 14 additions & 0 deletions bin/benchmark-data-update/conf/example-config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"

[source_configuration]
rpc_ws_url = "wss://mango.rpcpool.com/<TOKEN>"
rpc_http_url = "http://mango.rpcpool.com/<TOKEN>"
snapshot_interval_secs = 900
use_grpc = false
dedup_queue_size = 50000

[[source_configuration.grpc_sources]]
name = "benchmark-data-update"
connection_string = "http://tyo64.rpcpool.com/"
#token = "<TOKEN>"
retry_connection_sleep_secs = 30
23 changes: 23 additions & 0 deletions bin/benchmark-data-update/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use mango_feeds_connector::GrpcSourceConfig;
use serde_derive::Deserialize;
use services_mango_lib::env_helper::string_or_env;

#[derive(Clone, Debug, Deserialize)]
pub struct Configuration {
#[serde(deserialize_with = "string_or_env")]
pub mango_group: String,
pub source_configuration: SourceConfiguration,
}

#[derive(Clone, Debug, Deserialize)]
pub struct SourceConfiguration {
#[serde(deserialize_with = "string_or_env")]
pub rpc_http_url: String,
#[serde(deserialize_with = "string_or_env")]
pub rpc_ws_url: String,

pub snapshot_interval_secs: u64,

pub dedup_queue_size: usize,
pub grpc_sources: Vec<GrpcSourceConfig>,
}
72 changes: 72 additions & 0 deletions bin/benchmark-data-update/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
mod configuration;
mod processors;

use futures_util::StreamExt;
// use mango_feeds_connector::metrics;
use mango_v4_client::tracing_subscriber_init;
use std::fs::File;
use std::io::Read;
use std::sync::atomic::Ordering;

use crate::configuration::Configuration;
use crate::processors::data::{DataEventSource, DataProcessor};
use crate::processors::exit::ExitProcessor;
use crate::processors::logger::LoggerProcessor;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();

if args.len() < 2 {
eprintln!("Please enter a config file path argument.");
return Ok(());
}

let configuration: Configuration = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};

tracing_subscriber_init();

let exit_processor = ExitProcessor::init().await?;

let ws_processor: DataProcessor = DataProcessor::init(
&configuration,
DataEventSource::Websocket,
exit_processor.exit.clone(),
)
.await?;
let grpc_processor: DataProcessor = DataProcessor::init(
&configuration,
DataEventSource::Grpc,
exit_processor.exit.clone(),
)
.await?;

let logger_processor = LoggerProcessor::init(
&ws_processor.channel,
&grpc_processor.channel,
exit_processor.exit.clone(),
)
.await?;

let jobs = vec![
exit_processor.job,
ws_processor.job,
grpc_processor.job,
logger_processor.job,
];
let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect();

while let Some(_) = jobs.next().await {
// if any job exit, stop the others threads & wait
exit_processor.exit.store(true, Ordering::Relaxed);
}

// for now, we force exit here because websocket connection to RPC is not properly closed on exit
tracing::warn!("killing process");
std::process::exit(0x0100);
}
Loading

0 comments on commit c884ad1

Please sign in to comment.