From 3c6e4e575722e192bf99a4059c8eb47f50bce19e Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Fri, 24 Nov 2023 17:57:33 +0900 Subject: [PATCH] wip: implemented backend --- rpxy-lib/Cargo.toml | 26 +- rpxy-lib/src/backend/backend_main.rs | 136 +++++++++ .../backend/load_balance/load_balance_main.rs | 135 +++++++++ .../load_balance/load_balance_sticky.rs | 137 +++++++++ rpxy-lib/src/backend/load_balance/mod.rs | 41 +++ .../src/backend/load_balance/sticky_cookie.rs | 205 ++++++++++++++ rpxy-lib/src/backend/mod.rs | 14 + rpxy-lib/src/backend/upstream.rs | 266 ++++++++++++++++++ rpxy-lib/src/backend/upstream_opts.rs | 22 ++ rpxy-lib/src/error.rs | 11 + rpxy-lib/src/lib.rs | 6 +- rpxy-lib/src/name_exp.rs | 160 +++++++++++ rpxy-lib/src/proxy/crypto_service.rs | 0 rpxy-lib/src/proxy/mod.rs | 1 + rpxy-lib/src/proxy/proxy_main.rs | 30 +- rpxy-lib/src/proxy/proxy_tls.rs | 6 + 16 files changed, 1173 insertions(+), 23 deletions(-) create mode 100644 rpxy-lib/src/backend/backend_main.rs create mode 100644 rpxy-lib/src/backend/load_balance/load_balance_main.rs create mode 100644 rpxy-lib/src/backend/load_balance/load_balance_sticky.rs create mode 100644 rpxy-lib/src/backend/load_balance/mod.rs create mode 100644 rpxy-lib/src/backend/load_balance/sticky_cookie.rs create mode 100644 rpxy-lib/src/backend/mod.rs create mode 100644 rpxy-lib/src/backend/upstream.rs create mode 100644 rpxy-lib/src/backend/upstream_opts.rs create mode 100644 rpxy-lib/src/name_exp.rs create mode 100644 rpxy-lib/src/proxy/crypto_service.rs create mode 100644 rpxy-lib/src/proxy/proxy_tls.rs diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index cb6b5b6c..2de993de 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -15,15 +15,15 @@ publish = false default = ["http3-s2n", "sticky-cookie", "cache"] http3-quinn = ["socket2"] #"quinn", "h3", "h3-quinn", ] http3-s2n = [] #"h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] -sticky-cookie = [] #"base64", "sha2", "chrono"] +sticky-cookie = ["base64", "sha2", "chrono"] cache = [] #"http-cache-semantics", "lru"] native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] -# rand = "0.8.5" -# rustc-hash = "1.1.0" +rand = "0.8.5" +rustc-hash = "1.1.0" # bytes = "1.5.0" -# derive_builder = "0.12.0" +derive_builder = "0.12.0" futures = { version = "0.3.29", features = ["alloc", "async-await"] } tokio = { version = "1.34.0", default-features = false, features = [ "net", @@ -34,13 +34,13 @@ tokio = { version = "1.34.0", default-features = false, features = [ "fs", ] } async-trait = "0.1.74" -# hot_reload = "0.1.4" # reloading certs # Error handling anyhow = "1.0.75" thiserror = "1.0.50" # http and tls +hot_reload = "0.1.4" # reloading certs http = "1.0.0" # http-body-util = "0.1.0" hyper = { version = "1.0.1", default-features = false } @@ -75,14 +75,14 @@ socket2 = { version = "0.5.5", features = ["all"], optional = true } # http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } # lru = { version = "0.12.0", optional = true } -# # cookie handling for sticky cookie -# chrono = { version = "0.4.31", default-features = false, features = [ -# "unstable-locales", -# "alloc", -# "clock", -# ], optional = true } -# base64 = { version = "0.21.5", optional = true } -# sha2 = { version = "0.10.8", default-features = false, optional = true } +# cookie handling for sticky cookie +chrono = { version = "0.4.31", default-features = false, features = [ + "unstable-locales", + "alloc", + "clock", +], optional = true } +base64 = { version = "0.21.5", optional = true } +sha2 = { version = "0.10.8", default-features = false, optional = true } # [dev-dependencies] diff --git a/rpxy-lib/src/backend/backend_main.rs b/rpxy-lib/src/backend/backend_main.rs new file mode 100644 index 00000000..695a0631 --- /dev/null +++ b/rpxy-lib/src/backend/backend_main.rs @@ -0,0 +1,136 @@ +use crate::{ + certs::CryptoSource, + error::*, + log::*, + name_exp::{ByteName, ServerName}, + AppConfig, AppConfigList, +}; +use derive_builder::Builder; +use rustc_hash::FxHashMap as HashMap; +use std::borrow::Cow; + +use super::upstream::PathManager; + +/// Struct serving information to route incoming connections, like server name to be handled and tls certs/keys settings. +#[derive(Builder)] +pub struct BackendApp +where + T: CryptoSource, +{ + #[builder(setter(into))] + /// backend application name, e.g., app1 + pub app_name: String, + #[builder(setter(custom))] + /// server name, e.g., example.com, in [[ServerName]] object + pub server_name: ServerName, + /// struct of reverse proxy serving incoming request + pub path_manager: PathManager, + /// tls settings: https redirection with 30x + #[builder(default)] + pub https_redirection: Option, + /// TLS settings: source meta for server cert, key, client ca cert + #[builder(default)] + pub crypto_source: Option, +} +impl<'a, T> BackendAppBuilder +where + T: CryptoSource, +{ + pub fn server_name(&mut self, server_name: impl Into>) -> &mut Self { + self.server_name = Some(server_name.to_server_name()); + self + } +} + +/// HashMap and some meta information for multiple Backend structs. +pub struct BackendAppManager +where + T: CryptoSource, +{ + /// HashMap of Backend structs, key is server name + pub apps: HashMap>, + /// for plaintext http + pub default_server_name: Option, +} + +impl Default for BackendAppManager +where + T: CryptoSource, +{ + fn default() -> Self { + Self { + apps: HashMap::>::default(), + default_server_name: None, + } + } +} + +impl TryFrom<&AppConfig> for BackendApp +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_from(app_config: &AppConfig) -> Result { + let mut backend_builder = BackendAppBuilder::default(); + let path_manager = PathManager::try_from(app_config)?; + backend_builder + .app_name(app_config.app_name.clone()) + .server_name(app_config.server_name.clone()) + .path_manager(path_manager); + // TLS settings and build backend instance + let backend = if app_config.tls.is_none() { + backend_builder.build()? + } else { + let tls = app_config.tls.as_ref().unwrap(); + backend_builder + .https_redirection(Some(tls.https_redirection)) + .crypto_source(Some(tls.inner.clone())) + .build()? + }; + Ok(backend) + } +} + +impl TryFrom<&AppConfigList> for BackendAppManager +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_from(config_list: &AppConfigList) -> Result { + let mut manager = Self::default(); + for app_config in config_list.inner.iter() { + let backend: BackendApp = BackendApp::try_from(app_config)?; + manager + .apps + .insert(app_config.server_name.clone().to_server_name(), backend); + + info!( + "Registering application {} ({})", + &app_config.server_name, &app_config.app_name + ); + } + + // default backend application for plaintext http requests + if let Some(default_app_name) = &config_list.default_app { + let default_server_name = manager + .apps + .iter() + .filter(|(_k, v)| &v.app_name == default_app_name) + .map(|(_, v)| v.server_name.clone()) + .collect::>(); + + if !default_server_name.is_empty() { + info!( + "Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).", + &default_app_name, + (&default_server_name[0]).try_into().unwrap_or_else(|_| "".to_string()) + ); + + manager.default_server_name = Some(default_server_name[0].clone()); + } + } + Ok(manager) + } +} diff --git a/rpxy-lib/src/backend/load_balance/load_balance_main.rs b/rpxy-lib/src/backend/load_balance/load_balance_main.rs new file mode 100644 index 00000000..8ee1600b --- /dev/null +++ b/rpxy-lib/src/backend/load_balance/load_balance_main.rs @@ -0,0 +1,135 @@ +#[cfg(feature = "sticky-cookie")] +pub use super::{ + load_balance_sticky::{LoadBalanceSticky, LoadBalanceStickyBuilder}, + sticky_cookie::StickyCookie, +}; +use derive_builder::Builder; +use rand::Rng; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +/// Constants to specify a load balance option +pub mod load_balance_options { + pub const FIX_TO_FIRST: &str = "none"; + pub const ROUND_ROBIN: &str = "round_robin"; + pub const RANDOM: &str = "random"; + #[cfg(feature = "sticky-cookie")] + pub const STICKY_ROUND_ROBIN: &str = "sticky"; +} + +#[derive(Debug, Clone)] +/// Pointer to upstream serving the incoming request. +/// If 'sticky cookie'-based LB is enabled and cookie must be updated/created, the new cookie is also given. +pub struct PointerToUpstream { + pub ptr: usize, + pub context: Option, +} +/// Trait for LB +pub(super) trait LoadBalanceWithPointer { + fn get_ptr(&self, req_info: Option<&LoadBalanceContext>) -> PointerToUpstream; +} + +#[derive(Debug, Clone, Builder)] +/// Round Robin LB object as a pointer to the current serving upstream destination +pub struct LoadBalanceRoundRobin { + #[builder(default)] + /// Pointer to the index of the last served upstream destination + ptr: Arc, + #[builder(setter(custom), default)] + /// Number of upstream destinations + num_upstreams: usize, +} +impl LoadBalanceRoundRobinBuilder { + pub fn num_upstreams(&mut self, v: &usize) -> &mut Self { + self.num_upstreams = Some(*v); + self + } +} +impl LoadBalanceWithPointer for LoadBalanceRoundRobin { + /// Increment the count of upstream served up to the max value + fn get_ptr(&self, _info: Option<&LoadBalanceContext>) -> PointerToUpstream { + // Get a current count of upstream served + let current_ptr = self.ptr.load(Ordering::Relaxed); + + let ptr = if current_ptr < self.num_upstreams - 1 { + self.ptr.fetch_add(1, Ordering::Relaxed) + } else { + // Clear the counter + self.ptr.fetch_and(0, Ordering::Relaxed) + }; + PointerToUpstream { ptr, context: None } + } +} + +#[derive(Debug, Clone, Builder)] +/// Random LB object to keep the object of random pools +pub struct LoadBalanceRandom { + #[builder(setter(custom), default)] + /// Number of upstream destinations + num_upstreams: usize, +} +impl LoadBalanceRandomBuilder { + pub fn num_upstreams(&mut self, v: &usize) -> &mut Self { + self.num_upstreams = Some(*v); + self + } +} +impl LoadBalanceWithPointer for LoadBalanceRandom { + /// Returns the random index within the range + fn get_ptr(&self, _info: Option<&LoadBalanceContext>) -> PointerToUpstream { + let mut rng = rand::thread_rng(); + let ptr = rng.gen_range(0..self.num_upstreams); + PointerToUpstream { ptr, context: None } + } +} + +#[derive(Debug, Clone)] +/// Load Balancing Option +pub enum LoadBalance { + /// Fix to the first upstream. Use if only one upstream destination is specified + FixToFirst, + /// Randomly chose one upstream server + Random(LoadBalanceRandom), + /// Simple round robin without session persistance + RoundRobin(LoadBalanceRoundRobin), + #[cfg(feature = "sticky-cookie")] + /// Round robin with session persistance using cookie + StickyRoundRobin(LoadBalanceSticky), +} +impl Default for LoadBalance { + fn default() -> Self { + Self::FixToFirst + } +} + +impl LoadBalance { + /// Get the index of the upstream serving the incoming request + pub fn get_context(&self, _context_to_lb: &Option) -> PointerToUpstream { + match self { + LoadBalance::FixToFirst => PointerToUpstream { + ptr: 0usize, + context: None, + }, + LoadBalance::RoundRobin(ptr) => ptr.get_ptr(None), + LoadBalance::Random(ptr) => ptr.get_ptr(None), + #[cfg(feature = "sticky-cookie")] + LoadBalance::StickyRoundRobin(ptr) => { + // Generate new context if sticky round robin is enabled. + ptr.get_ptr(_context_to_lb.as_ref()) + } + } + } +} + +#[derive(Debug, Clone)] +/// Struct to handle the sticky cookie string, +/// - passed from Rp module (http handler) to LB module, manipulated from req, only StickyCookieValue exists. +/// - passed from LB module to Rp module (http handler), will be inserted into res, StickyCookieValue and Info exist. +pub struct LoadBalanceContext { + #[cfg(feature = "sticky-cookie")] + pub sticky_cookie: StickyCookie, + #[cfg(not(feature = "sticky-cookie"))] + pub sticky_cookie: (), +} diff --git a/rpxy-lib/src/backend/load_balance/load_balance_sticky.rs b/rpxy-lib/src/backend/load_balance/load_balance_sticky.rs new file mode 100644 index 00000000..d7a97953 --- /dev/null +++ b/rpxy-lib/src/backend/load_balance/load_balance_sticky.rs @@ -0,0 +1,137 @@ +use super::{ + load_balance_main::{LoadBalanceContext, LoadBalanceWithPointer, PointerToUpstream}, + sticky_cookie::StickyCookieConfig, + Upstream, +}; +use crate::{constants::STICKY_COOKIE_NAME, log::*}; +use derive_builder::Builder; +use rustc_hash::FxHashMap as HashMap; +use std::{ + borrow::Cow, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +#[derive(Debug, Clone, Builder)] +/// Round Robin LB object in the sticky cookie manner +pub struct LoadBalanceSticky { + #[builder(default)] + /// Pointer to the index of the last served upstream destination + ptr: Arc, + #[builder(setter(custom), default)] + /// Number of upstream destinations + num_upstreams: usize, + #[builder(setter(custom))] + /// Information to build the cookie to stick clients to specific backends + pub sticky_config: StickyCookieConfig, + #[builder(setter(custom))] + /// Hashmaps: + /// - Hashmap that maps server indices to server id (string) + /// - Hashmap that maps server ids (string) to server indices, for fast reverse lookup + upstream_maps: UpstreamMap, +} +#[derive(Debug, Clone)] +pub struct UpstreamMap { + /// Hashmap that maps server indices to server id (string) + upstream_index_map: Vec, + /// Hashmap that maps server ids (string) to server indices, for fast reverse lookup + upstream_id_map: HashMap, +} +impl LoadBalanceStickyBuilder { + /// Set the number of upstream destinations + pub fn num_upstreams(&mut self, v: &usize) -> &mut Self { + self.num_upstreams = Some(*v); + self + } + /// Set the information to build the cookie to stick clients to specific backends + pub fn sticky_config(&mut self, server_name: &str, path_opt: &Option) -> &mut Self { + self.sticky_config = Some(StickyCookieConfig { + name: STICKY_COOKIE_NAME.to_string(), // TODO: config等で変更できるように + domain: server_name.to_ascii_lowercase(), + path: if let Some(v) = path_opt { + v.to_ascii_lowercase() + } else { + "/".to_string() + }, + duration: 300, // TODO: config等で変更できるように + }); + self + } + /// Set the hashmaps: upstream_index_map and upstream_id_map + pub fn upstream_maps(&mut self, upstream_vec: &[Upstream]) -> &mut Self { + let upstream_index_map: Vec = upstream_vec + .iter() + .enumerate() + .map(|(i, v)| v.calculate_id_with_index(i)) + .collect(); + let mut upstream_id_map = HashMap::default(); + for (i, v) in upstream_index_map.iter().enumerate() { + upstream_id_map.insert(v.to_string(), i); + } + self.upstream_maps = Some(UpstreamMap { + upstream_index_map, + upstream_id_map, + }); + self + } +} +impl<'a> LoadBalanceSticky { + /// Increment the count of upstream served up to the max value + fn simple_increment_ptr(&self) -> usize { + // Get a current count of upstream served + let current_ptr = self.ptr.load(Ordering::Relaxed); + + if current_ptr < self.num_upstreams - 1 { + self.ptr.fetch_add(1, Ordering::Relaxed) + } else { + // Clear the counter + self.ptr.fetch_and(0, Ordering::Relaxed) + } + } + /// This is always called only internally. So 'unwrap()' is executed. + fn get_server_id_from_index(&self, index: usize) -> String { + self.upstream_maps.upstream_index_map.get(index).unwrap().to_owned() + } + /// This function takes value passed from outside. So 'result' is used. + fn get_server_index_from_id(&self, id: impl Into>) -> Option { + let id_str = id.into().to_string(); + self.upstream_maps.upstream_id_map.get(&id_str).map(|v| v.to_owned()) + } +} +impl LoadBalanceWithPointer for LoadBalanceSticky { + /// Get the pointer to the upstream server to serve the incoming request. + fn get_ptr(&self, req_info: Option<&LoadBalanceContext>) -> PointerToUpstream { + // If given context is None or invalid (not contained), get_ptr() is invoked to increment the pointer. + // Otherwise, get the server index indicated by the server_id inside the cookie + let ptr = match req_info { + None => { + debug!("No sticky cookie"); + self.simple_increment_ptr() + } + Some(context) => { + let server_id = &context.sticky_cookie.value.value; + if let Some(server_index) = self.get_server_index_from_id(server_id) { + debug!("Valid sticky cookie: id={}, index={}", server_id, server_index); + server_index + } else { + debug!("Invalid sticky cookie: id={}", server_id); + self.simple_increment_ptr() + } + } + }; + + // Get the server id from the ptr. + // TODO: This should be simplified and optimized if ptr is not changed (id value exists in cookie). + let upstream_id = self.get_server_id_from_index(ptr); + let new_cookie = self.sticky_config.build_sticky_cookie(upstream_id).unwrap(); + let new_context = Some(LoadBalanceContext { + sticky_cookie: new_cookie, + }); + PointerToUpstream { + ptr, + context: new_context, + } + } +} diff --git a/rpxy-lib/src/backend/load_balance/mod.rs b/rpxy-lib/src/backend/load_balance/mod.rs new file mode 100644 index 00000000..d8765172 --- /dev/null +++ b/rpxy-lib/src/backend/load_balance/mod.rs @@ -0,0 +1,41 @@ +mod load_balance_main; +#[cfg(feature = "sticky-cookie")] +mod load_balance_sticky; +#[cfg(feature = "sticky-cookie")] +mod sticky_cookie; + +use super::upstream::Upstream; +use thiserror::Error; + +pub use load_balance_main::{ + load_balance_options, LoadBalance, LoadBalanceContext, LoadBalanceRandomBuilder, LoadBalanceRoundRobinBuilder, +}; +#[cfg(feature = "sticky-cookie")] +pub use load_balance_sticky::LoadBalanceStickyBuilder; + +/// Result type for load balancing +type LoadBalanceResult = std::result::Result; +/// Describes things that can go wrong in the Load Balance +#[derive(Debug, Error)] +pub enum LoadBalanceError { + // backend load balance errors + #[cfg(feature = "sticky-cookie")] + #[error("Failed to cookie conversion to/from string")] + FailedToConversionStickyCookie, + + #[cfg(feature = "sticky-cookie")] + #[error("Invalid cookie structure")] + InvalidStickyCookieStructure, + + #[cfg(feature = "sticky-cookie")] + #[error("No sticky cookie value")] + NoStickyCookieValue, + + #[cfg(feature = "sticky-cookie")] + #[error("Failed to cookie conversion into string: no meta information")] + NoStickyCookieNoMetaInfo, + + #[cfg(feature = "sticky-cookie")] + #[error("Failed to build sticky cookie from config")] + FailedToBuildStickyCookie, +} diff --git a/rpxy-lib/src/backend/load_balance/sticky_cookie.rs b/rpxy-lib/src/backend/load_balance/sticky_cookie.rs new file mode 100644 index 00000000..28572b50 --- /dev/null +++ b/rpxy-lib/src/backend/load_balance/sticky_cookie.rs @@ -0,0 +1,205 @@ +use super::{LoadBalanceError, LoadBalanceResult}; +use chrono::{TimeZone, Utc}; +use derive_builder::Builder; +use std::borrow::Cow; + +#[derive(Debug, Clone, Builder)] +/// Cookie value only, used for COOKIE in req +pub struct StickyCookieValue { + #[builder(setter(custom))] + /// Field name indicating sticky cookie + pub name: String, + #[builder(setter(custom))] + /// Upstream server_id + pub value: String, +} +impl<'a> StickyCookieValueBuilder { + pub fn name(&mut self, v: impl Into>) -> &mut Self { + self.name = Some(v.into().to_ascii_lowercase()); + self + } + pub fn value(&mut self, v: impl Into>) -> &mut Self { + self.value = Some(v.into().to_string()); + self + } +} +impl StickyCookieValue { + pub fn try_from(value: &str, expected_name: &str) -> LoadBalanceResult { + if !value.starts_with(expected_name) { + return Err(LoadBalanceError::FailedToConversionStickyCookie); + }; + let kv = value.split('=').map(|v| v.trim()).collect::>(); + if kv.len() != 2 { + return Err(LoadBalanceError::InvalidStickyCookieStructure); + }; + if kv[1].is_empty() { + return Err(LoadBalanceError::NoStickyCookieValue); + } + Ok(StickyCookieValue { + name: expected_name.to_string(), + value: kv[1].to_string(), + }) + } +} + +#[derive(Debug, Clone, Builder)] +/// Struct describing sticky cookie meta information used for SET-COOKIE in res +pub struct StickyCookieInfo { + #[builder(setter(custom))] + /// Unix time + pub expires: i64, + + #[builder(setter(custom))] + /// Domain + pub domain: String, + + #[builder(setter(custom))] + /// Path + pub path: String, +} +impl<'a> StickyCookieInfoBuilder { + pub fn domain(&mut self, v: impl Into>) -> &mut Self { + self.domain = Some(v.into().to_ascii_lowercase()); + self + } + pub fn path(&mut self, v: impl Into>) -> &mut Self { + self.path = Some(v.into().to_ascii_lowercase()); + self + } + pub fn expires(&mut self, duration_secs: i64) -> &mut Self { + let current = Utc::now().timestamp(); + self.expires = Some(current + duration_secs); + self + } +} + +#[derive(Debug, Clone, Builder)] +/// Struct describing sticky cookie +pub struct StickyCookie { + #[builder(setter(custom))] + /// Upstream server_id + pub value: StickyCookieValue, + #[builder(setter(custom), default)] + /// Upstream server_id + pub info: Option, +} + +impl<'a> StickyCookieBuilder { + /// Set the value of sticky cookie + pub fn value(&mut self, n: impl Into>, v: impl Into>) -> &mut Self { + self.value = Some(StickyCookieValueBuilder::default().name(n).value(v).build().unwrap()); + self + } + /// Set the meta information of sticky cookie + pub fn info( + &mut self, + domain: impl Into>, + path: impl Into>, + duration_secs: i64, + ) -> &mut Self { + let info = StickyCookieInfoBuilder::default() + .domain(domain) + .path(path) + .expires(duration_secs) + .build() + .unwrap(); + self.info = Some(Some(info)); + self + } +} + +impl TryInto for StickyCookie { + type Error = LoadBalanceError; + + fn try_into(self) -> LoadBalanceResult { + if self.info.is_none() { + return Err(LoadBalanceError::NoStickyCookieNoMetaInfo); + } + let info = self.info.unwrap(); + let chrono::LocalResult::Single(expires_timestamp) = Utc.timestamp_opt(info.expires, 0) else { + return Err(LoadBalanceError::FailedToConversionStickyCookie); + }; + let exp_str = expires_timestamp.format("%a, %d-%b-%Y %T GMT").to_string(); + let max_age = info.expires - Utc::now().timestamp(); + + Ok(format!( + "{}={}; expires={}; Max-Age={}; path={}; domain={}", + self.value.name, self.value.value, exp_str, max_age, info.path, info.domain + )) + } +} + +#[derive(Debug, Clone)] +/// Configuration to serve incoming requests in the manner of "sticky cookie". +/// Including a dictionary to map Ids included in cookie and upstream destinations, +/// and expiration of cookie. +/// "domain" and "path" in the cookie will be the same as the reverse proxy options. +pub struct StickyCookieConfig { + pub name: String, + pub domain: String, + pub path: String, + pub duration: i64, +} +impl<'a> StickyCookieConfig { + pub fn build_sticky_cookie(&self, v: impl Into>) -> LoadBalanceResult { + StickyCookieBuilder::default() + .value(self.name.clone(), v) + .info(&self.domain, &self.path, self.duration) + .build() + .map_err(|_| LoadBalanceError::FailedToBuildStickyCookie) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::constants::STICKY_COOKIE_NAME; + + #[test] + fn config_works() { + let config = StickyCookieConfig { + name: STICKY_COOKIE_NAME.to_string(), + domain: "example.com".to_string(), + path: "/path".to_string(), + duration: 100, + }; + let expires_unix = Utc::now().timestamp() + 100; + let sc_string: LoadBalanceResult = config.build_sticky_cookie("test_value").unwrap().try_into(); + let expires_date_string = Utc + .timestamp_opt(expires_unix, 0) + .unwrap() + .format("%a, %d-%b-%Y %T GMT") + .to_string(); + assert_eq!( + sc_string.unwrap(), + format!( + "{}=test_value; expires={}; Max-Age={}; path=/path; domain=example.com", + STICKY_COOKIE_NAME, expires_date_string, 100 + ) + ); + } + #[test] + fn to_string_works() { + let sc = StickyCookie { + value: StickyCookieValue { + name: STICKY_COOKIE_NAME.to_string(), + value: "test_value".to_string(), + }, + info: Some(StickyCookieInfo { + expires: 1686221173i64, + domain: "example.com".to_string(), + path: "/path".to_string(), + }), + }; + let sc_string: LoadBalanceResult = sc.try_into(); + let max_age = 1686221173i64 - Utc::now().timestamp(); + assert!(sc_string.is_ok()); + assert_eq!( + sc_string.unwrap(), + format!( + "{}=test_value; expires=Thu, 08-Jun-2023 10:46:13 GMT; Max-Age={}; path=/path; domain=example.com", + STICKY_COOKIE_NAME, max_age + ) + ); + } +} diff --git a/rpxy-lib/src/backend/mod.rs b/rpxy-lib/src/backend/mod.rs new file mode 100644 index 00000000..68f97a85 --- /dev/null +++ b/rpxy-lib/src/backend/mod.rs @@ -0,0 +1,14 @@ +mod backend_main; +mod load_balance; +mod upstream; +mod upstream_opts; + +pub use backend_main::{BackendAppBuilderError, BackendAppManager}; +pub use upstream::Upstream; +// #[cfg(feature = "sticky-cookie")] +// pub use sticky_cookie::{StickyCookie, StickyCookieValue}; +// pub use self::{ +// load_balance::{LbContext, LoadBalance}, +// upstream::{ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder}, +// upstream_opts::UpstreamOption, +// }; diff --git a/rpxy-lib/src/backend/upstream.rs b/rpxy-lib/src/backend/upstream.rs new file mode 100644 index 00000000..91e392d7 --- /dev/null +++ b/rpxy-lib/src/backend/upstream.rs @@ -0,0 +1,266 @@ +#[cfg(feature = "sticky-cookie")] +use super::load_balance::LoadBalanceStickyBuilder; +use super::load_balance::{ + load_balance_options as lb_opts, LoadBalance, LoadBalanceContext, LoadBalanceRandomBuilder, + LoadBalanceRoundRobinBuilder, +}; +// use super::{BytesName, LbContext, PathNameBytesExp, UpstreamOption}; +use super::upstream_opts::UpstreamOption; +use crate::{ + certs::CryptoSource, + error::RpxyError, + globals::{AppConfig, UpstreamUri}, + log::*, + name_exp::{ByteName, PathName}, +}; +#[cfg(feature = "sticky-cookie")] +use base64::{engine::general_purpose, Engine as _}; +use derive_builder::Builder; +use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; +#[cfg(feature = "sticky-cookie")] +use sha2::{Digest, Sha256}; +use std::borrow::Cow; + +#[derive(Debug, Clone)] +/// Handler for given path to route incoming request to path's corresponding upstream server(s). +pub struct PathManager { + /// HashMap of upstream candidate server info, key is path name + /// TODO: HashMapでいいのかは疑問。max_by_keyでlongest prefix matchしてるのも無駄っぽいが。。。 + inner: HashMap, +} + +impl TryFrom<&AppConfig> for PathManager +where + T: CryptoSource, +{ + type Error = RpxyError; + fn try_from(app_config: &AppConfig) -> Result { + let mut inner: HashMap = HashMap::default(); + + app_config.reverse_proxy.iter().for_each(|rpc| { + let upstream_vec: Vec = rpc.upstream.iter().map(Upstream::from).collect(); + let elem = UpstreamCandidatesBuilder::default() + .upstream(&upstream_vec) + .path(&rpc.path) + .replace_path(&rpc.replace_path) + .load_balance(&rpc.load_balance, &upstream_vec, &app_config.server_name, &rpc.path) + .options(&rpc.upstream_options) + .build() + .unwrap(); + inner.insert(elem.path.clone(), elem); + }); + + if app_config.reverse_proxy.iter().filter(|rpc| rpc.path.is_none()).count() >= 2 { + error!("Multiple default reverse proxy setting"); + return Err(RpxyError::InvalidReverseProxyConfig); + } + + if !(inner.iter().all(|(_, elem)| { + !(elem.options.contains(&UpstreamOption::ForceHttp11Upstream) + && elem.options.contains(&UpstreamOption::ForceHttp2Upstream)) + })) { + error!("Either one of force_http11 or force_http2 can be enabled"); + return Err(RpxyError::InvalidUpstreamOptionSetting); + } + + Ok(PathManager { inner }) + } +} + +impl PathManager { + /// Get an appropriate upstream destinations for given path string. + /// trie使ってlongest prefix match させてもいいけどルート記述は少ないと思われるので、 + /// コスト的にこの程度で十分では。 + pub fn get<'a>(&self, path_str: impl Into>) -> Option<&UpstreamCandidates> { + let path_name = &path_str.to_path_name(); + + let matched_upstream = self + .inner + .iter() + .filter(|(route_bytes, _)| { + match path_name.starts_with(route_bytes) { + true => { + route_bytes.len() == 1 // route = '/', i.e., default + || match path_name.get(route_bytes.len()) { + None => true, // exact case + Some(p) => p == &b'/', // sub-path case + } + } + _ => false, + } + }) + .max_by_key(|(route_bytes, _)| route_bytes.len()); + if let Some((path, u)) = matched_upstream { + debug!( + "Found upstream: {:?}", + path.try_into().unwrap_or_else(|_| "".to_string()) + ); + Some(u) + } else { + None + } + } +} + +#[derive(Debug, Clone)] +/// Upstream struct just containing uri without path +pub struct Upstream { + /// Base uri without specific path + pub uri: hyper::Uri, +} +impl From<&UpstreamUri> for Upstream { + fn from(value: &UpstreamUri) -> Self { + Self { + uri: value.inner.clone(), + } + } +} +impl Upstream { + #[cfg(feature = "sticky-cookie")] + /// Hashing uri with index to avoid collision + pub fn calculate_id_with_index(&self, index: usize) -> String { + let mut hasher = Sha256::new(); + let uri_string = format!("{}&index={}", self.uri.clone(), index); + hasher.update(uri_string.as_bytes()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) + } +} +#[derive(Debug, Clone, Builder)] +/// Struct serving multiple upstream servers for, e.g., load balancing. +pub struct UpstreamCandidates { + #[builder(setter(custom))] + /// Upstream server(s) + pub inner: Vec, + + #[builder(setter(custom), default)] + /// Path like "/path" in [[PathName]] associated with the upstream server(s) + pub path: PathName, + + #[builder(setter(custom), default)] + /// Path in [[PathName]] that will be used to replace the "path" part of incoming url + pub replace_path: Option, + + #[builder(setter(custom), default)] + /// Load balancing option + pub load_balance: LoadBalance, + + #[builder(setter(custom), default)] + /// Activated upstream options defined in [[UpstreamOption]] + pub options: HashSet, +} + +impl UpstreamCandidatesBuilder { + /// Set the upstream server(s) + pub fn upstream(&mut self, upstream_vec: &[Upstream]) -> &mut Self { + self.inner = Some(upstream_vec.to_vec()); + self + } + /// Set the path like "/path" in [[PathName]] associated with the upstream server(s), default is "/" + pub fn path(&mut self, v: &Option) -> &mut Self { + let path = match v { + Some(p) => p.to_path_name(), + None => "/".to_path_name(), + }; + self.path = Some(path); + self + } + /// Set the path in [[PathName]] that will be used to replace the "path" part of incoming url + pub fn replace_path(&mut self, v: &Option) -> &mut Self { + self.replace_path = Some(v.to_owned().as_ref().map_or_else(|| None, |v| Some(v.to_path_name()))); + self + } + /// Set the load balancing option + pub fn load_balance( + &mut self, + v: &Option, + // upstream_num: &usize, + upstream_vec: &Vec, + _server_name: &str, + _path_opt: &Option, + ) -> &mut Self { + let upstream_num = &upstream_vec.len(); + let lb = if let Some(x) = v { + match x.as_str() { + lb_opts::FIX_TO_FIRST => LoadBalance::FixToFirst, + lb_opts::RANDOM => LoadBalance::Random( + LoadBalanceRandomBuilder::default() + .num_upstreams(upstream_num) + .build() + .unwrap(), + ), + lb_opts::ROUND_ROBIN => LoadBalance::RoundRobin( + LoadBalanceRoundRobinBuilder::default() + .num_upstreams(upstream_num) + .build() + .unwrap(), + ), + #[cfg(feature = "sticky-cookie")] + lb_opts::STICKY_ROUND_ROBIN => LoadBalance::StickyRoundRobin( + LoadBalanceStickyBuilder::default() + .num_upstreams(upstream_num) + .sticky_config(_server_name, _path_opt) + .upstream_maps(upstream_vec) // TODO: + .build() + .unwrap(), + ), + _ => { + error!("Specified load balancing option is invalid."); + LoadBalance::default() + } + } + } else { + LoadBalance::default() + }; + self.load_balance = Some(lb); + self + } + /// Set the activated upstream options defined in [[UpstreamOption]] + pub fn options(&mut self, v: &Option>) -> &mut Self { + let opts = if let Some(opts) = v { + opts + .iter() + .filter_map(|str| UpstreamOption::try_from(str.as_str()).ok()) + .collect::>() + } else { + Default::default() + }; + self.options = Some(opts); + self + } +} + +impl UpstreamCandidates { + /// Get an enabled option of load balancing [[LoadBalance]] + pub fn get(&self, context_to_lb: &Option) -> (Option<&Upstream>, Option) { + let pointer_to_upstream = self.load_balance.get_context(context_to_lb); + debug!("Upstream of index {} is chosen.", pointer_to_upstream.ptr); + debug!("Context to LB (Cookie in Request): {:?}", context_to_lb); + debug!( + "Context from LB (Set-Cookie in Response): {:?}", + pointer_to_upstream.context + ); + (self.inner.get(pointer_to_upstream.ptr), pointer_to_upstream.context) + } +} + +#[cfg(test)] +mod test { + #[allow(unused)] + use super::*; + + #[cfg(feature = "sticky-cookie")] + #[test] + fn calc_id_works() { + let uri = "https://www.rust-lang.org".parse::().unwrap(); + let upstream = Upstream { uri }; + assert_eq!( + "eGsjoPbactQ1eUJjafYjPT3ekYZQkaqJnHdA_FMSkgM", + upstream.calculate_id_with_index(0) + ); + assert_eq!( + "tNVXFJ9eNCT2mFgKbYq35XgH5q93QZtfU8piUiiDxVA", + upstream.calculate_id_with_index(1) + ); + } +} diff --git a/rpxy-lib/src/backend/upstream_opts.rs b/rpxy-lib/src/backend/upstream_opts.rs new file mode 100644 index 00000000..3f5fbc86 --- /dev/null +++ b/rpxy-lib/src/backend/upstream_opts.rs @@ -0,0 +1,22 @@ +use crate::error::*; + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub enum UpstreamOption { + OverrideHost, + UpgradeInsecureRequests, + ForceHttp11Upstream, + ForceHttp2Upstream, + // TODO: Adds more options for heder override +} +impl TryFrom<&str> for UpstreamOption { + type Error = RpxyError; + fn try_from(val: &str) -> RpxyResult { + match val { + "override_host" => Ok(Self::OverrideHost), + "upgrade_insecure_requests" => Ok(Self::UpgradeInsecureRequests), + "force_http11_upstream" => Ok(Self::ForceHttp11Upstream), + "force_http2_upstream" => Ok(Self::ForceHttp2Upstream), + _ => Err(RpxyError::UnsupportedUpstreamOption), + } + } +} diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 7662a9d8..bc730a93 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -8,4 +8,15 @@ pub type RpxyResult = std::result::Result; pub enum RpxyError { #[error("IO error: {0}")] Io(#[from] std::io::Error), + + // backend errors + #[error("Invalid reverse proxy setting")] + InvalidReverseProxyConfig, + #[error("Invalid upstream option setting")] + InvalidUpstreamOptionSetting, + #[error("Failed to build backend app")] + FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError), + + #[error("Unsupported upstream option")] + UnsupportedUpstreamOption, } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index e0a9d219..28e08c09 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -1,3 +1,4 @@ +mod backend; mod certs; mod constants; mod count; @@ -5,6 +6,7 @@ mod error; mod globals; mod hyper_executor; mod log; +mod name_exp; mod proxy; use crate::{error::*, globals::Globals, log::*, proxy::Proxy}; @@ -70,8 +72,8 @@ where term_notify: term_notify.clone(), }); - // TODO: 1. build backends, and make it contained in Arc - // app_config_list: app_config_list.clone(), + // 1. build backends, and make it contained in Arc + let app_manager = Arc::new(backend::BackendAppManager::try_from(app_config_list)?); // TODO: 2. build message handler with Arc-ed http_client and backends, and make it contained in Arc as well // // build message handler including a request forwarder diff --git a/rpxy-lib/src/name_exp.rs b/rpxy-lib/src/name_exp.rs new file mode 100644 index 00000000..8ed17e29 --- /dev/null +++ b/rpxy-lib/src/name_exp.rs @@ -0,0 +1,160 @@ +use std::borrow::Cow; + +/// Server name (hostname or ip address) representation in bytes-based struct +/// for searching hashmap or key list by exact or longest-prefix matching +#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)] +pub struct ServerName { + inner: Vec, // lowercase ascii bytes +} +impl From<&str> for ServerName { + fn from(s: &str) -> Self { + let name = s.bytes().collect::>().to_ascii_lowercase(); + Self { inner: name } + } +} +impl From<&[u8]> for ServerName { + fn from(b: &[u8]) -> Self { + Self { + inner: b.to_ascii_lowercase(), + } + } +} +impl TryInto for &ServerName { + type Error = anyhow::Error; + fn try_into(self) -> Result { + let s = std::str::from_utf8(&self.inner)?; + Ok(s.to_string()) + } +} +impl AsRef<[u8]> for ServerName { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +/// Path name, like "/path/ok", represented in bytes-based struct +/// for searching hashmap or key list by exact or longest-prefix matching +#[derive(Clone, Debug, PartialEq, Eq, Hash, Default)] +pub struct PathName { + inner: Vec, // lowercase ascii bytes +} +impl From<&str> for PathName { + fn from(s: &str) -> Self { + let name = s.bytes().collect::>().to_ascii_lowercase(); + Self { inner: name } + } +} +impl From<&[u8]> for PathName { + fn from(b: &[u8]) -> Self { + Self { + inner: b.to_ascii_lowercase(), + } + } +} +impl TryInto for &PathName { + type Error = anyhow::Error; + fn try_into(self) -> Result { + let s = std::str::from_utf8(&self.inner)?; + Ok(s.to_string()) + } +} +impl AsRef<[u8]> for PathName { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} +impl PathName { + pub fn len(&self) -> usize { + self.inner.len() + } + pub fn is_empty(&self) -> bool { + self.inner.len() == 0 + } + pub fn get(&self, index: I) -> Option<&I::Output> + where + I: std::slice::SliceIndex<[u8]>, + { + self.inner.get(index) + } + pub fn starts_with(&self, needle: &Self) -> bool { + self.inner.starts_with(&needle.inner) + } +} + +/// Trait to express names in ascii-lowercased bytes +pub trait ByteName { + type OutputServer: Send + Sync + 'static; + type OutputPath; + fn to_server_name(self) -> Self::OutputServer; + fn to_path_name(self) -> Self::OutputPath; +} + +impl<'a, T: Into>> ByteName for T { + type OutputServer = ServerName; + type OutputPath = PathName; + + fn to_server_name(self) -> Self::OutputServer { + ServerName::from(self.into().as_ref()) + } + + fn to_path_name(self) -> Self::OutputPath { + PathName::from(self.into().as_ref()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn bytes_name_str_works() { + let s = "OK_string"; + let bn = s.to_path_name(); + let bn_lc = s.to_server_name(); + + assert_eq!("ok_string".as_bytes(), bn.as_ref()); + assert_eq!("ok_string".as_bytes(), bn_lc.as_ref()); + } + + #[test] + fn from_works() { + let s = "OK_string".to_server_name(); + let m = ServerName::from("OK_strinG".as_bytes()); + assert_eq!(s, m); + assert_eq!(s.as_ref(), "ok_string".as_bytes()); + assert_eq!(m.as_ref(), "ok_string".as_bytes()); + } + + #[test] + fn get_works() { + let s = "OK_str".to_path_name(); + let i = s.get(0); + assert_eq!(Some(&"o".as_bytes()[0]), i); + let i = s.get(1); + assert_eq!(Some(&"k".as_bytes()[0]), i); + let i = s.get(2); + assert_eq!(Some(&"_".as_bytes()[0]), i); + let i = s.get(3); + assert_eq!(Some(&"s".as_bytes()[0]), i); + let i = s.get(4); + assert_eq!(Some(&"t".as_bytes()[0]), i); + let i = s.get(5); + assert_eq!(Some(&"r".as_bytes()[0]), i); + let i = s.get(6); + assert_eq!(None, i); + } + + #[test] + fn start_with_works() { + let s = "OK_str".to_path_name(); + let correct = "OK".to_path_name(); + let incorrect = "KO".to_path_name(); + assert!(s.starts_with(&correct)); + assert!(!s.starts_with(&incorrect)); + } + + #[test] + fn as_ref_works() { + let s = "OK_str".to_path_name(); + assert_eq!(s.as_ref(), "ok_str".as_bytes()); + } +} diff --git a/rpxy-lib/src/proxy/crypto_service.rs b/rpxy-lib/src/proxy/crypto_service.rs new file mode 100644 index 00000000..e69de29b diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index 9b55e1da..9718cc11 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -1,5 +1,6 @@ mod proxy_main; mod socket; +mod proxy_tls; use crate::{globals::Globals, hyper_executor::LocalExecutor}; use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder}; diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 416ecac1..a024bd7d 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -1,10 +1,11 @@ use super::socket::bind_tcp_socket; -use crate::{error::RpxyResult, globals::Globals, hyper_executor::LocalExecutor, log::*}; +use crate::{error::RpxyResult, globals::Globals, log::*}; +use hot_reload::{ReloaderReceiver, ReloaderService}; use hyper_util::server::conn::auto::Builder as ConnectionBuilder; use std::{net::SocketAddr, sync::Arc}; /// Proxy main object responsible to serve requests received from clients at the given socket address. -pub(crate) struct Proxy { +pub(crate) struct Proxy { /// global context shared among async tasks pub globals: Arc, /// listen socket address @@ -15,7 +16,7 @@ pub(crate) struct Proxy { pub connection_builder: Arc>, } -impl Proxy { +impl Proxy { /// Start without TLS (HTTP cleartext) async fn start_without_tls(&self) -> RpxyResult<()> { let listener_service = async { @@ -31,14 +32,27 @@ impl Proxy { Ok(()) } + /// Start with TLS (HTTPS) + pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { + // let (cert_reloader_service, cert_reloader_rx) = ReloaderService::, ServerCryptoBase>::new( + // &self.globals.clone(), + // CERTS_WATCH_DELAY_SECS, + // !LOAD_CERTS_ONLY_WHEN_UPDATED, + // ) + // .await + // .map_err(|e| anyhow::anyhow!(e))?; + loop {} + Ok(()) + } + /// Entrypoint for HTTP/1.1, 2 and 3 servers pub async fn start(&self) -> RpxyResult<()> { let proxy_service = async { - // if self.tls_enabled { - // self.start_with_tls().await - // } else { - self.start_without_tls().await - // } + if self.tls_enabled { + self.start_with_tls().await + } else { + self.start_without_tls().await + } }; match &self.globals.term_notify { diff --git a/rpxy-lib/src/proxy/proxy_tls.rs b/rpxy-lib/src/proxy/proxy_tls.rs new file mode 100644 index 00000000..f67ad8d5 --- /dev/null +++ b/rpxy-lib/src/proxy/proxy_tls.rs @@ -0,0 +1,6 @@ +use super::proxy_main::Proxy; +use crate::{log::*, error::*}; + +impl Proxy{ + +}