diff --git a/Cargo.toml b/Cargo.toml index c512b187..78680885 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["rpxy-bin", "rpxy-lib"] +members = ["rpxy-bin", "rpxy-lib", "legacy-lib"] exclude = ["submodules"] resolver = "2" diff --git a/legacy-lib/Cargo.toml b/legacy-lib/Cargo.toml new file mode 100644 index 00000000..c975fb6c --- /dev/null +++ b/legacy-lib/Cargo.toml @@ -0,0 +1,89 @@ +[package] +name = "rpxy-lib-legacy" +version = "0.6.2" +authors = ["Jun Kurihara"] +homepage = "https://github.com/junkurihara/rust-rpxy" +repository = "https://github.com/junkurihara/rust-rpxy" +license = "MIT" +readme = "../README.md" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["http3-s2n", "sticky-cookie", "cache"] +http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] +http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] +sticky-cookie = ["base64", "sha2", "chrono"] +cache = ["http-cache-semantics", "lru"] +native-roots = ["hyper-rustls/native-tokio"] + +[dependencies] +rand = "0.8.5" +rustc-hash = "1.1.0" +bytes = "1.5.0" +derive_builder = "0.12.0" +futures = { version = "0.3.29", features = ["alloc", "async-await"] } +tokio = { version = "1.34.0", default-features = false, features = [ + "net", + "rt-multi-thread", + "time", + "sync", + "macros", + "fs", +] } +async-trait = "0.1.74" +hot_reload = "0.1.4" # reloading certs + +# Error handling +anyhow = "1.0.75" +thiserror = "1.0.50" + +# http and tls +http = "1.0.0" +http-body-util = "0.1.0" +hyper = { version = "1.0.1", default-features = false } +hyper-util = { version = "0.1.1", features = ["full"] } +hyper-rustls = { version = "0.24.2", default-features = false, features = [ + "tokio-runtime", + "webpki-tokio", + "http1", + "http2", +] } +tokio-rustls = { version = "0.24.1", features = ["early-data"] } +rustls = { version = "0.21.9", default-features = false } +webpki = "0.22.4" +x509-parser = "0.15.1" + +# logging +tracing = { version = "0.1.40" } + +# http/3 +quinn = { version = "0.10.2", optional = true } +h3 = { path = "../submodules/h3/h3/", optional = true } +h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true } +s2n-quic = { version = "1.31.0", default-features = false, features = [ + "provider-tls-rustls", +], optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } +s2n-quic-rustls = { version = "0.31.0", optional = true } +# for UDP socket wit SO_REUSEADDR when h3 with quinn +socket2 = { version = "0.5.5", features = ["all"], optional = true } + +# cache +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +lru = { version = "0.12.0", optional = true } + +# cookie handling for sticky cookie +chrono = { version = "0.4.31", default-features = false, features = [ + "unstable-locales", + "alloc", + "clock", +], optional = true } +base64 = { version = "0.21.5", optional = true } +sha2 = { version = "0.10.8", default-features = false, optional = true } + + +[dev-dependencies] +# http and tls diff --git a/rpxy-lib/src/backend/load_balance.rs b/legacy-lib/src/backend/load_balance.rs similarity index 100% rename from rpxy-lib/src/backend/load_balance.rs rename to legacy-lib/src/backend/load_balance.rs diff --git a/rpxy-lib/src/backend/load_balance_sticky.rs b/legacy-lib/src/backend/load_balance_sticky.rs similarity index 100% rename from rpxy-lib/src/backend/load_balance_sticky.rs rename to legacy-lib/src/backend/load_balance_sticky.rs diff --git a/rpxy-lib/src/backend/mod.rs b/legacy-lib/src/backend/mod.rs similarity index 100% rename from rpxy-lib/src/backend/mod.rs rename to legacy-lib/src/backend/mod.rs diff --git a/rpxy-lib/src/backend/sticky_cookie.rs b/legacy-lib/src/backend/sticky_cookie.rs similarity index 100% rename from rpxy-lib/src/backend/sticky_cookie.rs rename to legacy-lib/src/backend/sticky_cookie.rs diff --git a/rpxy-lib/src/backend/upstream.rs b/legacy-lib/src/backend/upstream.rs similarity index 100% rename from rpxy-lib/src/backend/upstream.rs rename to legacy-lib/src/backend/upstream.rs diff --git a/rpxy-lib/src/backend/upstream_opts.rs b/legacy-lib/src/backend/upstream_opts.rs similarity index 100% rename from rpxy-lib/src/backend/upstream_opts.rs rename to legacy-lib/src/backend/upstream_opts.rs diff --git a/legacy-lib/src/certs.rs b/legacy-lib/src/certs.rs new file mode 100644 index 00000000..c9cfafd5 --- /dev/null +++ b/legacy-lib/src/certs.rs @@ -0,0 +1,91 @@ +use async_trait::async_trait; +use rustc_hash::FxHashSet as HashSet; +use rustls::{ + sign::{any_supported_type, CertifiedKey}, + Certificate, OwnedTrustAnchor, PrivateKey, +}; +use std::io; +use x509_parser::prelude::*; + +#[async_trait] +// Trait to read certs and keys anywhere from KVS, file, sqlite, etc. +pub trait CryptoSource { + type Error; + + /// read crypto materials from source + async fn read(&self) -> Result; + + /// Returns true when mutual tls is enabled + fn is_mutual_tls(&self) -> bool; +} + +/// Certificates and private keys in rustls loaded from files +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct CertsAndKeys { + pub certs: Vec, + pub cert_keys: Vec, + pub client_ca_certs: Option>, +} + +impl CertsAndKeys { + pub fn parse_server_certs_and_keys(&self) -> Result { + // for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() { + let signing_key = self + .cert_keys + .iter() + .find_map(|k| { + if let Ok(sk) = any_supported_type(k) { + Some(sk) + } else { + None + } + }) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "Unable to find a valid certificate and key", + ) + })?; + Ok(CertifiedKey::new(self.certs.clone(), signing_key)) + } + + pub fn parse_client_ca_certs(&self) -> Result<(Vec, HashSet>), anyhow::Error> { + let certs = self.client_ca_certs.as_ref().ok_or(anyhow::anyhow!("No client cert"))?; + + let owned_trust_anchors: Vec<_> = certs + .iter() + .map(|v| { + // let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); + let trust_anchor = webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ) + }) + .collect(); + + // TODO: SKID is not used currently + let subject_key_identifiers: HashSet<_> = certs + .iter() + .filter_map(|v| { + // retrieve ca key id (subject key id) + let cert = parse_x509_certificate(&v.0).unwrap().1; + let subject_key_ids = cert + .iter_extensions() + .filter_map(|ext| match ext.parsed_extension() { + ParsedExtension::SubjectKeyIdentifier(skid) => Some(skid), + _ => None, + }) + .collect::>(); + if !subject_key_ids.is_empty() { + Some(subject_key_ids[0].0.to_owned()) + } else { + None + } + }) + .collect(); + + Ok((owned_trust_anchors, subject_key_identifiers)) + } +} diff --git a/legacy-lib/src/constants.rs b/legacy-lib/src/constants.rs new file mode 100644 index 00000000..ebec1fc0 --- /dev/null +++ b/legacy-lib/src/constants.rs @@ -0,0 +1,45 @@ +pub const RESPONSE_HEADER_SERVER: &str = "rpxy"; +// pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; +// pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +pub const TCP_LISTEN_BACKLOG: u32 = 1024; +// pub const HTTP_LISTEN_PORT: u16 = 8080; +// pub const HTTPS_LISTEN_PORT: u16 = 8443; +pub const PROXY_TIMEOUT_SEC: u64 = 60; +pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; +pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser +pub const MAX_CLIENTS: usize = 512; +pub const MAX_CONCURRENT_STREAMS: u32 = 64; +pub const CERTS_WATCH_DELAY_SECS: u32 = 60; +pub const LOAD_CERTS_ONLY_WHEN_UPDATED: bool = true; + +// #[cfg(feature = "http3")] +// pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB +// #[cfg(feature = "http3")] +// pub const H3_REQUEST_BUF_SIZE: usize = 65_536; // 64KB // handled by quinn + +#[allow(non_snake_case)] +#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] +pub mod H3 { + pub const ALT_SVC_MAX_AGE: u32 = 3600; + pub const REQUEST_MAX_BODY_SIZE: usize = 268_435_456; // 256MB + pub const MAX_CONCURRENT_CONNECTIONS: u32 = 4096; + pub const MAX_CONCURRENT_BIDISTREAM: u32 = 64; + pub const MAX_CONCURRENT_UNISTREAM: u32 = 64; + pub const MAX_IDLE_TIMEOUT: u64 = 10; // secs +} + +#[cfg(feature = "sticky-cookie")] +/// For load-balancing with sticky cookie +pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; + +#[cfg(feature = "cache")] +// # of entries in cache +pub const MAX_CACHE_ENTRY: usize = 1_000; +#[cfg(feature = "cache")] +// max size for each file in bytes +pub const MAX_CACHE_EACH_SIZE: usize = 65_535; +#[cfg(feature = "cache")] +// on memory cache if less than or equel to +pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; + +// TODO: max cache size in total diff --git a/legacy-lib/src/error.rs b/legacy-lib/src/error.rs new file mode 100644 index 00000000..c672682d --- /dev/null +++ b/legacy-lib/src/error.rs @@ -0,0 +1,86 @@ +pub use anyhow::{anyhow, bail, ensure, Context}; +use std::io; +use thiserror::Error; + +pub type Result = std::result::Result; + +/// Describes things that can go wrong in the Rpxy +#[derive(Debug, Error)] +pub enum RpxyError { + #[error("Proxy build error: {0}")] + ProxyBuild(#[from] crate::proxy::ProxyBuilderError), + + #[error("Backend build error: {0}")] + BackendBuild(#[from] crate::backend::BackendBuilderError), + + #[error("MessageHandler build error: {0}")] + HandlerBuild(#[from] crate::handler::HttpMessageHandlerBuilderError), + + #[error("Config builder error: {0}")] + ConfigBuild(&'static str), + + #[error("Http Message Handler Error: {0}")] + Handler(&'static str), + + #[error("Cache Error: {0}")] + Cache(&'static str), + + #[error("Http Request Message Error: {0}")] + Request(&'static str), + + #[error("TCP/UDP Proxy Layer Error: {0}")] + Proxy(String), + + #[allow(unused)] + #[error("LoadBalance Layer Error: {0}")] + LoadBalance(String), + + #[error("I/O Error: {0}")] + Io(#[from] io::Error), + + // #[error("Toml Deserialization Error")] + // TomlDe(#[from] toml::de::Error), + #[cfg(feature = "http3-quinn")] + #[error("Quic Connection Error [quinn]: {0}")] + QuicConn(#[from] quinn::ConnectionError), + + #[cfg(feature = "http3-s2n")] + #[error("Quic Connection Error [s2n-quic]: {0}")] + QUicConn(#[from] s2n_quic::connection::Error), + + #[cfg(feature = "http3-quinn")] + #[error("H3 Error [quinn]: {0}")] + H3(#[from] h3::Error), + + #[cfg(feature = "http3-s2n")] + #[error("H3 Error [s2n-quic]: {0}")] + H3(#[from] s2n_quic_h3::h3::Error), + + #[error("rustls Connection Error: {0}")] + Rustls(#[from] rustls::Error), + + #[error("Hyper Error: {0}")] + Hyper(#[from] hyper::Error), + + #[error("Hyper Http Error: {0}")] + HyperHttp(#[from] hyper::http::Error), + + #[error("Hyper Http HeaderValue Error: {0}")] + HyperHeaderValue(#[from] hyper::header::InvalidHeaderValue), + + #[error("Hyper Http HeaderName Error: {0}")] + HyperHeaderName(#[from] hyper::header::InvalidHeaderName), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +#[allow(dead_code)] +#[derive(Debug, Error, Clone)] +pub enum ClientCertsError { + #[error("TLS Client Certificate is Required for Given SNI: {0}")] + ClientCertRequired(String), + + #[error("Inconsistent TLS Client Certificate for Given SNI: {0}")] + InconsistentClientCert(String), +} diff --git a/legacy-lib/src/globals.rs b/legacy-lib/src/globals.rs new file mode 100644 index 00000000..02605a60 --- /dev/null +++ b/legacy-lib/src/globals.rs @@ -0,0 +1,325 @@ +use crate::{ + backend::{ + Backend, BackendBuilder, Backends, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption, + }, + certs::CryptoSource, + constants::*, + error::RpxyError, + log::*, + utils::{BytesName, PathNameBytesExp}, +}; +use rustc_hash::FxHashMap as HashMap; +use std::net::SocketAddr; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use tokio::time::Duration; + +/// Global object containing proxy configurations and shared object like counters. +/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. +pub struct Globals +where + T: CryptoSource, +{ + /// Configuration parameters for proxy transport and request handlers + pub proxy_config: ProxyConfig, // TODO: proxy configはarcに包んでこいつだけ使いまわせばいいように変えていく。backendsも? + + /// Backend application objects to which http request handler forward incoming requests + pub backends: Backends, + + /// Shared context - Counter for serving requests + pub request_count: RequestCount, + + /// Shared context - Async task runtime handler + pub runtime_handle: tokio::runtime::Handle, + + /// Shared context - Notify object to stop async tasks + pub term_notify: Option>, +} + +/// Configuration parameters for proxy transport and request handlers +#[derive(PartialEq, Eq, Clone)] +pub struct ProxyConfig { + pub listen_sockets: Vec, // when instantiate server + pub http_port: Option, // when instantiate server + pub https_port: Option, // when instantiate server + pub tcp_listen_backlog: u32, // when instantiate server + + pub proxy_timeout: Duration, // when serving requests at Proxy + pub upstream_timeout: Duration, // when serving requests at Handler + + pub max_clients: usize, // when serving requests + pub max_concurrent_streams: u32, // when instantiate server + pub keepalive: bool, // when instantiate server + + // experimentals + pub sni_consistency: bool, // Handler + + #[cfg(feature = "cache")] + pub cache_enabled: bool, + #[cfg(feature = "cache")] + pub cache_dir: Option, + #[cfg(feature = "cache")] + pub cache_max_entry: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size_on_memory: usize, + + // All need to make packet acceptor + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub http3: bool, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_alt_svc_max_age: u32, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_request_max_body_size: usize, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_max_concurrent_bidistream: u32, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_max_concurrent_unistream: u32, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_max_concurrent_connections: u32, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + pub h3_max_idle_timeout: Option, +} + +impl Default for ProxyConfig { + fn default() -> Self { + Self { + listen_sockets: Vec::new(), + http_port: None, + https_port: None, + tcp_listen_backlog: TCP_LISTEN_BACKLOG, + + // TODO: Reconsider each timeout values + proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), + upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), + + max_clients: MAX_CLIENTS, + max_concurrent_streams: MAX_CONCURRENT_STREAMS, + keepalive: true, + + sni_consistency: true, + + #[cfg(feature = "cache")] + cache_enabled: false, + #[cfg(feature = "cache")] + cache_dir: None, + #[cfg(feature = "cache")] + cache_max_entry: MAX_CACHE_ENTRY, + #[cfg(feature = "cache")] + cache_max_each_size: MAX_CACHE_EACH_SIZE, + #[cfg(feature = "cache")] + cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY, + + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + http3: false, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_alt_svc_max_age: H3::ALT_SVC_MAX_AGE, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_request_max_body_size: H3::REQUEST_MAX_BODY_SIZE, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_max_concurrent_connections: H3::MAX_CONCURRENT_CONNECTIONS, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_max_concurrent_bidistream: H3::MAX_CONCURRENT_BIDISTREAM, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_max_concurrent_unistream: H3::MAX_CONCURRENT_UNISTREAM, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + h3_max_idle_timeout: Some(Duration::from_secs(H3::MAX_IDLE_TIMEOUT)), + } + } +} + +/// Configuration parameters for backend applications +#[derive(PartialEq, Eq, Clone)] +pub struct AppConfigList +where + T: CryptoSource, +{ + pub inner: Vec>, + pub default_app: Option, +} +impl TryInto> for AppConfigList +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result, Self::Error> { + let mut backends = Backends::new(); + for app_config in self.inner.iter() { + let backend = app_config.try_into()?; + backends + .apps + .insert(app_config.server_name.clone().to_server_name_vec(), backend); + info!( + "Registering application {} ({})", + &app_config.server_name, &app_config.app_name + ); + } + + // default backend application for plaintext http requests + if let Some(d) = self.default_app { + let d_sn: Vec<&str> = backends + .apps + .iter() + .filter(|(_k, v)| v.app_name == d) + .map(|(_, v)| v.server_name.as_ref()) + .collect(); + if !d_sn.is_empty() { + info!( + "Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).", + d, d_sn[0] + ); + backends.default_server_name_bytes = Some(d_sn[0].to_server_name_vec()); + } + } + Ok(backends) + } +} + +/// Configuration parameters for single backend application +#[derive(PartialEq, Eq, Clone)] +pub struct AppConfig +where + T: CryptoSource, +{ + pub app_name: String, + pub server_name: String, + pub reverse_proxy: Vec, + pub tls: Option>, +} +impl TryInto> for &AppConfig +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result, Self::Error> { + // backend builder + let mut backend_builder = BackendBuilder::default(); + // reverse proxy settings + let reverse_proxy = self.try_into()?; + + backend_builder + .app_name(self.app_name.clone()) + .server_name(self.server_name.clone()) + .reverse_proxy(reverse_proxy); + + // TLS settings and build backend instance + let backend = if self.tls.is_none() { + backend_builder.build().map_err(RpxyError::BackendBuild)? + } else { + let tls = self.tls.as_ref().unwrap(); + + backend_builder + .https_redirection(Some(tls.https_redirection)) + .crypto_source(Some(tls.inner.clone())) + .build()? + }; + Ok(backend) + } +} +impl TryInto for &AppConfig +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result { + let mut upstream: HashMap = HashMap::default(); + + self.reverse_proxy.iter().for_each(|rpo| { + let upstream_vec: Vec = rpo.upstream.iter().map(|x| x.try_into().unwrap()).collect(); + // let upstream_iter = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()); + // let lb_upstream_num = vec_upstream.len(); + let elem = UpstreamGroupBuilder::default() + .upstream(&upstream_vec) + .path(&rpo.path) + .replace_path(&rpo.replace_path) + .lb(&rpo.load_balance, &upstream_vec, &self.server_name, &rpo.path) + .opts(&rpo.upstream_options) + .build() + .unwrap(); + + upstream.insert(elem.path.clone(), elem); + }); + if self.reverse_proxy.iter().filter(|rpo| rpo.path.is_none()).count() >= 2 { + error!("Multiple default reverse proxy setting"); + return Err(RpxyError::ConfigBuild("Invalid reverse proxy setting")); + } + + if !(upstream.iter().all(|(_, elem)| { + !(elem.opts.contains(&UpstreamOption::ForceHttp11Upstream) + && elem.opts.contains(&UpstreamOption::ForceHttp2Upstream)) + })) { + error!("Either one of force_http11 or force_http2 can be enabled"); + return Err(RpxyError::ConfigBuild("Invalid upstream option setting")); + } + + Ok(ReverseProxy { upstream }) + } +} + +/// Configuration parameters for single reverse proxy corresponding to the path +#[derive(PartialEq, Eq, Clone)] +pub struct ReverseProxyConfig { + pub path: Option, + pub replace_path: Option, + pub upstream: Vec, + pub upstream_options: Option>, + pub load_balance: Option, +} + +/// Configuration parameters for single upstream destination from a reverse proxy +#[derive(PartialEq, Eq, Clone)] +pub struct UpstreamUri { + pub inner: hyper::Uri, +} +impl TryInto for &UpstreamUri { + type Error = anyhow::Error; + + fn try_into(self) -> std::result::Result { + Ok(Upstream { + uri: self.inner.clone(), + }) + } +} + +/// Configuration parameters on TLS for a single backend application +#[derive(PartialEq, Eq, Clone)] +pub struct TlsConfig +where + T: CryptoSource, +{ + pub inner: T, + pub https_redirection: bool, +} + +#[derive(Debug, Clone, Default)] +/// Counter for serving requests +pub struct RequestCount(Arc); + +impl RequestCount { + pub fn current(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + pub fn increment(&self) -> usize { + self.0.fetch_add(1, Ordering::Relaxed) + } + + pub fn decrement(&self) -> usize { + let mut count; + while { + count = self.0.load(Ordering::Relaxed); + count > 0 + && self + .0 + .compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed) + != Ok(count) + } {} + count + } +} diff --git a/rpxy-lib/src/handler/cache.rs b/legacy-lib/src/handler/cache.rs similarity index 100% rename from rpxy-lib/src/handler/cache.rs rename to legacy-lib/src/handler/cache.rs diff --git a/rpxy-lib/src/handler/error.rs b/legacy-lib/src/handler/error.rs similarity index 100% rename from rpxy-lib/src/handler/error.rs rename to legacy-lib/src/handler/error.rs diff --git a/rpxy-lib/src/handler/forwarder.rs b/legacy-lib/src/handler/forwarder.rs similarity index 100% rename from rpxy-lib/src/handler/forwarder.rs rename to legacy-lib/src/handler/forwarder.rs diff --git a/rpxy-lib/src/handler/handler_main.rs b/legacy-lib/src/handler/handler_main.rs similarity index 100% rename from rpxy-lib/src/handler/handler_main.rs rename to legacy-lib/src/handler/handler_main.rs diff --git a/rpxy-lib/src/handler/mod.rs b/legacy-lib/src/handler/mod.rs similarity index 100% rename from rpxy-lib/src/handler/mod.rs rename to legacy-lib/src/handler/mod.rs diff --git a/rpxy-lib/src/handler/utils_headers.rs b/legacy-lib/src/handler/utils_headers.rs similarity index 100% rename from rpxy-lib/src/handler/utils_headers.rs rename to legacy-lib/src/handler/utils_headers.rs diff --git a/rpxy-lib/src/handler/utils_request.rs b/legacy-lib/src/handler/utils_request.rs similarity index 100% rename from rpxy-lib/src/handler/utils_request.rs rename to legacy-lib/src/handler/utils_request.rs diff --git a/rpxy-lib/src/handler/utils_synth_response.rs b/legacy-lib/src/handler/utils_synth_response.rs similarity index 100% rename from rpxy-lib/src/handler/utils_synth_response.rs rename to legacy-lib/src/handler/utils_synth_response.rs diff --git a/rpxy-lib/src/hyper_executor.rs b/legacy-lib/src/hyper_executor.rs similarity index 100% rename from rpxy-lib/src/hyper_executor.rs rename to legacy-lib/src/hyper_executor.rs diff --git a/legacy-lib/src/lib.rs b/legacy-lib/src/lib.rs new file mode 100644 index 00000000..a9f48abc --- /dev/null +++ b/legacy-lib/src/lib.rs @@ -0,0 +1,112 @@ +mod backend; +mod certs; +mod constants; +mod error; +mod globals; +mod handler; +mod hyper_executor; +mod log; +mod proxy; +mod utils; + +use crate::{error::*, globals::Globals, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder}; +use futures::future::select_all; +use hyper_executor::build_http_server; +use std::sync::Arc; + +pub use crate::{ + certs::{CertsAndKeys, CryptoSource}, + globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}, +}; +pub mod reexports { + pub use hyper::Uri; + pub use rustls::{Certificate, PrivateKey}; +} + +#[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))] +compile_error!("feature \"http3-quinn\" and feature \"http3-s2n\" cannot be enabled at the same time"); + +/// Entrypoint that creates and spawns tasks of reverse proxy services +pub async fn entrypoint( + proxy_config: &ProxyConfig, + app_config_list: &AppConfigList, + runtime_handle: &tokio::runtime::Handle, + term_notify: Option>, +) -> Result<()> +where + T: CryptoSource + Clone + Send + Sync + 'static, +{ + // For initial message logging + if proxy_config.listen_sockets.iter().any(|addr| addr.is_ipv6()) { + info!("Listen both IPv4 and IPv6") + } else { + info!("Listen IPv4") + } + if proxy_config.http_port.is_some() { + info!("Listen port: {}", proxy_config.http_port.unwrap()); + } + if proxy_config.https_port.is_some() { + info!("Listen port: {} (for TLS)", proxy_config.https_port.unwrap()); + } + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + if proxy_config.http3 { + info!("Experimental HTTP/3.0 is enabled. Note it is still very unstable."); + } + if !proxy_config.sni_consistency { + info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); + } + #[cfg(feature = "cache")] + if proxy_config.cache_enabled { + info!( + "Cache is enabled: cache dir = {:?}", + proxy_config.cache_dir.as_ref().unwrap() + ); + } else { + info!("Cache is disabled") + } + + // build global + let globals = Arc::new(Globals { + proxy_config: proxy_config.clone(), + backends: app_config_list.clone().try_into()?, + request_count: Default::default(), + runtime_handle: runtime_handle.clone(), + term_notify: term_notify.clone(), + }); + + // build message handler including a request forwarder + let msg_handler = Arc::new( + HttpMessageHandlerBuilder::default() + // .forwarder(Arc::new(Forwarder::new(&globals).await)) + .globals(globals.clone()) + .build()?, + ); + + let http_server = Arc::new(build_http_server(&globals)); + + let addresses = globals.proxy_config.listen_sockets.clone(); + let futures = select_all(addresses.into_iter().map(|addr| { + let mut tls_enabled = false; + if let Some(https_port) = globals.proxy_config.https_port { + tls_enabled = https_port == addr.port() + } + + let proxy = ProxyBuilder::default() + .globals(globals.clone()) + .listening_on(addr) + .tls_enabled(tls_enabled) + .http_server(http_server.clone()) + .msg_handler(msg_handler.clone()) + .build() + .unwrap(); + + globals.runtime_handle.spawn(async move { proxy.start().await }) + })); + + // wait for all future + if let (Ok(Err(e)), _, _) = futures.await { + error!("Some proxy services are down: {}", e); + }; + + Ok(()) +} diff --git a/legacy-lib/src/log.rs b/legacy-lib/src/log.rs new file mode 100644 index 00000000..6b8afbec --- /dev/null +++ b/legacy-lib/src/log.rs @@ -0,0 +1,98 @@ +use crate::utils::ToCanonical; +use hyper::header; +use std::net::SocketAddr; +pub use tracing::{debug, error, info, warn}; + +#[derive(Debug, Clone)] +pub struct MessageLog { + // pub tls_server_name: String, + pub client_addr: String, + pub method: String, + pub host: String, + pub p_and_q: String, + pub version: hyper::Version, + pub uri_scheme: String, + pub uri_host: String, + pub ua: String, + pub xff: String, + pub status: String, + pub upstream: String, +} + +impl From<&hyper::Request> for MessageLog { + fn from(req: &hyper::Request) -> Self { + let header_mapper = |v: header::HeaderName| { + req + .headers() + .get(v) + .map_or_else(|| "", |s| s.to_str().unwrap_or("")) + .to_string() + }; + Self { + // tls_server_name: "".to_string(), + client_addr: "".to_string(), + method: req.method().to_string(), + host: header_mapper(header::HOST), + p_and_q: req + .uri() + .path_and_query() + .map_or_else(|| "", |v| v.as_str()) + .to_string(), + version: req.version(), + uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), + uri_host: req.uri().host().unwrap_or("").to_string(), + ua: header_mapper(header::USER_AGENT), + xff: header_mapper(header::HeaderName::from_static("x-forwarded-for")), + status: "".to_string(), + upstream: "".to_string(), + } + } +} + +impl MessageLog { + pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self { + self.client_addr = client_addr.to_canonical().to_string(); + self + } + // pub fn tls_server_name(&mut self, tls_server_name: &str) -> &mut Self { + // self.tls_server_name = tls_server_name.to_string(); + // self + // } + pub fn status_code(&mut self, status_code: &hyper::StatusCode) -> &mut Self { + self.status = status_code.to_string(); + self + } + pub fn xff(&mut self, xff: &Option<&header::HeaderValue>) -> &mut Self { + self.xff = xff.map_or_else(|| "", |v| v.to_str().unwrap_or("")).to_string(); + self + } + pub fn upstream(&mut self, upstream: &hyper::Uri) -> &mut Self { + self.upstream = upstream.to_string(); + self + } + + pub fn output(&self) { + info!( + "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", + if !self.host.is_empty() { + self.host.as_str() + } else { + self.uri_host.as_str() + }, + self.client_addr, + self.method, + self.p_and_q, + self.version, + self.status, + if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { + format!("{}://{}", self.uri_scheme, self.uri_host) + } else { + "".to_string() + }, + self.ua, + self.xff, + self.upstream, + // self.tls_server_name + ); + } +} diff --git a/rpxy-lib/src/proxy/crypto_service.rs b/legacy-lib/src/proxy/crypto_service.rs similarity index 100% rename from rpxy-lib/src/proxy/crypto_service.rs rename to legacy-lib/src/proxy/crypto_service.rs diff --git a/rpxy-lib/src/proxy/mod.rs b/legacy-lib/src/proxy/mod.rs similarity index 100% rename from rpxy-lib/src/proxy/mod.rs rename to legacy-lib/src/proxy/mod.rs diff --git a/rpxy-lib/src/proxy/proxy_client_cert.rs b/legacy-lib/src/proxy/proxy_client_cert.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_client_cert.rs rename to legacy-lib/src/proxy/proxy_client_cert.rs diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/legacy-lib/src/proxy/proxy_h3.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_h3.rs rename to legacy-lib/src/proxy/proxy_h3.rs diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/legacy-lib/src/proxy/proxy_main.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_main.rs rename to legacy-lib/src/proxy/proxy_main.rs diff --git a/rpxy-lib/src/proxy/proxy_quic_quinn.rs b/legacy-lib/src/proxy/proxy_quic_quinn.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_quic_quinn.rs rename to legacy-lib/src/proxy/proxy_quic_quinn.rs diff --git a/rpxy-lib/src/proxy/proxy_quic_s2n.rs b/legacy-lib/src/proxy/proxy_quic_s2n.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_quic_s2n.rs rename to legacy-lib/src/proxy/proxy_quic_s2n.rs diff --git a/rpxy-lib/src/proxy/proxy_tls.rs b/legacy-lib/src/proxy/proxy_tls.rs similarity index 100% rename from rpxy-lib/src/proxy/proxy_tls.rs rename to legacy-lib/src/proxy/proxy_tls.rs diff --git a/rpxy-lib/src/proxy/socket.rs b/legacy-lib/src/proxy/socket.rs similarity index 100% rename from rpxy-lib/src/proxy/socket.rs rename to legacy-lib/src/proxy/socket.rs diff --git a/rpxy-lib/src/utils/bytes_name.rs b/legacy-lib/src/utils/bytes_name.rs similarity index 100% rename from rpxy-lib/src/utils/bytes_name.rs rename to legacy-lib/src/utils/bytes_name.rs diff --git a/rpxy-lib/src/utils/mod.rs b/legacy-lib/src/utils/mod.rs similarity index 100% rename from rpxy-lib/src/utils/mod.rs rename to legacy-lib/src/utils/mod.rs diff --git a/rpxy-lib/src/utils/socket_addr.rs b/legacy-lib/src/utils/socket_addr.rs similarity index 100% rename from rpxy-lib/src/utils/socket_addr.rs rename to legacy-lib/src/utils/socket_addr.rs diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index dd6e744e..0d72e561 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -12,15 +12,15 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-s2n", "cache"] -http3-quinn = ["rpxy-lib/http3-quinn"] -http3-s2n = ["rpxy-lib/http3-s2n"] -cache = ["rpxy-lib/cache"] -native-roots = ["rpxy-lib/native-roots"] +# default = ["http3-s2n", "cache"] +# http3-quinn = ["rpxy-lib/http3-quinn"] +# http3-s2n = ["rpxy-lib/http3-s2n"] +# cache = ["rpxy-lib/cache"] +# native-roots = ["rpxy-lib/native-roots"] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ - "sticky-cookie", + # "sticky-cookie", ] } anyhow = "1.0.75" diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index fae0c3c2..3db2b425 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -12,19 +12,19 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-s2n", "sticky-cookie", "cache"] -http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] -http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] -sticky-cookie = ["base64", "sha2", "chrono"] -cache = ["http-cache-semantics", "lru"] -native-roots = ["hyper-rustls/native-tokio"] +# default = ["http3-s2n", "sticky-cookie", "cache"] +# http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] +# http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] +# sticky-cookie = ["base64", "sha2", "chrono"] +# cache = ["http-cache-semantics", "lru"] +# native-roots = ["hyper-rustls/native-tokio"] [dependencies] -rand = "0.8.5" -rustc-hash = "1.1.0" -bytes = "1.5.0" -derive_builder = "0.12.0" -futures = { version = "0.3.29", features = ["alloc", "async-await"] } +# rand = "0.8.5" +# rustc-hash = "1.1.0" +# bytes = "1.5.0" +# derive_builder = "0.12.0" +# futures = { version = "0.3.29", features = ["alloc", "async-await"] } tokio = { version = "1.34.0", default-features = false, features = [ "net", "rt-multi-thread", @@ -34,7 +34,7 @@ tokio = { version = "1.34.0", default-features = false, features = [ "fs", ] } async-trait = "0.1.74" -hot_reload = "0.1.4" # reloading certs +# hot_reload = "0.1.4" # reloading certs # Error handling anyhow = "1.0.75" @@ -42,48 +42,48 @@ thiserror = "1.0.50" # http and tls http = "1.0.0" -http-body-util = "0.1.0" +# http-body-util = "0.1.0" hyper = { version = "1.0.1", default-features = false } -hyper-util = { version = "0.1.1", features = ["full"] } -hyper-rustls = { version = "0.24.2", default-features = false, features = [ - "tokio-runtime", - "webpki-tokio", - "http1", - "http2", -] } -tokio-rustls = { version = "0.24.1", features = ["early-data"] } +# hyper-util = { version = "0.1.1", features = ["full"] } +# hyper-rustls = { version = "0.24.2", default-features = false, features = [ +# "tokio-runtime", +# "webpki-tokio", +# "http1", +# "http2", +# ] } +# tokio-rustls = { version = "0.24.1", features = ["early-data"] } rustls = { version = "0.21.9", default-features = false } -webpki = "0.22.4" -x509-parser = "0.15.1" +# webpki = "0.22.4" +# x509-parser = "0.15.1" # logging tracing = { version = "0.1.40" } -# http/3 -quinn = { version = "0.10.2", optional = true } -h3 = { path = "../submodules/h3/h3/", optional = true } -h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true } -s2n-quic = { version = "1.31.0", default-features = false, features = [ - "provider-tls-rustls", -], optional = true } -s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } -s2n-quic-rustls = { version = "0.31.0", optional = true } -# for UDP socket wit SO_REUSEADDR when h3 with quinn -socket2 = { version = "0.5.5", features = ["all"], optional = true } +# # http/3 +# quinn = { version = "0.10.2", optional = true } +# h3 = { path = "../submodules/h3/h3/", optional = true } +# h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true } +# s2n-quic = { version = "1.31.0", default-features = false, features = [ +# "provider-tls-rustls", +# ], optional = true } +# s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } +# s2n-quic-rustls = { version = "0.31.0", optional = true } +# # for UDP socket wit SO_REUSEADDR when h3 with quinn +# socket2 = { version = "0.5.5", features = ["all"], optional = true } -# cache -http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } -lru = { version = "0.12.0", optional = true } +# # cache +# http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +# lru = { version = "0.12.0", optional = true } -# cookie handling for sticky cookie -chrono = { version = "0.4.31", default-features = false, features = [ - "unstable-locales", - "alloc", - "clock", -], optional = true } -base64 = { version = "0.21.5", optional = true } -sha2 = { version = "0.10.8", default-features = false, optional = true } +# # cookie handling for sticky cookie +# chrono = { version = "0.4.31", default-features = false, features = [ +# "unstable-locales", +# "alloc", +# "clock", +# ], optional = true } +# base64 = { version = "0.21.5", optional = true } +# sha2 = { version = "0.10.8", default-features = false, optional = true } -[dev-dependencies] -# http and tls +# [dev-dependencies] +# # http and tls diff --git a/rpxy-lib/src/certs.rs b/rpxy-lib/src/certs.rs index c9cfafd5..b93aa8f8 100644 --- a/rpxy-lib/src/certs.rs +++ b/rpxy-lib/src/certs.rs @@ -1,11 +1,5 @@ use async_trait::async_trait; -use rustc_hash::FxHashSet as HashSet; -use rustls::{ - sign::{any_supported_type, CertifiedKey}, - Certificate, OwnedTrustAnchor, PrivateKey, -}; -use std::io; -use x509_parser::prelude::*; +use rustls::{Certificate, PrivateKey}; #[async_trait] // Trait to read certs and keys anywhere from KVS, file, sqlite, etc. @@ -26,66 +20,3 @@ pub struct CertsAndKeys { pub cert_keys: Vec, pub client_ca_certs: Option>, } - -impl CertsAndKeys { - pub fn parse_server_certs_and_keys(&self) -> Result { - // for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() { - let signing_key = self - .cert_keys - .iter() - .find_map(|k| { - if let Ok(sk) = any_supported_type(k) { - Some(sk) - } else { - None - } - }) - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "Unable to find a valid certificate and key", - ) - })?; - Ok(CertifiedKey::new(self.certs.clone(), signing_key)) - } - - pub fn parse_client_ca_certs(&self) -> Result<(Vec, HashSet>), anyhow::Error> { - let certs = self.client_ca_certs.as_ref().ok_or(anyhow::anyhow!("No client cert"))?; - - let owned_trust_anchors: Vec<_> = certs - .iter() - .map(|v| { - // let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); - let trust_anchor = webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); - rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( - trust_anchor.subject, - trust_anchor.spki, - trust_anchor.name_constraints, - ) - }) - .collect(); - - // TODO: SKID is not used currently - let subject_key_identifiers: HashSet<_> = certs - .iter() - .filter_map(|v| { - // retrieve ca key id (subject key id) - let cert = parse_x509_certificate(&v.0).unwrap().1; - let subject_key_ids = cert - .iter_extensions() - .filter_map(|ext| match ext.parsed_extension() { - ParsedExtension::SubjectKeyIdentifier(skid) => Some(skid), - _ => None, - }) - .collect::>(); - if !subject_key_ids.is_empty() { - Some(subject_key_ids[0].0.to_owned()) - } else { - None - } - }) - .collect(); - - Ok((owned_trust_anchors, subject_key_identifiers)) - } -} diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index c672682d..34769dc7 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -1,86 +1,8 @@ pub use anyhow::{anyhow, bail, ensure, Context}; -use std::io; use thiserror::Error; pub type Result = std::result::Result; /// Describes things that can go wrong in the Rpxy #[derive(Debug, Error)] -pub enum RpxyError { - #[error("Proxy build error: {0}")] - ProxyBuild(#[from] crate::proxy::ProxyBuilderError), - - #[error("Backend build error: {0}")] - BackendBuild(#[from] crate::backend::BackendBuilderError), - - #[error("MessageHandler build error: {0}")] - HandlerBuild(#[from] crate::handler::HttpMessageHandlerBuilderError), - - #[error("Config builder error: {0}")] - ConfigBuild(&'static str), - - #[error("Http Message Handler Error: {0}")] - Handler(&'static str), - - #[error("Cache Error: {0}")] - Cache(&'static str), - - #[error("Http Request Message Error: {0}")] - Request(&'static str), - - #[error("TCP/UDP Proxy Layer Error: {0}")] - Proxy(String), - - #[allow(unused)] - #[error("LoadBalance Layer Error: {0}")] - LoadBalance(String), - - #[error("I/O Error: {0}")] - Io(#[from] io::Error), - - // #[error("Toml Deserialization Error")] - // TomlDe(#[from] toml::de::Error), - #[cfg(feature = "http3-quinn")] - #[error("Quic Connection Error [quinn]: {0}")] - QuicConn(#[from] quinn::ConnectionError), - - #[cfg(feature = "http3-s2n")] - #[error("Quic Connection Error [s2n-quic]: {0}")] - QUicConn(#[from] s2n_quic::connection::Error), - - #[cfg(feature = "http3-quinn")] - #[error("H3 Error [quinn]: {0}")] - H3(#[from] h3::Error), - - #[cfg(feature = "http3-s2n")] - #[error("H3 Error [s2n-quic]: {0}")] - H3(#[from] s2n_quic_h3::h3::Error), - - #[error("rustls Connection Error: {0}")] - Rustls(#[from] rustls::Error), - - #[error("Hyper Error: {0}")] - Hyper(#[from] hyper::Error), - - #[error("Hyper Http Error: {0}")] - HyperHttp(#[from] hyper::http::Error), - - #[error("Hyper Http HeaderValue Error: {0}")] - HyperHeaderValue(#[from] hyper::header::InvalidHeaderValue), - - #[error("Hyper Http HeaderName Error: {0}")] - HyperHeaderName(#[from] hyper::header::InvalidHeaderName), - - #[error(transparent)] - Other(#[from] anyhow::Error), -} - -#[allow(dead_code)] -#[derive(Debug, Error, Clone)] -pub enum ClientCertsError { - #[error("TLS Client Certificate is Required for Given SNI: {0}")] - ClientCertRequired(String), - - #[error("Inconsistent TLS Client Certificate for Given SNI: {0}")] - InconsistentClientCert(String), -} +pub enum RpxyError {} diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 02605a60..2db2805d 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -1,42 +1,5 @@ -use crate::{ - backend::{ - Backend, BackendBuilder, Backends, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption, - }, - certs::CryptoSource, - constants::*, - error::RpxyError, - log::*, - utils::{BytesName, PathNameBytesExp}, -}; -use rustc_hash::FxHashMap as HashMap; -use std::net::SocketAddr; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use tokio::time::Duration; - -/// Global object containing proxy configurations and shared object like counters. -/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. -pub struct Globals -where - T: CryptoSource, -{ - /// Configuration parameters for proxy transport and request handlers - pub proxy_config: ProxyConfig, // TODO: proxy configはarcに包んでこいつだけ使いまわせばいいように変えていく。backendsも? - - /// Backend application objects to which http request handler forward incoming requests - pub backends: Backends, - - /// Shared context - Counter for serving requests - pub request_count: RequestCount, - - /// Shared context - Async task runtime handler - pub runtime_handle: tokio::runtime::Handle, - - /// Shared context - Notify object to stop async tasks - pub term_notify: Option>, -} +use crate::{certs::CryptoSource, constants::*}; +use std::{net::SocketAddr, time::Duration}; /// Configuration parameters for proxy transport and request handlers #[derive(PartialEq, Eq, Clone)] @@ -140,44 +103,6 @@ where pub inner: Vec>, pub default_app: Option, } -impl TryInto> for AppConfigList -where - T: CryptoSource + Clone, -{ - type Error = RpxyError; - - fn try_into(self) -> Result, Self::Error> { - let mut backends = Backends::new(); - for app_config in self.inner.iter() { - let backend = app_config.try_into()?; - backends - .apps - .insert(app_config.server_name.clone().to_server_name_vec(), backend); - info!( - "Registering application {} ({})", - &app_config.server_name, &app_config.app_name - ); - } - - // default backend application for plaintext http requests - if let Some(d) = self.default_app { - let d_sn: Vec<&str> = backends - .apps - .iter() - .filter(|(_k, v)| v.app_name == d) - .map(|(_, v)| v.server_name.as_ref()) - .collect(); - if !d_sn.is_empty() { - info!( - "Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).", - d, d_sn[0] - ); - backends.default_server_name_bytes = Some(d_sn[0].to_server_name_vec()); - } - } - Ok(backends) - } -} /// Configuration parameters for single backend application #[derive(PartialEq, Eq, Clone)] @@ -190,77 +115,6 @@ where pub reverse_proxy: Vec, pub tls: Option>, } -impl TryInto> for &AppConfig -where - T: CryptoSource + Clone, -{ - type Error = RpxyError; - - fn try_into(self) -> Result, Self::Error> { - // backend builder - let mut backend_builder = BackendBuilder::default(); - // reverse proxy settings - let reverse_proxy = self.try_into()?; - - backend_builder - .app_name(self.app_name.clone()) - .server_name(self.server_name.clone()) - .reverse_proxy(reverse_proxy); - - // TLS settings and build backend instance - let backend = if self.tls.is_none() { - backend_builder.build().map_err(RpxyError::BackendBuild)? - } else { - let tls = self.tls.as_ref().unwrap(); - - backend_builder - .https_redirection(Some(tls.https_redirection)) - .crypto_source(Some(tls.inner.clone())) - .build()? - }; - Ok(backend) - } -} -impl TryInto for &AppConfig -where - T: CryptoSource + Clone, -{ - type Error = RpxyError; - - fn try_into(self) -> Result { - let mut upstream: HashMap = HashMap::default(); - - self.reverse_proxy.iter().for_each(|rpo| { - let upstream_vec: Vec = rpo.upstream.iter().map(|x| x.try_into().unwrap()).collect(); - // let upstream_iter = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()); - // let lb_upstream_num = vec_upstream.len(); - let elem = UpstreamGroupBuilder::default() - .upstream(&upstream_vec) - .path(&rpo.path) - .replace_path(&rpo.replace_path) - .lb(&rpo.load_balance, &upstream_vec, &self.server_name, &rpo.path) - .opts(&rpo.upstream_options) - .build() - .unwrap(); - - upstream.insert(elem.path.clone(), elem); - }); - if self.reverse_proxy.iter().filter(|rpo| rpo.path.is_none()).count() >= 2 { - error!("Multiple default reverse proxy setting"); - return Err(RpxyError::ConfigBuild("Invalid reverse proxy setting")); - } - - if !(upstream.iter().all(|(_, elem)| { - !(elem.opts.contains(&UpstreamOption::ForceHttp11Upstream) - && elem.opts.contains(&UpstreamOption::ForceHttp2Upstream)) - })) { - error!("Either one of force_http11 or force_http2 can be enabled"); - return Err(RpxyError::ConfigBuild("Invalid upstream option setting")); - } - - Ok(ReverseProxy { upstream }) - } -} /// Configuration parameters for single reverse proxy corresponding to the path #[derive(PartialEq, Eq, Clone)] @@ -275,16 +129,7 @@ pub struct ReverseProxyConfig { /// Configuration parameters for single upstream destination from a reverse proxy #[derive(PartialEq, Eq, Clone)] pub struct UpstreamUri { - pub inner: hyper::Uri, -} -impl TryInto for &UpstreamUri { - type Error = anyhow::Error; - - fn try_into(self) -> std::result::Result { - Ok(Upstream { - uri: self.inner.clone(), - }) - } + pub inner: http::Uri, } /// Configuration parameters on TLS for a single backend application @@ -296,30 +141,3 @@ where pub inner: T, pub https_redirection: bool, } - -#[derive(Debug, Clone, Default)] -/// Counter for serving requests -pub struct RequestCount(Arc); - -impl RequestCount { - pub fn current(&self) -> usize { - self.0.load(Ordering::Relaxed) - } - - pub fn increment(&self) -> usize { - self.0.fetch_add(1, Ordering::Relaxed) - } - - pub fn decrement(&self) -> usize { - let mut count; - while { - count = self.0.load(Ordering::Relaxed); - count > 0 - && self - .0 - .compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed) - != Ok(count) - } {} - count - } -} diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 7f7ade27..b201b526 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -1,19 +1,11 @@ -mod backend; mod certs; mod constants; mod error; mod globals; -mod handler; -mod hyper_executor; mod log; -mod proxy; -mod utils; -use crate::{error::*, globals::Globals, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder}; -use futures::future::select_all; -use hyper_executor::build_http_server; -// use hyper_trust_dns::TrustDnsResolver; -use std::{sync::Arc, time::Duration}; +use crate::{error::*, log::*}; +use std::sync::Arc; pub use crate::{ certs::{CertsAndKeys, CryptoSource}, @@ -66,48 +58,48 @@ where info!("Cache is disabled") } - // build global - let globals = Arc::new(Globals { - proxy_config: proxy_config.clone(), - backends: app_config_list.clone().try_into()?, - request_count: Default::default(), - runtime_handle: runtime_handle.clone(), - term_notify: term_notify.clone(), - }); + // // build global + // let globals = Arc::new(Globals { + // proxy_config: proxy_config.clone(), + // backends: app_config_list.clone().try_into()?, + // request_count: Default::default(), + // runtime_handle: runtime_handle.clone(), + // term_notify: term_notify.clone(), + // }); - // build message handler including a request forwarder - let msg_handler = Arc::new( - HttpMessageHandlerBuilder::default() - // .forwarder(Arc::new(Forwarder::new(&globals).await)) - .globals(globals.clone()) - .build()?, - ); + // // build message handler including a request forwarder + // let msg_handler = Arc::new( + // HttpMessageHandlerBuilder::default() + // // .forwarder(Arc::new(Forwarder::new(&globals).await)) + // .globals(globals.clone()) + // .build()?, + // ); - let http_server = Arc::new(build_http_server(&globals)); + // let http_server = Arc::new(build_http_server(&globals)); - let addresses = globals.proxy_config.listen_sockets.clone(); - let futures = select_all(addresses.into_iter().map(|addr| { - let mut tls_enabled = false; - if let Some(https_port) = globals.proxy_config.https_port { - tls_enabled = https_port == addr.port() - } + // let addresses = globals.proxy_config.listen_sockets.clone(); + // let futures = select_all(addresses.into_iter().map(|addr| { + // let mut tls_enabled = false; + // if let Some(https_port) = globals.proxy_config.https_port { + // tls_enabled = https_port == addr.port() + // } - let proxy = ProxyBuilder::default() - .globals(globals.clone()) - .listening_on(addr) - .tls_enabled(tls_enabled) - .http_server(http_server.clone()) - .msg_handler(msg_handler.clone()) - .build() - .unwrap(); + // let proxy = ProxyBuilder::default() + // .globals(globals.clone()) + // .listening_on(addr) + // .tls_enabled(tls_enabled) + // .http_server(http_server.clone()) + // .msg_handler(msg_handler.clone()) + // .build() + // .unwrap(); - globals.runtime_handle.spawn(async move { proxy.start().await }) - })); + // globals.runtime_handle.spawn(async move { proxy.start().await }) + // })); - // wait for all future - if let (Ok(Err(e)), _, _) = futures.await { - error!("Some proxy services are down: {}", e); - }; + // // wait for all future + // if let (Ok(Err(e)), _, _) = futures.await { + // error!("Some proxy services are down: {}", e); + // }; Ok(()) } diff --git a/rpxy-lib/src/log.rs b/rpxy-lib/src/log.rs index 6b8afbec..c55b5c2b 100644 --- a/rpxy-lib/src/log.rs +++ b/rpxy-lib/src/log.rs @@ -1,98 +1 @@ -use crate::utils::ToCanonical; -use hyper::header; -use std::net::SocketAddr; pub use tracing::{debug, error, info, warn}; - -#[derive(Debug, Clone)] -pub struct MessageLog { - // pub tls_server_name: String, - pub client_addr: String, - pub method: String, - pub host: String, - pub p_and_q: String, - pub version: hyper::Version, - pub uri_scheme: String, - pub uri_host: String, - pub ua: String, - pub xff: String, - pub status: String, - pub upstream: String, -} - -impl From<&hyper::Request> for MessageLog { - fn from(req: &hyper::Request) -> Self { - let header_mapper = |v: header::HeaderName| { - req - .headers() - .get(v) - .map_or_else(|| "", |s| s.to_str().unwrap_or("")) - .to_string() - }; - Self { - // tls_server_name: "".to_string(), - client_addr: "".to_string(), - method: req.method().to_string(), - host: header_mapper(header::HOST), - p_and_q: req - .uri() - .path_and_query() - .map_or_else(|| "", |v| v.as_str()) - .to_string(), - version: req.version(), - uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), - uri_host: req.uri().host().unwrap_or("").to_string(), - ua: header_mapper(header::USER_AGENT), - xff: header_mapper(header::HeaderName::from_static("x-forwarded-for")), - status: "".to_string(), - upstream: "".to_string(), - } - } -} - -impl MessageLog { - pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self { - self.client_addr = client_addr.to_canonical().to_string(); - self - } - // pub fn tls_server_name(&mut self, tls_server_name: &str) -> &mut Self { - // self.tls_server_name = tls_server_name.to_string(); - // self - // } - pub fn status_code(&mut self, status_code: &hyper::StatusCode) -> &mut Self { - self.status = status_code.to_string(); - self - } - pub fn xff(&mut self, xff: &Option<&header::HeaderValue>) -> &mut Self { - self.xff = xff.map_or_else(|| "", |v| v.to_str().unwrap_or("")).to_string(); - self - } - pub fn upstream(&mut self, upstream: &hyper::Uri) -> &mut Self { - self.upstream = upstream.to_string(); - self - } - - pub fn output(&self) { - info!( - "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", - if !self.host.is_empty() { - self.host.as_str() - } else { - self.uri_host.as_str() - }, - self.client_addr, - self.method, - self.p_and_q, - self.version, - self.status, - if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { - format!("{}://{}", self.uri_scheme, self.uri_host) - } else { - "".to_string() - }, - self.ua, - self.xff, - self.upstream, - // self.tls_server_name - ); - } -}