Skip to content

Commit

Permalink
Added sending system metadata in the pipeline start commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Macronic committed Aug 21, 2024
1 parent a09201c commit d38f094
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 11 deletions.
52 changes: 49 additions & 3 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// src/events/mod.rs
use crate::{debug_log::Logger, http_client::send_http_event};
use crate::{
debug_log::Logger,
http_client::{send_http_event, send_http_get},
metrics::SystemMetricsCollector,
};
use anyhow::{Context, Result};
use chrono::Utc;
use serde::Deserialize;
use serde_json::json;
use serde_json::{json, Value};
use sysinfo::System;
use tracing::info;

#[derive(Debug)]
Expand Down Expand Up @@ -58,7 +63,45 @@ pub struct RunEventOut {
pub service_name: String,
}

pub async fn send_start_run_event(service_url: &str, api_key: &str) -> Result<RunEventOut> {
const AWS_METADATA_URL: &str = "http://169.254.169.254/latest/meta-data/";

async fn get_aws_instance_metadata() -> Result<Value> {
let (status, response_text) = send_http_get(AWS_METADATA_URL, None).await?;

serde_json::from_str(&response_text).context(format!(
"Failed to get AWS instance metadata. Status: {}, Response: {}",
status, response_text
))
}

async fn gather_system_properties(system: &System) -> Value {
let aws_metadata = get_aws_instance_metadata().await.unwrap_or(json!(null));

let disk_metadata = SystemMetricsCollector::gather_disk_data();

json!(
{
"os": System::name(),
"os_version": System::os_version(),
"kernel_version": System::kernel_version(),
"arch": System::cpu_arch(),
"num_cpus": system.cpus().len(),
"hostname": System::host_name(),
"total_memory": system.total_memory(),
"total_swap": system.total_swap(),
"uptime": System::uptime(),
"aws_metadata": aws_metadata,
"is_aws_instance": !aws_metadata.is_null(),
"system_disk_io": disk_metadata,
}
)
}

pub async fn send_start_run_event(
service_url: &str,
api_key: &str,
system: &System,
) -> Result<RunEventOut> {
info!("Starting new pipeline...");

let logger = Logger::new();
Expand All @@ -80,12 +123,15 @@ pub async fn send_start_run_event(service_url: &str, api_key: &str) -> Result<Ru
result: Vec<RunLogOut>,
}

let system_properties = gather_system_properties(system).await;

let init_entry = json!({
"message": "[CLI] Starting new pipeline run",
"process_type": "pipeline",
"process_status": "new_run",
"event_type": "process_status",
"timestamp": Utc::now().timestamp_millis() as f64 / 1000.,
"attributes": system_properties,
});

let result = send_http_event(service_url, api_key, &init_entry).await?;
Expand Down
19 changes: 19 additions & 0 deletions src/http_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ async fn record_all_outgoing_http_calls(
write_to_log_file("log_outgoing_http_calls.txt", &log_message).await
}

pub async fn send_http_get(url: &str, api_key: Option<&str>) -> Result<(u16, String)> {
let client = Client::new();
let mut response = client.get(url);

if let Some(api_key) = api_key {
response = response.header("x-api-key", api_key)
}

let response = response.send().await.context("Failed to send http get")?;

let status = response.status();
let response_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());

Ok((status.as_u16(), response_text))
}

pub async fn send_http_body(
service_url: &str,
api_key: &str,
Expand Down
20 changes: 13 additions & 7 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ impl SystemMetricsCollector {
SystemMetricsCollector
}

pub fn gather_metrics_object_attributes(system: &mut System) -> Value {
let used_memory = system.used_memory();
let total_memory = system.total_memory();
let memory_utilization = (used_memory as f64 / total_memory as f64) * 100.0;

let cpu_usage = system.global_cpu_info().cpu_usage();

pub fn gather_disk_data() -> HashMap<String, serde_json::Value> {
let disks: Disks = Disks::new_with_refreshed_list();

let mut d_stats: HashMap<String, serde_json::Value> = HashMap::new();
Expand All @@ -46,6 +40,18 @@ impl SystemMetricsCollector {
d_stats.insert(d_name.to_string(), disk_data);
}

d_stats
}

pub fn gather_metrics_object_attributes(system: &mut System) -> Value {
let used_memory = system.used_memory();
let total_memory = system.total_memory();
let memory_utilization = (used_memory as f64 / total_memory as f64) * 100.0;

let cpu_usage = system.global_cpu_info().cpu_usage();

let d_stats = Self::gather_disk_data();

let attributes = json!({
"events_name": "global_system_metrics",
"system_memory_total": total_memory,
Expand Down
2 changes: 1 addition & 1 deletion src/tracer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl TracerClient {
self.stop_run().await?;
}

let result = send_start_run_event(&self.service_url, &self.api_key).await?;
let result = send_start_run_event(&self.service_url, &self.api_key, &self.system).await?;

self.current_run = Some(RunMetadata {
last_interaction: Instant::now(),
Expand Down

0 comments on commit d38f094

Please sign in to comment.