diff --git a/Cargo.lock b/Cargo.lock index b943dc5a8..b69555b8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1244,7 +1244,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core 0.3.4", - "bitflags", + "bitflags 1.3.2", "bytes", "futures-util", "http 0.2.11", @@ -1431,7 +1431,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.6", "thiserror", ] @@ -1471,6 +1471,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" + [[package]] name = "bitmaps" version = "2.1.0" @@ -1860,7 +1866,7 @@ checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" dependencies = [ "ansi_term", "atty", - "bitflags", + "bitflags 1.3.2", "strsim 0.8.0", "textwrap 0.11.0", "unicode-width", @@ -1874,7 +1880,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_lex 0.2.4", "indexmap 1.9.3", "once_cell", @@ -2319,6 +2325,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "custom-tracing" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum 0.7.4", + "helium-proto", + "http 0.2.11", + "notify", + "tokio", + "tower-http", + "tower-layer", + "tracing", + "tracing-subscriber", +] + [[package]] name = "cxx" version = "1.0.82" @@ -2901,6 +2923,18 @@ dependencies = [ "uuid", ] +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.4.1", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -2944,6 +2978,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.28" @@ -3358,7 +3401,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.1.16", + "getrandom 0.2.10", "k256", "lazy_static", "multihash", @@ -3804,6 +3847,7 @@ dependencies = [ "chrono", "clap 4.4.8", "config", + "custom-tracing", "file-store", "futures", "futures-util", @@ -3829,6 +3873,26 @@ dependencies = [ "triggered", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -4136,6 +4200,26 @@ dependencies = [ "typewit", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy-distributor" version = "0.1.0" @@ -4454,6 +4538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -4677,7 +4762,7 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", "memoffset 0.7.1", @@ -4694,6 +4779,34 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.5.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "walkdir", + "windows-sys 0.48.0", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -4917,6 +5030,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.10.1" @@ -5570,7 +5689,7 @@ version = "10.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -5613,7 +5732,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -5622,7 +5741,16 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", +] + +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", ] [[package]] @@ -6050,6 +6178,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.20" @@ -6144,7 +6281,7 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -6753,7 +6890,7 @@ dependencies = [ "array-bytes", "base64 0.21.7", "bincode", - "bitflags", + "bitflags 1.3.2", "blake3", "borsh 0.10.3", "borsh 0.9.3", @@ -6975,7 +7112,7 @@ dependencies = [ "assert_matches", "base64 0.21.7", "bincode", - "bitflags", + "bitflags 1.3.2", "borsh 0.10.3", "bs58 0.4.0", "bytemuck", @@ -7343,7 +7480,7 @@ dependencies = [ "ahash 0.7.6", "atoi", "base64 0.13.1", - "bitflags", + "bitflags 1.3.2", "byteorder", "bytes", "chrono", @@ -7538,7 +7675,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "system-configuration-sys", ] @@ -7882,7 +8019,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ - "bitflags", + "bitflags 1.3.2", "bytes", "futures-core", "futures-util", @@ -7938,6 +8075,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", ] [[package]] @@ -7947,12 +8096,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", + "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-log", ] [[package]] @@ -8005,7 +8157,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] @@ -8158,6 +8310,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcell" version = "0.1.3" @@ -8212,6 +8370,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" @@ -8673,7 +8841,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.9.9", + "sha2 0.10.6", "thiserror", "twox-hash", "xorf", diff --git a/Cargo.toml b/Cargo.toml index 721b4919c..64d33e0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,25 +3,26 @@ debug = true [workspace] members = [ - "boost_manager", - "db_store", - "denylist", - "file_store", - "ingest", - "iot_config", - "iot_packet_verifier", - "iot_verifier", - "metrics", - "mobile_config", - "mobile_config_cli", - "mobile_packet_verifier", - "mobile_verifier", - "poc_entropy", - "price", - "reward_index", - "reward_scheduler", - "solana", - "task_manager", + "boost_manager", + "custom_tracing", + "db_store", + "denylist", + "file_store", + "ingest", + "iot_config", + "iot_packet_verifier", + "iot_verifier", + "metrics", + "mobile_config", + "mobile_config_cli", + "mobile_packet_verifier", + "mobile_verifier", + "poc_entropy", + "price", + "reward_index", + "reward_scheduler", + "solana", + "task_manager", ] resolver = "2" @@ -32,14 +33,14 @@ edition = "2021" [workspace.dependencies] anchor-client = "0.29.0" -anyhow = {version = "1", features = ["backtrace"]} -bs58 = {version = "0.4", features=["check"]} +anyhow = { version = "1", features = ["backtrace"] } +bs58 = { version = "0.4", features = ["check"] } thiserror = "1" -clap = {version = "4", features = ["derive"]} -serde = {version = "1", features=["derive"]} +clap = { version = "4", features = ["derive"] } +serde = { version = "1", features = ["derive"] } serde_json = "1" http-serde = "1" -chrono = {version = "0", features = ["serde"]} +chrono = { version = "0", features = ["serde"] } tokio = { version = "1", default-features = false, features = [ "fs", "macros", @@ -48,38 +49,50 @@ tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "rt", "process", - "time" + "time", ] } tokio-stream = "0" -sqlx = {version = "0", features = [ +sqlx = { version = "0", features = [ "postgres", "uuid", "decimal", "chrono", "migrate", "macros", - "runtime-tokio-rustls" -]} -helium-anchor-gen = {git = "https://github.com/helium/helium-anchor-gen.git"} -helium-crypto = {version = "0.8.4", features=["sqlx-postgres", "multisig"]} -hextree = {git = "https://github.com/jaykickliter/HexTree", branch = "main", features = ["disktree"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} + "runtime-tokio-rustls", +] } +helium-anchor-gen = { git = "https://github.com/helium/helium-anchor-gen.git" } +helium-crypto = { version = "0.8.4", features = ["sqlx-postgres", "multisig"] } +hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ + "disktree", +] } +helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ + "services", +] } solana-client = "1.16" solana-sdk = "1.16" solana-program = "1.16" spl-token = "3.5.0" -reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} +reqwest = { version = "0", default-features = false, features = [ + "gzip", + "json", + "rustls-tls", +] } beacon = { git = "https://github.com/helium/proto", branch = "master" } humantime = "2" metrics = "0.21" metrics-exporter-prometheus = "0" tracing = "0" -tracing-subscriber = { version = "0", default-features=false, features = ["env-filter", "registry", "fmt"] } +tracing-subscriber = { version = "0", default-features = false, features = [ + "env-filter", + "registry", + "fmt", +] } rust_decimal = "1" rust_decimal_macros = "1" base64 = ">=0.21" sha2 = "0.10" -tonic = {version = "0.10", features = ["tls", "tls-roots"]} +tonic = { version = "0.10", features = ["tls", "tls-roots"] } http = "<=0.2" triggered = "0" futures = "*" @@ -88,9 +101,9 @@ prost = "*" pyth-sdk-solana = "=0.8" once_cell = "1" lazy_static = "1" -config = {version="0", default-features=false, features=["toml"]} -h3o = {version = "0", features = ["serde"]} -xorf = {version = "0", features = ["serde"] } +config = { version = "0", default-features = false, features = ["toml"] } +h3o = { version = "0", features = ["serde"] } +xorf = { version = "0", features = ["serde"] } bytes = "*" bincode = "1" twox-hash = "1" @@ -100,8 +113,8 @@ retainer = "*" rand = "0.8" itertools = "*" tokio-util = "0" -uuid = {version = "1", features = ["v4", "serde"]} -tower-http = {version = "0", features = ["trace"]} +uuid = { version = "1", features = ["v4", "serde"] } +tower-http = { version = "0", features = ["trace"] } [patch.crates-io] sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" } diff --git a/custom_tracing/Cargo.toml b/custom_tracing/Cargo.toml new file mode 100644 index 000000000..83ba8bd3a --- /dev/null +++ b/custom_tracing/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "custom-tracing" +version = "0.1.0" +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +notify = { version = "6", default-features = false } +anyhow = "1" +tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] } +tracing = "0" +tracing-subscriber = { version = "0", default-features = true, features = [ + "env-filter", + "registry", + "fmt", +] } +tower-http = { version = "0", features = ["trace"] } +tower-layer = { version = "0" } +axum = { version = "0.7", features = ["tracing"], optional = true } +helium-proto = { workspace = true, optional = true } +http = { workspace = true, optional = true } + + +[target.'cfg(target_os = "macos")'.dependencies] +notify = { version = "6", default-features = false, features = [ + "macos_fsevent", +] } + + +[features] +default = [] +http-1 = ["axum"] +grpc = ["helium-proto", "http"] diff --git a/custom_tracing/src/grpc_layer.rs b/custom_tracing/src/grpc_layer.rs new file mode 100644 index 000000000..bb9445ab0 --- /dev/null +++ b/custom_tracing/src/grpc_layer.rs @@ -0,0 +1,26 @@ +use helium_proto::services::Body; +use http::request::Request; +use tower_http::{ + classify::{GrpcErrorsAsFailures, SharedClassifier}, + trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer}, + LatencyUnit, +}; +use tracing::{Level, Span}; + +type GrpcLayer = + TraceLayer, for<'a> fn(&'a http::Request) -> Span>; + +pub fn new_with_span(make_span: fn(&Request) -> Span) -> GrpcLayer { + TraceLayer::new_for_grpc() + .make_span_with(make_span) + .on_response( + DefaultOnResponse::new() + .level(Level::DEBUG) + .latency_unit(LatencyUnit::Millis), + ) + .on_failure( + DefaultOnFailure::new() + .level(Level::WARN) + .latency_unit(LatencyUnit::Millis), + ) +} diff --git a/custom_tracing/src/http_layer.rs b/custom_tracing/src/http_layer.rs new file mode 100644 index 000000000..aed429d43 --- /dev/null +++ b/custom_tracing/src/http_layer.rs @@ -0,0 +1,27 @@ +use axum::{body::Body, http::Request}; +use tower_http::{ + classify::{ServerErrorsAsFailures, SharedClassifier}, + trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer}, + LatencyUnit, +}; +use tracing::{Level, Span}; + +pub fn new_with_span( + make_span: fn(&Request) -> Span, +) -> TraceLayer< + SharedClassifier, + for<'a> fn(&'a axum::http::Request) -> Span, +> { + TraceLayer::new_for_http() + .make_span_with(make_span) + .on_response( + DefaultOnResponse::new() + .level(Level::DEBUG) + .latency_unit(LatencyUnit::Millis), + ) + .on_failure( + DefaultOnFailure::new() + .level(Level::WARN) + .latency_unit(LatencyUnit::Millis), + ) +} diff --git a/custom_tracing/src/lib.rs b/custom_tracing/src/lib.rs new file mode 100644 index 000000000..03a4e03c3 --- /dev/null +++ b/custom_tracing/src/lib.rs @@ -0,0 +1,148 @@ +use anyhow::Result; +use notify::{event::DataChange, Config, RecommendedWatcher, RecursiveMode, Watcher}; +use std::{fs, path::Path}; +use tracing::Span; +use tracing_subscriber::{ + layer::SubscriberExt, + reload::{self, Handle}, + util::SubscriberInitExt, + EnvFilter, Registry, +}; + +#[cfg(feature = "grpc")] +pub mod grpc_layer; +#[cfg(feature = "http-1")] +pub mod http_layer; + +pub async fn init(og_filter: String, tracing_cfg_file: String) -> Result<()> { + let (filtered_layer, reload_handle) = + reload::Layer::new(tracing_subscriber::EnvFilter::new(og_filter.clone())); + + tracing_subscriber::registry() + .with(filtered_layer) + .with(tracing_subscriber::fmt::layer()) + .init(); + + tokio::spawn(async move { + let state = State { + og_filter: og_filter.clone(), + tracing_cfg_file, + reload_handle, + }; + if let Err(err) = state.watch().await { + tracing::warn!(?err, "tracing error watching configuration for update") + } + }); + + tracing::info!("custom tracing installed"); + + Ok(()) +} + +pub fn record(field: &str, value: T) +where + T: std::fmt::Display, +{ + Span::current().record(field, &tracing::field::display(value)); +} + +#[derive(Clone)] +pub struct State { + pub og_filter: String, + pub tracing_cfg_file: String, + pub reload_handle: Handle, +} + +impl State { + async fn watch(&self) -> Result<()> { + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + + let mut watcher = RecommendedWatcher::new( + move |res| { + tx.blocking_send(res).expect("Failed to send event"); + }, + Config::default(), + )?; + + watcher.watch(".".as_ref(), RecursiveMode::NonRecursive)?; + + while let Some(res) = rx.recv().await { + match res { + Err(err) => tracing::warn!(?err, "tracing config watcher configuration file error"), + Ok(event) => match event.kind { + notify::EventKind::Modify(notify::event::ModifyKind::Data( + DataChange::Content, + )) => { + if let Some(event_path) = event.paths.first() { + if Path::new(event_path).exists() { + match fs::read_to_string(event_path) { + Err(_err) => tracing::warn!( + ?_err, + "tracing config watcher failed to read file" + ), + Ok(content) => { + if file_match(event_path, self.tracing_cfg_file.clone()) { + self.handle_change(content)?; + } + } + } + } + } + } + notify::EventKind::Remove(notify::event::RemoveKind::File) => { + if let Some(event_path) = event.paths.first() { + if file_match(event_path, self.tracing_cfg_file.clone()) { + self.handle_delete()?; + } + } + } + _event => { + tracing::debug!(?_event, "tracing config watcher ignored unhandled message") + } + }, + } + } + + Ok(()) + } + + fn handle_change(&self, content: String) -> Result<()> { + if content.is_empty() { + self.handle_delete() + } else { + match tracing_subscriber::EnvFilter::try_new(content.clone()) { + Err(_err) => tracing::warn!( + filter = content, + ?_err, + "tracing config watcher failed to parse filter" + ), + Ok(new_filter) => { + self.reload_handle.modify(|filter| *filter = new_filter)?; + tracing::info!(filter = content, "custom tracing config updated"); + } + } + Ok(()) + } + } + + fn handle_delete(&self) -> Result<()> { + let new_filter = self.og_filter.clone(); + + self.reload_handle + .modify(|filter| *filter = tracing_subscriber::EnvFilter::new(new_filter.clone()))?; + + tracing::info!( + filter = new_filter, + "tracing config watcher file deleted, reverting to rustlog filter" + ); + Ok(()) + } +} + +fn file_match(event_path: &Path, file: String) -> bool { + event_path + .to_str() + .map(|path| path.split('/')) + .map(|path| path.last().is_some_and(|file_name| file_name == file)) + .unwrap_or_default() +} diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml index 2cf93230a..f6cbc5d26 100644 --- a/ingest/Cargo.toml +++ b/ingest/Cargo.toml @@ -4,24 +4,24 @@ version = "0.1.0" description = "PoC Ingest Server for the Helium Network" authors.workspace = true edition.workspace = true -license.workspace = true +license.workspace = true [dependencies] -anyhow = {workspace = true} -config = {workspace = true} -clap = {workspace = true} -thiserror = {workspace = true} -serde = {workspace = true} -serde_json = {workspace = true} -base64 = {workspace = true} -bs58 = {workspace = true} -sha2 = {workspace = true} -http = {workspace = true} -tonic = {workspace = true} -triggered = {workspace = true} -futures = {workspace = true} -futures-util = {workspace = true} -prost = {workspace = true} +anyhow = { workspace = true } +config = { workspace = true } +clap = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +base64 = { workspace = true } +bs58 = { workspace = true } +sha2 = { workspace = true } +http = { workspace = true } +tonic = { workspace = true } +triggered = { workspace = true } +futures = { workspace = true } +futures-util = { workspace = true } +prost = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true } @@ -32,10 +32,11 @@ helium-proto = { workspace = true } helium-crypto = { workspace = true } file-store = { path = "../file_store" } poc-metrics = { path = "../metrics" } -metrics = {workspace = true } +metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } task-manager = { path = "../task_manager" } rand = { workspace = true } +custom-tracing = { path = "../custom_tracing", features = ["grpc"] } [dev-dependencies] backon = "0" diff --git a/ingest/src/main.rs b/ingest/src/main.rs index 9f027e5bc..9bd6b117b 100644 --- a/ingest/src/main.rs +++ b/ingest/src/main.rs @@ -2,7 +2,6 @@ use anyhow::Result; use clap::Parser; use ingest::{server_iot, server_mobile, Mode, Settings}; use std::path; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Debug, clap::Parser)] #[clap(version = env!("CARGO_PKG_VERSION"))] @@ -41,10 +40,7 @@ pub struct Server {} impl Server { pub async fn run(&self, settings: &Settings) -> Result<()> { - tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::new(&settings.log)) - .with(tracing_subscriber::fmt::layer()) - .init(); + custom_tracing::init(settings.log.clone(), settings.tracing_cfg_file.clone()).await?; // Install the prometheus metrics exporter poc_metrics::start_metrics(&settings.metrics)?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index fecd6e2e3..a0116eb47 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -55,6 +55,7 @@ impl ManagedTask for GrpcServer { let address = self.address; Box::pin(async move { transport::Server::builder() + .layer(custom_tracing::grpc_layer::new_with_span(make_span)) .layer(poc_metrics::request_layer!("ingest_server_grpc_connection")) .add_service(poc_mobile::Server::with_interceptor( *self, @@ -70,6 +71,14 @@ impl ManagedTask for GrpcServer { } } +fn make_span(_request: &http::request::Request) -> tracing::Span { + tracing::info_span!( + "tracing", + pub_key = tracing::field::Empty, + subscriber_id = tracing::field::Empty, + ) +} + impl GrpcServer { fn verify_network(&self, public_key: PublicKey) -> VerifyResult { if self.required_network == public_key.network { @@ -103,6 +112,8 @@ impl poc_mobile::PocMobile for GrpcServer { let timestamp: u64 = Utc::now().timestamp_millis() as u64; let event = request.into_inner(); + custom_tracing::record("pub_key", pub_key_to_b58(&event.pub_key)); + let report = self .verify_public_key(event.pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -125,6 +136,8 @@ impl poc_mobile::PocMobile for GrpcServer { let timestamp: u64 = Utc::now().timestamp_millis() as u64; let event = request.into_inner(); + custom_tracing::record("pub_key", pub_key_to_b58(&event.pub_key)); + let report = self .verify_public_key(event.pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -147,6 +160,8 @@ impl poc_mobile::PocMobile for GrpcServer { let timestamp: u64 = Utc::now().timestamp_millis() as u64; let event = request.into_inner(); + custom_tracing::record("pub_key", pub_key_to_b58(&event.pub_key)); + let report = self .verify_public_key(event.pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -169,6 +184,8 @@ impl poc_mobile::PocMobile for GrpcServer { let timestamp = Utc::now().timestamp_millis() as u64; let event = request.into_inner(); + custom_tracing::record("pub_key", pub_key_to_b58(&event.pub_key)); + let report = self .verify_public_key(event.pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -194,6 +211,13 @@ impl poc_mobile::PocMobile for GrpcServer { let subscriber_id = event.subscriber_id.clone(); let timestamp_millis = event.timestamp; + custom_tracing::record( + "pub_key", + bs58::encode(&event.carrier_pub_key).into_string(), + ); + + custom_tracing::record("subscriber_id", bs58::encode(&subscriber_id).into_string()); + let report = self .verify_public_key(event.carrier_pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -204,7 +228,6 @@ impl poc_mobile::PocMobile for GrpcServer { }) .map_err(|status| { tracing::debug!( - subscriber_id = ?subscriber_id, timestamp = %timestamp_millis, status = %status ); @@ -228,6 +251,8 @@ impl poc_mobile::PocMobile for GrpcServer { let cbsd_id = event.cbsd_id.clone(); let threshold_timestamp = event.threshold_timestamp; + custom_tracing::record("pub_key", pub_key_to_b58(&hotspot_pubkey)); + let report = self .verify_public_key(event.carrier_pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -238,7 +263,6 @@ impl poc_mobile::PocMobile for GrpcServer { }) .map_err(|status| { tracing::debug!( - hotspot_pubkey = ?hotspot_pubkey, cbsd_id = ?cbsd_id, threshold_timestamp = %threshold_timestamp, status = %status @@ -263,6 +287,8 @@ impl poc_mobile::PocMobile for GrpcServer { let cbsd_id = event.cbsd_id.clone(); let invalidated_timestamp = event.timestamp; + custom_tracing::record("pub_key", pub_key_to_b58(&hotspot_pubkey)); + let report = self .verify_public_key(event.carrier_pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -273,7 +299,6 @@ impl poc_mobile::PocMobile for GrpcServer { }) .map_err(|status| { tracing::debug!( - hotspot_pubkey = ?hotspot_pubkey, cbsd_id = ?cbsd_id, invalidated_timestamp = %invalidated_timestamp, status = %status @@ -298,6 +323,8 @@ impl poc_mobile::PocMobile for GrpcServer { let timestamp: u64 = Utc::now().timestamp_millis() as u64; let event = request.into_inner(); + custom_tracing::record("pub_key", pub_key_to_b58(&event.pub_key)); + let report = self .verify_public_key(event.pub_key.as_ref()) .and_then(|public_key| self.verify_network(public_key)) @@ -458,3 +485,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { .start() .await } + +fn pub_key_to_b58(pub_key: &[u8]) -> String { + bs58::encode(pub_key).into_string() +} diff --git a/ingest/src/settings.rs b/ingest/src/settings.rs index 52a98874c..6dc041a4b 100644 --- a/ingest/src/settings.rs +++ b/ingest/src/settings.rs @@ -13,6 +13,9 @@ pub struct Settings { /// "ingest=debug,poc_store=info" #[serde(default = "default_log")] pub log: String, + /// File name to be watched by custom tracing + #[serde(default = "default_tracing_cfg_file")] + pub tracing_cfg_file: String, /// Mode to run the server in (iot or mobile). Required pub mode: Mode, /// Listen address. Required. Default is 0.0.0.0:9081 @@ -54,6 +57,10 @@ pub fn default_log() -> String { "ingest=debug,poc_store=info".to_string() } +pub fn default_tracing_cfg_file() -> String { + "tracing.cfg".to_string() +} + pub fn default_sink() -> String { "/var/data/ingest".to_string() }