diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.lock b/Cargo.lock index 457c5215cc..1a9eaab365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,6 +1330,44 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "console-api" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1480,6 +1518,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -2311,6 +2358,19 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -2549,6 +2609,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3196,6 +3268,7 @@ name = "observe" version = "0.1.0" dependencies = [ "atty", + "console-subscriber", "futures", "once_cell", "pin-project-lite", @@ -3605,6 +3678,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -5002,9 +5107,20 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.3.0" @@ -5117,6 +5233,33 @@ dependencies = [ "winnow 0.6.13", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -5125,9 +5268,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index eea3c60994..7e02f8dbf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ tempfile = "3.10.1" time = { version = "0.3.36", features = ["macros"] } thiserror = "1.0.61" toml = "0.8.14" -tokio = "1.38.0" +tokio = { version = "1.38.0", features = ["tracing"] } tokio-stream = { version = "0.1.15", features = ["sync"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" diff --git a/README.md b/README.md index 8f56685bc2..a379927358 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,27 @@ ANVIL_IP_ADDR=0.0.0.0 anvil \ --timestamp 1577836800 ``` +### Profiling + +The most important binaries support [tokio-console](https://github.com/tokio-rs/console) to allow you a could look inside the tokio runtime. + +Simply enable the feature by passing `--enable-tokio-console true` when running a binary and then in another shell, run + +``` +cargo install --locked tokio-console +tokio-console +``` + + +### Changing Log Filters + +It's possible to change the tracing log filter while the process is running. This can be useful to debug an error that requires more verbose logs but which might no longer appear after restarting the system. + +Each process opens a UNIX socket at `/tmp/log_filter_override__.sock`. To change the log filter connect to it with `nc -U ` and enter a new log filter. +You can also reset the log filter to the filter the program was initially started with by entering `reset`. + +See [here](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives) for documentation on the supported log filter format. + ## Running the Services Locally ### Prerequisites diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index c5e5895d3a..b04bd16417 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -20,6 +20,7 @@ use { ::observe::metrics, anyhow::Result, database::order_events::OrderEventLabel, + itertools::Itertools, model::solver_competition::{ CompetitionAuction, Order, @@ -130,6 +131,14 @@ impl RunLoop { }; let competition_simulation_block = self.eth.current_block().borrow().number; + let considered_orders = solutions + .iter() + .flat_map(|solution| solution.solution.order_ids().copied()) + .unique() + .collect(); + self.persistence + .store_order_events(considered_orders, OrderEventLabel::Considered); + // TODO: Keep going with other solutions until some deadline. if let Some(Participant { driver, solution }) = solutions.last() { tracing::info!(driver = %driver.name, solution = %solution.id(), "winner"); @@ -146,10 +155,6 @@ impl RunLoop { } }; - let order_uids = solution.order_ids().copied().collect(); - self.persistence - .store_order_events(order_uids, OrderEventLabel::Considered); - let winner = solution.solver().into(); let winning_score = solution.score().get().0; let reference_score = solutions diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index e4db7decc4..3fb1588a39 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" license = "MIT OR Apache-2.0" [dependencies] -atty = "0.2.14" +atty = "0.2" +console-subscriber = "0.3.0" futures = { workspace = true } once_cell = { workspace = true } pin-project-lite = "0.2.14" diff --git a/crates/observe/build.rs b/crates/observe/build.rs new file mode 100644 index 0000000000..b0ce3de9b9 --- /dev/null +++ b/crates/observe/build.rs @@ -0,0 +1,4 @@ +fn main() { + // Make build system aware of custom config flags to avoid clippy warnings + println!("cargo::rustc-check-cfg=cfg(tokio_unstable)"); +} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index e54bd43dc0..2362e62415 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -6,3 +6,6 @@ pub mod metrics; pub mod panic_hook; pub mod request_id; pub mod tracing; + +#[cfg(unix)] +mod tracing_reload_handler; diff --git a/crates/observe/src/tracing.rs b/crates/observe/src/tracing.rs index 606052e34c..6efe47f414 100644 --- a/crates/observe/src/tracing.rs +++ b/crates/observe/src/tracing.rs @@ -1,8 +1,15 @@ use { + crate::tracing_reload_handler::spawn_reload_handler, std::{panic::PanicInfo, sync::Once}, time::macros::format_description, tracing::level_filters::LevelFilter, - tracing_subscriber::fmt::{time::UtcTime, writer::MakeWriterExt as _}, + tracing_subscriber::{ + fmt::{time::UtcTime, writer::MakeWriterExt as _}, + prelude::*, + util::SubscriberInitExt, + EnvFilter, + Layer, + }, }; /// Initializes tracing setup that is shared between the binaries. @@ -28,22 +35,67 @@ pub fn initialize_reentrant(env_filter: &str) { } fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { - // This is what kibana uses to separate multi line log messages. - let subscriber_builder = tracing_subscriber::fmt::fmt() - .with_timer(UtcTime::new(format_description!( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" - ))) - .with_env_filter(env_filter) - .with_ansi(atty::is(atty::Stream::Stdout)); - match stderr_threshold.into_level() { - Some(threshold) => subscriber_builder - .with_writer( - std::io::stderr - .with_max_level(threshold) - .or_else(std::io::stdout), - ) - .init(), - None => subscriber_builder.init(), + let initial_filter = env_filter.to_string(); + + // The `tracing` APIs are heavily generic to enable zero overhead. Unfortunately + // this leads to very annoying type constraints which can only be satisfied + // by literally copy and pasting the code so the compiler doesn't try to + // infer types that satisfy both the tokio-console and the regular case. + // It's tempting to resolve this mess by first configuring the `fmt_layer` and + // only then the `console_subscriber`. However, this setup was the only way + // I found that: + // 1. actually makes `tokio-console` work + // 2. prints logs if `tokio-console` is disabled + // 3. does NOT skip the next log following a `tracing::event!()`. These calls + // happen for example under the hood in `sqlx`. I don't understand what's + // actually causing that but at this point I'm just happy if all the features + // work correctly. + macro_rules! fmt_layer { + ($env_filter:expr, $stderr_threshold:expr) => {{ + tracing_subscriber::fmt::layer() + .with_writer( + std::io::stdout + .with_min_level( + $stderr_threshold + .into_level() + .unwrap_or(tracing::Level::ERROR), + ) + .or_else(std::io::stderr), + ) + .with_timer(UtcTime::new(format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ))) + .with_ansi(atty::is(atty::Stream::Stdout)) + .with_filter($env_filter) + }}; + } + + if cfg!(tokio_unstable) { + let (env_filter, reload_handle) = + tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); + + tracing_subscriber::registry() + .with(console_subscriber::spawn()) + .with(fmt_layer!(env_filter, stderr_threshold)) + .init(); + + if cfg!(unix) { + spawn_reload_handler(initial_filter, reload_handle); + } + } else { + let (env_filter, reload_handle) = + tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); + + tracing_subscriber::registry() + // Without this the subscriber ignores the next log after an `tracing::event!()` which + // `sqlx` uses under the hood. + .with(tracing::level_filters::LevelFilter::TRACE) + .with(fmt_layer!(env_filter, stderr_threshold)) + .init(); + + if cfg!(unix) { + spawn_reload_handler(initial_filter, reload_handle); + } } } diff --git a/crates/observe/src/tracing_reload_handler.rs b/crates/observe/src/tracing_reload_handler.rs new file mode 100644 index 0000000000..71d5f3c817 --- /dev/null +++ b/crates/observe/src/tracing_reload_handler.rs @@ -0,0 +1,113 @@ +use { + tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{UnixListener, UnixStream}, + }, + tracing_subscriber::{reload, EnvFilter}, +}; + +/// Spawns a new thread that listens for connections to a UNIX socket +/// at "/tmp/log_filter_override__". +/// Whenever a line gets writtedn to that socket the reload handler +/// uses it as the new log filter. +/// To reset to the original log filter send the message "reset". +pub(crate) fn spawn_reload_handler( + initial_filter: String, + reload_handle: reload::Handle, +) { + tokio::spawn(async move { + let id = std::process::id(); + let name = binary_name().unwrap_or_default(); + + let socket_path = format!("/tmp/log_filter_override_{name}_{id}.sock"); + tracing::warn!(file = socket_path, "open log filter reload socket"); + let handle = SocketHandle { + listener: UnixListener::bind(&socket_path).expect("socket handle is unique"), + socket_path, + }; + + loop { + handle_connection(&handle.listener, &initial_filter, &reload_handle).await; + } + }); +} + +struct SocketHandle { + socket_path: String, + listener: UnixListener, +} + +impl Drop for SocketHandle { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.socket_path); + } +} + +fn binary_name() -> Option { + Some( + std::env::current_exe() + .ok()? + .file_name()? + .to_str()? + .to_string(), + ) +} + +async fn handle_connection( + listener: &UnixListener, + initial_filter: &str, + reload_handle: &reload::Handle, +) { + let Ok((mut socket, _addr)) = listener.accept().await else { + tracing::warn!("failed to accept UNIX socket connection"); + return; + }; + + let _ = socket + .write_all(format!("log filter on process startup was: {initial_filter:?}\n",).as_bytes()) + .await; + + loop { + let message = read_line(&mut socket).await; + + let filter = match message.as_deref() { + Some("") => { + log(&mut socket, "client terminated connection".into()).await; + break; + } + None => { + log(&mut socket, "failed to read message from socket".into()).await; + continue; + } + Some("reset") => initial_filter, + Some(message) => message, + }; + + let Ok(env_filter) = EnvFilter::try_new(filter) else { + log(&mut socket, format!("failed to parse filter: {filter:?}")).await; + continue; + }; + + match reload_handle.reload(env_filter) { + Ok(_) => log(&mut socket, format!("applied new filter: {filter:?}")).await, + Err(err) => log(&mut socket, format!("failed to apply filter: {err:?}")).await, + } + } +} + +async fn read_line(socket: &mut UnixStream) -> Option { + let mut reader = BufReader::new(socket); + let mut buffer = String::new(); + reader.read_line(&mut buffer).await.ok()?; + Some(buffer.trim().to_owned()) +} + +/// Logs the message in this process' logs and reports it back to the +/// connected socket. +async fn log(socket: &mut UnixStream, message: String) { + // Use a fairly high log level to improve chances that this actually gets logged + // when somebody messed with the log filter. + tracing::warn!(message); + let _ = socket.write_all(message.as_bytes()).await; + let _ = socket.write_all(b"\n").await; +} diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 24b4e01e44..bf435e797b 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -412,9 +412,8 @@ pub async fn run(args: Arguments) { shutdown_sender.send(()).expect("failed to send shutdown signal"); match tokio::time::timeout(Duration::from_secs(10), serve_api).await { Ok(inner) => inner.expect("API failed during shutdown"), - Err(_) => tracing::error!("API shutdown exceeded timeout"), + Err(_) => panic!("API shutdown exceeded timeout"), } - std::process::exit(0); } }; }