Skip to content

Commit

Permalink
WIP fix server
Browse files Browse the repository at this point in the history
  • Loading branch information
CAGS295 committed Aug 14, 2024
1 parent b662df2 commit 1392070
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 55 deletions.
File renamed without changes.
18 changes: 12 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.6.18", features = ["macros"] }
axum = { version = "^0.7", features = ["macros"] }
async-trait = "0.1.57"
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = { version = "0.6.2", default-features = false }
ctrlc = "3.2.3"
futures-util = "0.3.21"
reqwest = { version = "0.11.16", default-features = false, features = [
reqwest = { version = "^0.12", default-features = false, features = [
"json",
"native-tls-vendored",
] }
Expand All @@ -29,19 +29,22 @@ serde = "*"
left-right = "0.11.5"
tonic = { version = "*", optional = true }
prost = { version = "*", optional = true }
hyper = { version = "0.14.26", features = ["stream"] }
http = "0.2.9"
tower-http = { version = "0.4.3", default-features = false, features = [
hyper = { version = "^1" }
http = "^1"
tower-http = { version = "^0", default-features = false, features = [
"compression-gzip",
] }
flate2 = "1.0.26"
flate2 = "^1.0"
opentelemetry = "0.19.0"
tracing-opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
hyper-util = { version = "0.1.6", features = ["server", "http2", "tokio"] }
tower = "0.4.13"

[dev-dependencies]
rand = { version = "*" }
criterion = "*"
http-body-util = "0.1.2"

[features]
default = ["grpc", "codec"]
Expand All @@ -55,3 +58,6 @@ harness = false
[[bench]]
name = "depth_monitor_scale"
harness = false

#[patch."https://github.com/CAGS295/lob.git"]
#lob = { path = "../lob" }
31 changes: 17 additions & 14 deletions benches/depth_monitor_scale.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,43 @@
use axum::body::Body;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use flate2::read::GzDecoder;
use http::header::ACCEPT_ENCODING;
use http::Request;
use hyper::body::to_bytes;
use hyper::Body;
use hyper::Client;
use http::Response;
use http_body_util::BodyExt;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioTimer;
use lob::Decode;
use lob::LimitOrderBook;
use std::io::Read;
use std::time::Duration;
use std::{io::Read, time::Duration};

fn random_gets(c: &mut Criterion) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed building the Runtime");
let client = Client::builder()
let client = Client::builder(TokioExecutor::new())
.http2_only(true)
.http2_keep_alive_interval(Some(Duration::from_millis(20)))
.build_http::<Body>();
.http2_keep_alive_interval(Some(Duration::from_millis(200)))
.timer(TokioTimer::new())
.build_http();

c.bench_function("depth scale", |b| {
b.iter(|| {
let req = Request::builder()
.header(ACCEPT_ENCODING, "gzip")
.method("GET")
.uri("http://[::1]:50051/scale/depth/btcusdt")
.body(Body::default())
.body(Body::empty())
.unwrap();
let response = rt.block_on(client.request(req)).unwrap();
let response: Response<_> = rt.block_on(client.request(req)).unwrap();
let body = response.into_body();
let bytes = rt.block_on(to_bytes(body)).unwrap();
let mut x = Vec::new();
GzDecoder::new(&bytes[..]).read_to_end(&mut x).unwrap();
let bytes: Vec<u8> = rt.block_on(body.collect()).unwrap().to_bytes().into();
let mut buf = Vec::new();
GzDecoder::new(&bytes[..]).read_to_end(&mut buf).unwrap();

let book_snapshot = LimitOrderBook::decode(&mut x.as_slice()).unwrap();
let book_snapshot = LimitOrderBook::decode(&mut buf.as_slice()).unwrap();

black_box(book_snapshot);
})
Expand Down
20 changes: 8 additions & 12 deletions examples/depth_sampler_scale.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use flate2::read::GzDecoder;
use http::header::{ACCEPT_ENCODING, CONTENT_ENCODING};
use http::{Request, StatusCode};
use hyper::body::to_bytes;
use hyper::{Body, Client};
use http::StatusCode;
use lob::Decode;
use lob::LimitOrderBook;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use reqwest::{Client, Method};
use std::io::Read;
use std::time::Duration;
use std::{println, thread};
Expand All @@ -30,19 +29,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
b / 2
);

let client = Client::builder().http2_only(true).build_http::<Body>();
let client = Client::new();

while !signal.is_terminated() {
let request = Request::builder()
let req = client
.request(Method::GET, "http://[::1]:50051/scale/depth/btcusdt")
.header(ACCEPT_ENCODING, "gzip")
.method("GET")
.uri("http://[::1]:50051/scale/depth/btcusdt")
.body(Body::default())
.unwrap();
.build()?;

let future = client.request(request);
let response = client.execute(req).await?;

let response = future.await?;
match response.status() {
StatusCode::OK => {}
err => {
Expand All @@ -63,7 +59,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
None => false,
};
let bytes = to_bytes(response.into_body()).await?;
let bytes = response.bytes().await?;
let mut decode_output_buffer;
let mut bytes = if decode {
decode_output_buffer = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ pub(crate) trait EventHandler<Monitorable> {

/// A handler shoud be identifiable from a Self::Update.
/// This is necessary to route updates to their handler when processing multiple subscriptions from the same source.
fn to_id<'a>(event: &'a Self::Update) -> &'a str;
fn to_id(event: &Self::Update) -> &str;

/// Use the Result to break out of the handler loop;
/// Handle a raw message.
fn handle(&mut self, msg: Message) -> Result<(), ()> {
let Some(update) = Self::parse_update(msg)? else{
let Some(update) = Self::parse_update(msg)? else {
//Skip if not a relevant update.
return Ok(());
};
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/multiplexor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ where
H::parse_update(event)
}

fn to_id<'b>(event: &'b Self::Update) -> &'b str {
fn to_id(event: &Self::Update) -> &str {
H::to_id(event)
}

fn handle_update(&mut self, update: Self::Update) -> Result<(), ()> {
let id = Self::to_id(&update);

let Some(handle) = self.writers.get_mut(id)else{
let Some(handle) = self.writers.get_mut(id) else {
error!("Unknown handle {id}");
return Ok(());
};
Expand Down
4 changes: 2 additions & 2 deletions src/monitor/order_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl EventHandler<Depth> for OrderBook {
}
}

fn to_id<'a>(update: &'a DepthUpdate) -> &'a str {
fn to_id(update: &DepthUpdate) -> &str {
&update.event.symbol
}

Expand Down Expand Up @@ -176,7 +176,7 @@ mod test {
};

let x = DepthUpdate {
asks: asks,
asks,
..Default::default()
};
//Skip the first publish optimization.
Expand Down
2 changes: 1 addition & 1 deletion src/providers/binance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod test {

#[test]
fn subscription_serialization() {
let symbols = vec!["a", "b"];
let symbols = ["a", "b"];
let value = Binance.ws_subscriptions(symbols.iter());
assert_eq!(
value,
Expand Down
57 changes: 41 additions & 16 deletions src/servers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
pub mod grpc;
pub mod scale;

use axum::Router;
use axum::{extract::Request, Router};
use futures_util::TryFutureExt;
#[cfg(feature = "grpc")]
pub use grpc::{limit_order_book_service_client, Pair};
use hyper::server::conn::AddrIncoming;
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use left_right::ReadHandleFactory;
use std::collections::HashMap;
use std::net::Ipv6Addr;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use tower::Service;
use tracing::{error, info, warn};

#[derive(Clone)]
Expand Down Expand Up @@ -53,28 +56,50 @@ async fn inner_start(

#[cfg(feature = "codec")]
let router = {
use axum::{handler::Handler, routing::get};
use tower_http::compression::CompressionLayer;

router.route(
"/scale/depth/:symbol",
get(scale::serve_book.layer(CompressionLayer::new())).with_state(Hook(factory)),
axum::routing::get(scale::serve_book)
.with_state(Hook(factory))
.layer(CompressionLayer::new()),
)
};

let app = router.into_make_service();
let incoming = AddrIncoming::bind(&addr).unwrap();
axum::Server::builder(incoming)
.tcp_keepalive_interval(Some(Duration::from_millis(500)))
.tcp_nodelay(true)
.http2_only(true)
.serve(app)
.await
.map_err(|e| {
error!("{e}");
})?;
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();

Ok(())
loop {
let (stream, addr) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
error!("{e}");
continue;
}
};

info!("Connection {addr}, accepted");

stream.set_nodelay(true).unwrap();

let tower_service = router.clone();
tokio::spawn(async move {
let stream = TokioIo::new(stream);
let hyper_service = hyper::service::service_fn(move |req: Request<Incoming>| {
tower_service.clone().call(req)
});

let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
//builder.keep_alive_interval(Some(Duration::from_millis(500)));
//builder.timer(TokioTimer::new());

let x = builder
.serve_connection(stream, hyper_service)
.map_err(|e| {
error!("{e}");
});
x.await.unwrap();
});
}
}

pub fn start(
Expand Down
6 changes: 6 additions & 0 deletions src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ use std::sync::{atomic::AtomicBool, Arc};
#[derive(Clone)]
pub struct Terminate(Arc<AtomicBool>);

impl Default for Terminate {
fn default() -> Self {
Self::new()
}
}

impl Terminate {
pub fn new() -> Terminate {
let flag = Terminate(Arc::new(AtomicBool::new(false)));
Expand Down

0 comments on commit 1392070

Please sign in to comment.