Skip to content

Commit

Permalink
fix metrics collecting
Browse files Browse the repository at this point in the history
  • Loading branch information
longfangsong committed Aug 23, 2024
1 parent 19a7dd5 commit 29495de
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 54 deletions.
13 changes: 5 additions & 8 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ name: Rust

on:
push:
branches: [ "master" ]
branches: ["master"]
pull_request:
branches: [ "master" ]
branches: ["master"]

env:
CARGO_TERM_COLOR: always

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
# - name: Run tests
# run: cargo test --verbose
- uses: actions/checkout@v4
- name: Test
run: cargo test
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ toml = "0.5"
once_cell = "1.8"

[dev-dependencies]
hyper = "1.4.1"
sqllogictest = "0.13.0"
tower = "0.4.13"
117 changes: 71 additions & 46 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
#![feature(let_chains)]

#[macro_use]
extern crate lazy_static;

use std::{collections::HashMap, mem, process::id, sync::Arc, time::Duration};
use std::{collections::HashMap, mem, sync::Arc, time::Duration};

use axum::{
extract::{self},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use log::{info, LevelFilter};
use prometheus::{Encoder, IntGauge, Registry, TextEncoder};
use prometheus::{Encoder, TextEncoder};
use rumqttc::QoS;
use serde::{Deserialize, Serialize};
use sysinfo::{CpuExt, Pid, ProcessExt, System, SystemExt};
use tokio::{
sync::{mpsc, Mutex},
time,
};
use sysinfo::{CpuExt, System, SystemExt};
use tokio::{sync::Mutex, time};

use catalog::Catalog;
use core::Tuple;
Expand All @@ -31,6 +29,7 @@ mod catalog;
mod config;
mod connector;
mod core;
mod metrics;
mod sql;
mod storage;
mod util;
Expand All @@ -42,14 +41,11 @@ struct AppState {
views: HashMap<usize, View>,
next_id: usize,
dummy_subscribers: HashMap<usize, Vec<Tuple>>,
registry: Registry,
}

impl AppState {
pub fn new(registry: Registry) -> Self {
let mut app_state = Self::default();
app_state.registry = registry;
return app_state;
pub fn new() -> Self {
Self::default()
}
}

Expand Down Expand Up @@ -92,11 +88,10 @@ async fn execute_sql(
}
}
});
let view_manager = state.clone();
tokio::spawn(async move {
while let Ok(Ok(result)) = receiver.recv().await {
if !result.is_none() {
let data = result.unwrap().parse_into_json().unwrap();
if let Some(result) = result {
let data = result.parse_into_json().unwrap();
sender
.client
.publish("/yisa/data2", QoS::AtLeastOnce, false, data)
Expand Down Expand Up @@ -145,48 +140,30 @@ async fn ping() -> &'static str {
"pong"
}

async fn metrics_handler(extract::State(state): extract::State<Arc<Mutex<AppState>>>) -> String {
let mut state = state.lock().await;
async fn metrics_handler() -> String {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = state.registry.gather();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}

#[tokio::main]
pub async fn main() {
config::initialize_config();
log::set_logger(&LOGGER)
.map(|()| log::set_max_level(LevelFilter::Info))
.unwrap();

// initialize Prometheus registry
let registry = Registry::new();

// cpu and memory gauge
let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();

// register gauge
registry.register(Box::new(cpu_gauge.clone())).unwrap();
registry.register(Box::new(memory_gauge.clone())).unwrap();
fn start_collecting_metric() {
let mut sys = System::new();
let current_pid = id() as i32;
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
sys.refresh_all();
if let Some(process) = sys.process(Pid::from(current_pid)) {
cpu_gauge.set(process.cpu_usage() as i64);
memory_gauge.set(process.memory() as i64);
}
let cpu_usage = sys.global_cpu_info().cpu_usage() as i64;
let memory_usage = sys.used_memory() as i64;
metrics::CPU.set(cpu_usage);
metrics::MEMORY.set(memory_usage);
}
});
}

let app_state = AppState::new(registry);

fn app() -> Router {
// Initialize database
let catalog = Catalog::new();
let storage_mgr = StorageManager::default();
Expand All @@ -196,8 +173,8 @@ pub async fn main() {
storage_mgr,
};
let session = Arc::new(Mutex::new(Session::new(query_ctx)));
let http_addr = "127.0.0.1:3030";
let app = Router::new()
let app_state = AppState::new();
Router::new()
.route("/metrics", get(metrics_handler))
.route(
"/view",
Expand All @@ -206,8 +183,56 @@ pub async fn main() {
.route("/view", get(poll_view))
.route("/view", delete(delete_view))
.route("/ping", get(ping))
.with_state(Arc::new(Mutex::new(app_state)));
.with_state(Arc::new(Mutex::new(app_state)))
}

#[tokio::main]
pub async fn main() {
config::initialize_config();
log::set_logger(&LOGGER)
.map(|()| log::set_max_level(LevelFilter::Info))
.unwrap();
start_collecting_metric();
let app = app();
let http_addr = "127.0.0.1:3030";
let listener = tokio::net::TcpListener::bind(http_addr).await.unwrap();
info!("HTTP listening to {}", http_addr);
axum::serve(listener, app).await.unwrap();
}

#[cfg(test)]
mod tests {
use axum::{
body::Body,
http::{Request, StatusCode},
};
use futures::StreamExt;
use tower::ServiceExt;

use super::*;
#[tokio::test]
async fn metric_works() {
start_collecting_metric();
let app = app();
tokio::time::sleep(Duration::from_secs(1)).await;
let response = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
let mut body = response.into_body().into_data_stream();
let mut body_bytes = Vec::new();
while let Some(Ok(bytes)) = body.next().await {
body_bytes.extend_from_slice(&bytes);
}
let body_string = String::from_utf8(body_bytes).unwrap();
assert!(body_string.contains("cpu_usage"));
assert!(body_string.contains("memory_usage"));
}
}
8 changes: 8 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use prometheus::{register_int_gauge, IntGauge};

lazy_static! {
pub static ref CPU: IntGauge =
register_int_gauge!("cpu_usage", "CPU usage in percentage").unwrap();
pub static ref MEMORY: IntGauge =
register_int_gauge!("memory_usage", "Memory usage in bytes").unwrap();
}

0 comments on commit 29495de

Please sign in to comment.