From 4a5978235bff4e28bc7fc14a1a111ee22f346251 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Tue, 17 Dec 2024 00:30:16 +0800 Subject: [PATCH] feat: support otel Signed-off-by: Wei Zhang --- Cargo.lock | 169 +++++++++++++++++- Cargo.toml | 5 + .../http-api-bindings/src/embedding/openai.rs | 8 +- crates/http-api-bindings/src/rate_limit.rs | 6 +- crates/tabby-index/src/code/mod.rs | 4 +- crates/tabby/Cargo.toml | 5 + crates/tabby/src/main.rs | 35 +--- crates/tabby/src/otel.rs | 109 +++++++++++ crates/tabby/src/serve.rs | 18 +- 9 files changed, 311 insertions(+), 48 deletions(-) create mode 100644 crates/tabby/src/otel.rs diff --git a/Cargo.lock b/Cargo.lock index bcd6211b7e2c..d4d433fa5be9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1541,6 +1541,12 @@ dependencies = [ "url", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "globset" version = "0.4.14" @@ -3240,9 +3246,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" dependencies = [ "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.18.0", +] + +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry 0.27.1", + "opentelemetry-proto", + "opentelemetry_sdk 0.27.1", + "prost", + "thiserror", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +dependencies = [ + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "prost", + "tonic", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52" + [[package]] name = "opentelemetry_api" version = "0.18.0" @@ -3276,6 +3333,26 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry 0.27.1", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "overload" version = "0.1.1" @@ -3655,6 +3732,29 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" +[[package]] +name = "prost" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "psm" version = "0.1.21" @@ -5124,6 +5224,10 @@ dependencies = [ "llama-cpp-server", "nvml-wrapper", "openssl", + "opentelemetry 0.27.1", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.27.1", "regex", "reqwest", "reqwest-eventsource", @@ -5145,6 +5249,7 @@ dependencies = [ "tokio", "tower-http 0.5.2", "tracing", + "tracing-opentelemetry 0.28.0", "tracing-subscriber", "utoipa", "utoipa-swagger-ui", @@ -5556,7 +5661,7 @@ dependencies = [ "fnv", "futures", "humantime", - "opentelemetry", + "opentelemetry 0.18.0", "pin-project", "rand 0.8.5", "serde", @@ -5567,7 +5672,7 @@ dependencies = [ "tokio-serde", "tokio-util", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.18.0", ] [[package]] @@ -5807,9 +5912,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", @@ -5879,6 +5984,36 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -5887,9 +6022,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -6021,12 +6160,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" dependencies = [ "once_cell", - "opentelemetry", + "opentelemetry 0.18.0", "tracing", "tracing-core", "tracing-subscriber", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index 605e5269bbab..9488a6eca551 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,11 @@ async-openai = "0.20" tracing-test = "0.2" clap = "4.3.0" ratelimit = "0.10" +tracing-opentelemetry = "0.28.0" +opentelemetry = { version = "0.27.0", features = ["trace", "metrics"] } +opentelemetry_sdk = { version = "0.27.0", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = { version = "0.27.0" } +opentelemetry-semantic-conventions = { version = "0.27.0", features = ["semconv_experimental"] } [workspace.dependencies.uuid] version = "1.3.3" diff --git a/crates/http-api-bindings/src/embedding/openai.rs b/crates/http-api-bindings/src/embedding/openai.rs index 2a10d33fa417..ee42ec324bb1 100644 --- a/crates/http-api-bindings/src/embedding/openai.rs +++ b/crates/http-api-bindings/src/embedding/openai.rs @@ -5,6 +5,7 @@ use async_openai::{ }; use async_trait::async_trait; use tabby_inference::Embedding; +use tracing::{info_span, Instrument}; pub struct OpenAIEmbeddingEngine { client: async_openai::Client, @@ -40,7 +41,12 @@ impl Embedding for OpenAIEmbeddingEngine { user: None, dimensions: None, }; - let resp = self.client.embeddings().create(request).await?; + let resp = self + .client + .embeddings() + .create(request) + .instrument(info_span!("embed_openai")) + .await?; let data = resp .data .into_iter() diff --git a/crates/http-api-bindings/src/rate_limit.rs b/crates/http-api-bindings/src/rate_limit.rs index 167adb9a64aa..54302f2397b2 100644 --- a/crates/http-api-bindings/src/rate_limit.rs +++ b/crates/http-api-bindings/src/rate_limit.rs @@ -9,6 +9,7 @@ use futures::stream::BoxStream; use leaky_bucket::RateLimiter; use tabby_inference::{ChatCompletionStream, CompletionOptions, CompletionStream, Embedding}; use tokio::time::Duration; +use tracing::{info_span, Instrument}; fn new_rate_limiter(rpm: u64) -> RateLimiter { let rps = (rpm as f64 / 60.0).ceil() as usize; @@ -35,7 +36,10 @@ pub fn new_embedding(embedding: Box, request_per_minute: u64) -> impl Embedding for RateLimitedEmbedding { async fn embed(&self, prompt: &str) -> anyhow::Result> { self.rate_limiter.acquire(1).await; - self.embedding.embed(prompt).await + self.embedding + .embed(prompt) + .instrument(info_span!("rate_limit_embed")) + .await } } diff --git a/crates/tabby-index/src/code/mod.rs b/crates/tabby-index/src/code/mod.rs index f7d0c0fc7583..d234f706df09 100644 --- a/crates/tabby-index/src/code/mod.rs +++ b/crates/tabby-index/src/code/mod.rs @@ -11,7 +11,7 @@ use tabby_common::{ }; use tabby_inference::Embedding; use tokio::task::JoinHandle; -use tracing::warn; +use tracing::{info_span, warn, Instrument}; use self::intelligence::SourceCode; use crate::{ @@ -126,7 +126,7 @@ async fn build_binarize_embedding_tokens( embedding: Arc, body: &str, ) -> Result> { - let embedding = match embedding.embed(body).await { + let embedding = match embedding.embed(body).instrument(info_span!("embed")).await { Ok(x) => x, Err(err) => { bail!("Failed to embed chunk text: {}", err); diff --git a/crates/tabby/Cargo.toml b/crates/tabby/Cargo.toml index 2da06ef88084..319f8cf77bbe 100644 --- a/crates/tabby/Cargo.toml +++ b/crates/tabby/Cargo.toml @@ -38,6 +38,11 @@ strum = { workspace = true } strfmt = "0.2.4" tracing = { workspace = true } tracing-subscriber = { workspace = true } +tracing-opentelemetry.workspace = true +opentelemetry.workspace = true +opentelemetry_sdk.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry-semantic-conventions.workspace = true tantivy = { workspace = true } anyhow = { workspace = true } sysinfo = "0.29.8" diff --git a/crates/tabby/src/main.rs b/crates/tabby/src/main.rs index bb0aa98d2deb..bde5fba9ae75 100644 --- a/crates/tabby/src/main.rs +++ b/crates/tabby/src/main.rs @@ -1,3 +1,4 @@ +mod otel; mod routes; mod services; @@ -9,8 +10,6 @@ use std::os::unix::fs::PermissionsExt; use clap::{Parser, Subcommand}; use tabby_common::config::{Config, ModelConfig}; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -56,7 +55,7 @@ async fn main() { color_eyre::install().expect("Must be able to install color_eyre"); let cli = Cli::parse(); - init_logging(); + let _guard = otel::init_tracing_subscriber(cli.otlp_endpoint); let config = Config::load().expect("Must be able to load config"); let root = tabby_common::path::tabby_root(); @@ -91,36 +90,6 @@ macro_rules! fatal { }; } -fn init_logging() { - let mut layers = Vec::new(); - - let fmt_layer = tracing_subscriber::fmt::layer() - .with_file(true) - .with_line_number(true) - .boxed(); - - layers.push(fmt_layer); - - let mut dirs = if cfg!(feature = "prod") { - "tabby=info,otel=debug,http_api_bindings=info,llama_cpp_server=info".into() - } else { - "tabby=debug,otel=debug,http_api_bindings=debug,llama_cpp_server=debug".into() - }; - - if let Ok(env) = std::env::var(EnvFilter::DEFAULT_ENV) { - dirs = format!("{dirs},{env}") - }; - - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::WARN.into()) - .parse_lossy(dirs); - - tracing_subscriber::registry() - .with(layers) - .with(env_filter) - .init(); -} - fn to_local_config(model: &str, parallelism: u8, device: &Device) -> ModelConfig { let num_gpu_layers = if *device != Device::Cpu { std::env::var("LLAMA_CPP_N_GPU_LAYERS") diff --git a/crates/tabby/src/otel.rs b/crates/tabby/src/otel.rs new file mode 100644 index 000000000000..e0c0f0cd1e9c --- /dev/null +++ b/crates/tabby/src/otel.rs @@ -0,0 +1,109 @@ +use axum_prometheus::lifecycle::layer; +use opentelemetry::{ + global, + trace::{FutureExt, TracerProvider as _}, + KeyValue, +}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + runtime, + trace::{RandomIdGenerator, Sampler, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use tracing::level_filters::LevelFilter; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::{ + layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, +}; + +// Create a Resource that captures information about the entity for which telemetry is recorded. +fn resource() -> Resource { + Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"), + ], + SCHEMA_URL, + ) +} + +// Construct TracerProvider for OpenTelemetryLayer +fn init_tracer_provider(otlp_endpoint: String) -> TracerProvider { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp_endpoint) + .with_timeout(std::time::Duration::from_secs(3)) + .build() + .unwrap(); + + TracerProvider::builder() + // Customize sampling strategy + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + // If export trace to AWS X-Ray, you can use XrayIdGenerator + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource()) + .with_batch_exporter(exporter, runtime::Tokio) + .build() +} + +// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing +pub fn init_tracing_subscriber(otlp_endpoint: Option) -> OtelGuard { + let mut layers: Vec + Send + Sync>> = Vec::new(); + + let tracer_provider = if let Some(endpoint) = otlp_endpoint { + let tracer_provider = init_tracer_provider(endpoint); + let tracer = tracer_provider.tracer("tracing-otel-subscriber"); + layers.push(Box::new(OpenTelemetryLayer::new(tracer))); + Some(tracer_provider) + } else { + None + }; + + let fmt_layer = tracing_subscriber::fmt::layer() + .with_file(true) + .with_line_number(true) + .boxed(); + layers.push(fmt_layer); + + let mut dirs = if cfg!(feature = "prod") { + "tabby=info,otel=debug,http_api_bindings=info,llama_cpp_server=info".into() + } else { + "tabby=debug,otel=debug,http_api_bindings=debug,llama_cpp_server=debug".into() + }; + + if let Ok(env) = std::env::var(EnvFilter::DEFAULT_ENV) { + dirs = format!("{dirs},{env}") + }; + + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::WARN.into()) + .parse_lossy(dirs); + + tracing_subscriber::registry() + .with(layers) + .with(env_filter) + .init(); + + OtelGuard { tracer_provider } +} + +pub struct OtelGuard { + tracer_provider: Option, +} + +impl Drop for OtelGuard { + fn drop(&mut self) { + if let Some(tracer_provider) = self.tracer_provider.take() { + if let Err(err) = tracer_provider.shutdown() { + eprintln!("{err:?}"); + } + } + } +} diff --git a/crates/tabby/src/serve.rs b/crates/tabby/src/serve.rs index b5ca96109d8e..0c9447f7cd6f 100644 --- a/crates/tabby/src/serve.rs +++ b/crates/tabby/src/serve.rs @@ -14,7 +14,7 @@ use tabby_download::ModelKind; use tabby_inference::ChatCompletionStream; use tokio::{sync::oneshot::Sender, time::sleep}; use tower_http::timeout::TimeoutLayer; -use tracing::{debug, warn}; +use tracing::{debug, info_span, warn, Instrument}; use utoipa::{ openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme}, Modify, OpenApi, @@ -114,7 +114,9 @@ pub struct ServeArgs { pub async fn main(config: &Config, args: &ServeArgs) { let config = merge_args(config, args); - load_model(&config).await; + load_model(&config) + .instrument(info_span!("load_model")) + .await; let tx = try_run_spinner(); @@ -213,15 +215,21 @@ pub async fn main(config: &Config, args: &ServeArgs) { async fn load_model(config: &Config) { if let Some(ModelConfig::Local(ref model)) = config.model.completion { - download_model_if_needed(&model.model_id, ModelKind::Completion).await; + download_model_if_needed(&model.model_id, ModelKind::Completion) + .instrument(info_span!("completion")) + .await; } if let Some(ModelConfig::Local(ref model)) = config.model.chat { - download_model_if_needed(&model.model_id, ModelKind::Chat).await; + download_model_if_needed(&model.model_id, ModelKind::Chat) + .instrument(info_span!("chat")) + .await; } if let ModelConfig::Local(ref model) = config.model.embedding { - download_model_if_needed(&model.model_id, ModelKind::Embedding).await; + download_model_if_needed(&model.model_id, ModelKind::Embedding) + .instrument(info_span!("embedding")) + .await; } }