Skip to content

Commit

Permalink
wip: tested with synthetic echo response from h3
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara committed Nov 24, 2023
1 parent 5576389 commit 1dc88ce
Show file tree
Hide file tree
Showing 11 changed files with 732 additions and 37 deletions.
2 changes: 1 addition & 1 deletion legacy-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["http3-s2n", "sticky-cookie", "cache"]
default = ["http3-quinn", "sticky-cookie", "cache"]
http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"]
http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"]
sticky-cookie = ["base64", "sha2", "chrono"]
Expand Down
6 changes: 3 additions & 3 deletions rpxy-bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rpxy"
version = "0.6.2"
version = "0.7.0"
authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy"
Expand All @@ -12,15 +12,15 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["http3-s2n", "cache"]
default = ["http3-quinn", "cache"]
http3-quinn = ["rpxy-lib/http3-quinn"]
http3-s2n = ["rpxy-lib/http3-s2n"]
cache = ["rpxy-lib/cache"]
native-roots = ["rpxy-lib/native-roots"]

[dependencies]
rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [
# "sticky-cookie",
"sticky-cookie",
] }

anyhow = "1.0.75"
Expand Down
23 changes: 15 additions & 8 deletions rpxy-lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rpxy-lib"
version = "0.6.2"
version = "0.7.0"
authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy"
Expand All @@ -12,17 +12,23 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["http3-s2n", "sticky-cookie", "cache"]
default = ["http3-quinn", "sticky-cookie", "cache"]
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn"]
http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"]
http3-s2n = [
"h3",
"s2n-quic",
"s2n-quic-core",
"s2n-quic-rustls",
"s2n-quic-h3",
]
sticky-cookie = ["base64", "sha2", "chrono"]
cache = [] #"http-cache-semantics", "lru"]
native-roots = [] #"hyper-rustls/native-tokio"]
cache = [] #"http-cache-semantics", "lru"]
native-roots = [] #"hyper-rustls/native-tokio"]

[dependencies]
rand = "0.8.5"
rustc-hash = "1.1.0"
# bytes = "1.5.0"
bytes = "1.5.0"
derive_builder = "0.12.0"
futures = { version = "0.3.29", features = ["alloc", "async-await"] }
tokio = { version = "1.34.0", default-features = false, features = [
Expand All @@ -41,7 +47,7 @@ thiserror = "1.0.50"

# http
http = "1.0.0"
# http-body-util = "0.1.0"
http-body-util = "0.1.0"
hyper = { version = "1.0.1", default-features = false }
hyper-util = { version = "0.1.1", features = ["full"] }
# hyper-rustls = { version = "0.24.2", default-features = false, features = [
Expand All @@ -50,11 +56,11 @@ hyper-util = { version = "0.1.1", features = ["full"] }
# "http1",
# "http2",
# ] }
# tokio-rustls = { version = "0.24.1", features = ["early-data"] }

# tls and cert management
hot_reload = "0.1.4"
rustls = { version = "0.21.9", default-features = false }
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
webpki = "0.22.4"
x509-parser = "0.15.1"

Expand All @@ -68,6 +74,7 @@ h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true }
s2n-quic = { version = "1.31.0", default-features = false, features = [
"provider-tls-rustls",
], optional = true }
s2n-quic-core = { version = "0.31.0", default-features = false, optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true }
s2n-quic-rustls = { version = "0.31.0", optional = true }
# for UDP socket wit SO_REUSEADDR when h3 with quinn
Expand Down
2 changes: 1 addition & 1 deletion rpxy-lib/src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use service::CryptoReloader;
use std::sync::Arc;

pub use certs::{CertsAndKeys, CryptoSource};
pub use service::ServerCryptoBase;
pub use service::{ServerCrypto, ServerCryptoBase, SniServerCryptoMap};

/// Result type inner of certificate reloader service
type ReloaderServiceResultInner<T> = (
Expand Down
47 changes: 47 additions & 0 deletions rpxy-lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,51 @@ pub type RpxyResult<T> = std::result::Result<T, RpxyError>;
/// Describes things that can go wrong in the Rpxy
#[derive(Debug, Error)]
pub enum RpxyError {
// general errors
#[error("IO error: {0}")]
Io(#[from] std::io::Error),

// TLS errors
#[error("Failed to build TLS acceptor: {0}")]
FailedToTlsHandshake(String),
#[error("No server name in ClientHello")]
NoServerNameInClientHello,
#[error("No TLS serving app: {0}")]
NoTlsServingApp(String),
#[error("Failed to update server crypto: {0}")]
FailedToUpdateServerCrypto(String),
#[error("No server crypto: {0}")]
NoServerCrypto(String),

// hyper errors
#[error("hyper body manipulation error: {0}")]
HyperBodyManipulationError(String),

// http/3 errors
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
#[error("H3 error: {0}")]
H3Error(#[from] h3::Error),

#[cfg(feature = "http3-quinn")]
#[error("Invalid rustls TLS version: {0}")]
QuinnInvalidTlsProtocolVersion(String),
#[cfg(feature = "http3-quinn")]
#[error("Quinn connection error: {0}")]
QuinnConnectionFailed(#[from] quinn::ConnectionError),

#[cfg(feature = "http3-s2n")]
#[error("s2n-quic validation error: {0}")]
S2nQuicValidationError(#[from] s2n_quic_core::transport::parameters::ValidationError),
#[cfg(feature = "http3-s2n")]
#[error("s2n-quic connection error: {0}")]
S2nQuicConnectionError(#[from] s2n_quic_core::connection::Error),
#[cfg(feature = "http3-s2n")]
#[error("s2n-quic start error: {0}")]
S2nQuicStartError(#[from] s2n_quic::provider::StartError),

// certificate reloader errors
#[error("No certificate reloader when building a proxy for TLS")]
NoCertificateReloader,
#[error("Certificate reload error: {0}")]
CertificateReloadError(#[from] hot_reload::ReloaderError<crate::crypto::ServerCryptoBase>),

Expand All @@ -20,6 +62,11 @@ pub enum RpxyError {
#[error("Failed to build backend app: {0}")]
FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError),

// Upstream connection setting errors
#[error("Unsupported upstream option")]
UnsupportedUpstreamOption,

// Others
#[error("Infallible")]
Infallible(#[from] std::convert::Infallible),
}
6 changes: 5 additions & 1 deletion rpxy-lib/src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
mod proxy_h3;
mod proxy_main;
mod proxy_tls;
#[cfg(feature = "http3-quinn")]
mod proxy_quic_quinn;
#[cfg(feature = "http3-s2n")]
mod proxy_quic_s2n;
mod socket;

use crate::{globals::Globals, hyper_executor::LocalExecutor};
Expand Down
205 changes: 205 additions & 0 deletions rpxy-lib/src/proxy/proxy_h3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use super::proxy_main::Proxy;
use crate::{error::*, log::*, name_exp::ServerName};
use bytes::Bytes;
use http::{Request, Response};
use http_body_util::BodyExt;
use std::{net::SocketAddr, time::Duration};
use tokio::time::timeout;

#[cfg(feature = "http3-quinn")]
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
#[cfg(feature = "http3-s2n")]
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};

// use crate::{certs::CryptoSource, error::*, log::*, utils::ServerNameBytesExp};
// use futures::Stream;
// use hyper_util::client::legacy::connect::Connect;

// impl<U> Proxy<U>
// where
// // T: Connect + Clone + Sync + Send + 'static,
// U: CryptoSource + Clone + Sync + Send + 'static,
// {

impl Proxy {
pub(super) async fn h3_serve_connection<C>(
&self,
quic_connection: C,
tls_server_name: ServerName,
client_addr: SocketAddr,
) -> RpxyResult<()>
where
C: ConnectionQuic<Bytes>,
<C as ConnectionQuic<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
{
let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?;
info!(
"QUIC/HTTP3 connection established from {:?} {}",
client_addr,
<&ServerName as TryInto<String>>::try_into(&tls_server_name).unwrap_or_default()
);

// TODO: Is here enough to fetch server_name from NewConnection?
// to avoid deep nested call from listener_service_h3
loop {
// this routine follows hyperium/h3 examples https://github.com/hyperium/h3/blob/master/examples/server.rs
match h3_conn.accept().await {
Ok(None) => {
break;
}
Err(e) => {
warn!("HTTP/3 error on accept incoming connection: {}", e);
match e.get_error_level() {
h3::error::ErrorLevel::ConnectionError => break,
h3::error::ErrorLevel::StreamError => continue,
}
}
Ok(Some((req, stream))) => {
// We consider the connection count separately from the stream count.
// Max clients for h1/h2 = max 'stream' for h3.
let request_count = self.globals.request_count.clone();
if request_count.increment() > self.globals.proxy_config.max_clients {
request_count.decrement();
h3_conn.shutdown(0).await?;
break;
}
debug!("Request incoming: current # {}", request_count.current());

let self_inner = self.clone();
let tls_server_name_inner = tls_server_name.clone();
self.globals.runtime_handle.spawn(async move {
if let Err(e) = timeout(
self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2
self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner),
)
.await
{
error!("HTTP/3 failed to process stream: {}", e);
}
request_count.decrement();
debug!("Request processed: current # {}", request_count.current());
});
}
}
}

Ok(())
}

/// Serves a request stream from a client
/// TODO: TODO: TODO: TODO:
/// TODO: Body in hyper-0.14 was changed to Incoming in hyper-1.0, and it is not accessible from outside.
/// Thus, we need to implement IncomingLike trait using channel. Also, the backend handler must feed the body in the form of
/// Either<Incoming, IncomingLike> as body.
/// Also, the downstream from the backend handler could be Incoming, but will be wrapped as Either<Incoming, ()/Empty> as well due to H3.
/// Result<Either<_,_>, E> type includes E as HttpError to generate the status code and related Response<BoxBody>.
/// Thus to handle synthetic error messages in BoxBody, the serve() function outputs Response<Either<Either<Incoming, ()/Empty>, BoxBody>>>.
async fn h3_serve_stream<S>(
&self,
req: Request<()>,
stream: RequestStream<S, Bytes>,
client_addr: SocketAddr,
tls_server_name: ServerName,
) -> RpxyResult<()>
where
S: BidiStream<Bytes> + Send + 'static,
<S as BidiStream<Bytes>>::RecvStream: Send,
{
let (req_parts, _) = req.into_parts();
// split stream and async body handling
let (mut send_stream, mut recv_stream) = stream.split();

// let max_body_size = self.globals.proxy_config.h3_request_max_body_size;
// // let max = body_stream.size_hint().upper().unwrap_or(u64::MAX);
// // if max > max_body_size as u64 {
// // return Err(HttpError::TooLargeRequestBody);
// // }

// let new_req = Request::from_parts(req_parts, body_stream);

// // generate streamed body with trailers using channel
// let (body_sender, req_body) = Incoming::channel();

// // Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1
// // The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn.
// let max_body_size = self.globals.proxy_config.h3_request_max_body_size;
// self.globals.runtime_handle.spawn(async move {
// // let mut sender = body_sender;
// let mut size = 0usize;
// while let Some(mut body) = recv_stream.recv_data().await? {
// debug!("HTTP/3 incoming request body: remaining {}", body.remaining());
// size += body.remaining();
// if size > max_body_size {
// error!(
// "Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}",
// size, max_body_size
// );
// return Err(RpxyError::Proxy("Exceeds max request body size for HTTP/3".to_string()));
// }
// // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
// // sender.send_data(body.copy_to_bytes(body.remaining())).await?;
// }

// // trailers: use inner for work around. (directly get trailer)
// let trailers = recv_stream.as_mut().recv_trailers().await?;
// if trailers.is_some() {
// debug!("HTTP/3 incoming request trailers");
// // sender.send_trailers(trailers.unwrap()).await?;
// }
// Ok(())
// });

// let new_req: Request<Incoming> = Request::from_parts(req_parts, req_body);
// let res = self
// .msg_handler
// .clone()
// .handle_request(
// new_req,
// client_addr,
// self.listening_on,
// self.tls_enabled,
// Some(tls_server_name),
// )
// .await?;

// TODO: TODO: TODO: remove later
let body = full(hyper::body::Bytes::from("hello h3 echo"));
let res = Response::builder().body(body).unwrap();
/////////////////

let (new_res_parts, new_body) = res.into_parts();
let new_res = Response::from_parts(new_res_parts, ());

match send_stream.send_response(new_res).await {
Ok(_) => {
debug!("HTTP/3 response to connection successful");
// aggregate body without copying
let body_data = new_body
.collect()
.await
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;

// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
send_stream.send_data(body_data.to_bytes()).await?;

// TODO: needs handling trailer? should be included in body from handler.
}
Err(err) => {
error!("Unable to send response to connection peer: {:?}", err);
}
}
Ok(send_stream.finish().await?)
}
}

//////////////
/// TODO: remove later
/// helper function to build a full body
use http_body_util::Full;
pub(crate) type BoxBody = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
pub fn full(body: hyper::body::Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}
//////////////
Loading

0 comments on commit 1dc88ce

Please sign in to comment.