Skip to content

Commit

Permalink
feat: Add prometheus direct interface and update opentelemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy HERGAULT <[email protected]>
  • Loading branch information
reneca committed Oct 6, 2024
1 parent 5a07249 commit fc6f5c5
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 72 deletions.
15 changes: 15 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,19 @@ prosa-macros = { version = "0.1.1", path = "prosa_macros" }
thiserror = "1"
aquamarine = "0.5"
bytes = "1"
url = { version = "2", features = ["serde"] }
tokio = "1"

# Config Observability
log = "0.4"
tracing-core = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.25"
opentelemetry = { version = "0.24", features = ["metrics", "trace", "logs"] }
opentelemetry_sdk = { version = "0.24", features = ["metrics", "trace", "logs", "rt-tokio"] }
opentelemetry-stdout = { version = "0.5", features = ["metrics", "trace", "logs"]}
opentelemetry-otlp = { version = "0.17", features = ["metrics", "trace", "logs"]}
prometheus = { version = "0.13" }
prometheus_exporter = { version = "0.8", features = ["logging"] }
opentelemetry-prometheus = { version = "0.17" }
opentelemetry-appender-log = "0.5"
2 changes: 1 addition & 1 deletion cargo-prosa/src/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct ComponentVersion<'a> {
pub version: &'a String,
}

impl<'a> fmt::Display for ComponentVersion<'a> {
impl fmt::Display for ComponentVersion<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if f.alternate() {
writeln!(f, "{}", self.name)?;
Expand Down
15 changes: 9 additions & 6 deletions prosa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ glob = { version = "0.3" }
toml = "0.8"
serde_yaml = "0.9"

tracing-opentelemetry = "0.22"

opentelemetry = { version = "0.21", features = ["metrics", "trace", "logs"] }
opentelemetry_sdk = { version = "0.21", features = ["metrics", "trace", "logs", "rt-tokio"] }
opentelemetry-stdout = { version = "0.2", features = ["metrics", "trace", "logs"]}
opentelemetry-otlp = { version = "0.14", features = ["metrics", "trace", "logs"]}
log.workspace = true
tracing-opentelemetry.workspace = true

opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
opentelemetry-stdout.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry-appender-log.workspace = true
memory-stats = "1"
6 changes: 3 additions & 3 deletions prosa/my_prosa_settings.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: my-prosa

otel:
observability:
level: DEBUG
metrics:
stdout:
level: 0
prometheus:
endpoint: 0.0.0.0:9100
traces:
stdout:
level: debug
Expand Down
7 changes: 0 additions & 7 deletions prosa/proc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use config::Config;
use opentelemetry::global;
use prosa::core::main::{MainProc, MainRunnable};
use prosa::core::msg::{InternalMsg, Msg, RequestMsg};
use prosa::core::proc::{proc, Proc, ProcBusParam, ProcConfig};
Expand Down Expand Up @@ -128,12 +127,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let my_settings = config.try_deserialize::<MySettings>()?;
println!("My ProSA settings: {:?}", my_settings);

// metrics
global::set_meter_provider(my_settings.get_observability().build_meter_provider());

// logs
global::set_logger_provider(my_settings.get_observability().build_logger_provider());

// traces
let telemetry_filter = TelemetryFilter::new(LevelFilter::DEBUG);
my_settings
Expand Down
99 changes: 92 additions & 7 deletions prosa/src/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use super::proc::ProcBusParam;
use super::service::{ProcService, ServiceTable};
use super::settings::Settings;
use opentelemetry::logs::LoggerProvider as _;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::metrics::{Meter, MeterProvider};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::KeyValue;
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use prosa_utils::msg::tvf::{Tvf, TvfError};
use std::borrow::Cow;
use std::sync::Arc;
Expand Down Expand Up @@ -99,7 +101,7 @@ where
{
internal_tx_queue: mpsc::Sender<InternalMainMsg<M>>,
name: String,
meter_provider: opentelemetry_sdk::metrics::MeterProvider,
meter_provider: opentelemetry_sdk::metrics::SdkMeterProvider,
logger_provider: opentelemetry_sdk::logs::LoggerProvider,
tracer_provider: opentelemetry_sdk::trace::TracerProvider,
}
Expand All @@ -123,11 +125,16 @@ where
internal_tx_queue: mpsc::Sender<InternalMainMsg<M>>,
settings: &S,
) -> Main<M> {
let logger_provider = settings.get_observability().build_logger_provider();
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
let _ = log::set_boxed_logger(Box::new(otel_log_appender));
log::set_max_level(settings.get_observability().get_logger_level().into());

Main {
internal_tx_queue,
name: settings.get_prosa_name(),
meter_provider: settings.get_observability().build_meter_provider(),
logger_provider: settings.get_observability().build_logger_provider(),
logger_provider,
tracer_provider: settings.get_observability().build_tracer_provider(),
}
}
Expand Down Expand Up @@ -246,7 +253,11 @@ where

/// Provide the opentelemetry Tracer based on ProSA settings
pub fn tracer(&self, name: impl Into<Cow<'static, str>>) -> opentelemetry_sdk::trace::Tracer {
self.tracer_provider.tracer(name)
self.tracer_provider
.tracer_builder(name)
.with_version(env!("CARGO_PKG_VERSION"))
.with_attributes([KeyValue::new("prosa_name", self.name.clone())])
.build()
}
}

Expand All @@ -255,9 +266,11 @@ pub struct MainProc<M>
where
M: Sized + Clone + Tvf,
{
name: String,
processors: HashMap<u32, HashMap<u32, ProcService<M>>>,
services: Arc<ServiceTable<M>>,
internal_rx_queue: mpsc::Receiver<InternalMainMsg<M>>,
meter: Meter,
}

impl<M> ProcBusParam for MainProc<M>
Expand All @@ -282,6 +295,16 @@ impl<M> MainProc<M>
where
M: Sized + Clone + Debug + Tvf + Default + 'static + std::marker::Send + std::marker::Sync,
{
/// Getter of the number of processors' queues
fn get_proc_queue_len(&self) -> usize {
let mut proc_queue_len = 0;
for proc_queue in self.processors.values() {
proc_queue_len += proc_queue.len();
}

proc_queue_len
}

async fn remove_proc(&mut self, proc_id: u32) -> Option<HashMap<u32, ProcService<M>>> {
if let Some(proc) = self.processors.remove(&proc_id) {
let mut new_services = (*self.services).clone();
Expand Down Expand Up @@ -366,6 +389,47 @@ where
}

async fn internal_run(&mut self) -> Result<(), BusError> {
let prosa_name = self.name.clone();

// Monitor RAM usage
self.meter
.u64_observable_gauge("prosa_main_ram")
.with_description("RAM consumed by ProSA")
.with_unit("bytes")
.with_callback(move |observer| {
if let Some(usage) = memory_stats::memory_stats() {
observer.observe(
usage.physical_mem as u64,
&[
KeyValue::new("prosa_name", prosa_name.clone()),
KeyValue::new("type", "physical"),
],
);
observer.observe(
usage.virtual_mem as u64,
&[
KeyValue::new("prosa_name", prosa_name.clone()),
KeyValue::new("type", "virtual"),
],
);
}
})
.init();

// Monitor services
let services_meter = self
.meter
.u64_gauge("prosa_main_services")
.with_description("Services declared to the main task")
.init();
// Monitor processors objects
let processors_meter = self
.meter
.u64_gauge("prosa_main_processors")
.with_description("Processors declared to the main task")
.init();

let prosa_name = self.name.clone();
loop {
tokio::select! {
Some(msg) = self.internal_rx_queue.recv() => {
Expand All @@ -390,16 +454,25 @@ where
let _ = self.processors.remove(&proc_id);
}
}

processors_meter.record(self.processors.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "tasks")]);
processors_meter.record(self.get_proc_queue_len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "queues")]);
},
InternalMainMsg::DeleteProc(proc_id) => {
if self.remove_proc(proc_id).await.is_some() {
prosa_main_update_srv!(self);
}

processors_meter.record(self.processors.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "tasks")]);
processors_meter.record(self.get_proc_queue_len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "queues")]);
},
InternalMainMsg::DeleteProcQueue(proc_id, queue_id) => {
if self.remove_proc_queue(proc_id, queue_id).await.is_some() {
self.notify_srv_proc().await;
}

processors_meter.record(self.processors.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "tasks")]);
processors_meter.record(self.get_proc_queue_len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone()), KeyValue::new("type", "queues")]);
},
InternalMainMsg::NewProcService(names, proc_id) => {
if let Some(proc_service) = self.processors.get(&proc_id) {
Expand All @@ -410,6 +483,7 @@ where
}
}
self.services = Arc::new(new_services);
services_meter.record(self.services.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone())]);
self.notify_srv_proc().await;
}
},
Expand All @@ -421,6 +495,7 @@ where
new_services.add_service(&name, proc_queue.clone());
}
self.services = Arc::new(new_services);
services_meter.record(self.services.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone())]);
self.notify_srv_proc().await;
}
}
Expand All @@ -431,6 +506,7 @@ where
new_services.rm_service_proc(&name, proc_id);
}
self.services = Arc::new(new_services);
services_meter.record(self.services.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone())]);
self.notify_srv_proc().await;
},
InternalMainMsg::DeleteService(names, proc_id, queue_id) => {
Expand All @@ -439,6 +515,7 @@ where
new_services.rm_service(&name, proc_id, queue_id);
}
self.services = Arc::new(new_services);
services_meter.record(self.services.len() as u64, &[KeyValue::new("prosa_name", prosa_name.clone())]);
self.notify_srv_proc().await;
},
InternalMainMsg::Command(cmd)=> {
Expand All @@ -465,29 +542,37 @@ where
}
}

/// Name given to the main task of ProSA
pub(crate) const MAIN_TASK_NAME: &str = "main";

impl<M> MainRunnable<M> for MainProc<M>
where
M: Sized + Clone + Debug + Tvf + Default + 'static + std::marker::Send + std::marker::Sync,
{
fn create<S: Settings>(settings: &S) -> (Main<M>, MainProc<M>) {
let (internal_tx_queue, internal_rx_queue) = mpsc::channel(2048);
let main = Main::new(internal_tx_queue, settings);
let name = main.name().clone();
let meter = main.meter("prosa_main_task_meter");
(
Main::new(internal_tx_queue, settings),
main,
MainProc {
name,
processors: Default::default(),
services: Arc::new(ServiceTable::default()),
internal_rx_queue,
meter,
},
)
}

fn run(mut self) -> std::thread::JoinHandle<()> {
std::thread::Builder::new()
.name("main".into())
.name(MAIN_TASK_NAME.into())
.spawn(move || {
let rt: Runtime = Builder::new_current_thread()
.enable_all()
.thread_name("main")
.thread_name(MAIN_TASK_NAME)
.build()
.unwrap();
rt.block_on(self.internal_run()).unwrap();
Expand Down
10 changes: 10 additions & 0 deletions prosa/src/core/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ impl<M> ServiceTable<M>
where
M: Sized + Clone + Tvf,
{
/// Getter to know if the service table is empty
pub fn is_empty(&self) -> bool {
self.table.is_empty()
}

/// Getter of the length of the service table (use for metrics)
pub fn len(&self) -> usize {
self.table.len()
}

/// Method to know if the service is available from a processor
///
/// Call be the processor to know if a service is available (service test)
Expand Down
7 changes: 2 additions & 5 deletions prosa/src/inj/proc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::time::Duration;

use opentelemetry::{
metrics::{Histogram, Unit},
KeyValue,
};
use opentelemetry::{metrics::Histogram, KeyValue};
use prosa_macros::{proc, proc_settings};
use serde::{Deserialize, Serialize};
use tracing::debug;
Expand Down Expand Up @@ -192,7 +189,7 @@ where
let meter_trans_duration = meter
.f64_histogram("prosa_inj_request_duration")
.with_description("inj transaction processing duration")
.with_unit(Unit::new("seconds"))
.with_unit("seconds")
.init();

// Declare the processor
Expand Down
Loading

0 comments on commit fc6f5c5

Please sign in to comment.