diff --git a/Cargo.lock b/Cargo.lock index 891eaee2..83268877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1910,9 +1910,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", @@ -1998,10 +1998,18 @@ dependencies = [ "config", "criterion", "entrystore", + "humantime", "logger", "metriken", + "parking_lot", + "protocol-common", "protocol-memcache", + "serde", "server", + "session", + "tokio", + "toml", + "warp", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0dbe361d..1417cc6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ switchboard = "0.2.1" syn = "2.0.38" thiserror = "1.0.49" tiny_http = "0.12.0" +tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] } toml = "0.8.2" twox-hash = { version = "1.6.3", default-features = false } urlencoding = "2.1.3" diff --git a/config/segcache.toml b/config/segcache.toml index 4eb3452c..2438e016 100644 --- a/config/segcache.toml +++ b/config/segcache.toml @@ -1,4 +1,6 @@ -daemonize = false +[general] +# choose between 'mio' and 'tokio' +engine = "mio" [admin] # interfaces listening on diff --git a/src/server/pingserver/Cargo.toml b/src/server/pingserver/Cargo.toml index cf9cef33..ba52597d 100644 --- a/src/server/pingserver/Cargo.toml +++ b/src/server/pingserver/Cargo.toml @@ -45,7 +45,7 @@ rustls-native-certs = "0.8.0" serde = { workspace = true, features = ["derive"] } server = { path = "../../core/server" } session = { path = "../../session" } -tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } toml = { workspace = true } tonic = { version = "0.12.2" } warp = "0.3.7" diff --git a/src/server/segcache/Cargo.toml b/src/server/segcache/Cargo.toml index 8eba4f70..cd9bb30f 100644 --- a/src/server/segcache/Cargo.toml +++ b/src/server/segcache/Cargo.toml @@ -9,31 +9,11 @@ homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } -[lib] -name = "pelikan_segcache_rs" -path = "src/lib.rs" -doc = true - [[bin]] name = "pelikan_segcache_rs" path = "src/main.rs" doc = false -[[test]] -name = "integration" -path = "tests/integration.rs" -harness = false - -[[test]] -name = "integration_multi" -path = "tests/integration_multi.rs" -harness = false - -[[bench]] -name = "benchmark" -path = "benches/benchmark.rs" -harness = false - [features] debug = ["entrystore/debug"] @@ -43,10 +23,18 @@ clap = { workspace = true } common = { path = "../../common" } config = { path = "../../config" } entrystore = { path = "../../entrystore" } +humantime = "2.1.0" logger = { path = "../../logger" } metriken = { workspace = true } +parking_lot = "0.12.3" +protocol-common = { path = "../../protocol/common" } protocol-memcache = { path = "../../protocol/memcache" } +serde = { workspace = true, features = ["derive"] } server = { path = "../../core/server", features = ["boringssl"] } +session = { path = "../../session" } +tokio = { workspace = true, features = ["full"] } +toml = { workspace = true } +warp = "0.3.7" [dev-dependencies] criterion = "0.5.1" diff --git a/src/server/segcache/benches/benchmark.rs b/src/server/segcache/benches/benchmark.rs deleted file mode 100644 index 0db17d4c..00000000 --- a/src/server/segcache/benches/benchmark.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! This is a very basic benchmark which tests only get requests with a few -//! different key and value sizes. It's only using one connection and a very -//! primitive blocking client, so these results do not reflect the true -//! performance of the server when under load. It can be used to get a rough -//! idea of how changes may impact performance. -//! -//! For formal performance testing, it is recommended to use -//! [rpc-perf](https://github.com/twitter/rpc-perf) or another cache -//! benchmarking tool which supports the Memcache ASCII protocol. - -use config::SegcacheConfig; -use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use pelikan_segcache_rs::Segcache; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::time::Duration; - -fn get_benchmark(c: &mut Criterion) { - // use the default config - let config = SegcacheConfig::default(); - - // launch the server - let server = Segcache::new(config).expect("failed to launch segcache"); - - // wait for server to startup. duration is chosen to be longer than we'd - // expect startup to take in a slow ci environment. - std::thread::sleep(Duration::from_secs(10)); - - // connect and initialize an empty buffer - let mut stream = TcpStream::connect("127.0.0.1:12321").expect("failed to connect"); - let mut buffer = vec![0; 1024 * 1024]; - - // define a benchmarking group - let mut group = c.benchmark_group("request"); - group.throughput(Throughput::Elements(1)); - - let mut key_id = 0; - - // benchmark for a few key lengths - for klen in [1, 16, 64, 255].iter() { - // benchmark getting empty value - let bench_name = format!("get/{klen}b/0b"); - let key = format!("{:01$}", 0, klen); - let msg = format!("get {key}\r\n"); - group.bench_function(&bench_name, |b| { - b.iter(|| { - assert!(stream.write_all(msg.as_bytes()).is_ok()); - if let Ok(bytes) = stream.read(&mut buffer) { - assert_eq!(&buffer[0..bytes], b"END\r\n", "invalid response"); - } else { - panic!("read error"); - } - }) - }); - - // benchmark across a few value lengths - for vlen in [1, 64, 1024, 4096].iter() { - let key = format!("{key_id:0klen$}"); - let value = format!("{:A>1$}", 0, vlen); - let msg = format!("set {key} 0 0 {vlen}\r\n{value}\r\n"); - assert!(stream.write_all(msg.as_bytes()).is_ok()); - if let Ok(bytes) = stream.read(&mut buffer) { - assert_eq!(&buffer[0..bytes], b"STORED\r\n", "invalid response"); - } else { - panic!("read error"); - } - - let bench_name = format!("get/{klen}b/{vlen}b"); - let msg = format!("get {key}\r\n"); - let response = format!("VALUE {key} 0 {vlen}\r\n{value}\r\nEND\r\n"); - group.bench_function(&bench_name, |b| { - b.iter(|| { - assert!(stream.write_all(msg.as_bytes()).is_ok()); - if let Ok(bytes) = stream.read(&mut buffer) { - assert_eq!(&buffer[0..bytes], response.as_bytes(), "invalid response"); - } else { - panic!("read error"); - } - }) - }); - - key_id += 1; - } - } - - // shutdown the server - server.shutdown(); -} - -criterion_group!(benches, get_benchmark); -criterion_main!(benches); diff --git a/src/server/segcache/src/config.rs b/src/server/segcache/src/config.rs new file mode 100644 index 00000000..6a529d24 --- /dev/null +++ b/src/server/segcache/src/config.rs @@ -0,0 +1,195 @@ +use config::*; + +use serde::{Deserialize, Serialize}; + +use std::io::Read; +use std::net::SocketAddr; +use std::time::Duration; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Config { + #[serde(default)] + pub general: General, + #[serde(default)] + pub metrics: Metrics, + + // application modules + #[serde(default)] + pub admin: Admin, + #[serde(default)] + pub server: Server, + #[serde(default)] + pub worker: Worker, + #[serde(default)] + pub time: Time, + #[serde(default)] + pub tls: Tls, + #[serde(default)] + pub seg: Seg, + + // ccommon + #[serde(default)] + pub buf: Buf, + #[serde(default)] + pub debug: Debug, + #[serde(default)] + pub klog: Klog, + #[serde(default)] + pub sockio: Sockio, + #[serde(default)] + pub tcp: Tcp, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct General { + pub engine: Engine, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Metrics { + #[serde(default = "interval")] + pub interval: String, +} + +impl Default for Metrics { + fn default() -> Self { + Self { + interval: interval(), + } + } +} + +impl Metrics { + pub fn interval(&self) -> Duration { + self.interval.parse::().unwrap().into() + } +} + +fn interval() -> String { + "1s".into() +} + +#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum Engine { + #[default] + Mio, + Tokio, +} + +impl Config { + pub fn load(file: &str) -> Result { + let mut file = std::fs::File::open(file)?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + + let config: Config = match toml::from_str(&content) { + Ok(t) => t, + Err(e) => { + error!("{}", e); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Error parsing config", + )); + } + }; + + match config.metrics.interval.parse::() { + Ok(interval) => { + if Into::::into(interval) < Duration::from_millis(10) { + eprintln!("metrics interval cannot be less than 10ms"); + std::process::exit(1); + } + } + Err(e) => { + eprintln!("metrics interval is not valid: {e}"); + std::process::exit(1); + } + } + + Ok(config) + } + + pub fn listen(&self) -> SocketAddr { + self.server + .socket_addr() + .map_err(|e| { + error!("{}", e); + std::io::Error::new(std::io::ErrorKind::Other, "Bad listen address") + }) + .map_err(|_| { + std::process::exit(1); + }) + .unwrap() + } +} + +impl AdminConfig for Config { + fn admin(&self) -> &Admin { + &self.admin + } +} + +impl BufConfig for Config { + fn buf(&self) -> &Buf { + &self.buf + } +} + +impl DebugConfig for Config { + fn debug(&self) -> &Debug { + &self.debug + } +} + +impl KlogConfig for Config { + fn klog(&self) -> &Klog { + &self.klog + } +} + +impl SegConfig for Config { + fn seg(&self) -> &Seg { + &self.seg + } +} + +impl ServerConfig for Config { + fn server(&self) -> &Server { + &self.server + } +} + +impl SockioConfig for Config { + fn sockio(&self) -> &Sockio { + &self.sockio + } +} + +impl TcpConfig for Config { + fn tcp(&self) -> &Tcp { + &self.tcp + } +} + +impl TimeConfig for Config { + fn time(&self) -> &Time { + &self.time + } +} + +impl TlsConfig for Config { + fn tls(&self) -> &Tls { + &self.tls + } +} + +impl WorkerConfig for Config { + fn worker(&self) -> &Worker { + &self.worker + } + + fn worker_mut(&mut self) -> &mut Worker { + &mut self.worker + } +} diff --git a/src/server/segcache/src/lib.rs b/src/server/segcache/src/lib.rs deleted file mode 100644 index 04b7fb03..00000000 --- a/src/server/segcache/src/lib.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! Segcache is a cache implementation which used segment based storage and uses -//! a subset of the Memcache protocol. Segment based storage allows us to -//! perform efficient eager expiration of items. - -use config::*; -use entrystore::Seg; -use logger::*; -use protocol_memcache::{Request, RequestParser, Response}; -use server::{Process, ProcessBuilder}; - -type Parser = RequestParser; -type Storage = Seg; - -/// This structure represents a running `Segcache` process. -#[allow(dead_code)] -pub struct Segcache { - process: Process, -} - -impl Segcache { - /// Creates a new `Segcache` process from the given `SegcacheConfig`. - pub fn new(config: SegcacheConfig) -> Result { - // initialize logging - let log_drain = configure_logging(&config); - - // initialize metrics - common::metrics::init(); - - // initialize storage - let storage = Storage::new(&config)?; - - // initialize parser - let parser = Parser::new() - .max_value_size(config.seg().segment_size() as usize) - .time_type(config.time().time_type()); - - // initialize process - let process_builder = ProcessBuilder::::new( - &config, log_drain, parser, storage, - )? - .version(env!("CARGO_PKG_VERSION")); - - // spawn threads - let process = process_builder.spawn(); - - Ok(Self { process }) - } - - /// Wait for all threads to complete. Blocks until the process has fully - /// terminated. Under normal conditions, this will block indefinitely. - pub fn wait(self) { - self.process.wait() - } - - /// Triggers a shutdown of the process and blocks until the process has - /// fully terminated. This is more likely to be used for running integration - /// tests or other automated testing. - pub fn shutdown(self) { - self.process.shutdown() - } -} - -common::metrics::test_no_duplicates!(); diff --git a/src/server/segcache/src/main.rs b/src/server/segcache/src/main.rs index 5bad855b..47d50ffa 100644 --- a/src/server/segcache/src/main.rs +++ b/src/server/segcache/src/main.rs @@ -15,13 +15,32 @@ #[macro_use] extern crate logger; +use entrystore::Seg; +use protocol_memcache::Request; +use protocol_memcache::RequestParser; +use protocol_memcache::Response; +use server::ProcessBuilder; +use std::sync::atomic::AtomicBool; +// use protocol_memcache::Storage; +use crate::config::Engine; use backtrace::Backtrace; use clap::{Arg, Command}; -use config::SegcacheConfig; +use logger::configure_logging; +// use config::SegcacheConfig; use metriken::*; -use pelikan_segcache_rs::Segcache; +// use pelikan_segcache_rs::Segcache; use server::PERCENTILES; +type Parser = RequestParser; +type Storage = Seg; + +mod config; +mod tokio; + +use crate::config::*; + +static RUNNING: AtomicBool = AtomicBool::new(true); + /// The entry point into the running Segcache instance. This function parses the /// command line options, loads the configuration, and launches the core /// threads. @@ -104,7 +123,7 @@ fn main() { // load config from file let config = if let Some(file) = matches.get_one::("CONFIG") { debug!("loading config: {}", file); - match SegcacheConfig::load(file) { + match Config::load(file) { Ok(c) => c, Err(error) => { eprintln!("error loading config file: {file}\n{error}"); @@ -116,16 +135,38 @@ fn main() { }; if matches.get_flag("print-config") { - config.print(); - std::process::exit(0); + todo!("not implemented"); + // config.print(); + // std::process::exit(0); } - // launch segcache - match Segcache::new(config) { - Ok(segcache) => segcache.wait(), - Err(e) => { - eprintln!("error launching segcache: {e}"); - std::process::exit(1); + // initialize logging + let log = configure_logging(&config); + + // initialize metrics + common::metrics::init(); + + // launch the server + match config.general.engine { + Engine::Mio => { + // initialize storage + let storage = Storage::new(&config).expect("failed to initialize storage"); + + // initialize parser + let parser = Parser::new() + .max_value_size(config.seg.segment_size() as usize) + .time_type(config.time.time_type()); + + // initialize process + let process_builder = ProcessBuilder::::new( + &config, log, parser, storage, + ) + .expect("failed to initialize process"); + + // spawn threads + let process = process_builder.spawn(); + process.wait(); } + Engine::Tokio => tokio::spawn(config, log), } } diff --git a/src/server/segcache/src/tokio/admin.rs b/src/server/segcache/src/tokio/admin.rs new file mode 100644 index 00000000..985cdbf3 --- /dev/null +++ b/src/server/segcache/src/tokio/admin.rs @@ -0,0 +1,233 @@ +use crate::tokio::METRICS_SNAPSHOT; +use crate::*; + +use ::config::AdminConfig; + +use metriken::Value; + +use std::net::ToSocketAddrs; +use std::sync::Arc; + +/// The HTTP admin server. +pub async fn http(config: Arc) { + let admin = filters::admin(); + + let addr = format!("{}:{}", config.admin().host(), config.admin().port()); + + let addr = addr + .to_socket_addrs() + .expect("bad listen address") + .next() + .expect("couldn't determine listen address"); + + warp::serve(admin).run(addr).await; +} + +mod filters { + use super::*; + use warp::Filter; + + /// The combined set of admin endpoint filters + pub fn admin() -> impl Filter + Clone { + prometheus_stats().or(human_stats()).or(json_stats()) + } + + /// Serves Prometheus / OpenMetrics text format metrics. + /// + /// GET /metrics + pub fn prometheus_stats( + ) -> impl Filter + Clone { + warp::path!("metrics") + .and(warp::get()) + .and_then(handlers::prometheus_stats) + } + + /// Serves a human readable metrics output. + /// + /// GET /vars + pub fn human_stats( + ) -> impl Filter + Clone { + warp::path!("vars") + .and(warp::get()) + .and_then(handlers::human_stats) + } + + /// Serves JSON metrics output that is compatible with Twitter Server / + /// Finagle metrics endpoints. Multiple paths are provided for enhanced + /// compatibility with metrics collectors. + /// + /// GET /metrics.json + /// GET /vars.json + /// GET /admin/metrics.json + pub fn json_stats( + ) -> impl Filter + Clone { + warp::path!("metrics.json") + .and(warp::get()) + .and_then(handlers::json_stats) + .or(warp::path!("vars.json") + .and(warp::get()) + .and_then(handlers::json_stats)) + .or(warp::path!("admin" / "metrics.json") + .and(warp::get()) + .and_then(handlers::json_stats)) + } +} + +pub mod handlers { + + use super::*; + use core::convert::Infallible; + use std::time::UNIX_EPOCH; + + /// Serves Prometheus / OpenMetrics text format metrics. All metrics have + /// type information, some have descriptions as well. Percentiles read from + /// heatmaps are exposed with a `percentile` label where the value + /// corresponds to the percentile in the range of 0.0 - 100.0. + /// + /// See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md + /// + /// ```text + /// # TYPE some_counter counter + /// # HELP some_counter An unsigned 64bit monotonic counter. + /// counter 0 + /// # TYPE some_gauge gauge + /// # HELP some_gauge A signed 64bit gauge. + /// some_gauge 0 + /// # TYPE some_distribution{percentile="50.0"} gauge + /// some_distribution{percentile="50.0"} 0 + /// ``` + pub async fn prometheus_stats() -> Result { + let mut data = Vec::new(); + + let metrics_snapshot = METRICS_SNAPSHOT.read().await; + + let timestamp = metrics_snapshot + .current + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } + + let name = metric.name(); + + match metric.value() { + Some(Value::Counter(value)) => { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} counter\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} counter\n{name} {value}")); + } + } + Some(Value::Gauge(value)) => { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} gauge\n{name} {value}")); + } + } + Some(Value::Other(_)) => { + let percentiles = metrics_snapshot.percentiles(metric.name()); + + for (_label, percentile, value) in percentiles { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); + } else { + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); + } + } + } + _ => continue, + } + } + + data.sort(); + let mut content = data.join("\n"); + content += "\n"; + let parts: Vec<&str> = content.split('/').collect(); + Ok(parts.join("_")) + } + + /// Serves JSON formatted metrics following the conventions of Finagle / + /// TwitterServer. Percentiles read from heatmaps will have a percentile + /// label appended to the metric name in the form `/p999` which would be the + /// 99.9th percentile. + /// + /// ```text + /// {"get/ok": 0,"client/request/p999": 0, ... } + /// ``` + pub async fn json_stats() -> Result { + let data = human_formatted_stats().await; + + let mut content = "{".to_string(); + content += &data.join(","); + content += "}"; + + Ok(content) + } + + /// Serves human readable stats. One metric per line with a `LF` as the + /// newline character (Unix-style). Percentiles will have percentile labels + /// appened with a `/` as a separator. + /// + /// ``` + /// get/ok: 0 + /// client/request/latency/p50: 0, + /// ``` + pub async fn human_stats() -> Result { + let data = human_formatted_stats().await; + + let mut content = data.join("\n"); + content += "\n"; + Ok(content) + } +} + +// human formatted stats that can be exposed as human stats or converted to json +pub async fn human_formatted_stats() -> Vec { + let mut data = Vec::new(); + + let metrics_snapshot = METRICS_SNAPSHOT.read().await; + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } + + let name = metric.name(); + + match metric.value() { + Some(Value::Counter(value)) => { + data.push(format!("\"{name}\": {value}")); + } + Some(Value::Gauge(value)) => { + data.push(format!("\"{name}\": {value}")); + } + Some(Value::Other(_)) => { + let percentiles = metrics_snapshot.percentiles(metric.name()); + + for (label, _percentile, value) in percentiles { + data.push(format!("\"{name}/{label}\": {value}",)); + } + } + _ => continue, + } + } + + data.sort(); + + data +} diff --git a/src/server/segcache/src/tokio/ascii.rs b/src/server/segcache/src/tokio/ascii.rs new file mode 100644 index 00000000..b2c9079e --- /dev/null +++ b/src/server/segcache/src/tokio/ascii.rs @@ -0,0 +1,105 @@ +use crate::Config; +use crate::Parser; +use crate::Storage; + +use ::config::BufConfig; +use protocol_common::Execute; +use protocol_memcache::{Compose, Parse}; +use session::{Buf, BufMut, Buffer}; + +use parking_lot::Mutex; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use std::borrow::{Borrow, BorrowMut}; +use std::io::ErrorKind; +use std::sync::Arc; + +pub async fn run(config: Arc, storage: Storage, parser: Parser) { + let listener = std::net::TcpListener::bind(config.listen()).unwrap(); + let listener = TcpListener::from_std(listener).unwrap(); + + let storage = Arc::new(Mutex::new(storage)); + + loop { + if let Ok((mut socket, _)) = listener.accept().await { + if socket.set_nodelay(true).is_err() { + continue; + } + + let buf_size = config.buf().size(); + let storage = storage.clone(); + + tokio::spawn(async move { + // initialize parser and the read and write bufs + // let parser = protocol_memcache::RequestParser::new(); + let mut read_buffer = Buffer::new(buf_size); + let mut write_buffer = Buffer::new(buf_size); + + loop { + // read from the socket + match socket.read(read_buffer.borrow_mut()).await { + Ok(0) => { + // socket was closed, return to close + return; + } + Ok(n) => { + // bytes received, advance read buffer + // to make them available for parsing + unsafe { + read_buffer.advance_mut(n); + } + } + Err(_) => { + // some other error occurred, return to + // close + return; + } + }; + + // parse the read buffer + let request = match parser.parse(read_buffer.borrow()) { + Ok(request) => { + // got a complete request, consume the + // bytes for the request by advancing + // the read buffer + let consumed = request.consumed(); + read_buffer.advance(consumed); + + request + } + Err(e) => match e.kind() { + ErrorKind::WouldBlock => { + // incomplete request, loop to read + // again + continue; + } + _ => { + // some parse error, return to close + return; + } + }, + }; + + // execute the request + let response = { + let mut storage = storage.lock(); + (*storage).execute(&request.into_inner()) + }; + + // write the response into the buffer + response.compose(&mut write_buffer); + + // flush the write buffer, return to close on + // error + if socket.write_all(write_buffer.borrow()).await.is_err() { + return; + } + + // clear the write buffer + write_buffer.clear(); + } + }); + } + } +} diff --git a/src/server/segcache/src/tokio/grpc.rs b/src/server/segcache/src/tokio/grpc.rs new file mode 100644 index 00000000..cad2319a --- /dev/null +++ b/src/server/segcache/src/tokio/grpc.rs @@ -0,0 +1,41 @@ +use crate::{Config, RUNNING}; + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use tonic::transport::Server as TonicServer; +use tonic::{Request as TonicRequest, Response as TonicResponse, Status as TonicStatus}; + +pub mod pingpong { + tonic::include_proto!("pingpong"); +} + +use pingpong::ping_server::{Ping, PingServer}; +use pingpong::{PingRequest, PongResponse}; + +#[derive(Debug, Default)] +pub struct Server {} + +#[tonic::async_trait] +impl Ping for Server { + async fn ping( + &self, + _request: TonicRequest, + ) -> Result, TonicStatus> { + Ok(TonicResponse::new(PongResponse {})) + } +} + +pub async fn run(config: Arc) { + tokio::spawn(async move { + if let Err(e) = TonicServer::builder() + .add_service(PingServer::new(Server::default())) + .serve(config.listen()) + .await + { + error!("{e}"); + }; + + RUNNING.store(false, Ordering::Relaxed); + }); +} diff --git a/src/server/segcache/src/tokio/http2.rs b/src/server/segcache/src/tokio/http2.rs new file mode 100644 index 00000000..285e6bbf --- /dev/null +++ b/src/server/segcache/src/tokio/http2.rs @@ -0,0 +1,106 @@ +use crate::Config; + +use session::REQUEST_LATENCY; + +use bytes::BytesMut; +use chrono::Utc; +use http::{HeaderMap, HeaderValue, Version}; +use tokio::net::TcpListener; + +use std::sync::Arc; +use std::time::Instant; + +pub async fn run(config: Arc) { + let listener = TcpListener::bind(config.listen()).await.unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = stream.set_nodelay(true).is_err(); + + tokio::task::spawn(async move { + match ::h2::server::handshake(stream).await { + Ok(mut conn) => { + loop { + match conn.accept().await { + Some(Ok((request, mut sender))) => { + let start = Instant::now(); + + tokio::spawn(async move { + let (_parts, mut body) = request.into_parts(); + + let mut content = BytesMut::new(); + + // receive all request body content + while let Some(data) = body.data().await { + if data.is_err() { + // TODO(bmartin): increment error stats + return; + } + + let data = data.unwrap(); + + content.extend_from_slice(&data); + let _ = + body.flow_control().release_capacity(data.len()); + } + + // we don't need the trailers, but read them here + if body.trailers().await.is_err() { + // TODO(bmartin): increment error stats + return; + } + + let mut date = + HeaderValue::from_str(&Utc::now().to_rfc2822()) + .unwrap(); + date.set_sensitive(true); + + // build our response + let response = http::response::Builder::new() + .status(200) + .version(Version::HTTP_2) + .header("content-type", "application/grpc") + .header("date", date) + .body(()) + .unwrap(); + + let content = BytesMut::zeroed(5); + + let mut trailers = HeaderMap::new(); + trailers.append("grpc-status", 0.into()); + + // send the response + if let Ok(mut stream) = + sender.send_response(response, false) + { + if stream.send_data(content.into(), false).is_ok() + && stream.send_trailers(trailers).is_ok() + { + let stop = Instant::now(); + let latency = stop.duration_since(start).as_nanos(); + + let _ = REQUEST_LATENCY.increment(latency as _); + } + } + + // TODO(bmartin): increment error stats + }); + } + Some(Err(e)) => { + eprintln!("error: {e}"); + break; + } + None => { + continue; + } + } + } + } + Err(e) => { + eprintln!("error during handshake: {e}"); + } + } + }); + } + } +} diff --git a/src/server/segcache/src/tokio/http3.rs b/src/server/segcache/src/tokio/http3.rs new file mode 100644 index 00000000..1f95ba77 --- /dev/null +++ b/src/server/segcache/src/tokio/http3.rs @@ -0,0 +1,130 @@ +use crate::Config; + +use common::ssl::TlsConfig as TlsConfigTrait; +use config::TlsConfig; +use session::REQUEST_LATENCY; + +use bytes::{Buf, BytesMut}; +use chrono::Utc; +use http::{HeaderMap, HeaderValue, Version}; +use quinn::crypto::rustls::QuicServerConfig; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; + +use std::sync::Arc; +use std::time::Instant; + +pub async fn run(config: Arc) { + let cert_file = config + .tls() + .certificate() + .expect("no certificate configured"); + let cert_content = std::fs::read(cert_file).expect("failed to read cert"); + let cert = CertificateDer::from(cert_content); + + let key_file = config + .tls() + .private_key() + .expect("no private key configured"); + let key_content = std::fs::read(key_file).expect("failed to read private key"); + let key = PrivateKeyDer::try_from(key_content).expect("failed to load private key"); + + let mut tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert], key) + .expect("error configuring tls"); + + tls_config.max_early_data_size = u32::MAX; + tls_config.alpn_protocols = vec!["h3".into()]; + + let quic_config = QuicServerConfig::try_from(tls_config).expect("failed to configure quic"); + + let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config)); + let endpoint = quinn::Endpoint::server(server_config, config.listen()) + .expect("failed to start quic endpoint"); + + loop { + if let Some(incoming_conn) = endpoint.accept().await { + tokio::spawn(async move { + if let Ok(conn) = incoming_conn.await { + if let Ok(mut conn) = + h3::server::Connection::new(h3_quinn::Connection::new(conn)).await + { + loop { + match conn.accept().await { + Ok(Some((request, mut stream))) => { + let start = Instant::now(); + + tokio::spawn(async move { + let (_parts, _body) = request.into_parts(); + + let mut content = BytesMut::new(); + + while let Ok(data) = stream.recv_data().await { + if let Some(mut data) = data { + while data.has_remaining() { + let chunk: &[u8] = data.chunk(); + content.extend_from_slice(chunk); + data.advance(chunk.len()); + } + } else { + break; + } + } + + if let Ok(_trailers) = stream.recv_trailers().await { + let date = + HeaderValue::from_str(&Utc::now().to_rfc2822()) + .unwrap(); + + let response = http::response::Builder::new() + .status(200) + .version(Version::HTTP_3) + .header("content-type", "application/grpc") + .header("date", date) + .body(()) + .unwrap(); + + let content = BytesMut::zeroed(5); + + let mut trailers = HeaderMap::new(); + trailers.append("grpc-status", 0.into()); + + if stream.send_response(response).await.is_err() { + return; + } + + if stream.send_data(content).await.is_err() { + return; + } + + if stream.send_trailers(trailers).await.is_err() { + return; + } + + let stop = Instant::now(); + let latency = stop.duration_since(start).as_nanos(); + + let _ = REQUEST_LATENCY.increment(latency as _); + } + }); + } + Ok(None) => { + // break if no Request is accepted + break; + } + Err(err) => { + match err.get_error_level() { + // break on connection errors + h3::error::ErrorLevel::ConnectionError => break, + // continue on stream errors + h3::error::ErrorLevel::StreamError => continue, + } + } + } + } + } + } + }); + } + } +} diff --git a/src/server/segcache/src/tokio/metrics.rs b/src/server/segcache/src/tokio/metrics.rs new file mode 100644 index 00000000..cf8e951d --- /dev/null +++ b/src/server/segcache/src/tokio/metrics.rs @@ -0,0 +1,198 @@ +use metriken::{histogram, AtomicHistogram, RwLockHistogram, Value}; + +use std::collections::HashMap; +use std::time::SystemTime; + +pub static PERCENTILES: &[(&str, f64)] = &[ + ("p25", 25.0), + ("p50", 50.0), + ("p75", 75.0), + ("p90", 90.0), + ("p99", 99.0), + ("p999", 99.9), + ("p9999", 99.99), +]; + +pub struct MetricsSnapshot { + pub current: SystemTime, + pub previous: SystemTime, + pub counters: CountersSnapshot, + pub histograms: HistogramsSnapshot, +} + +impl Default for MetricsSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl MetricsSnapshot { + pub fn new() -> Self { + let now = SystemTime::now(); + + Self { + current: now, + previous: now, + counters: Default::default(), + histograms: Default::default(), + } + } + + pub fn update(&mut self) { + self.previous = self.current; + self.current = SystemTime::now(); + + self.counters.update(); + self.histograms.update(); + } + + pub fn percentiles(&self, name: &str) -> Vec<(String, f64, u64)> { + self.histograms.percentiles(name) + } +} + +pub struct HistogramsSnapshot { + pub previous: HashMap, + pub deltas: HashMap, +} + +impl Default for HistogramsSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl HistogramsSnapshot { + pub fn new() -> Self { + let mut current = HashMap::new(); + + for metric in &metriken::metrics() { + match metric.value() { + Some(Value::Other(other)) => { + let histogram = if let Some(histogram) = other.downcast_ref::() + { + histogram.load() + } else if let Some(histogram) = other.downcast_ref::() { + histogram.load() + } else { + None + }; + + if let Some(histogram) = histogram { + current.insert(metric.name().to_string(), histogram); + } + } + _ => continue, + } + } + + let deltas = current.clone(); + + Self { + previous: current, + deltas, + } + } + + pub fn update(&mut self) { + for metric in &metriken::metrics() { + match metric.value() { + Some(Value::Other(other)) => { + let histogram = if let Some(histogram) = other.downcast_ref::() + { + histogram.load() + } else if let Some(histogram) = other.downcast_ref::() { + histogram.load() + } else { + None + }; + + if let Some(histogram) = histogram { + let name = metric.name().to_string(); + + if let Some(previous) = self.previous.get(&name) { + self.deltas + .insert(name.clone(), histogram.wrapping_sub(previous).unwrap()); + } + + self.previous.insert(name, histogram); + } + } + _ => continue, + } + } + } + + pub fn percentiles(&self, metric: &str) -> Vec<(String, f64, u64)> { + let mut result = Vec::new(); + + let percentiles: Vec = PERCENTILES + .iter() + .map(|(_, percentile)| *percentile) + .collect(); + + if let Some(snapshot) = self.deltas.get(metric) { + if let Ok(Some(percentiles)) = snapshot.percentiles(&percentiles) { + for ((label, _), (percentile, bucket)) in PERCENTILES.iter().zip(percentiles.iter()) + { + result.push((label.to_string(), *percentile, bucket.end())); + } + } + } + + result + } +} + +#[derive(Clone)] +pub struct CountersSnapshot { + pub current: HashMap, + pub previous: HashMap, +} + +impl Default for CountersSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl CountersSnapshot { + pub fn new() -> Self { + let mut current = HashMap::new(); + let previous = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + let metric = metric.name().to_string(); + + if let Some(_counter) = any.downcast_ref::() { + current.insert(metric.clone(), 0); + } + } + Self { current, previous } + } + + pub fn update(&mut self) { + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(counter) = any.downcast_ref::() { + if let Some(old_value) = self + .current + .insert(metric.name().to_string(), counter.value()) + { + self.previous.insert(metric.name().to_string(), old_value); + } + } + } + } +} diff --git a/src/server/segcache/src/tokio/mod.rs b/src/server/segcache/src/tokio/mod.rs new file mode 100644 index 00000000..0716815b --- /dev/null +++ b/src/server/segcache/src/tokio/mod.rs @@ -0,0 +1,83 @@ +use crate::config::Config; +use crate::*; +use logger::Drain; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use ::tokio::runtime::Builder; +use ::tokio::sync::RwLock; +use ::tokio::time::sleep; +use metriken::Lazy; + +use std::sync::Arc; + +mod admin; +mod ascii; +mod metrics; + +static METRICS_SNAPSHOT: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(Default::default()))); + +pub fn spawn(config: Config, mut log: Box) { + let config = Arc::new(config); + + // initialize async runtime for control plane + let control_runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .expect("failed to initialize tokio runtime"); + + // spawn logging thread + control_runtime.spawn(async move { + while RUNNING.load(Ordering::Relaxed) { + sleep(Duration::from_millis(1)).await; + let _ = log.flush(); + } + let _ = log.flush(); + }); + + // spawn thread to maintain histogram snapshots + { + let interval = config.metrics.interval(); + control_runtime.spawn(async move { + while RUNNING.load(Ordering::Relaxed) { + // acquire a lock and update the snapshots + { + let mut snapshots = METRICS_SNAPSHOT.write().await; + snapshots.update(); + } + + // delay until next update + sleep(interval).await; + } + }); + } + + // spawn the admin thread + control_runtime.spawn(admin::http(config.clone())); + + // initialize storage + let storage = Storage::new(&*config).expect("failed to initialize storage"); + + // initialize parser + let parser = Parser::new() + .max_value_size(config.seg.segment_size() as usize) + .time_type(config.time.time_type()); + + // initialize async runtime for the data plane + let data_runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(config.worker.threads()) + .build() + .expect("failed to initialize tokio runtime"); + + data_runtime.block_on(async move { ascii::run(config, storage, parser).await }); + + while RUNNING.load(Ordering::Relaxed) { + std::thread::sleep(Duration::from_millis(250)); + } + + data_runtime.shutdown_timeout(std::time::Duration::from_millis(100)); + control_runtime.shutdown_timeout(std::time::Duration::from_millis(100)); +} diff --git a/src/server/segcache/tests/common.rs b/src/server/segcache/tests/common.rs deleted file mode 100644 index 9f336817..00000000 --- a/src/server/segcache/tests/common.rs +++ /dev/null @@ -1,352 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! This module provides a set of integration tests and a function to run the -//! tests against a Segcache instance. This allows us to run the same test suite -//! for multiple server configurations. - -use logger::*; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::time::Duration; - -pub fn tests() { - debug!("beginning tests"); - println!(); - - // get and gets on a key that is not in the cache results in a miss - test("get miss", &[("get 0\r\n", Some("END\r\n"))]); - test("gets miss", &[("gets 0\r\n", Some("END\r\n"))]); - - // check that we can store and retrieve a key - test( - "set and get", - &[ - // store the key - ("set 1 0 0 1\r\n1\r\n", Some("STORED\r\n")), - // retrieve the key - ("get 1\r\n", Some("VALUE 1 0 1\r\n1\r\nEND\r\n")), - ], - ); - - test( - "cas not_found", - &[ - // try to cas on key that is not in the cache - ("cas 2 0 0 1 0\r\n0\r\n", Some("NOT_FOUND\r\n")), - // confirm that the key is still not in the cache - ("get 2\r\n", Some("END\r\n")), - ], - ); - - test( - "cas exists", - &[ - // store the key - ("set 3 0 0 1\r\n3\r\n", Some("STORED\r\n")), - // try to cas with a bad cas value - ("cas 3 0 0 1 0\r\n0\r\n", Some("EXISTS\r\n")), - // check that it was not updated - ("get 3\r\n", Some("VALUE 3 0 1\r\n3\r\nEND\r\n")), - ], - ); - - test( - "cas stored", - &[ - // store the key - ("set 4 0 0 1\r\n4\r\n", Some("STORED\r\n")), - // cas with the correct cas value - ("cas 4 0 0 1 1\r\n0\r\n", Some("STORED\r\n")), - // check that the value was updated - ("get 4\r\n", Some("VALUE 4 0 1\r\n0\r\nEND\r\n")), - ], - ); - - test( - "add not_stored", - &[ - // store the key - ("set 5 0 0 1\r\n5\r\n", Some("STORED\r\n")), - // try to add a key that exists - ("add 5 0 0 1\r\n0\r\n", Some("NOT_STORED\r\n")), - // check that the value was not updated - ("get 5\r\n", Some("VALUE 5 0 1\r\n5\r\nEND\r\n")), - ], - ); - - test( - "add stored", - &[ - // try to add a new key - ("add 6 0 0 1\r\n6\r\n", Some("STORED\r\n")), - // check that the key exists now - ("get 6\r\n", Some("VALUE 6 0 1\r\n6\r\nEND\r\n")), - ], - ); - - test( - "replace not_stored", - &[ - // try to replace a key that does not exist - ("replace 7 0 0 1\r\n7\r\n", Some("NOT_STORED\r\n")), - // check that the value was not stored - ("get 7\r\n", Some("END\r\n")), - ], - ); - - test( - "replace stored", - &[ - // store the key - ("set 8 0 0 1\r\n8\r\n", Some("STORED\r\n")), - // replace a key that does exist - ("replace 8 0 0 1\r\n0\r\n", Some("STORED\r\n")), - // check that the value was updated - ("get 8\r\n", Some("VALUE 8 0 1\r\n0\r\nEND\r\n")), - ], - ); - - test( - "set flags", - &[ - // store the key - ("set 9 42 0 1\r\n1\r\n", Some("STORED\r\n")), - // retrieve with correct flags - ("get 9\r\n", Some("VALUE 9 42 1\r\n1\r\nEND\r\n")), - ], - ); - - // test pipelined commands - test( - "pipelined get (key: 4 depth: 2)", - &[("get 10\r\nget 10\r\n", Some("END\r\nEND\r\n"))], - ); - test( - "pipelined get and invalid (key 4, depth 2)", - &[("get 11\r\n ", Some("END\r\n"))], - ); - test( - "pipelined get and add (key 4, depth 2)", - &[( - "get 12 \r\nadd 12 0 0 1\r\n1\r\n", - Some("END\r\nSTORED\r\n"), - )], - ); - test( - "pipelined get and set (key 5, depth 2)", - &[( - "get 13 \r\nset 13 0 0 1 \r\n1\r\n", - Some("END\r\nSTORED\r\n"), - )], - ); - test( - "pipelined set and get (key 6, depth 3)", - &[( - "set 14 0 0 2 \r\nhi\r\nset 14 0 0 6\r\nhello!\r\nget 14 \r\n", - Some("STORED\r\nSTORED\r\nVALUE 14 0 6\r\nhello!\r\nEND\r\n"), - )], - ); - - // test increment - test( - "incr not_found", - &[("incr 15 1\r\n", Some("NOT_FOUND\r\n"))], - ); - test( - "incr stored", - &[ - // set the key - ("set 15 0 0 1\r\n0\r\n", Some("STORED\r\n")), - // increment it - ("incr 15 1\r\n", Some("1\r\n")), - // increment it again - ("incr 15 2\r\n", Some("3\r\n")), - ], - ); - test( - "incr error", - &[ - // set the key - ("set 16 0 0 1\r\na\r\n", Some("STORED\r\n")), - // increment non-numeric value is an error - ("incr 16 1\r\n", Some("ERROR\r\n")), - ], - ); - - // test decrement - test( - "decr not_found", - &[("decr 17 1\r\n", Some("NOT_FOUND\r\n"))], - ); - test( - "decr stored", - &[ - // set the key - ("set 18 0 0 2\r\n10\r\n", Some("STORED\r\n")), - // decrement it - ("decr 18 1\r\n", Some("9\r\n")), - // decrement it again - ("decr 18 2\r\n", Some("7\r\n")), - // decrement it again, saturates at zero - ("decr 18 255\r\n", Some("0\r\n")), - ], - ); - - // test unsupported commands - test("append", &[("append 7 0 0 1\r\n0\r\n", Some("ERROR\r\n"))]); - test( - "prepend", - &[("prepend 8 0 0 1\r\n0\r\n", Some("ERROR\r\n"))], - ); - - std::thread::sleep(Duration::from_millis(500)); -} - -// opens a new connection, operating on request + response pairs from the -// provided data. -fn test(name: &str, data: &[(&str, Option<&str>)]) { - info!("testing: {}", name); - debug!("connecting to server"); - let mut stream = TcpStream::connect("127.0.0.1:12321").expect("failed to connect"); - stream - .set_read_timeout(Some(Duration::from_millis(250))) - .expect("failed to set read timeout"); - stream - .set_write_timeout(Some(Duration::from_millis(250))) - .expect("failed to set write timeout"); - - debug!("sending request"); - for (request, response) in data { - match stream.write(request.as_bytes()) { - Ok(bytes) => { - if bytes == request.len() { - debug!("full request sent"); - } else { - error!("incomplete write"); - panic!("status: failed\n"); - } - } - Err(_) => { - error!("error sending request"); - panic!("status: failed\n"); - } - } - - std::thread::sleep(Duration::from_millis(10)); - let mut buf = vec![0; 4096]; - - if let Some(response) = response { - if stream.read(&mut buf).is_err() { - std::thread::sleep(Duration::from_millis(500)); - panic!("error reading response"); - } else if response.as_bytes() != &buf[0..response.len()] { - error!("expected: {:?}", response.as_bytes()); - error!("received: {:?}", &buf[0..response.len()]); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } else { - debug!("correct response"); - } - assert_eq!(response.as_bytes(), &buf[0..response.len()]); - } else if let Err(e) = stream.read(&mut buf) { - if e.kind() == std::io::ErrorKind::WouldBlock { - debug!("got no response"); - } else { - error!("error reading response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - } else { - error!("expected no response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - - if data.len() > 1 { - std::thread::sleep(Duration::from_millis(10)); - } - } - info!("status: passed\n"); -} - -pub fn admin_tests() { - debug!("beginning admin tests"); - println!(); - - admin_test( - "version", - &[( - "version\r\n", - Some(&format!("VERSION {}\r\n", env!("CARGO_PKG_VERSION"))), - )], - ); -} - -// opens a new connection to the admin port, sends a request, and checks the response. -fn admin_test(name: &str, data: &[(&str, Option<&str>)]) { - info!("testing: {}", name); - debug!("connecting to server"); - let mut stream = TcpStream::connect("127.0.0.1:9999").expect("failed to connect"); - stream - .set_read_timeout(Some(Duration::from_millis(250))) - .expect("failed to set read timeout"); - stream - .set_write_timeout(Some(Duration::from_millis(250))) - .expect("failed to set write timeout"); - - debug!("sending request"); - for (request, response) in data { - match stream.write(request.as_bytes()) { - Ok(bytes) => { - if bytes == request.len() { - debug!("full request sent"); - } else { - error!("incomplete write"); - panic!("status: failed\n"); - } - } - Err(_) => { - error!("error sending request"); - panic!("status: failed\n"); - } - } - - std::thread::sleep(Duration::from_millis(10)); - let mut buf = vec![0; 4096]; - - if let Some(response) = response { - if stream.read(&mut buf).is_err() { - std::thread::sleep(Duration::from_millis(500)); - panic!("error reading response"); - } else if response.as_bytes() != &buf[0..response.len()] { - error!("expected: {:?}", response.as_bytes()); - error!("received: {:?}", &buf[0..response.len()]); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } else { - debug!("correct response"); - } - assert_eq!(response.as_bytes(), &buf[0..response.len()]); - } else if let Err(e) = stream.read(&mut buf) { - if e.kind() == std::io::ErrorKind::WouldBlock { - debug!("got no response"); - } else { - error!("error reading response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - } else { - error!("expected no response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - - if data.len() > 1 { - std::thread::sleep(Duration::from_millis(10)); - } - } - info!("status: passed\n"); -} diff --git a/src/server/segcache/tests/integration.rs b/src/server/segcache/tests/integration.rs deleted file mode 100644 index 481860bc..00000000 --- a/src/server/segcache/tests/integration.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! This test module runs the integration test suite against a single-threaded -//! instance of Segcache. - -mod common; - -#[macro_use] -extern crate logger; - -use crate::common::*; - -use config::SegcacheConfig; -use pelikan_segcache_rs::Segcache; - -use std::time::Duration; - -fn main() { - debug!("launching server"); - let server = Segcache::new(SegcacheConfig::default()).expect("failed to launch segcache"); - - // wait for server to startup. duration is chosen to be longer than we'd - // expect startup to take in a slow ci environment. - std::thread::sleep(Duration::from_secs(10)); - - tests(); - - admin_tests(); - - // shutdown server and join - info!("shutdown..."); - server.shutdown(); - - info!("passed!"); -} diff --git a/src/server/segcache/tests/integration_multi.rs b/src/server/segcache/tests/integration_multi.rs deleted file mode 100644 index fb0cfa7b..00000000 --- a/src/server/segcache/tests/integration_multi.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! This test module runs the integration test suite against a multi-threaded -//! instance of Segcache. - -#[macro_use] -extern crate logger; - -mod common; - -use crate::common::*; - -use config::{SegcacheConfig, WorkerConfig}; -use pelikan_segcache_rs::Segcache; - -use std::time::Duration; - -fn main() { - debug!("launching multi-worker server"); - let mut config = SegcacheConfig::default(); - config.worker_mut().set_threads(2); - let server = Segcache::new(config).expect("failed to launch segcache"); - - // wait for server to startup. duration is chosen to be longer than we'd - // expect startup to take in a slow ci environment. - std::thread::sleep(Duration::from_secs(10)); - - tests(); - - admin_tests(); - - // shutdown server and join - info!("shutdown..."); - server.shutdown(); - - info!("passed!"); -} diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index d10d4623..2df5b4fd 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -14,5 +14,5 @@ clocksource = { workspace = true } common = { path = "../common", default-features = false } log = { workspace = true } metriken = { workspace = true } -pelikan-net = { workspace = true, default-features = false } +pelikan-net = { workspace = true } protocol-common = { path = "../protocol/common", default-features = false }