diff --git a/legacy-lib/Cargo.toml b/legacy-lib/Cargo.toml index c975fb6c..00f1edb3 100644 --- a/legacy-lib/Cargo.toml +++ b/legacy-lib/Cargo.toml @@ -12,7 +12,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-s2n", "sticky-cookie", "cache"] +default = ["http3-quinn", "sticky-cookie", "cache"] http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] sticky-cookie = ["base64", "sha2", "chrono"] diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 8cddc71e..00c2ef41 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rpxy" -version = "0.6.2" +version = "0.7.0" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" @@ -12,7 +12,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-s2n", "cache"] +default = ["http3-quinn", "cache"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] cache = ["rpxy-lib/cache"] @@ -20,7 +20,7 @@ native-roots = ["rpxy-lib/native-roots"] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ - # "sticky-cookie", + "sticky-cookie", ] } anyhow = "1.0.75" diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index c0cb403b..847d3fed 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rpxy-lib" -version = "0.6.2" +version = "0.7.0" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" @@ -12,17 +12,23 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-s2n", "sticky-cookie", "cache"] +default = ["http3-quinn", "sticky-cookie", "cache"] http3-quinn = ["socket2", "quinn", "h3", "h3-quinn"] -http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] +http3-s2n = [ + "h3", + "s2n-quic", + "s2n-quic-core", + "s2n-quic-rustls", + "s2n-quic-h3", +] sticky-cookie = ["base64", "sha2", "chrono"] -cache = [] #"http-cache-semantics", "lru"] -native-roots = [] #"hyper-rustls/native-tokio"] +cache = [] #"http-cache-semantics", "lru"] +native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] rand = "0.8.5" rustc-hash = "1.1.0" -# bytes = "1.5.0" +bytes = "1.5.0" derive_builder = "0.12.0" futures = { version = "0.3.29", features = ["alloc", "async-await"] } tokio = { version = "1.34.0", default-features = false, features = [ @@ -41,7 +47,7 @@ thiserror = "1.0.50" # http http = "1.0.0" -# http-body-util = "0.1.0" +http-body-util = "0.1.0" hyper = { version = "1.0.1", default-features = false } hyper-util = { version = "0.1.1", features = ["full"] } # hyper-rustls = { version = "0.24.2", default-features = false, features = [ @@ -50,11 +56,11 @@ hyper-util = { version = "0.1.1", features = ["full"] } # "http1", # "http2", # ] } -# tokio-rustls = { version = "0.24.1", features = ["early-data"] } # tls and cert management hot_reload = "0.1.4" rustls = { version = "0.21.9", default-features = false } +tokio-rustls = { version = "0.24.1", features = ["early-data"] } webpki = "0.22.4" x509-parser = "0.15.1" @@ -68,6 +74,7 @@ h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true } s2n-quic = { version = "1.31.0", default-features = false, features = [ "provider-tls-rustls", ], optional = true } +s2n-quic-core = { version = "0.31.0", default-features = false, optional = true } s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } s2n-quic-rustls = { version = "0.31.0", optional = true } # for UDP socket wit SO_REUSEADDR when h3 with quinn diff --git a/rpxy-lib/src/crypto/mod.rs b/rpxy-lib/src/crypto/mod.rs index 1f6566d8..7b8935c9 100644 --- a/rpxy-lib/src/crypto/mod.rs +++ b/rpxy-lib/src/crypto/mod.rs @@ -11,7 +11,7 @@ use service::CryptoReloader; use std::sync::Arc; pub use certs::{CertsAndKeys, CryptoSource}; -pub use service::ServerCryptoBase; +pub use service::{ServerCrypto, ServerCryptoBase, SniServerCryptoMap}; /// Result type inner of certificate reloader service type ReloaderServiceResultInner = ( diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 05e5db1a..8152e0d0 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -6,9 +6,51 @@ pub type RpxyResult = std::result::Result; /// Describes things that can go wrong in the Rpxy #[derive(Debug, Error)] pub enum RpxyError { + // general errors #[error("IO error: {0}")] Io(#[from] std::io::Error), + // TLS errors + #[error("Failed to build TLS acceptor: {0}")] + FailedToTlsHandshake(String), + #[error("No server name in ClientHello")] + NoServerNameInClientHello, + #[error("No TLS serving app: {0}")] + NoTlsServingApp(String), + #[error("Failed to update server crypto: {0}")] + FailedToUpdateServerCrypto(String), + #[error("No server crypto: {0}")] + NoServerCrypto(String), + + // hyper errors + #[error("hyper body manipulation error: {0}")] + HyperBodyManipulationError(String), + + // http/3 errors + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + #[error("H3 error: {0}")] + H3Error(#[from] h3::Error), + + #[cfg(feature = "http3-quinn")] + #[error("Invalid rustls TLS version: {0}")] + QuinnInvalidTlsProtocolVersion(String), + #[cfg(feature = "http3-quinn")] + #[error("Quinn connection error: {0}")] + QuinnConnectionFailed(#[from] quinn::ConnectionError), + + #[cfg(feature = "http3-s2n")] + #[error("s2n-quic validation error: {0}")] + S2nQuicValidationError(#[from] s2n_quic_core::transport::parameters::ValidationError), + #[cfg(feature = "http3-s2n")] + #[error("s2n-quic connection error: {0}")] + S2nQuicConnectionError(#[from] s2n_quic_core::connection::Error), + #[cfg(feature = "http3-s2n")] + #[error("s2n-quic start error: {0}")] + S2nQuicStartError(#[from] s2n_quic::provider::StartError), + + // certificate reloader errors + #[error("No certificate reloader when building a proxy for TLS")] + NoCertificateReloader, #[error("Certificate reload error: {0}")] CertificateReloadError(#[from] hot_reload::ReloaderError), @@ -20,6 +62,11 @@ pub enum RpxyError { #[error("Failed to build backend app: {0}")] FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError), + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, + + // Others + #[error("Infallible")] + Infallible(#[from] std::convert::Infallible), } diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index 2ca21e43..5b1ad618 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -1,5 +1,9 @@ +mod proxy_h3; mod proxy_main; -mod proxy_tls; +#[cfg(feature = "http3-quinn")] +mod proxy_quic_quinn; +#[cfg(feature = "http3-s2n")] +mod proxy_quic_s2n; mod socket; use crate::{globals::Globals, hyper_executor::LocalExecutor}; diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs new file mode 100644 index 00000000..056cd4bd --- /dev/null +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -0,0 +1,205 @@ +use super::proxy_main::Proxy; +use crate::{error::*, log::*, name_exp::ServerName}; +use bytes::Bytes; +use http::{Request, Response}; +use http_body_util::BodyExt; +use std::{net::SocketAddr, time::Duration}; +use tokio::time::timeout; + +#[cfg(feature = "http3-quinn")] +use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; +#[cfg(feature = "http3-s2n")] +use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; + +// use crate::{certs::CryptoSource, error::*, log::*, utils::ServerNameBytesExp}; +// use futures::Stream; +// use hyper_util::client::legacy::connect::Connect; + +// impl Proxy +// where +// // T: Connect + Clone + Sync + Send + 'static, +// U: CryptoSource + Clone + Sync + Send + 'static, +// { + +impl Proxy { + pub(super) async fn h3_serve_connection( + &self, + quic_connection: C, + tls_server_name: ServerName, + client_addr: SocketAddr, + ) -> RpxyResult<()> + where + C: ConnectionQuic, + >::BidiStream: BidiStream + Send + 'static, + <>::BidiStream as BidiStream>::RecvStream: Send, + <>::BidiStream as BidiStream>::SendStream: Send, + { + let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?; + info!( + "QUIC/HTTP3 connection established from {:?} {}", + client_addr, + <&ServerName as TryInto>::try_into(&tls_server_name).unwrap_or_default() + ); + + // TODO: Is here enough to fetch server_name from NewConnection? + // to avoid deep nested call from listener_service_h3 + loop { + // this routine follows hyperium/h3 examples https://github.com/hyperium/h3/blob/master/examples/server.rs + match h3_conn.accept().await { + Ok(None) => { + break; + } + Err(e) => { + warn!("HTTP/3 error on accept incoming connection: {}", e); + match e.get_error_level() { + h3::error::ErrorLevel::ConnectionError => break, + h3::error::ErrorLevel::StreamError => continue, + } + } + Ok(Some((req, stream))) => { + // We consider the connection count separately from the stream count. + // Max clients for h1/h2 = max 'stream' for h3. + let request_count = self.globals.request_count.clone(); + if request_count.increment() > self.globals.proxy_config.max_clients { + request_count.decrement(); + h3_conn.shutdown(0).await?; + break; + } + debug!("Request incoming: current # {}", request_count.current()); + + let self_inner = self.clone(); + let tls_server_name_inner = tls_server_name.clone(); + self.globals.runtime_handle.spawn(async move { + if let Err(e) = timeout( + self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2 + self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner), + ) + .await + { + error!("HTTP/3 failed to process stream: {}", e); + } + request_count.decrement(); + debug!("Request processed: current # {}", request_count.current()); + }); + } + } + } + + Ok(()) + } + + /// Serves a request stream from a client + /// TODO: TODO: TODO: TODO: + /// TODO: Body in hyper-0.14 was changed to Incoming in hyper-1.0, and it is not accessible from outside. + /// Thus, we need to implement IncomingLike trait using channel. Also, the backend handler must feed the body in the form of + /// Either as body. + /// Also, the downstream from the backend handler could be Incoming, but will be wrapped as Either as well due to H3. + /// Result, E> type includes E as HttpError to generate the status code and related Response. + /// Thus to handle synthetic error messages in BoxBody, the serve() function outputs Response, BoxBody>>>. + async fn h3_serve_stream( + &self, + req: Request<()>, + stream: RequestStream, + client_addr: SocketAddr, + tls_server_name: ServerName, + ) -> RpxyResult<()> + where + S: BidiStream + Send + 'static, + >::RecvStream: Send, + { + let (req_parts, _) = req.into_parts(); + // split stream and async body handling + let (mut send_stream, mut recv_stream) = stream.split(); + + // let max_body_size = self.globals.proxy_config.h3_request_max_body_size; + // // let max = body_stream.size_hint().upper().unwrap_or(u64::MAX); + // // if max > max_body_size as u64 { + // // return Err(HttpError::TooLargeRequestBody); + // // } + + // let new_req = Request::from_parts(req_parts, body_stream); + + // // generate streamed body with trailers using channel + // let (body_sender, req_body) = Incoming::channel(); + + // // Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1 + // // The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn. + // let max_body_size = self.globals.proxy_config.h3_request_max_body_size; + // self.globals.runtime_handle.spawn(async move { + // // let mut sender = body_sender; + // let mut size = 0usize; + // while let Some(mut body) = recv_stream.recv_data().await? { + // debug!("HTTP/3 incoming request body: remaining {}", body.remaining()); + // size += body.remaining(); + // if size > max_body_size { + // error!( + // "Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}", + // size, max_body_size + // ); + // return Err(RpxyError::Proxy("Exceeds max request body size for HTTP/3".to_string())); + // } + // // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes + // // sender.send_data(body.copy_to_bytes(body.remaining())).await?; + // } + + // // trailers: use inner for work around. (directly get trailer) + // let trailers = recv_stream.as_mut().recv_trailers().await?; + // if trailers.is_some() { + // debug!("HTTP/3 incoming request trailers"); + // // sender.send_trailers(trailers.unwrap()).await?; + // } + // Ok(()) + // }); + + // let new_req: Request = Request::from_parts(req_parts, req_body); + // let res = self + // .msg_handler + // .clone() + // .handle_request( + // new_req, + // client_addr, + // self.listening_on, + // self.tls_enabled, + // Some(tls_server_name), + // ) + // .await?; + + // TODO: TODO: TODO: remove later + let body = full(hyper::body::Bytes::from("hello h3 echo")); + let res = Response::builder().body(body).unwrap(); + ///////////////// + + let (new_res_parts, new_body) = res.into_parts(); + let new_res = Response::from_parts(new_res_parts, ()); + + match send_stream.send_response(new_res).await { + Ok(_) => { + debug!("HTTP/3 response to connection successful"); + // aggregate body without copying + let body_data = new_body + .collect() + .await + .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; + + // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() + send_stream.send_data(body_data.to_bytes()).await?; + + // TODO: needs handling trailer? should be included in body from handler. + } + Err(err) => { + error!("Unable to send response to connection peer: {:?}", err); + } + } + Ok(send_stream.finish().await?) + } +} + +////////////// +/// TODO: remove later +/// helper function to build a full body +use http_body_util::Full; +pub(crate) type BoxBody = http_body_util::combinators::BoxBody; +pub fn full(body: hyper::body::Bytes) -> BoxBody { + Full::new(body).map_err(|never| match never {}).boxed() +} +////////////// diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 5aea172d..0d6eb83b 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -1,10 +1,59 @@ use super::socket::bind_tcp_socket; -use crate::{error::RpxyResult, globals::Globals, log::*}; -use hyper_util::server::conn::auto::Builder as ConnectionBuilder; -use std::{net::SocketAddr, sync::Arc}; +use crate::{ + constants::TLS_HANDSHAKE_TIMEOUT_SEC, + crypto::{ServerCrypto, SniServerCryptoMap}, + error::*, + globals::Globals, + hyper_executor::LocalExecutor, + log::*, + name_exp::ServerName, +}; +use futures::{select, FutureExt}; +use http::{Request, Response}; +use hyper::{ + body::Incoming, + rt::{Read, Write}, + service::service_fn, +}; +use hyper_util::{rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::time::timeout; +/// Wrapper function to handle request for HTTP/1.1 and HTTP/2 +/// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler +async fn serve_request( + req: Request, + // handler: Arc>, + // handler: Arc>, + client_addr: SocketAddr, + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, +) -> RpxyResult> { + // match handler + // .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name) + // .await? + // { + // Ok(res) => passthrough_response(res), + // Err(e) => synthetic_error_response(StatusCode::from(e)), + // } + let body = full(hyper::body::Bytes::from("hello")); + let res = Response::builder().body(body).unwrap(); + Ok(res) +} +////////////// +/// TODO: remove later +/// helper function to build a full body +use http_body_util::{BodyExt, Full}; +pub(crate) type BoxBody = http_body_util::combinators::BoxBody; +pub fn full(body: hyper::body::Bytes) -> BoxBody { + Full::new(body).map_err(|never| match never {}).boxed() +} +////////////// + +#[derive(Clone)] /// Proxy main object responsible to serve requests received from clients at the given socket address. -pub(crate) struct Proxy { +pub(crate) struct Proxy { /// global context shared among async tasks pub globals: Arc, /// listen socket address @@ -15,7 +64,49 @@ pub(crate) struct Proxy { pub connection_builder: Arc>, } -impl Proxy { +impl Proxy { + /// Serves requests from clients + fn serve_connection(&self, stream: I, peer_addr: SocketAddr, tls_server_name: Option) + where + I: Read + Write + Send + Unpin + 'static, + { + let request_count = self.globals.request_count.clone(); + if request_count.increment() > self.globals.proxy_config.max_clients { + request_count.decrement(); + return; + } + debug!("Request incoming: current # {}", request_count.current()); + + let server_clone = self.connection_builder.clone(); + // let msg_handler_clone = self.msg_handler.clone(); + let timeout_sec = self.globals.proxy_config.proxy_timeout; + let tls_enabled = self.tls_enabled; + let listening_on = self.listening_on; + self.globals.runtime_handle.clone().spawn(async move { + timeout( + timeout_sec + Duration::from_secs(1), + server_clone.serve_connection_with_upgrades( + stream, + service_fn(move |req: Request| { + serve_request( + req, + // msg_handler_clone.clone(), + peer_addr, + listening_on, + tls_enabled, + tls_server_name.clone(), + ) + }), + ), + ) + .await + .ok(); + + request_count.decrement(); + debug!("Request processed: current # {}", request_count.current()); + }); + } + /// Start without TLS (HTTP cleartext) async fn start_without_tls(&self) -> RpxyResult<()> { let listener_service = async { @@ -23,7 +114,7 @@ impl Proxy { let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; info!("Start TCP proxy serving with HTTP request for configured host names"); while let Ok((stream, client_addr)) = tcp_listener.accept().await { - // self.serve_connection(TokioIo::new(stream), client_addr, None); + self.serve_connection(TokioIo::new(stream), client_addr, None); } Ok(()) as RpxyResult<()> }; @@ -33,14 +124,108 @@ impl Proxy { /// Start with TLS (HTTPS) pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { - // let (cert_reloader_service, cert_reloader_rx) = ReloaderService::, ServerCryptoBase>::new( - // &self.globals.clone(), - // CERTS_WATCH_DELAY_SECS, - // !LOAD_CERTS_ONLY_WHEN_UPDATED, - // ) - // .await - // .map_err(|e| anyhow::anyhow!(e))?; - loop {} + #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] + { + self.tls_listener_service().await?; + error!("TCP proxy service for TLS exited"); + Ok(()) + } + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + { + if self.globals.proxy_config.http3 { + select! { + _ = self.tls_listener_service().fuse() => { + error!("TCP proxy service for TLS exited"); + }, + _ = self.h3_listener_service().fuse() => { + error!("UDP proxy service for QUIC exited"); + } + }; + Ok(()) + } else { + self.tls_listener_service().await?; + error!("TCP proxy service for TLS exited"); + Ok(()) + } + } + } + + // TCP Listener Service, i.e., http/2 and http/1.1 + async fn tls_listener_service(&self) -> RpxyResult<()> { + let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else { + return Err(RpxyError::NoCertificateReloader); + }; + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; + info!("Start TCP proxy serving with HTTPS request for configured host names"); + + let mut server_crypto_map: Option> = None; + loop { + select! { + tcp_cnx = tcp_listener.accept().fuse() => { + if tcp_cnx.is_err() || server_crypto_map.is_none() { + continue; + } + let (raw_stream, client_addr) = tcp_cnx.unwrap(); + let sc_map_inner = server_crypto_map.clone(); + let self_inner = self.clone(); + + // spawns async handshake to avoid blocking thread by sequential handshake. + let handshake_fut = async move { + let acceptor = tokio_rustls::LazyConfigAcceptor::new(tokio_rustls::rustls::server::Acceptor::default(), raw_stream).await; + if let Err(e) = acceptor { + return Err(RpxyError::FailedToTlsHandshake(e.to_string())); + } + let start = acceptor.unwrap(); + let client_hello = start.client_hello(); + let sni = client_hello.server_name(); + debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", sni.unwrap_or("None")); + let server_name = sni.map(ServerName::from); + if server_name.is_none(){ + return Err(RpxyError::NoServerNameInClientHello); + } + let server_crypto = sc_map_inner.as_ref().unwrap().get(server_name.as_ref().unwrap()); + if server_crypto.is_none() { + return Err(RpxyError::NoTlsServingApp(server_name.as_ref().unwrap().try_into().unwrap_or_default())); + } + let stream = match start.into_stream(server_crypto.unwrap().clone()).await { + Ok(s) => TokioIo::new(s), + Err(e) => { + return Err(RpxyError::FailedToTlsHandshake(e.to_string())); + } + }; + self_inner.serve_connection(stream, client_addr, server_name); + Ok(()) as RpxyResult<()> + }; + + self.globals.runtime_handle.spawn( async move { + // timeout is introduced to avoid get stuck here. + let Ok(v) = timeout( + Duration::from_secs(TLS_HANDSHAKE_TIMEOUT_SEC), + handshake_fut + ).await else { + error!("Timeout to handshake TLS"); + return; + }; + if let Err(e) = v { + error!("{}", e); + } + }); + } + _ = server_crypto_rx.changed().fuse() => { + if server_crypto_rx.borrow().is_none() { + error!("Reloader is broken"); + break; + } + let cert_keys_map = server_crypto_rx.borrow().clone().unwrap(); + let Some(server_crypto): Option> = (&cert_keys_map).try_into().ok() else { + error!("Failed to update server crypto"); + break; + }; + server_crypto_map = Some(server_crypto.inner_local_map.clone()); + } + } + } Ok(()) } @@ -56,11 +241,11 @@ impl Proxy { match &self.globals.term_notify { Some(term) => { - tokio::select! { - _ = proxy_service => { + select! { + _ = proxy_service.fuse() => { warn!("Proxy service got down"); } - _ = term.notified() => { + _ = term.notified().fuse() => { info!("Proxy service listening on {} receives term signal", self.listening_on); } } diff --git a/rpxy-lib/src/proxy/proxy_quic_quinn.rs b/rpxy-lib/src/proxy/proxy_quic_quinn.rs new file mode 100644 index 00000000..bde3b00c --- /dev/null +++ b/rpxy-lib/src/proxy/proxy_quic_quinn.rs @@ -0,0 +1,121 @@ +use super::proxy_main::Proxy; +use super::socket::bind_udp_socket; +use crate::{crypto::ServerCrypto, error::*, log::*, name_exp::ByteName}; +// use hyper_util::client::legacy::connect::Connect; +use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig}; +use rustls::ServerConfig; +use std::sync::Arc; + +impl Proxy +// where +// // T: Connect + Clone + Sync + Send + 'static, +// U: CryptoSource + Clone + Sync + Send + 'static, +{ + pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { + let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else { + return Err(RpxyError::NoCertificateReloader); + }; + info!("Start UDP proxy serving with HTTP/3 request for configured host names [quinn]"); + // first set as null config server + let rustls_server_config = ServerConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .map_err(|e| RpxyError::QuinnInvalidTlsProtocolVersion(e.to_string()))? + .with_no_client_auth() + .with_cert_resolver(Arc::new(rustls::server::ResolvesServerCertUsingSni::new())); + + let mut transport_config_quic = TransportConfig::default(); + transport_config_quic + .max_concurrent_bidi_streams(self.globals.proxy_config.h3_max_concurrent_bidistream.into()) + .max_concurrent_uni_streams(self.globals.proxy_config.h3_max_concurrent_unistream.into()) + .max_idle_timeout( + self + .globals + .proxy_config + .h3_max_idle_timeout + .map(|v| quinn::IdleTimeout::try_from(v).unwrap()), + ); + + let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config)); + server_config_h3.transport = Arc::new(transport_config_quic); + server_config_h3.concurrent_connections(self.globals.proxy_config.h3_max_concurrent_connections); + + // To reuse address + let udp_socket = bind_udp_socket(&self.listening_on)?; + let runtime = quinn::default_runtime() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?; + let endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + Some(server_config_h3), + udp_socket, + runtime, + )?; + + let mut server_crypto: Option> = None; + loop { + tokio::select! { + new_conn = endpoint.accept() => { + if server_crypto.is_none() || new_conn.is_none() { + continue; + } + let mut conn: quinn::Connecting = new_conn.unwrap(); + let Ok(hsd) = conn.handshake_data().await else { + continue + }; + + let Ok(hsd_downcast) = hsd.downcast::() else { + continue + }; + let Some(new_server_name) = hsd_downcast.server_name else { + warn!("HTTP/3 no SNI is given"); + continue; + }; + debug!( + "HTTP/3 connection incoming (SNI {:?})", + new_server_name + ); + // TODO: server_nameをここで出してどんどん深く投げていくのは効率が悪い。connecting -> connectionsの後でいいのでは? + // TODO: 通常のTLSと同じenumか何かにまとめたい + let self_clone = self.clone(); + self.globals.runtime_handle.spawn(async move { + let client_addr = conn.remote_address(); + let quic_connection = match conn.await { + Ok(new_conn) => { + info!("New connection established"); + h3_quinn::Connection::new(new_conn) + }, + Err(e) => { + warn!("QUIC accepting connection failed: {:?}", e); + return Err(RpxyError::QuinnConnectionFailed(e)); + } + }; + // Timeout is based on underlying quic + if let Err(e) = self_clone.h3_serve_connection(quic_connection, new_server_name.to_server_name(), client_addr).await { + warn!("QUIC or HTTP/3 connection failed: {}", e); + }; + Ok(()) + }); + } + _ = server_crypto_rx.changed() => { + if server_crypto_rx.borrow().is_none() { + error!("Reloader is broken"); + break; + } + let cert_keys_map = server_crypto_rx.borrow().clone().unwrap(); + + server_crypto = (&cert_keys_map).try_into().ok(); + let Some(inner) = server_crypto.clone() else { + error!("Failed to update server crypto for h3"); + break; + }; + endpoint.set_server_config(Some(QuicServerConfig::with_crypto(inner.clone().inner_global_no_client_auth.clone()))); + + } + else => break + } + } + endpoint.wait_idle().await; + Ok(()) as RpxyResult<()> + } +} diff --git a/rpxy-lib/src/proxy/proxy_quic_s2n.rs b/rpxy-lib/src/proxy/proxy_quic_s2n.rs new file mode 100644 index 00000000..32be6193 --- /dev/null +++ b/rpxy-lib/src/proxy/proxy_quic_s2n.rs @@ -0,0 +1,132 @@ +use super::proxy_main::Proxy; +use crate::{ + crypto::{ServerCrypto, ServerCryptoBase}, + error::*, + log::*, + name_exp::ByteName, +}; +use hot_reload::ReloaderReceiver; +use std::sync::Arc; +// use hyper_util::client::legacy::connect::Connect; +use s2n_quic::provider; + +impl Proxy { + /// Start UDP proxy serving with HTTP/3 request for configured host names + pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { + let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else { + return Err(RpxyError::NoCertificateReloader); + }; + info!("Start UDP proxy serving with HTTP/3 request for configured host names [s2n-quic]"); + + // initially wait for receipt + let mut server_crypto: Option> = { + let _ = server_crypto_rx.changed().await; + let sc = self.receive_server_crypto(server_crypto_rx.clone())?; + Some(sc) + }; + + // event loop + loop { + tokio::select! { + v = self.h3_listener_service_inner(&server_crypto) => { + if let Err(e) = v { + error!("Quic connection event loop illegally shutdown [s2n-quic] {e}"); + break; + } + } + _ = server_crypto_rx.changed() => { + server_crypto = match self.receive_server_crypto(server_crypto_rx.clone()) { + Ok(sc) => Some(sc), + Err(e) => { + error!("{e}"); + break; + } + }; + } + else => break + } + } + + Ok(()) + } + + /// Receive server crypto from reloader + fn receive_server_crypto( + &self, + server_crypto_rx: ReloaderReceiver, + ) -> RpxyResult> { + let cert_keys_map = server_crypto_rx.borrow().clone().ok_or_else(|| { + error!("Reloader is broken"); + RpxyError::CertificateReloadError(anyhow!("Reloader is broken").into()) + })?; + + let server_crypto: Option> = (&cert_keys_map).try_into().ok(); + server_crypto.ok_or_else(|| { + error!("Failed to update server crypto for h3 [s2n-quic]"); + RpxyError::FailedToUpdateServerCrypto("Failed to update server crypto for h3 [s2n-quic]".to_string()) + }) + } + + /// Event loop for UDP proxy serving with HTTP/3 request for configured host names + async fn h3_listener_service_inner(&self, server_crypto: &Option>) -> RpxyResult<()> { + // setup UDP socket + let io = provider::io::tokio::Builder::default() + .with_receive_address(self.listening_on)? + .with_reuse_port()? + .build()?; + + // setup limits + let mut limits = provider::limits::Limits::default() + .with_max_open_local_bidirectional_streams(self.globals.proxy_config.h3_max_concurrent_bidistream as u64)? + .with_max_open_remote_bidirectional_streams(self.globals.proxy_config.h3_max_concurrent_bidistream as u64)? + .with_max_open_local_unidirectional_streams(self.globals.proxy_config.h3_max_concurrent_unistream as u64)? + .with_max_open_remote_unidirectional_streams(self.globals.proxy_config.h3_max_concurrent_unistream as u64)? + .with_max_active_connection_ids(self.globals.proxy_config.h3_max_concurrent_connections as u64)?; + limits = if let Some(v) = self.globals.proxy_config.h3_max_idle_timeout { + limits.with_max_idle_timeout(v)? + } else { + limits + }; + + // setup tls + let Some(server_crypto) = server_crypto else { + warn!("No server crypto is given [s2n-quic]"); + return Err(RpxyError::NoServerCrypto( + "No server crypto is given [s2n-quic]".to_string(), + )); + }; + let tls = server_crypto.inner_global_no_client_auth.clone(); + + let mut server = s2n_quic::Server::builder() + .with_tls(tls)? + .with_io(io)? + .with_limits(limits)? + .start()?; + + // quic event loop. this immediately cancels when crypto is updated by tokio::select! + while let Some(new_conn) = server.accept().await { + debug!("New QUIC connection established"); + let Ok(Some(new_server_name)) = new_conn.server_name() else { + warn!("HTTP/3 no SNI is given"); + continue; + }; + debug!("HTTP/3 connection incoming (SNI {:?})", new_server_name); + let self_clone = self.clone(); + + self.globals.runtime_handle.spawn(async move { + let client_addr = new_conn.remote_addr()?; + let quic_connection = s2n_quic_h3::Connection::new(new_conn); + // Timeout is based on underlying quic + if let Err(e) = self_clone + .h3_serve_connection(quic_connection, new_server_name.to_server_name(), client_addr) + .await + { + warn!("QUIC or HTTP/3 connection failed: {}", e); + }; + Ok(()) as RpxyResult<()> + }); + } + + Ok(()) + } +} diff --git a/rpxy-lib/src/proxy/proxy_tls.rs b/rpxy-lib/src/proxy/proxy_tls.rs deleted file mode 100644 index f67ad8d5..00000000 --- a/rpxy-lib/src/proxy/proxy_tls.rs +++ /dev/null @@ -1,6 +0,0 @@ -use super::proxy_main::Proxy; -use crate::{log::*, error::*}; - -impl Proxy{ - -}