-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Sven Pfennig <[email protected]>
- Loading branch information
Showing
1 changed file
with
167 additions
and
0 deletions.
There are no files selected for viewing
167 changes: 167 additions & 0 deletions
167
podtato-head/podtato-head-microservices/crates/podtato-entry/src/main_metrics.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
#![deny(warnings)] | ||
|
||
use std::convert::Infallible; | ||
use std::env; | ||
use std::net::SocketAddr; | ||
use std::path::Path; | ||
|
||
use bytes::Bytes; | ||
use http_body_util::Full; | ||
use hyper::header::CONTENT_TYPE; | ||
use hyper::server::conn::http1; | ||
use hyper::service::service_fn; | ||
use hyper::{ Method, Request, Response}; | ||
use hyper_util::rt::TokioIo; | ||
use tokio::net::TcpListener; | ||
|
||
mod assets; | ||
use assets::Assets; | ||
|
||
use std::time::SystemTime; | ||
use std::sync::Arc; | ||
use once_cell::sync::Lazy; | ||
use opentelemetry::{ | ||
metrics::{Counter, Histogram, MeterProvider as _, Unit}, | ||
KeyValue, | ||
}; | ||
use opentelemetry_sdk::metrics::SdkMeterProvider; | ||
use prometheus::{Encoder, Registry, TextEncoder}; | ||
|
||
static HANDLER_ALL: Lazy<[KeyValue; 1]> = Lazy::new(|| [KeyValue::new("handler", "all")]); | ||
|
||
struct AppState { | ||
registry: Registry, | ||
http_counter: Counter<u64>, | ||
http_body_gauge: Histogram<u64>, | ||
http_req_histogram: Histogram<f64>, | ||
} | ||
|
||
async fn serve_req(req: Request<impl hyper::body::Body>, state: Arc<AppState>) -> Result<Response<Full<Bytes>>, Infallible> { | ||
let request_start = SystemTime::now(); | ||
|
||
state.http_counter.add(1, HANDLER_ALL.as_ref()); | ||
if req.method() == &Method::GET && req.uri().path().starts_with("/parts") { | ||
let path = Path::new(req.uri().path()); | ||
let url = match path.file_name().unwrap().to_str() { | ||
Some("hat.svg") => "http://podtato-head-hat:9001/images/hat.svg", | ||
Some("left-leg.svg") => "http://podtato-head-left-leg:9002/images/left-leg.svg", | ||
Some("left-arm.svg") => "http://podtato-head-left-arm:9003/images/left-arm.svg", | ||
Some("right-leg.svg") => "http://podtato-head-right-leg:9004/images/right-leg.svg", | ||
Some("right-arm.svg") => "http://podtato-head-right-arm:9005/images/right-arm.svg", | ||
Some(&_) => todo!(), | ||
None => todo!(), | ||
}; | ||
let content_data = match fetch_data(&url).await { | ||
Ok(data) => data, | ||
Err(_) => String::new() | ||
}; | ||
let mut response = Response::new(Full::new(Bytes::from(content_data))); | ||
if url.ends_with(".svg") { | ||
response | ||
.headers_mut() | ||
.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap()); | ||
} | ||
state.http_req_histogram.record( | ||
request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), | ||
&[], | ||
); | ||
return Ok(response); | ||
} else if req.method() == &Method::GET && req.uri().path().starts_with("/assets/") { | ||
let path = req.uri().path().strip_prefix("/assets/").unwrap(); | ||
let content = Assets::get(&path); | ||
return match content { | ||
Some(content) => { | ||
let content_data = content.data.to_vec(); | ||
let mut response = Response::new(Full::new(Bytes::from(content_data))); | ||
if path.ends_with(".svg") { | ||
response | ||
.headers_mut() | ||
.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap()); | ||
} | ||
Ok(response) | ||
} | ||
None => Ok(Response::new(Full::new(Bytes::from("Not Found!")))), | ||
}; | ||
} else if req.method() == &Method::GET && req.uri().path() == "/metrics" { | ||
let mut buffer = vec![]; | ||
let encoder = TextEncoder::new(); | ||
let metric_families = state.registry.gather(); | ||
encoder.encode(&metric_families, &mut buffer).unwrap(); | ||
state | ||
.http_body_gauge | ||
.record(buffer.len() as u64, HANDLER_ALL.as_ref()); | ||
|
||
// return Ok(Response::builder() | ||
// .status(200) | ||
// .header(CONTENT_TYPE, encoder.format_type()) | ||
// .body(Body::from(buffer))); | ||
return Ok(Response::new(Full::new(Bytes::from(buffer)))); | ||
} else if req.method() == &Method::GET && req.uri().path() == "/" { | ||
// TODO: Handle file not found | ||
let content = Assets::get("html/podtato-home.html").unwrap(); | ||
let content_data = String::from_utf8(content.data.to_vec()) | ||
.unwrap() | ||
.replace("{{ . }}", "0.1.0-WasmEdge"); | ||
return Ok(Response::new(Full::new(Bytes::from(content_data)))); | ||
} else { | ||
return Ok(Response::new(Full::new(Bytes::from("Hello World!")))); | ||
} | ||
} | ||
|
||
async fn fetch_data(url: &str) -> Result<String, reqwest::Error> { | ||
Ok(reqwest::get(url).await?.text().await?) | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | ||
let registry = Registry::new(); | ||
let exporter = opentelemetry_prometheus::exporter() | ||
.with_registry(registry.clone()) | ||
.build()?; | ||
let provider = SdkMeterProvider::builder().with_reader(exporter).build(); | ||
|
||
let meter = provider.meter("hyper-example"); | ||
let state = Arc::new(AppState { | ||
registry, | ||
http_counter: meter | ||
.u64_counter("http_requests_total") | ||
.with_description("Total number of HTTP requests made.") | ||
.init(), | ||
http_body_gauge: meter | ||
.u64_histogram("example.http_response_size") | ||
.with_unit(Unit::new("By")) | ||
.with_description("The metrics HTTP response sizes in bytes.") | ||
.init(), | ||
http_req_histogram: meter | ||
.f64_histogram("example.http_request_duration") | ||
.with_unit(Unit::new("ms")) | ||
.with_description("The HTTP request latencies in milliseconds.") | ||
.init(), | ||
}); | ||
|
||
let port = env::var("PODTATO_PORT").unwrap_or("9000".to_string()); | ||
let port = port | ||
.parse::<u16>() | ||
.expect(format!("Failed to parse port {}", port).as_str()); | ||
|
||
println!("going to serve on port {}", port); | ||
|
||
let addr: SocketAddr = ([0, 0, 0, 0], port).into(); | ||
|
||
// Bind to the port and listen for incoming TCP connections | ||
let listener = TcpListener::bind(addr).await?; | ||
loop { | ||
let (tcp, _) = listener.accept().await?; | ||
let io = TokioIo::new(tcp); | ||
let state = state.clone(); | ||
tokio::task::spawn(async move { | ||
if let Err(err) = http1::Builder::new() | ||
// .timer(TokioTimer) | ||
.serve_connection(io, service_fn(move |req| serve_req(req, state.clone()))) | ||
.await | ||
{ | ||
println!("Error serving connection: {:?}", err); | ||
} | ||
}); | ||
} | ||
} |