From 65774ef16fadb127c2f4b902b565e61d2cda3748 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 29 Oct 2024 15:31:56 -0700 Subject: [PATCH] Request tracking, middleware changes, migration folder bug fix (#12) --- Cargo.lock | 20 +++++ Cargo.toml | 1 + ROADMAP.md | 4 +- docs/docs/controllers/middleware.md | 2 +- examples/middleware/README.md | 4 +- examples/middleware/src/main.rs | 2 +- examples/request-tracking/Cargo.toml | 11 +++ examples/request-tracking/migrations/.gitkeep | 0 examples/request-tracking/rwf.toml | 6 ++ .../request-tracking/src/controllers/mod.rs | 1 + examples/request-tracking/src/main.rs | 37 +++++++++ examples/request-tracking/src/models/mod.rs | 1 + examples/request-tracking/templates/.gitkeep | 0 examples/turbo/rwf.toml | 2 +- .../src/controllers/signup/middleware.rs | 2 +- rwf-cli/src/setup.rs | 18 +++++ rwf/Cargo.toml | 2 + rwf/src/analytics/mod.rs | 3 + rwf/src/analytics/requests.rs | 69 +++++++++++++++++ rwf/src/config.rs | 19 ++++- rwf/src/controller/auth.rs | 9 +++ rwf/src/controller/middleware/mod.rs | 75 ++++++++++++++++--- rwf/src/controller/middleware/rate_limiter.rs | 2 +- .../controller/middleware/request_tracker.rs | 71 ++++++++++++++++++ rwf/src/controller/middleware/secure_id.rs | 4 +- rwf/src/controller/mod.rs | 54 ++++++++++--- rwf/src/http/path/query.rs | 4 + rwf/src/http/request.rs | 22 +++++- rwf/src/http/response.rs | 7 ++ rwf/src/http/server.rs | 40 +++++----- rwf/src/lib.rs | 1 + rwf/src/model/migrations/bootstrap.sql | 20 ++++- rwf/src/model/migrations/mod.rs | 15 +++- rwf/src/model/mod.rs | 11 +-- rwf/src/model/value.rs | 35 ++++++++- 35 files changed, 505 insertions(+), 69 deletions(-) create mode 100644 examples/request-tracking/Cargo.toml create mode 100644 examples/request-tracking/migrations/.gitkeep create mode 100644 examples/request-tracking/rwf.toml create mode 100644 examples/request-tracking/src/controllers/mod.rs create mode 100644 examples/request-tracking/src/main.rs create mode 100644 examples/request-tracking/src/models/mod.rs create mode 100644 examples/request-tracking/templates/.gitkeep create mode 100644 rwf/src/analytics/mod.rs create mode 100644 rwf/src/analytics/requests.rs create mode 100644 rwf/src/controller/middleware/request_tracker.rs diff --git a/Cargo.lock b/Cargo.lock index 5f05f2d4..7c4672c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1184,6 +1184,7 @@ dependencies = [ "serde", "serde_json", "time", + "uuid", ] [[package]] @@ -1471,6 +1472,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "request-tracking" +version = "0.1.0" +dependencies = [ + "rand 0.8.5", + "rwf 0.1.3", + "tokio", +] + [[package]] name = "rest" version = "0.1.0" @@ -1522,6 +1532,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -2206,6 +2217,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 08ba6d0b..e2a8491e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ members = [ "rwf-macros", "rwf-tests", "examples/django", + "examples/request-tracking", ] diff --git a/ROADMAP.md b/ROADMAP.md index 3f2f2879..ca37a716 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -23,7 +23,7 @@ Rwf is brand new, but web development is ancient. Many features are missing or a ## Dynanic templates -- [ ] Better error messages, e.g. syntax errors, undefined variables, functions, etc. +- [x] Better error messages, e.g. syntax errors, undefined variables, functions, etc. - [ ] More data types support, e.g. UUIDs, timestampts, whatever Rust data types we forgot to add - [ ] More tests - [ ] Allow for extending template syntax with user-defined functions (defined at startup) @@ -56,7 +56,7 @@ Rwf is brand new, but web development is ancient. Many features are missing or a ## Built-ins - [ ] Feature flags and experiments -- [ ] Tracking (user requests) +- [x] Tracking (user requests) ## More? diff --git a/docs/docs/controllers/middleware.md b/docs/docs/controllers/middleware.md index 59866bfc..ba1bd38c 100644 --- a/docs/docs/controllers/middleware.md +++ b/docs/docs/controllers/middleware.md @@ -39,7 +39,7 @@ impl Middleware for RequiredHeaders { let header = request.headers().get(header); if header.is_none() { - return Ok(Outcome::Stop(Response::bad_request())); + return Ok(Outcome::Stop(request, Response::bad_request())); } } diff --git a/examples/middleware/README.md b/examples/middleware/README.md index 4f32eba3..bdbfb175 100644 --- a/examples/middleware/README.md +++ b/examples/middleware/README.md @@ -26,7 +26,7 @@ impl Middleware for OnlyLinuxBrowsers { } } - return Ok(Outcome::Stop(Response::redirect("https://archlinux.org"))) + return Ok(Outcome::Stop(request, Response::redirect("https://archlinux.org"))) } } ``` @@ -64,4 +64,4 @@ Middleware is evaluated in the order it's added to the middleware set. The middl ## Modifying responses -To modify responses, implement the `handle_response` method on the `Middleware` trait. See the included [request rate limiter](rwf/src/controller/middleware/rate_limiter.rs) middleware for complete example. \ No newline at end of file +To modify responses, implement the `handle_response` method on the `Middleware` trait. See the included [request rate limiter](/rwf/src/controller/middleware/rate_limiter.rs) middleware for complete example. diff --git a/examples/middleware/src/main.rs b/examples/middleware/src/main.rs index 2e08e007..ae6824a2 100644 --- a/examples/middleware/src/main.rs +++ b/examples/middleware/src/main.rs @@ -14,7 +14,7 @@ impl Middleware for BlockBadHeader { } } - Ok(Outcome::Stop(Response::bad_request())) + Ok(Outcome::Stop(request, Response::bad_request())) } } diff --git a/examples/request-tracking/Cargo.toml b/examples/request-tracking/Cargo.toml new file mode 100644 index 00000000..2295af83 --- /dev/null +++ b/examples/request-tracking/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "request-tracking" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rwf = { path = "../../rwf" } +tokio = { version = "1", features = ["full"] } +rand = "0.8" diff --git a/examples/request-tracking/migrations/.gitkeep b/examples/request-tracking/migrations/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/examples/request-tracking/rwf.toml b/examples/request-tracking/rwf.toml new file mode 100644 index 00000000..a9ef863a --- /dev/null +++ b/examples/request-tracking/rwf.toml @@ -0,0 +1,6 @@ +[general] +track_requests = true +log_queries = true + +[database] +name = "rwf_request_tracking" diff --git a/examples/request-tracking/src/controllers/mod.rs b/examples/request-tracking/src/controllers/mod.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/request-tracking/src/controllers/mod.rs @@ -0,0 +1 @@ + diff --git a/examples/request-tracking/src/main.rs b/examples/request-tracking/src/main.rs new file mode 100644 index 00000000..f7920ba0 --- /dev/null +++ b/examples/request-tracking/src/main.rs @@ -0,0 +1,37 @@ +mod controllers; +mod models; + +use rand::Rng; +use rwf::{http, prelude::*, Server}; + +#[derive(Default)] +struct Index; + +#[async_trait] +impl Controller for Index { + async fn handle(&self, _request: &Request) -> Result { + let ok = rand::thread_rng().gen::(); + + if ok { + // This is tracked. + Ok(Response::new().html(" +

All requests are tracked

+

To view requests, connect to the rwf_request_tracking database and run:

+ SELECT * FROM rwf_requests ORDER BY id + ")) + } else { + // This is tracked also. + Err(Error::HttpError(Box::new(http::Error::MissingParameter))) + } + } +} + +#[tokio::main] +async fn main() -> Result<(), http::Error> { + Logger::init(); + Migrations::migrate().await?; + + Server::new(vec![route!("/" => Index)]) + .launch("0.0.0.0:8000") + .await +} diff --git a/examples/request-tracking/src/models/mod.rs b/examples/request-tracking/src/models/mod.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/examples/request-tracking/src/models/mod.rs @@ -0,0 +1 @@ + diff --git a/examples/request-tracking/templates/.gitkeep b/examples/request-tracking/templates/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/examples/turbo/rwf.toml b/examples/turbo/rwf.toml index eb5af8ae..b5557dea 100644 --- a/examples/turbo/rwf.toml +++ b/examples/turbo/rwf.toml @@ -4,4 +4,4 @@ log_queries = true cache_templates = true [database] -name = "rwf_turbo" \ No newline at end of file +name = "rwf_turbo" diff --git a/examples/turbo/src/controllers/signup/middleware.rs b/examples/turbo/src/controllers/signup/middleware.rs index f1767d78..4a8b4023 100644 --- a/examples/turbo/src/controllers/signup/middleware.rs +++ b/examples/turbo/src/controllers/signup/middleware.rs @@ -9,7 +9,7 @@ impl Middleware for LoggedInCheck { async fn handle_request(&self, request: Request) -> Result { if let Some(session) = request.session() { if session.authenticated() { - return Ok(Outcome::Stop(Response::new().redirect("/chat"))); + return Ok(Outcome::Stop(request, Response::new().redirect("/chat"))); } } diff --git a/rwf-cli/src/setup.rs b/rwf-cli/src/setup.rs index 100f781f..23f3efde 100644 --- a/rwf-cli/src/setup.rs +++ b/rwf-cli/src/setup.rs @@ -1,5 +1,6 @@ use std::path::Path; use tokio::fs::{create_dir_all, read_to_string, File}; +use tokio::process::Command; use crate::logging::created; use rwf::colors::MaybeColorize; @@ -62,4 +63,21 @@ pub async fn setup() { break; } } + + // Add rwf dependencies + Command::new("cargo") + .arg("add") + .arg("tokio@1") + .arg("--features") + .arg("full") + .status() + .await + .unwrap(); + + Command::new("cargo") + .arg("add") + .arg("rwf") + .status() + .await + .unwrap(); } diff --git a/rwf/Cargo.toml b/rwf/Cargo.toml index 00199dc6..82e38681 100644 --- a/rwf/Cargo.toml +++ b/rwf/Cargo.toml @@ -16,6 +16,7 @@ time = { version = "0.3", features = ["formatting", "serde", "parsing"] } tokio-postgres = { version = "0.7", features = [ "with-time-0_3", "with-serde_json-1", + "with-uuid-1", ] } bytes = "1" tokio = { version = "1", features = ["full"] } @@ -38,6 +39,7 @@ sha1 = "0.10" toml = "0.8" pyo3 = { version = "0.22", features = ["auto-initialize"], optional = true } rayon = { version = "1", optional = true } +uuid = { version = "1", features = ["v4"] } [dev-dependencies] tempdir = "0.3" diff --git a/rwf/src/analytics/mod.rs b/rwf/src/analytics/mod.rs new file mode 100644 index 00000000..b3a5cc83 --- /dev/null +++ b/rwf/src/analytics/mod.rs @@ -0,0 +1,3 @@ +pub mod requests; + +pub use requests::Request; diff --git a/rwf/src/analytics/requests.rs b/rwf/src/analytics/requests.rs new file mode 100644 index 00000000..58287dcf --- /dev/null +++ b/rwf/src/analytics/requests.rs @@ -0,0 +1,69 @@ +use std::net::IpAddr; + +use crate::model::{Error, FromRow, Model, ToValue, Value}; +use time::OffsetDateTime; + +#[derive(Clone)] +pub struct Request { + id: Option, + path: String, + method: String, + query: serde_json::Value, + code: i32, + client_ip: Option, + created_at: OffsetDateTime, + duration: f32, +} + +impl FromRow for Request { + fn from_row(row: tokio_postgres::Row) -> Result { + Ok(Self { + id: row.try_get("id")?, + path: row.try_get("path")?, + method: row.try_get("method")?, + query: row.try_get("query")?, + code: row.try_get("code")?, + client_ip: row.try_get("client")?, + created_at: row.try_get("created_at")?, + duration: row.try_get("duration")?, + }) + } +} + +impl Model for Request { + fn id(&self) -> Value { + self.id.to_value() + } + + fn table_name() -> &'static str { + "rwf_requests" + } + + fn foreign_key() -> &'static str { + "rwf_request_id" + } + + fn column_names() -> &'static [&'static str] { + &[ + "path", + "method", + "query", + "code", + "client_ip", + "created_at", + "duration", + ] + } + + fn values(&self) -> Vec { + vec![ + self.path.to_value(), + self.method.to_value(), + self.query.to_value(), + self.code.to_value(), + self.client_ip.to_value(), + self.created_at.to_value(), + self.duration.to_value(), + ] + } +} diff --git a/rwf/src/config.rs b/rwf/src/config.rs index 5723eb19..2395b3a1 100644 --- a/rwf/src/config.rs +++ b/rwf/src/config.rs @@ -6,6 +6,7 @@ use std::io::IsTerminal; use std::path::{Path, PathBuf}; use time::Duration; +use crate::controller::middleware::{request_tracker::RequestTracker, Middleware}; use crate::controller::{AllowAll, AuthHandler, MiddlewareSet}; use rand::{rngs::OsRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -168,7 +169,9 @@ impl Default for Config { tty: std::io::stderr().is_terminal(), default_auth: AuthHandler::new(AllowAll {}), session_duration: Duration::days(4), - default_middleware: MiddlewareSet::default(), + default_middleware: MiddlewareSet::without_default(vec![ + RequestTracker::new().middleware() + ]), cache_templates, websocket: Websocket::default(), log_queries: var("RWF_LOG_QUERIES").is_ok(), @@ -216,6 +219,14 @@ impl Config { .database .from_config_file(&config_file.database.unwrap_or_default()); + let mut middelware = vec![]; + + if config_file.general.track_requests { + middelware.push(RequestTracker::new().middleware()); + } + + config.default_middleware = MiddlewareSet::without_default(middelware); + Ok(config) } @@ -251,6 +262,8 @@ struct General { log_queries: bool, #[serde(default = "General::default_cache_templates")] cache_templates: bool, + #[serde(default = "General::default_track_requests")] + track_requests: bool, } impl General { @@ -284,6 +297,10 @@ impl General { #[cfg(not(debug_assertions))] return true; } + + fn default_track_requests() -> bool { + false + } } #[derive(Serialize, Deserialize, Default)] diff --git a/rwf/src/controller/auth.rs b/rwf/src/controller/auth.rs index dcdce848..87c8f0a9 100644 --- a/rwf/src/controller/auth.rs +++ b/rwf/src/controller/auth.rs @@ -146,6 +146,15 @@ impl SessionId { } } +impl std::fmt::Display for SessionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SessionId::Authenticated(id) => write!(f, "{}", id), + SessionId::Guest(id) => write!(f, "{}", id), + } + } +} + impl Default for SessionId { fn default() -> Self { use rand::{distributions::Alphanumeric, thread_rng, Rng}; diff --git a/rwf/src/controller/middleware/mod.rs b/rwf/src/controller/middleware/mod.rs index b6f66028..b086b289 100644 --- a/rwf/src/controller/middleware/mod.rs +++ b/rwf/src/controller/middleware/mod.rs @@ -1,6 +1,13 @@ use super::Error; -use crate::http::{Request, Response}; +use crate::{ + colors::MaybeColorize, + config::get_config, + http::{Request, Response}, +}; use async_trait::async_trait; +use std::ops::Deref; +use std::sync::Arc; +use tracing::debug; pub mod rate_limiter; pub use rate_limiter::RateLimiter; @@ -10,6 +17,8 @@ pub mod prelude; pub mod secure_id; pub use secure_id::SecureId; +pub mod request_tracker; + /// The result of middleware processing a request. /// /// The middleware can either forward the request to the next middleware, @@ -17,7 +26,7 @@ pub use secure_id::SecureId; /// adding/removing headers or changing the body. pub enum Outcome { Forward(Request), - Stop(Response), + Stop(Request, Response), } #[async_trait] @@ -38,21 +47,32 @@ pub trait Middleware: Send + Sync { { MiddlewareHandler::new(self) } + + fn middleware_name(&self) -> &'static str { + std::any::type_name::() + } } +#[derive(Clone)] pub struct MiddlewareHandler { - middleware: Box, + middleware: Arc>, } impl MiddlewareHandler { pub fn new(middleware: impl Middleware + 'static) -> Self { Self { - middleware: Box::new(middleware), + middleware: Arc::new(Box::new(middleware)), } } async fn handle_request(&self, request: Request) -> Result { - self.middleware.handle_request(request).await + debug!( + "{} {} => {}", + "middleware".purple(), + request.path().base().purple(), + self.middleware.deref().middleware_name().green() + ); + self.middleware.deref().handle_request(request).await } async fn handle_response( @@ -60,40 +80,71 @@ impl MiddlewareHandler { request: &Request, response: Response, ) -> Result { - self.middleware.handle_response(request, response).await + debug!( + "{} {} <= {}", + "middleware".purple(), + request.path().base().purple(), + self.middleware.deref().middleware_name().green() + ); + self.middleware + .deref() + .handle_response(request, response) + .await } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct MiddlewareSet { handlers: Vec, } impl MiddlewareSet { + /// Create new middleware set, including middleware that runs by default + /// on every controller. pub fn new(handlers: Vec) -> Self { + let mut default_handlers = get_config().default_middleware.handlers(); + default_handlers.extend(handlers); + + Self { + handlers: default_handlers, + } + } + + /// Create a middleware set without the default middleware that runs on every controller. + /// Your controller will _only_ run your middleware, and included features like analytics won't work on your controller. + pub fn without_default(handlers: Vec) -> Self { Self { handlers } } - pub async fn handle_request(&self, mut request: Request) -> Result { - for middleware in &self.handlers { + pub async fn handle_request(&self, mut request: Request) -> Result<(Outcome, usize), Error> { + for (idx, middleware) in self.handlers.iter().enumerate() { match middleware.handle_request(request).await? { Outcome::Forward(req) => request = req, - Outcome::Stop(response) => return Ok(Outcome::Stop(response)), + Outcome::Stop(request, response) => { + return Ok((Outcome::Stop(request, response), idx)) + } } } - Ok(Outcome::Forward(request)) + Ok((Outcome::Forward(request), self.handlers.len())) } pub async fn handle_response( &self, request: &Request, mut response: Response, + executed: usize, ) -> Result { - for middleware in self.handlers.iter().rev() { + // Skip middleware that didn't run because the request was stopped. + let skip = self.handlers.len() - executed; + for middleware in self.handlers.iter().rev().skip(skip) { response = middleware.handle_response(request, response).await?; } Ok(response) } + + pub fn handlers(&self) -> Vec { + self.handlers.clone() + } } diff --git a/rwf/src/controller/middleware/rate_limiter.rs b/rwf/src/controller/middleware/rate_limiter.rs index 69b278bf..73c437e9 100644 --- a/rwf/src/controller/middleware/rate_limiter.rs +++ b/rwf/src/controller/middleware/rate_limiter.rs @@ -132,7 +132,7 @@ impl Middleware for RateLimiter { }; if too_many { - Ok(Outcome::Stop(Response::too_many())) + Ok(Outcome::Stop(request, Response::too_many())) } else { Ok(Outcome::Forward(request)) } diff --git a/rwf/src/controller/middleware/request_tracker.rs b/rwf/src/controller/middleware/request_tracker.rs new file mode 100644 index 00000000..59abe29e --- /dev/null +++ b/rwf/src/controller/middleware/request_tracker.rs @@ -0,0 +1,71 @@ +use time::{Duration, OffsetDateTime}; +use uuid::Uuid; + +use crate::analytics::Request as AnalyticsRequest; +use crate::controller::middleware::prelude::*; +use crate::http::CookieBuilder; +use crate::model::{Model, Pool, ToValue}; + +static COOKIE_NAME: &str = "rwf_aid"; + +pub struct RequestTracker {} + +impl RequestTracker { + pub fn new() -> Self { + Self {} + } +} + +#[crate::async_trait] +impl Middleware for RequestTracker { + async fn handle_request(&self, request: Request) -> Result { + Ok(Outcome::Forward(request)) + } + + async fn handle_response( + &self, + request: &Request, + mut response: Response, + ) -> Result { + let method = request.method().to_string(); + let path = request.path().path().to_string(); + let query = request.path().query().to_json(); + let code = response.status().code() as i32; + let duration = + ((request.received_at() - OffsetDateTime::now_utc()).as_seconds_f64() * 1000.0) as f32; + let client = request.peer().ip(); + + let client_id = match request + .cookies() + .get(COOKIE_NAME) + .map(|cookie| Uuid::parse_str(cookie.value())) + { + Some(Ok(cookie)) => cookie, + _ => Uuid::new_v4(), + }; + + let cookie = CookieBuilder::new() + .name(COOKIE_NAME) + .value(client_id.to_string()) + .max_age(Duration::weeks(4)) + .build(); + + response = response.cookie(cookie); + + if let Ok(mut conn) = Pool::connection().await { + let _ = AnalyticsRequest::create(&[ + ("method", method.to_value()), + ("path", path.to_value()), + ("query", query.to_value()), + ("client_ip", client.to_value()), + ("client_id", client_id.to_value()), + ("code", code.to_value()), + ("duration", duration.to_value()), + ]) + .execute(&mut conn) + .await; + } + + Ok(response) + } +} diff --git a/rwf/src/controller/middleware/secure_id.rs b/rwf/src/controller/middleware/secure_id.rs index a22162da..a07d228f 100644 --- a/rwf/src/controller/middleware/secure_id.rs +++ b/rwf/src/controller/middleware/secure_id.rs @@ -23,7 +23,7 @@ impl Middleware for SecureId { if let Ok(Some(id)) = id { // Block requests to a numeric ID. if self.block_unencrypted && id.chars().all(|c| c.is_numeric()) { - return Ok(Outcome::Stop(Response::not_found())); + return Ok(Outcome::Stop(request, Response::not_found())); } let path = request.path().clone(); @@ -36,7 +36,7 @@ impl Middleware for SecureId { return Ok(Outcome::Forward(request)); } else { - return Ok(Outcome::Stop(Response::not_found())); + return Ok(Outcome::Stop(request, Response::not_found())); } } diff --git a/rwf/src/controller/mod.rs b/rwf/src/controller/mod.rs index f84bb41f..4848f8ee 100644 --- a/rwf/src/controller/mod.rs +++ b/rwf/src/controller/mod.rs @@ -22,12 +22,13 @@ use super::http::{ Handler, Method, Protocol, Request, Response, Stream, ToParameter, }; use super::model::{get_connection, Insert, Model, Query, ToValue, Update, Value}; +use crate::colors::MaybeColorize; use crate::comms::Comms; use crate::config::get_config; use tokio::select; use tokio::time::{interval, timeout}; -use tracing::debug; +use tracing::{debug, error, info}; use serde::{Deserialize, Serialize}; @@ -85,19 +86,43 @@ pub trait Controller: Sync + Send { return auth.auth().denied(&request).await; } - let no_session = request.session().is_none(); - let outcome = self.middleware().handle_request(request).await?; let response = match outcome { - Outcome::Forward(request) => match self.handle(&request).await { + (Outcome::Forward(request), executed) => match self.handle(&request).await { Ok(response) => { self.middleware() - .handle_response(&request, response.from_request(&request)?) + .handle_response(&request, response.from_request(&request)?, executed) + .await? + } + Err(err) => { + error!("{}", err); + + let response = match err { + Error::HttpError(err) => match err.code() { + 400 => Response::bad_request(), + 403 => Response::forbidden(), + _ => Response::internal_error(err), + }, + + Error::ViewError(err) => Response::internal_error_pretty( + "Template error", + err.to_string().as_str(), + ), + + err => Response::internal_error(err), + }; + + // Run the middleware chain on the response anyway. + self.middleware() + .handle_response(&request, response, executed) .await? } - Err(err) => return Err(err), }, - Outcome::Stop(response) => response, + (Outcome::Stop(request, response), executed) => { + self.middleware() + .handle_response(&request, response.from_request(&request)?, executed) + .await? + } }; Ok(response) @@ -424,7 +449,12 @@ pub trait WebsocketController: Controller { return Err(Error::SessionMissingError); }; - debug!("new websocket connection from session \"{:?}\"", session_id); + info!( + "{} {} {} connected", + "websocket".purple(), + request.path().path().purple(), + self.controller_name().green(), + ); let config = get_config(); let mut stream = stream.stream(); @@ -437,7 +467,7 @@ pub trait WebsocketController: Controller { loop { select! { _ = check.tick() => { - debug!("checking websocket session \"{:?}\"", session_id); + debug!("{} check session \"{}\"", "websocket".purple(), session_id); let closed = match timeout( config.websocket.ping_timeout.unsigned_abs(), @@ -457,7 +487,9 @@ pub trait WebsocketController: Controller { message = receiver.recv() => { match message { Ok(message) => { - debug!("sending {:?} to {:?}", message, receiver.session_id()); + debug!("{} sending {:?} to session \"{}\"", + "websocket".purple(), + message, receiver.session_id()); message.send(&mut stream).await?; } @@ -475,7 +507,7 @@ pub trait WebsocketController: Controller { let frame = frame?; if frame.is_pong() { - debug!("websocket session \"{:?}\" is alive", session_id); + debug!("{} session \"{}\" is alive", "websocket".purple(), session_id); lost_pings -= 1; // Protect against weird clients. diff --git a/rwf/src/http/path/query.rs b/rwf/src/http/path/query.rs index ace581c9..e82535c5 100644 --- a/rwf/src/http/path/query.rs +++ b/rwf/src/http/path/query.rs @@ -49,6 +49,10 @@ impl Query { None => None, } } + + pub fn to_json(&self) -> serde_json::Value { + serde_json::to_value(&self.query).unwrap_or(serde_json::Value::default()) + } } impl std::fmt::Display for Query { diff --git a/rwf/src/http/request.rs b/rwf/src/http/request.rs index 9976af56..9e9afd75 100644 --- a/rwf/src/http/request.rs +++ b/rwf/src/http/request.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use serde::Deserialize; use serde_json::{Deserializer, Value}; +use time::OffsetDateTime; use tokio::io::{AsyncRead, AsyncReadExt}; use super::{Cookies, Error, FormData, FromFormData, Head, Params, Response, ToParameter}; @@ -20,12 +21,25 @@ use crate::{ /// /// The request is fully loaded into memory. It's safe to clone /// since the contents are behind an [`std::sync::Arc`]. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Request { head: Head, session: Option, inner: Arc, params: Option>, + received_at: OffsetDateTime, +} + +impl Default for Request { + fn default() -> Self { + Self { + head: Head::default(), + session: None, + inner: Arc::new(Inner::default()), + params: None, + received_at: OffsetDateTime::now_utc(), + } + } } #[derive(Debug, Default, Clone)] @@ -57,6 +71,7 @@ impl Request { peer: Some(peer), cookies, }), + received_at: OffsetDateTime::now_utc(), }) } @@ -135,6 +150,11 @@ impl Request { self.session.as_ref() } + /// When thre request was received. + pub fn received_at(&self) -> OffsetDateTime { + self.received_at + } + /// Get the session identifier. /// /// This will be a random string if it's a guest diff --git a/rwf/src/http/response.rs b/rwf/src/http/response.rs index 1439545f..8d279d7e 100644 --- a/rwf/src/http/response.rs +++ b/rwf/src/http/response.rs @@ -245,11 +245,18 @@ impl Response { &mut self.cookies } + /// Set a private (encrypted) cookie on the response. pub fn private_cookie(mut self, cookie: Cookie) -> Result { self.cookies.add_private(cookie)?; Ok(self) } + /// Set a cookie on the response. + pub fn cookie(mut self, cookie: Cookie) -> Self { + self.cookies.add(cookie); + self + } + pub fn set_session(mut self, session: Session) -> Self { self.session = Some(session); self diff --git a/rwf/src/http/server.rs b/rwf/src/http/server.rs index 95a41aaa..395de90f 100644 --- a/rwf/src/http/server.rs +++ b/rwf/src/http/server.rs @@ -5,7 +5,6 @@ //! //! The server is using Tokio, so it can support millions of concurrent clients. use super::{Error, Handler, Request, Response, Router}; -use crate::controller::Error as ControllerError; use crate::colors::MaybeColorize; @@ -97,13 +96,18 @@ impl Server { let mut stream = BufReader::new(BufWriter::new(stream)); tokio::spawn(async move { - debug!("new connection from {:?}", peer_addr); + debug!("{} new connection from {:?}", "http".purple(), peer_addr); loop { let request = match Request::read(peer_addr, &mut stream).await { Ok(request) => request, Err(err) => { - debug!("client {:?} disconnected: {:?}", peer_addr, err); + debug!( + "{} client {:?} disconnected: {:?}", + "http".purple(), + peer_addr, + err + ); return; } }; @@ -120,22 +124,7 @@ impl Server { Ok(response) => response, Err(err) => { error!("{}", err); - match err { - ControllerError::HttpError(err) => match err.code() { - 400 => Response::bad_request(), - 403 => Response::forbidden(), - _ => Response::internal_error(err), - }, - - ControllerError::ViewError(err) => { - Response::internal_error_pretty( - "Template error", - err.to_string().as_str(), - ) - } - - err => Response::internal_error(err), - } + Response::internal_error(err) } }; @@ -200,13 +189,18 @@ impl Server { } fn log(request: &Request, controller_name: &str, response: &Response, duration: Duration) { + let method = request.method().to_string(); + let path = request.path().path(); + let code = response.status().code() as i32; + let duration = (duration.as_secs_f64() * 1000.0) as f32; + info!( "{} {} {} {} ({:.3} ms)", - request.method().to_string().purple(), - request.path().path().purple(), + method.purple(), + path.purple(), controller_name.green(), - response.status().code(), - duration.as_secs_f64() * 1000.0, + code, + duration, ); } } diff --git a/rwf/src/lib.rs b/rwf/src/lib.rs index 0024a8c8..19e041bc 100644 --- a/rwf/src/lib.rs +++ b/rwf/src/lib.rs @@ -1,3 +1,4 @@ +pub mod analytics; pub mod colors; pub mod comms; pub mod config; diff --git a/rwf/src/model/migrations/bootstrap.sql b/rwf/src/model/migrations/bootstrap.sql index 425df56f..ef6b6bf8 100644 --- a/rwf/src/model/migrations/bootstrap.sql +++ b/rwf/src/model/migrations/bootstrap.sql @@ -32,4 +32,22 @@ CREATE INDEX IF NOT EXISTS rwf_jobs_runnin_idx ON rwf_jobs USING btree(start_aft AND started_at IS NOT NULL AND attempts < retries; -CREATE INDEX IF NOT EXISTS rwf_jobs_name_completed_at_idx ON rwf_jobs USING btree(name, completed_at); \ No newline at end of file +CREATE INDEX IF NOT EXISTS rwf_jobs_name_completed_at_idx ON rwf_jobs USING btree(name, completed_at); + +CREATE TABLE IF NOT EXISTS rwf_requests ( + id BIGSERIAL PRIMARY KEY, + path VARCHAR NOT NULL, + method VARCHAR NOT NULL DEFAULT 'GET', + query JSONB NOT NULL DEFAULT '{}'::jsonb, + code INTEGER NOT NULL DEFAULT 200, + client_ip INET, + client_id UUID NOT NULL DEFAULT gen_random_uuid(), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + duration REAL NOT NULL +); + +CREATE INDEX IF NOT EXISTS rwf_requests_path_created_at ON rwf_requests USING btree(created_at, path, client_id); + +CREATE INDEX IF NOT EXISTS rwf_requests_errors ON rwf_requests USING btree(created_at, code, client_id) WHERE code >= 400; + +CREATE INDEX IF NOT EXISTS rwf_requests_too_slow ON rwf_requests USING btree(created_at, duration, client_id) WHERE duration >= 1000.0; -- the unit is milliseconds diff --git a/rwf/src/model/migrations/mod.rs b/rwf/src/model/migrations/mod.rs index cb0f9b0d..d3121367 100644 --- a/rwf/src/model/migrations/mod.rs +++ b/rwf/src/model/migrations/mod.rs @@ -123,9 +123,18 @@ impl Migrations { while let Some(dir_entry) = dir_entries.next_entry().await? { let metadata = dir_entry.metadata().await?; if metadata.is_file() { - let file = MigrationFile::parse( - dir_entry.file_name().to_str().expect("migration OsString"), - )?; + let file_name = dir_entry + .file_name() + .to_str() + .expect("migration OsString") + .to_string(); + + // Skip hidden files + if file_name.starts_with(".") { + continue; + } + + let file = MigrationFile::parse(&file_name)?; let entry = checks .entry(file.name.clone()) .or_insert_with(Check::default); diff --git a/rwf/src/model/mod.rs b/rwf/src/model/mod.rs index e66d0b2f..180b6b43 100644 --- a/rwf/src/model/mod.rs +++ b/rwf/src/model/mod.rs @@ -524,7 +524,7 @@ impl Query { let result = match result { Ok(result) => result, Err(err) => { - self.log_error(); + self.log_error(&err); return Err(err); } }; @@ -542,7 +542,7 @@ impl Query { match result { Ok(rows) => Ok(rows), Err(err) => { - self.log_error(); + self.log_error(&err); Err(err) } } @@ -659,12 +659,13 @@ impl Query { ); } - fn log_error(&self) { + fn log_error(&self, err: &Error) { error!( - "{} {} {}", + "{} {} {} {}", Self::type_name().green(), self.action().purple(), - self.to_sql() + self.to_sql(), + err, ) } } diff --git a/rwf/src/model/value.rs b/rwf/src/model/value.rs index e8fbaf63..9a051198 100644 --- a/rwf/src/model/value.rs +++ b/rwf/src/model/value.rs @@ -1,8 +1,9 @@ use bytes::BytesMut; use time::{OffsetDateTime, PrimitiveDateTime}; use tokio_postgres::types::{to_sql_checked, IsNull, Type}; +use uuid::Uuid; -use std::ops::Range; +use std::{net::IpAddr, ops::Range}; use super::{Column, Error, Escape, ToSql}; @@ -28,6 +29,8 @@ pub enum Value { TimestampT(OffsetDateTime), /// Timestamp without time zone. Timestamp(PrimitiveDateTime), + IpAddr(IpAddr), + Uuid(Uuid), /// List (Postgres array) of values, e.g. `{1, 2, 3}`. List(Vec), /// Tuple (also known as "record") of values, e.g. `(1, 2, 3)`. @@ -175,6 +178,30 @@ impl ToValue for f32 { } } +impl ToValue for IpAddr { + fn to_value(&self) -> Value { + Value::IpAddr(self.clone()) + } +} + +impl ToValue for Option { + fn to_value(&self) -> Value { + Value::Optional(Box::new(self.as_ref().map(|v| v.to_value()))) + } +} + +impl ToValue for Uuid { + fn to_value(&self) -> Value { + Value::Uuid(self.clone()) + } +} + +impl ToValue for Option { + fn to_value(&self) -> Value { + Value::Optional(Box::new(self.as_ref().map(|v| v.to_value()))) + } +} + impl ToValue for Value { fn to_value(&self) -> Value { self.clone() @@ -316,6 +343,8 @@ impl tokio_postgres::types::ToSql for Value { Value::Boolean(b) => b.to_sql(ty, out), Value::TimestampT(timestamp) => timestamp.to_sql(ty, out), Value::Timestamp(timestamp) => timestamp.to_sql(ty, out), + Value::IpAddr(ip) => ip.to_sql(ty, out), + Value::Uuid(uuid) => uuid.to_sql(ty, out), Value::List(values) => values.to_sql(ty, out), Value::Json(json) => json.to_sql(ty, out), Value::Optional(value) => { @@ -350,6 +379,8 @@ impl ToSql for Value { SmallInt(integer) => integer.to_string(), Float(float) => float.to_string(), Real(float) => float.to_string(), + IpAddr(ip) => ip.to_string(), + Uuid(uuid) => uuid.to_string(), Placeholder(number) => format!("${}", number), Range((a, b)) => format!("BETWEEN {} AND {}", a.to_sql(), b.to_sql()), List(values) => format!( @@ -421,6 +452,8 @@ impl From for serde_json::Value { Value::Float(f) => serde_json::Value::Number(Number::from_f64(f).unwrap()), Value::Real(f) => serde_json::Value::Number(Number::from_f64(f as f64).unwrap()), Value::Json(json) => json, + Value::IpAddr(ip) => serde_json::Value::String(ip.to_string()), + Value::Uuid(uuid) => serde_json::Value::String(uuid.to_string()), _ => todo!("model::Value to serde_json::Value"), } }