Skip to content

Commit

Permalink
wip: feat: change request body from either to explicit enum
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara committed Dec 12, 2023
1 parent 008b62a commit 1c18f38
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 41 deletions.
2 changes: 2 additions & 0 deletions rpxy-lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum RpxyError {
HyperIncomingLikeNewClosed,
#[error("New body write aborted")]
HyperNewBodyWriteAborted,
#[error("Hyper error in serving request or response body type: {0}")]
HyperBodyError(#[from] hyper::Error),

// http/3 errors
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
Expand Down
2 changes: 1 addition & 1 deletion rpxy-lib/src/forwarder/cache/cache_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl RpxyCache {
.map(|f| {
if f.is_data() {
let data_bytes = f.data_ref().unwrap().clone();
println!("ddddde");
debug!("cache data bytes of {} bytes", data_bytes.len())
// TODO: cache data bytes as file or on memory
// fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。
// ファイルとObjectのbindをどうやってするか
Expand Down
4 changes: 2 additions & 2 deletions rpxy-lib/src/forwarder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
mod cache;
mod client;

use crate::hyper_ext::body::{IncomingLike, IncomingOr};
use crate::hyper_ext::body::RequestBody;

pub(crate) type Forwarder<C> = client::Forwarder<C, IncomingOr<IncomingLike>>;
pub(crate) type Forwarder<C> = client::Forwarder<C, RequestBody>;
pub(crate) use client::ForwardRequest;

#[cfg(feature = "cache")]
Expand Down
51 changes: 31 additions & 20 deletions rpxy-lib/src/hyper_ext/body_type.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
// use http::Response;
use http_body_util::{combinators, BodyExt, Either, Empty, Full};
use super::body::IncomingLike;
use crate::error::RpxyError;
use http_body_util::{combinators, BodyExt, Empty, Full};
use hyper::body::{Body, Bytes, Incoming};
use std::pin::Pin;

/// Type for synthetic boxed body
pub(crate) type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;
/// Type for either passthrough body or given body type, specifically synthetic boxed body
pub(crate) type IncomingOr<B> = Either<Incoming, B>;

// /// helper function to build http response with passthrough body
// pub(crate) fn wrap_incoming_body_response<B>(response: Response<Incoming>) -> Response<IncomingOr<B>>
// where
// B: hyper::body::Body,
// {
// response.map(IncomingOr::Left)
// }

// /// helper function to build http response with synthetic body
// pub(crate) fn wrap_synthetic_body_response<B>(response: Response<B>) -> Response<IncomingOr<B>> {
// response.map(IncomingOr::Right)
// }

/// helper function to build a empty body
pub(crate) fn empty() -> BoxBody {
Expand All @@ -31,6 +17,30 @@ pub(crate) fn full(body: Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}

/* ------------------------------------ */
/// Request body used in this project
/// - Incoming: just a type that only forwards the downstream request body to upstream.
/// - IncomingLike: a Incoming-like type in which channel is used
pub(crate) enum RequestBody {
Incoming(Incoming),
IncomingLike(IncomingLike),
}

impl Body for RequestBody {
type Data = bytes::Bytes;
type Error = RpxyError;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.get_mut() {
RequestBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx).map_err(RpxyError::HyperBodyError),
RequestBody::IncomingLike(incoming_like) => Pin::new(incoming_like).poll_frame(cx),
}
}
}

/* ------------------------------------ */
#[cfg(feature = "cache")]
use futures::channel::mpsc::UnboundedReceiver;
Expand All @@ -44,8 +54,8 @@ pub(crate) type UnboundedStreamBody = StreamBody<UnboundedReceiver<Result<Frame<

/// Response body use in this project
/// - Incoming: just a type that only forwards the upstream response body to downstream.
/// - BoxedCache: a type that is generated from cache, e.g.,, small byte object.
/// - StreamedCache: another type that is generated from cache as stream, e.g., large byte object.
/// - Boxed: a type that is generated from cache or synthetic response body, e.g.,, small byte object.
/// - Streamed: another type that is generated from stream, e.g., large byte object.
pub(crate) enum ResponseBody {
Incoming(Incoming),
Boxed(BoxBody),
Expand All @@ -55,7 +65,7 @@ pub(crate) enum ResponseBody {

impl Body for ResponseBody {
type Data = bytes::Bytes;
type Error = hyper::Error;
type Error = RpxyError;

fn poll_frame(
self: Pin<&mut Self>,
Expand All @@ -68,5 +78,6 @@ impl Body for ResponseBody {
#[cfg(feature = "cache")]
ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx),
}
.map_err(RpxyError::HyperBodyError)
}
}
2 changes: 1 addition & 1 deletion rpxy-lib/src/hyper_ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ pub(crate) mod rt {
#[allow(unused)]
pub(crate) mod body {
pub(crate) use super::body_incoming_like::IncomingLike;
pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr, ResponseBody, UnboundedStreamBody};
pub(crate) use super::body_type::{empty, full, BoxBody, RequestBody, ResponseBody, UnboundedStreamBody};
}
6 changes: 3 additions & 3 deletions rpxy-lib/src/message_handler/handler_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
error::*,
forwarder::{ForwardRequest, Forwarder},
globals::Globals,
hyper_ext::body::{IncomingLike, IncomingOr, ResponseBody},
hyper_ext::body::{RequestBody, ResponseBody},
log::*,
name_exp::ServerName,
};
Expand Down Expand Up @@ -53,7 +53,7 @@ where
/// Responsible to passthrough responses from backend applications or generate synthetic error responses.
pub async fn handle_request(
&self,
req: Request<IncomingOr<IncomingLike>>,
req: Request<RequestBody>,
client_addr: SocketAddr, // For access control
listen_addr: SocketAddr,
tls_enabled: bool,
Expand Down Expand Up @@ -94,7 +94,7 @@ where
async fn handle_request_inner(
&self,
log_data: &mut HttpMessageLog,
mut req: Request<IncomingOr<IncomingLike>>,
mut req: Request<RequestBody>,
client_addr: SocketAddr, // For access control
listen_addr: SocketAddr,
tls_enabled: bool,
Expand Down
15 changes: 3 additions & 12 deletions rpxy-lib/src/proxy/proxy_h3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::proxy_main::Proxy;
use crate::{
crypto::CryptoSource,
error::*,
hyper_ext::body::{IncomingLike, IncomingOr},
hyper_ext::body::{IncomingLike, RequestBody},
log::*,
name_exp::ServerName,
};
Expand Down Expand Up @@ -137,7 +137,7 @@ where
Ok(()) as RpxyResult<()>
});

let new_req: Request<IncomingOr<IncomingLike>> = Request::from_parts(req_parts, IncomingOr::Right(req_body));
let new_req: Request<RequestBody> = Request::from_parts(req_parts, RequestBody::IncomingLike(req_body));
let res = self
.message_handler
.handle_request(
Expand All @@ -155,6 +155,7 @@ where
match send_stream.send_response(new_res).await {
Ok(_) => {
debug!("HTTP/3 response to connection successful");
// on-demand body streaming to downstream without expanding the object onto memory.
loop {
let frame = match new_body.frame().await {
Some(frame) => frame,
Expand All @@ -175,16 +176,6 @@ where
send_stream.send_trailers(trailers).await?;
}
}
// // aggregate body without copying
// let body_data = new_body
// .collect()
// .await
// .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;

// // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
// send_stream.send_data(body_data.to_bytes()).await?;

// TODO: needs handling trailer? should be included in body from handler.
}
Err(err) => {
error!("Unable to send response to connection peer: {:?}", err);
Expand Down
4 changes: 2 additions & 2 deletions rpxy-lib/src/proxy/proxy_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
error::*,
globals::Globals,
hyper_ext::{
body::{IncomingOr, ResponseBody},
body::{RequestBody, ResponseBody},
rt::LocalExecutor,
},
log::*,
Expand Down Expand Up @@ -39,7 +39,7 @@ where
{
handler
.handle_request(
req.map(IncomingOr::Left),
req.map(RequestBody::Incoming),
client_addr,
listen_addr,
tls_enabled,
Expand Down

0 comments on commit 1c18f38

Please sign in to comment.