From b44edeb60d234d49c45828395108f7519a048d4b Mon Sep 17 00:00:00 2001 From: Ruben2424 <61056653+Ruben2424@users.noreply.github.com> Date: Wed, 20 Mar 2024 20:05:16 +0100 Subject: [PATCH] split `server.rs` and `client.rs` into multiple files in thair own module (#229) * fix warnings * move client and server to new modules * split server.rs * split client.rs * fix docs warnings * clippy run * re-export server stuff * re-export client stuff --- h3-quinn/src/lib.rs | 4 +- h3-webtransport/src/server.rs | 5 +- h3/src/client/builder.rs | 129 +++++++ h3/src/{client.rs => client/connection.rs} | 347 +---------------- h3/src/client/mod.rs | 12 + h3/src/client/stream.rs | 219 +++++++++++ h3/src/lib.rs | 3 +- h3/src/qpack/mod.rs | 7 +- h3/src/qpack/prefix_string/mod.rs | 2 +- h3/src/qpack/stream.rs | 6 +- h3/src/qpack/tests.rs | 4 +- h3/src/server/builder.rs | 140 +++++++ h3/src/{server.rs => server/connection.rs} | 424 ++------------------- h3/src/server/mod.rs | 62 +++ h3/src/{ => server}/request.rs | 4 +- h3/src/server/stream.rs | 243 ++++++++++++ h3/src/tests/connection.rs | 4 +- 17 files changed, 864 insertions(+), 751 deletions(-) create mode 100644 h3/src/client/builder.rs rename h3/src/{client.rs => client/connection.rs} (58%) create mode 100644 h3/src/client/mod.rs create mode 100644 h3/src/client/stream.rs create mode 100644 h3/src/server/builder.rs rename h3/src/{server.rs => server/connection.rs} (55%) create mode 100644 h3/src/server/mod.rs rename h3/src/{ => server}/request.rs (97%) create mode 100644 h3/src/server/stream.rs diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 54c39f04..573fa823 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -20,9 +20,7 @@ use futures::{ StreamExt, }; use quinn::ReadDatagram; -pub use quinn::{ - self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError, -}; +pub use quinn::{self, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; use h3::{ ext::Datagram, diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 8ca23aca..435963e9 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -16,7 +16,8 @@ use h3::{ frame::FrameStream, proto::frame::Frame, quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf}, - server::{self, Connection, RequestStream}, + server::Connection, + server::RequestStream, Error, }; use h3::{ @@ -397,7 +398,7 @@ where C: quic::Connection, B: Buf, { - conn: &'a Mutex>, + conn: &'a Mutex>, } impl<'a, C, B> Future for AcceptUni<'a, C, B> diff --git a/h3/src/client/builder.rs b/h3/src/client/builder.rs new file mode 100644 index 00000000..d5d875cb --- /dev/null +++ b/h3/src/client/builder.rs @@ -0,0 +1,129 @@ +//! HTTP/3 client builder + +use std::{ + marker::PhantomData, + sync::{atomic::AtomicUsize, Arc}, + task::Poll, +}; + +use bytes::{Buf, Bytes}; +use futures_util::future; + +use crate::{ + config::Config, + connection::{ConnectionInner, SharedStateRef}, + error::Error, + quic::{self}, +}; + +use super::connection::{Connection, SendRequest}; + +/// Start building a new HTTP/3 client +pub fn builder() -> Builder { + Builder::new() +} + +/// Create a new HTTP/3 client with default settings +pub async fn new(conn: C) -> Result<(Connection, SendRequest), Error> +where + C: quic::Connection, + O: quic::OpenStreams, +{ + //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3 + //= type=implication + //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP + //# address and UDP port, where the IP address and port might be derived + //# from a URI, a selected alternative service ([ALTSVC]), a configured + //# proxy, or name resolution of any of these. + Builder::new().build(conn).await +} + +/// HTTP/3 client builder +/// +/// Set the configuration for a new client. +/// +/// # Examples +/// ```rust +/// # use h3::quic; +/// # async fn doc(quic: C) +/// # where +/// # C: quic::Connection, +/// # O: quic::OpenStreams, +/// # B: bytes::Buf, +/// # { +/// let h3_conn = h3::client::builder() +/// .max_field_section_size(8192) +/// .build(quic) +/// .await +/// .expect("Failed to build connection"); +/// # } +/// ``` +pub struct Builder { + config: Config, +} + +impl Builder { + pub(super) fn new() -> Self { + Builder { + config: Default::default(), + } + } + + #[cfg(test)] + pub fn send_settings(&mut self, value: bool) -> &mut Self { + self.config.send_settings = value; + self + } + + /// Set the maximum header size this client is willing to accept + /// + /// See [header size constraints] section of the specification for details. + /// + /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints + pub fn max_field_section_size(&mut self, value: u64) -> &mut Self { + self.config.settings.max_field_section_size = value; + self + } + + /// Just like in HTTP/2, HTTP/3 also uses the concept of "grease" + /// to prevent potential interoperability issues in the future. + /// In HTTP/3, the concept of grease is used to ensure that the protocol can evolve + /// and accommodate future changes without breaking existing implementations. + pub fn send_grease(&mut self, enabled: bool) -> &mut Self { + self.config.send_grease = enabled; + self + } + + /// Create a new HTTP/3 client from a `quic` connection + pub async fn build( + &mut self, + quic: C, + ) -> Result<(Connection, SendRequest), Error> + where + C: quic::Connection, + O: quic::OpenStreams, + B: Buf, + { + let open = quic.opener(); + let conn_state = SharedStateRef::default(); + + let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await); + + Ok(( + Connection { + inner: ConnectionInner::new(quic, conn_state.clone(), self.config).await?, + sent_closing: None, + recv_closing: None, + }, + SendRequest { + open, + conn_state, + conn_waker, + max_field_section_size: self.config.settings.max_field_section_size, + sender_count: Arc::new(AtomicUsize::new(1)), + send_grease_frame: self.config.send_grease, + _buf: PhantomData, + }, + )) + } +} diff --git a/h3/src/client.rs b/h3/src/client/connection.rs similarity index 58% rename from h3/src/client.rs rename to h3/src/client/connection.rs index 83b745e3..4fb53cd7 100644 --- a/h3/src/client.rs +++ b/h3/src/client/connection.rs @@ -1,19 +1,17 @@ //! Client implementation of the HTTP/3 protocol use std::{ - convert::TryFrom, marker::PhantomData, sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll, Waker}, }; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, BytesMut}; use futures_util::future; -use http::{request, HeaderMap, Response}; +use http::request; use tracing::{info, trace}; use crate::{ - config::Config, connection::{self, ConnectionInner, ConnectionState, SharedStateRef}, error::{Code, Error, ErrorLevel}, frame::FrameStream, @@ -23,25 +21,7 @@ use crate::{ stream::{self, BufRecvStream}, }; -/// Start building a new HTTP/3 client -pub fn builder() -> Builder { - Builder::new() -} - -/// Create a new HTTP/3 client with default settings -pub async fn new(conn: C) -> Result<(Connection, SendRequest), Error> -where - C: quic::Connection, - O: quic::OpenStreams, -{ - //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3 - //= type=implication - //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP - //# address and UDP port, where the IP address and port might be derived - //# from a URI, a selected alternative service ([ALTSVC]), a configured - //# proxy, or name resolution of any of these. - Builder::new().build(conn).await -} +use super::stream::RequestStream; /// HTTP/3 request sender /// @@ -125,14 +105,14 @@ where T: quic::OpenStreams, B: Buf, { - open: T, - conn_state: SharedStateRef, - max_field_section_size: u64, // maximum size for a header we receive + pub(super) open: T, + pub(super) conn_state: SharedStateRef, + pub(super) max_field_section_size: u64, // maximum size for a header we receive // counts instances of SendRequest to close the connection when the last is dropped. - sender_count: Arc, - conn_waker: Option, - _buf: PhantomData, - send_grease_frame: bool, + pub(super) sender_count: Arc, + pub(super) conn_waker: Option, + pub(super) _buf: PhantomData, + pub(super) send_grease_frame: bool, } impl SendRequest @@ -306,7 +286,9 @@ where /// ```rust /// # use bytes::Buf; /// # use futures_util::future; -/// # use h3::{client::*, quic}; +/// # use h3::quic; +/// # use h3::client::Connection; +/// # use h3::client::SendRequest; /// # use tokio::{self, sync::oneshot, task::JoinHandle}; /// # async fn doc(mut connection: Connection) /// # -> Result<(), Box> @@ -351,11 +333,11 @@ where C: quic::Connection, B: Buf, { - inner: ConnectionInner, + pub(super) inner: ConnectionInner, // Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept. - sent_closing: Option, + pub(super) sent_closing: Option, // Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept. - recv_closing: Option, + pub(super) recv_closing: Option, } impl Connection @@ -462,300 +444,3 @@ where Poll::Pending } } - -/// HTTP/3 client builder -/// -/// Set the configuration for a new client. -/// -/// # Examples -/// ```rust -/// # use h3::quic; -/// # async fn doc(quic: C) -/// # where -/// # C: quic::Connection, -/// # O: quic::OpenStreams, -/// # B: bytes::Buf, -/// # { -/// let h3_conn = h3::client::builder() -/// .max_field_section_size(8192) -/// .build(quic) -/// .await -/// .expect("Failed to build connection"); -/// # } -/// ``` -pub struct Builder { - config: Config, -} - -impl Builder { - pub(super) fn new() -> Self { - Builder { - config: Default::default(), - } - } - - #[cfg(test)] - pub fn send_settings(&mut self, value: bool) -> &mut Self { - self.config.send_settings = value; - self - } - - /// Set the maximum header size this client is willing to accept - /// - /// See [header size constraints] section of the specification for details. - /// - /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints - pub fn max_field_section_size(&mut self, value: u64) -> &mut Self { - self.config.settings.max_field_section_size = value; - self - } - - /// Just like in HTTP/2, HTTP/3 also uses the concept of "grease" - /// to prevent potential interoperability issues in the future. - /// In HTTP/3, the concept of grease is used to ensure that the protocol can evolve - /// and accommodate future changes without breaking existing implementations. - pub fn send_grease(&mut self, enabled: bool) -> &mut Self { - self.config.send_grease = enabled; - self - } - - /// Create a new HTTP/3 client from a `quic` connection - pub async fn build( - &mut self, - quic: C, - ) -> Result<(Connection, SendRequest), Error> - where - C: quic::Connection, - O: quic::OpenStreams, - B: Buf, - { - let open = quic.opener(); - let conn_state = SharedStateRef::default(); - - let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await); - - Ok(( - Connection { - inner: ConnectionInner::new(quic, conn_state.clone(), self.config).await?, - sent_closing: None, - recv_closing: None, - }, - SendRequest { - open, - conn_state, - conn_waker, - max_field_section_size: self.config.settings.max_field_section_size, - sender_count: Arc::new(AtomicUsize::new(1)), - send_grease_frame: self.config.send_grease, - _buf: PhantomData, - }, - )) - } -} - -/// Manage request bodies transfer, response and trailers. -/// -/// Once a request has been sent via [`send_request()`], a response can be awaited by calling -/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request -/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`]. -/// -/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns -/// `None`. Then the trailers will eventually be available via [`recv_trailers()`]. -/// -/// TODO: If data is polled before the response has been received, an error will be thrown. -/// -/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be -/// thrown -/// -/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will -/// put an end to any transfer concerning it. -/// -/// # Examples -/// -/// ```rust -/// # use h3::{quic, client::*}; -/// # use http::{Request, Response}; -/// # use bytes::Buf; -/// # use tokio::io::AsyncWriteExt; -/// # async fn doc(mut req_stream: RequestStream) -> Result<(), Box> -/// # where -/// # T: quic::RecvStream, -/// # { -/// // Prepare the HTTP request to send to the server -/// let request = Request::get("https://www.example.com/").body(())?; -/// -/// // Receive the response -/// let response = req_stream.recv_response().await?; -/// // Receive the body -/// while let Some(mut chunk) = req_stream.recv_data().await? { -/// let mut out = tokio::io::stdout(); -/// out.write_all_buf(&mut chunk).await?; -/// out.flush().await?; -/// } -/// # Ok(()) -/// # } -/// # pub fn main() {} -/// ``` -/// -/// [`send_request()`]: struct.SendRequest.html#method.send_request -/// [`recv_response()`]: #method.recv_response -/// [`recv_data()`]: #method.recv_data -/// [`send_data()`]: #method.send_data -/// [`send_trailers()`]: #method.send_trailers -/// [`recv_trailers()`]: #method.recv_trailers -/// [`finish()`]: #method.finish -/// [`stop_sending()`]: #method.stop_sending -pub struct RequestStream { - inner: connection::RequestStream, -} - -impl ConnectionState for RequestStream { - fn shared_state(&self) -> &SharedStateRef { - &self.inner.conn_state - } -} - -impl RequestStream -where - S: quic::RecvStream, -{ - /// Receive the HTTP/3 response - /// - /// This should be called before trying to receive any data with [`recv_data()`]. - /// - /// [`recv_data()`]: #method.recv_data - pub async fn recv_response(&mut self) -> Result, Error> { - let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx)) - .await - .map_err(|e| self.maybe_conn_err(e))? - .ok_or_else(|| { - Code::H3_GENERAL_PROTOCOL_ERROR.with_reason( - "Did not receive response headers", - ErrorLevel::ConnectionError, - ) - })?; - - //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5 - //= type=TODO - //# A client MUST treat - //# receipt of a PUSH_PROMISE frame that contains a larger push ID than - //# the client has advertised as a connection error of H3_ID_ERROR. - - //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5 - //= type=TODO - //# If a client - //# receives a push ID that has already been promised and detects a - //# mismatch, it MUST respond with a connection error of type - //# H3_GENERAL_PROTOCOL_ERROR. - - let decoded = if let Frame::Headers(ref mut encoded) = frame { - match qpack::decode_stateless(encoded, self.inner.max_field_section_size) { - //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2 - //# An HTTP/3 implementation MAY impose a limit on the maximum size of - //# the message header it will accept on an individual HTTP message. - Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => { - self.inner.stop_sending(Code::H3_REQUEST_CANCELLED); - return Err(Error::header_too_big( - cancel_size, - self.inner.max_field_section_size, - )); - } - Ok(decoded) => decoded, - Err(e) => return Err(e.into()), - } - } else { - return Err(Code::H3_FRAME_UNEXPECTED.with_reason( - "First response frame is not headers", - ErrorLevel::ConnectionError, - )); - }; - - let qpack::Decoded { fields, .. } = decoded; - - let (status, headers) = Header::try_from(fields)?.into_response_parts()?; - let mut resp = Response::new(()); - *resp.status_mut() = status; - *resp.headers_mut() = headers; - *resp.version_mut() = http::Version::HTTP_3; - - Ok(resp) - } - - /// Receive some of the request body. - // TODO what if called before recv_response ? - pub async fn recv_data(&mut self) -> Result, Error> { - self.inner.recv_data().await - } - - /// Receive an optional set of trailers for the response. - pub async fn recv_trailers(&mut self) -> Result, Error> { - let res = self.inner.recv_trailers().await; - if let Err(ref e) = res { - if e.is_header_too_big() { - self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED); - } - } - res - } - - /// Tell the peer to stop sending into the underlying QUIC stream - pub fn stop_sending(&mut self, error_code: crate::error::Code) { - // TODO take by value to prevent any further call as this request is cancelled - // rename `cancel()` ? - self.inner.stream.stop_sending(error_code) - } -} - -impl RequestStream -where - S: quic::SendStream, - B: Buf, -{ - /// Send some data on the request body. - pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { - self.inner.send_data(buf).await - } - - /// Send a set of trailers to end the request. - /// - /// Either [`RequestStream::finish`] or - /// [`RequestStream::send_trailers`] must be called to finalize a - /// request. - pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { - self.inner.send_trailers(trailers).await - } - - /// End the request without trailers. - /// - /// Either [`RequestStream::finish`] or - /// [`RequestStream::send_trailers`] must be called to finalize a - /// request. - pub async fn finish(&mut self) -> Result<(), Error> { - self.inner.finish().await - } - - //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1 - //= type=TODO - //# Implementations SHOULD cancel requests by abruptly terminating any - //# directions of a stream that are still open. To do so, an - //# implementation resets the sending parts of streams and aborts reading - //# on the receiving parts of streams; see Section 2.4 of - //# [QUIC-TRANSPORT]. -} - -impl RequestStream -where - S: quic::BidiStream, - B: Buf, -{ - /// Split this stream into two halves that can be driven independently. - pub fn split( - self, - ) -> ( - RequestStream, - RequestStream, - ) { - let (send, recv) = self.inner.split(); - (RequestStream { inner: send }, RequestStream { inner: recv }) - } -} diff --git a/h3/src/client/mod.rs b/h3/src/client/mod.rs new file mode 100644 index 00000000..2435a4c3 --- /dev/null +++ b/h3/src/client/mod.rs @@ -0,0 +1,12 @@ +//! HTTP/3 client + +mod connection; +mod stream; + +mod builder; + +pub use builder::builder; +pub use builder::new; +pub use builder::Builder; +pub use connection::{Connection, SendRequest}; +pub use stream::RequestStream; diff --git a/h3/src/client/stream.rs b/h3/src/client/stream.rs new file mode 100644 index 00000000..e2b4b8e6 --- /dev/null +++ b/h3/src/client/stream.rs @@ -0,0 +1,219 @@ +use bytes::Buf; +use futures_util::future; +use http::{HeaderMap, Response}; + +use crate::{ + connection::{self, ConnectionState, SharedStateRef}, + error::{Code, Error, ErrorLevel}, + proto::{frame::Frame, headers::Header}, + qpack, + quic::{self}, +}; +use std::convert::TryFrom; + +/// Manage request bodies transfer, response and trailers. +/// +/// Once a request has been sent via [`send_request()`], a response can be awaited by calling +/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request +/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`]. +/// +/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns +/// `None`. Then the trailers will eventually be available via [`recv_trailers()`]. +/// +/// TODO: If data is polled before the response has been received, an error will be thrown. +/// +/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be +/// thrown +/// +/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will +/// put an end to any transfer concerning it. +/// +/// # Examples +/// +/// ```rust +/// # use h3::{quic, client::*}; +/// # use http::{Request, Response}; +/// # use bytes::Buf; +/// # use tokio::io::AsyncWriteExt; +/// # async fn doc(mut req_stream: RequestStream) -> Result<(), Box> +/// # where +/// # T: quic::RecvStream, +/// # { +/// // Prepare the HTTP request to send to the server +/// let request = Request::get("https://www.example.com/").body(())?; +/// +/// // Receive the response +/// let response = req_stream.recv_response().await?; +/// // Receive the body +/// while let Some(mut chunk) = req_stream.recv_data().await? { +/// let mut out = tokio::io::stdout(); +/// out.write_all_buf(&mut chunk).await?; +/// out.flush().await?; +/// } +/// # Ok(()) +/// # } +/// # pub fn main() {} +/// ``` +/// +/// [`send_request()`]: struct.SendRequest.html#method.send_request +/// [`recv_response()`]: #method.recv_response +/// [`recv_data()`]: #method.recv_data +/// [`send_data()`]: #method.send_data +/// [`send_trailers()`]: #method.send_trailers +/// [`recv_trailers()`]: #method.recv_trailers +/// [`finish()`]: #method.finish +/// [`stop_sending()`]: #method.stop_sending +pub struct RequestStream { + pub(super) inner: connection::RequestStream, +} + +impl ConnectionState for RequestStream { + fn shared_state(&self) -> &SharedStateRef { + &self.inner.conn_state + } +} + +impl RequestStream +where + S: quic::RecvStream, +{ + /// Receive the HTTP/3 response + /// + /// This should be called before trying to receive any data with [`recv_data()`]. + /// + /// [`recv_data()`]: #method.recv_data + pub async fn recv_response(&mut self) -> Result, Error> { + let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx)) + .await + .map_err(|e| self.maybe_conn_err(e))? + .ok_or_else(|| { + Code::H3_GENERAL_PROTOCOL_ERROR.with_reason( + "Did not receive response headers", + ErrorLevel::ConnectionError, + ) + })?; + + //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5 + //= type=TODO + //# A client MUST treat + //# receipt of a PUSH_PROMISE frame that contains a larger push ID than + //# the client has advertised as a connection error of H3_ID_ERROR. + + //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5 + //= type=TODO + //# If a client + //# receives a push ID that has already been promised and detects a + //# mismatch, it MUST respond with a connection error of type + //# H3_GENERAL_PROTOCOL_ERROR. + + let decoded = if let Frame::Headers(ref mut encoded) = frame { + match qpack::decode_stateless(encoded, self.inner.max_field_section_size) { + //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2 + //# An HTTP/3 implementation MAY impose a limit on the maximum size of + //# the message header it will accept on an individual HTTP message. + Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => { + self.inner.stop_sending(Code::H3_REQUEST_CANCELLED); + return Err(Error::header_too_big( + cancel_size, + self.inner.max_field_section_size, + )); + } + Ok(decoded) => decoded, + Err(e) => return Err(e.into()), + } + } else { + return Err(Code::H3_FRAME_UNEXPECTED.with_reason( + "First response frame is not headers", + ErrorLevel::ConnectionError, + )); + }; + + let qpack::Decoded { fields, .. } = decoded; + + let (status, headers) = Header::try_from(fields)?.into_response_parts()?; + let mut resp = Response::new(()); + *resp.status_mut() = status; + *resp.headers_mut() = headers; + *resp.version_mut() = http::Version::HTTP_3; + + Ok(resp) + } + + /// Receive some of the request body. + // TODO what if called before recv_response ? + pub async fn recv_data(&mut self) -> Result, Error> { + self.inner.recv_data().await + } + + /// Receive an optional set of trailers for the response. + pub async fn recv_trailers(&mut self) -> Result, Error> { + let res = self.inner.recv_trailers().await; + if let Err(ref e) = res { + if e.is_header_too_big() { + self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED); + } + } + res + } + + /// Tell the peer to stop sending into the underlying QUIC stream + pub fn stop_sending(&mut self, error_code: crate::error::Code) { + // TODO take by value to prevent any further call as this request is cancelled + // rename `cancel()` ? + self.inner.stream.stop_sending(error_code) + } +} + +impl RequestStream +where + S: quic::SendStream, + B: Buf, +{ + /// Send some data on the request body. + pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { + self.inner.send_data(buf).await + } + + /// Send a set of trailers to end the request. + /// + /// Either [`RequestStream::finish`] or + /// [`RequestStream::send_trailers`] must be called to finalize a + /// request. + pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { + self.inner.send_trailers(trailers).await + } + + /// End the request without trailers. + /// + /// Either [`RequestStream::finish`] or + /// [`RequestStream::send_trailers`] must be called to finalize a + /// request. + pub async fn finish(&mut self) -> Result<(), Error> { + self.inner.finish().await + } + + //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1 + //= type=TODO + //# Implementations SHOULD cancel requests by abruptly terminating any + //# directions of a stream that are still open. To do so, an + //# implementation resets the sending parts of streams and aborts reading + //# on the receiving parts of streams; see Section 2.4 of + //# [QUIC-TRANSPORT]. +} + +impl RequestStream +where + S: quic::BidiStream, + B: Buf, +{ + /// Split this stream into two halves that can be driven independently. + pub fn split( + self, + ) -> ( + RequestStream, + RequestStream, + ) { + let (send, recv) = self.inner.split(); + (RequestStream { inner: send }, RequestStream { inner: recv }) + } +} diff --git a/h3/src/lib.rs b/h3/src/lib.rs index 7fb6496a..7dc10006 100644 --- a/h3/src/lib.rs +++ b/h3/src/lib.rs @@ -3,11 +3,12 @@ #![allow(clippy::derive_partial_eq_without_eq)] pub mod client; + mod config; pub mod error; pub mod ext; pub mod quic; -pub(crate) mod request; + pub mod server; pub use error::Error; diff --git a/h3/src/qpack/mod.rs b/h3/src/qpack/mod.rs index 7189dc3a..ed12af96 100644 --- a/h3/src/qpack/mod.rs +++ b/h3/src/qpack/mod.rs @@ -1,9 +1,6 @@ pub use self::{ - decoder::{ - ack_header, decode_stateless, stream_canceled, Decoded, Decoder, Error as DecoderError, - }, - dynamic::Error as DynamicTableError, - encoder::{encode_stateless, Encoder, Error as EncoderError}, + decoder::{decode_stateless, Decoded, Error as DecoderError}, + encoder::{encode_stateless, Error as EncoderError}, field::HeaderField, }; diff --git a/h3/src/qpack/prefix_string/mod.rs b/h3/src/qpack/prefix_string/mod.rs index 7583568a..4a0967ba 100644 --- a/h3/src/qpack/prefix_string/mod.rs +++ b/h3/src/qpack/prefix_string/mod.rs @@ -11,7 +11,7 @@ use bytes::{Buf, BufMut}; pub use self::bitwin::BitWindow; pub use self::{ - decode::{DecodeIter, Error as HuffmanDecodingError, HpackStringDecode}, + decode::{Error as HuffmanDecodingError, HpackStringDecode}, encode::{Error as HuffmanEncodingError, HpackStringEncode}, }; diff --git a/h3/src/qpack/stream.rs b/h3/src/qpack/stream.rs index 792fbf9c..d2ec4c8f 100644 --- a/h3/src/qpack/stream.rs +++ b/h3/src/qpack/stream.rs @@ -99,7 +99,7 @@ impl InsertWithNameRef { }; let index: usize = index .try_into() - .map_err(|e| ParseError::Integer(crate::qpack::prefix_int::Error::Overflow))?; + .map_err(|_e| ParseError::Integer(crate::qpack::prefix_int::Error::Overflow))?; let value = match prefix_string::decode(8, buf) { Ok(x) => x, @@ -294,7 +294,7 @@ pub struct HeaderAck(pub u64); impl HeaderAck { pub fn decode(buf: &mut R) -> Result, ParseError> { let stream_id = match prefix_int::decode(7, buf) { - Ok((0b1, x)) => x as u64, + Ok((0b1, x)) => x, Ok((f, _)) => return Err(ParseError::InvalidPrefix(f)), Err(IntError::UnexpectedEnd) => return Ok(None), Err(e) => return Err(e.into()), @@ -313,7 +313,7 @@ pub struct StreamCancel(pub u64); impl StreamCancel { pub fn decode(buf: &mut R) -> Result, ParseError> { let stream_id = match prefix_int::decode(6, buf) { - Ok((0b01, x)) => x as u64, + Ok((0b01, x)) => x, Ok((f, _)) => return Err(ParseError::InvalidPrefix(f)), Err(IntError::UnexpectedEnd) => return Ok(None), Err(e) => return Err(e.into()), diff --git a/h3/src/qpack/tests.rs b/h3/src/qpack/tests.rs index 5ae34d51..8228dea5 100644 --- a/h3/src/qpack/tests.rs +++ b/h3/src/qpack/tests.rs @@ -1,4 +1,6 @@ -use crate::qpack::{dynamic::DynamicTable, Decoded, Decoder, DecoderError, Encoder, HeaderField}; +use crate::qpack::decoder::Decoder; +use crate::qpack::encoder::Encoder; +use crate::qpack::{dynamic::DynamicTable, Decoded, DecoderError, HeaderField}; use std::io::Cursor; pub mod helpers { diff --git a/h3/src/server/builder.rs b/h3/src/server/builder.rs new file mode 100644 index 00000000..1836fd1b --- /dev/null +++ b/h3/src/server/builder.rs @@ -0,0 +1,140 @@ +//! Builder of HTTP/3 server connections. +//! +//! Use this struct to create a new [`Connection`]. +//! Settings for the [`Connection`] can be provided here. +//! +//! # Example +//! +//! ```rust +//! fn doc(conn: C) +//! where +//! C: h3::quic::Connection, +//! B: bytes::Buf, +//! { +//! let mut server_builder = h3::server::builder(); +//! // Set the maximum header size +//! server_builder.max_field_section_size(1000); +//! // do not send grease types +//! server_builder.send_grease(false); +//! // Build the Connection +//! let mut h3_conn = server_builder.build(conn); +//! } +//! ``` + +use std::{collections::HashSet, result::Result}; + +use bytes::Buf; + +use tokio::sync::mpsc; + +use crate::{ + config::Config, + connection::{ConnectionInner, SharedStateRef}, + error::Error, + quic::{self}, +}; + +use super::connection::Connection; + +/// Create a builder of HTTP/3 server connections +/// +/// This function creates a [`Builder`] that carries settings that can +/// be shared between server connections. +pub fn builder() -> Builder { + Builder::new() +} + +/// Builder of HTTP/3 server connections. +pub struct Builder { + pub(crate) config: Config, +} + +impl Builder { + /// Creates a new [`Builder`] with default settings. + pub(super) fn new() -> Self { + Builder { + config: Default::default(), + } + } + + #[cfg(test)] + pub fn send_settings(&mut self, value: bool) -> &mut Self { + self.config.send_settings = value; + self + } + + /// Set the maximum header size this client is willing to accept + /// + /// See [header size constraints] section of the specification for details. + /// + /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints + pub fn max_field_section_size(&mut self, value: u64) -> &mut Self { + self.config.settings.max_field_section_size = value; + self + } + + /// Send grease values to the Client. + /// See [setting](https://www.rfc-editor.org/rfc/rfc9114.html#settings-parameters), [frame](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) and [stream](https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease) for more information. + #[inline] + pub fn send_grease(&mut self, value: bool) -> &mut Self { + self.config.send_grease = value; + self + } + + /// Indicates to the peer that WebTransport is supported. + /// + /// See: [establishing a webtransport session](https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.1) + /// + /// + /// **Server**: + /// Supporting for webtransport also requires setting `enable_connect` `enable_datagram` + /// and `max_webtransport_sessions`. + #[inline] + pub fn enable_webtransport(&mut self, value: bool) -> &mut Self { + self.config.settings.enable_webtransport = value; + self + } + + /// Enables the CONNECT protocol + pub fn enable_connect(&mut self, value: bool) -> &mut Self { + self.config.settings.enable_extended_connect = value; + self + } + + /// Limits the maximum number of WebTransport sessions + pub fn max_webtransport_sessions(&mut self, value: u64) -> &mut Self { + self.config.settings.max_webtransport_sessions = value; + self + } + + /// Indicates that the client or server supports HTTP/3 datagrams + /// + /// See: + pub fn enable_datagram(&mut self, value: bool) -> &mut Self { + self.config.settings.enable_datagram = value; + self + } +} + +impl Builder { + /// Build an HTTP/3 connection from a QUIC connection + /// + /// This method creates a [`Connection`] instance with the settings in the [`Builder`]. + pub async fn build(&self, conn: C) -> Result, Error> + where + C: quic::Connection, + B: Buf, + { + let (sender, receiver) = mpsc::unbounded_channel(); + Ok(Connection { + inner: ConnectionInner::new(conn, SharedStateRef::default(), self.config).await?, + max_field_section_size: self.config.settings.max_field_section_size, + request_end_send: sender, + request_end_recv: receiver, + ongoing_streams: HashSet::new(), + sent_closing: None, + recv_closing: None, + last_accepted_stream: None, + }) + } +} diff --git a/h3/src/server.rs b/h3/src/server/connection.rs similarity index 55% rename from h3/src/server.rs rename to h3/src/server/connection.rs index 0963430c..b28c8d6c 100644 --- a/h3/src/server.rs +++ b/h3/src/server/connection.rs @@ -1,54 +1,6 @@ -//! This module provides methods to create a http/3 Server. +//! HTTP/3 server connection //! -//! It allows to accept incoming requests, and send responses. -//! -//! # Examples -//! -//! ## Simple example -//! ```rust -//! async fn doc(conn: C) -//! where -//! C: h3::quic::Connection, -//! >::BidiStream: Send + 'static -//! { -//! let mut server_builder = h3::server::builder(); -//! // Build the Connection -//! let mut h3_conn = server_builder.build(conn).await.unwrap(); -//! loop { -//! // Accept incoming requests -//! match h3_conn.accept().await { -//! Ok(Some((req, mut stream))) => { -//! // spawn a new task to handle the request -//! tokio::spawn(async move { -//! // build a http response -//! let response = http::Response::builder().status(http::StatusCode::OK).body(()).unwrap(); -//! // send the response to the wire -//! stream.send_response(response).await.unwrap(); -//! // send some date -//! stream.send_data(bytes::Bytes::from("test")).await.unwrap(); -//! // finnish the stream -//! stream.finish().await.unwrap(); -//! }); -//! } -//! Ok(None) => { -//! // break if no Request is accepted -//! break; -//! } -//! Err(err) => { -//! match err.get_error_level() { -//! // break on connection errors -//! h3::error::ErrorLevel::ConnectionError => break, -//! // continue on stream errors -//! h3::error::ErrorLevel::StreamError => continue, -//! } -//! } -//! } -//! } -//! } -//! ``` -//! -//! ## File server -//! A ready-to-use example of a file server is available [here](https://github.com/hyperium/h3/blob/master/examples/client.rs) +//! The [`Connection`] struct manages a connection from the side of the HTTP/3 server use std::{ collections::HashSet, @@ -59,42 +11,35 @@ use std::{ task::{Context, Poll}, }; -use bytes::{Buf, BytesMut}; +use bytes::Buf; use futures_util::{ - future::{self, Future}, + future::{self}, ready, }; -use http::{response, HeaderMap, Request, Response}; -use pin_project_lite::pin_project; +use http::Request; use quic::RecvStream; use quic::StreamId; use tokio::sync::mpsc; use crate::{ - config::Config, connection::{self, ConnectionInner, ConnectionState, SharedStateRef}, error::{Code, Error, ErrorLevel}, ext::Datagram, frame::{FrameStream, FrameStreamError}, proto::{ frame::{Frame, PayloadLen}, - headers::Header, push::PushId, }, qpack, quic::{self, RecvDatagramExt, SendDatagramExt, SendStream as _}, - request::ResolveRequest, - stream::{self, BufRecvStream}, + stream::BufRecvStream, }; -use tracing::{error, trace, warn}; -/// Create a builder of HTTP/3 server connections -/// -/// This function creates a [`Builder`] that carries settings that can -/// be shared between server connections. -pub fn builder() -> Builder { - Builder::new() -} +use crate::server::request::ResolveRequest; + +use tracing::{trace, warn}; + +use super::stream::{ReadDatagram, RequestStream}; /// Server connection driver /// @@ -110,18 +55,18 @@ where { /// TODO: temporarily break encapsulation for `WebTransportSession` pub inner: ConnectionInner, - max_field_section_size: u64, + pub(super) max_field_section_size: u64, // List of all incoming streams that are currently running. - ongoing_streams: HashSet, + pub(super) ongoing_streams: HashSet, // Let the streams tell us when they are no longer running. - request_end_recv: mpsc::UnboundedReceiver, - request_end_send: mpsc::UnboundedSender, + pub(super) request_end_recv: mpsc::UnboundedReceiver, + pub(super) request_end_send: mpsc::UnboundedSender, // Has a GOAWAY frame been sent? If so, this StreamId is the last we are willing to accept. - sent_closing: Option, + pub(super) sent_closing: Option, // Has a GOAWAY frame been received? If so, this is PushId the last the remote will accept. - recv_closing: Option, + pub(super) recv_closing: Option, // The id of the last stream received by this connection. - last_accepted_stream: Option, + pub(super) last_accepted_stream: Option, } impl ConnectionState for Connection @@ -141,11 +86,11 @@ where { /// Create a new HTTP/3 server connection with default settings /// - /// Use a custom [`Builder`] with [`builder()`] to create a connection + /// Use a custom [`super::builder::Builder`] with [`super::builder::builder()`] to create a connection /// with different settings. /// Provide a Connection which implements [`quic::Connection`]. pub async fn new(conn: C) -> Result { - builder().build(conn).await + super::builder::builder().build(conn).await } /// Closes the connection with a code and a reason. @@ -522,330 +467,7 @@ where //# parallelism, at least 100 request streams SHOULD be permitted at a //# time. -/// Builder of HTTP/3 server connections. -/// -/// Use this struct to create a new [`Connection`]. -/// Settings for the [`Connection`] can be provided here. -/// -/// # Example -/// -/// ```rust -/// fn doc(conn: C) -/// where -/// C: h3::quic::Connection, -/// B: bytes::Buf, -/// { -/// let mut server_builder = h3::server::builder(); -/// // Set the maximum header size -/// server_builder.max_field_section_size(1000); -/// // do not send grease types -/// server_builder.send_grease(false); -/// // Build the Connection -/// let mut h3_conn = server_builder.build(conn); -/// } -/// ``` -pub struct Builder { - pub(crate) config: Config, -} - -impl Builder { - /// Creates a new [`Builder`] with default settings. - pub(super) fn new() -> Self { - Builder { - config: Default::default(), - } - } - - #[cfg(test)] - pub fn send_settings(&mut self, value: bool) -> &mut Self { - self.config.send_settings = value; - self - } - - /// Set the maximum header size this client is willing to accept - /// - /// See [header size constraints] section of the specification for details. - /// - /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints - pub fn max_field_section_size(&mut self, value: u64) -> &mut Self { - self.config.settings.max_field_section_size = value; - self - } - - /// Send grease values to the Client. - /// See [setting](https://www.rfc-editor.org/rfc/rfc9114.html#settings-parameters), [frame](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) and [stream](https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease) for more information. - #[inline] - pub fn send_grease(&mut self, value: bool) -> &mut Self { - self.config.send_grease = value; - self - } - - /// Indicates to the peer that WebTransport is supported. - /// - /// See: [establishing a webtransport session](https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.1) - /// - /// - /// **Server**: - /// Supporting for webtransport also requires setting `enable_connect` `enable_datagram` - /// and `max_webtransport_sessions`. - #[inline] - pub fn enable_webtransport(&mut self, value: bool) -> &mut Self { - self.config.settings.enable_webtransport = value; - self - } - - /// Enables the CONNECT protocol - pub fn enable_connect(&mut self, value: bool) -> &mut Self { - self.config.settings.enable_extended_connect = value; - self - } - - /// Limits the maximum number of WebTransport sessions - pub fn max_webtransport_sessions(&mut self, value: u64) -> &mut Self { - self.config.settings.max_webtransport_sessions = value; - self - } - - /// Indicates that the client or server supports HTTP/3 datagrams - /// - /// See: - pub fn enable_datagram(&mut self, value: bool) -> &mut Self { - self.config.settings.enable_datagram = value; - self - } -} - -impl Builder { - /// Build an HTTP/3 connection from a QUIC connection - /// - /// This method creates a [`Connection`] instance with the settings in the [`Builder`]. - pub async fn build(&self, conn: C) -> Result, Error> - where - C: quic::Connection, - B: Buf, - { - let (sender, receiver) = mpsc::unbounded_channel(); - Ok(Connection { - inner: ConnectionInner::new(conn, SharedStateRef::default(), self.config).await?, - max_field_section_size: self.config.settings.max_field_section_size, - request_end_send: sender, - request_end_recv: receiver, - ongoing_streams: HashSet::new(), - sent_closing: None, - recv_closing: None, - last_accepted_stream: None, - }) - } -} - -struct RequestEnd { - request_end: mpsc::UnboundedSender, - stream_id: StreamId, -} - -/// Manage request and response transfer for an incoming request -/// -/// The [`RequestStream`] struct is used to send and/or receive -/// information from the client. -pub struct RequestStream { - inner: connection::RequestStream, - request_end: Arc, -} - -impl AsMut> for RequestStream { - fn as_mut(&mut self) -> &mut connection::RequestStream { - &mut self.inner - } -} - -impl ConnectionState for RequestStream { - fn shared_state(&self) -> &SharedStateRef { - &self.inner.conn_state - } -} - -impl RequestStream -where - S: quic::RecvStream, - B: Buf, -{ - /// Receive data sent from the client - pub async fn recv_data(&mut self) -> Result, Error> { - self.inner.recv_data().await - } - - /// Poll for data sent from the client - pub fn poll_recv_data( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, Error>> { - self.inner.poll_recv_data(cx) - } - - /// Receive an optional set of trailers for the request - pub async fn recv_trailers(&mut self) -> Result, Error> { - self.inner.recv_trailers().await - } - - /// Tell the peer to stop sending into the underlying QUIC stream - pub fn stop_sending(&mut self, error_code: crate::error::Code) { - self.inner.stream.stop_sending(error_code) - } - - /// Returns the underlying stream id - pub fn id(&self) -> StreamId { - self.inner.stream.id() - } -} - -impl RequestStream -where - S: quic::SendStream, - B: Buf, -{ - /// Send the HTTP/3 response - /// - /// This should be called before trying to send any data with - /// [`RequestStream::send_data`]. - pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> { - let (parts, _) = resp.into_parts(); - let response::Parts { - status, headers, .. - } = parts; - let headers = Header::response(status, headers); - - let mut block = BytesMut::new(); - let mem_size = qpack::encode_stateless(&mut block, headers)?; - - let max_mem_size = self - .inner - .conn_state - .read("send_response") - .peer_config - .max_field_section_size; - - //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2 - //# An implementation that - //# has received this parameter SHOULD NOT send an HTTP message header - //# that exceeds the indicated size, as the peer will likely refuse to - //# process it. - if mem_size > max_mem_size { - return Err(Error::header_too_big(mem_size, max_mem_size)); - } - - stream::write(&mut self.inner.stream, Frame::Headers(block.freeze())) - .await - .map_err(|e| self.maybe_conn_err(e))?; - - Ok(()) - } - - /// Send some data on the response body. - pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { - self.inner.send_data(buf).await - } - - /// Stop a stream with an error code - /// - /// The code can be [`Code::H3_NO_ERROR`]. - pub fn stop_stream(&mut self, error_code: Code) { - self.inner.stop_stream(error_code); - } - - /// Send a set of trailers to end the response. - /// - /// Either [`RequestStream::finish`] or - /// [`RequestStream::send_trailers`] must be called to finalize a - /// request. - pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { - self.inner.send_trailers(trailers).await - } - - /// End the response without trailers. - /// - /// Either [`RequestStream::finish`] or - /// [`RequestStream::send_trailers`] must be called to finalize a - /// request. - pub async fn finish(&mut self) -> Result<(), Error> { - self.inner.finish().await - } - - //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1 - //= type=TODO - //# Implementations SHOULD cancel requests by abruptly terminating any - //# directions of a stream that are still open. To do so, an - //# implementation resets the sending parts of streams and aborts reading - //# on the receiving parts of streams; see Section 2.4 of - //# [QUIC-TRANSPORT]. - - /// Returns the underlying stream id - pub fn send_id(&self) -> StreamId { - self.inner.stream.send_id() - } -} - -impl RequestStream -where - S: quic::BidiStream, - B: Buf, -{ - /// Splits the Request-Stream into send and receive. - /// This can be used the send and receive data on different tasks. - pub fn split( - self, - ) -> ( - RequestStream, - RequestStream, - ) { - let (send, recv) = self.inner.split(); - ( - RequestStream { - inner: send, - request_end: self.request_end.clone(), - }, - RequestStream { - inner: recv, - request_end: self.request_end, - }, - ) - } -} - -impl Drop for RequestEnd { - fn drop(&mut self) { - if let Err(e) = self.request_end.send(self.stream_id) { - error!( - "failed to notify connection of request end: {} {}", - self.stream_id, e - ); - } - } -} - -pin_project! { - /// Future for [`Connection::read_datagram`] - pub struct ReadDatagram<'a, C, B> - where - C: quic::Connection, - B: Buf, - { - conn: &'a mut Connection, - _marker: PhantomData, - } -} - -impl<'a, C, B> Future for ReadDatagram<'a, C, B> -where - C: quic::Connection + RecvDatagramExt, - B: Buf, -{ - type Output = Result>, Error>; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - tracing::trace!("poll: read_datagram"); - match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? { - Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))), - None => Poll::Ready(Ok(None)), - } - } +pub(super) struct RequestEnd { + pub(super) request_end: mpsc::UnboundedSender, + pub(super) stream_id: StreamId, } diff --git a/h3/src/server/mod.rs b/h3/src/server/mod.rs new file mode 100644 index 00000000..f9d62764 --- /dev/null +++ b/h3/src/server/mod.rs @@ -0,0 +1,62 @@ +//! This module provides methods to create a http/3 Server. +//! +//! It allows to accept incoming requests, and send responses. +//! +//! # Examples +//! +//! ## Simple example +//! ```rust +//! async fn doc(conn: C) +//! where +//! C: h3::quic::Connection, +//! >::BidiStream: Send + 'static +//! { +//! let mut server_builder = h3::server::builder(); +//! // Build the Connection +//! let mut h3_conn = server_builder.build(conn).await.unwrap(); +//! loop { +//! // Accept incoming requests +//! match h3_conn.accept().await { +//! Ok(Some((req, mut stream))) => { +//! // spawn a new task to handle the request +//! tokio::spawn(async move { +//! // build a http response +//! let response = http::Response::builder().status(http::StatusCode::OK).body(()).unwrap(); +//! // send the response to the wire +//! stream.send_response(response).await.unwrap(); +//! // send some date +//! stream.send_data(bytes::Bytes::from("test")).await.unwrap(); +//! // finnish the stream +//! stream.finish().await.unwrap(); +//! }); +//! } +//! Ok(None) => { +//! // break if no Request is accepted +//! break; +//! } +//! Err(err) => { +//! match err.get_error_level() { +//! // break on connection errors +//! h3::error::ErrorLevel::ConnectionError => break, +//! // continue on stream errors +//! h3::error::ErrorLevel::StreamError => continue, +//! } +//! } +//! } +//! } +//! } +//! ``` +//! +//! ## File server +//! A ready-to-use example of a file server is available [here](https://github.com/hyperium/h3/blob/master/examples/server.rs) + +mod builder; +mod connection; +mod request; +mod stream; + +pub use builder::builder; +pub use builder::Builder; +pub use connection::Connection; +pub use stream::ReadDatagram; +pub use stream::RequestStream; diff --git a/h3/src/request.rs b/h3/src/server/request.rs similarity index 97% rename from h3/src/request.rs rename to h3/src/server/request.rs index f705efbc..b70e3dfa 100644 --- a/h3/src/request.rs +++ b/h3/src/server/request.rs @@ -3,7 +3,9 @@ use std::convert::TryFrom; use bytes::Buf; use http::{Request, StatusCode}; -use crate::{error::Code, proto::headers::Header, qpack, quic, server::RequestStream, Error}; +use crate::{error::Code, proto::headers::Header, qpack, quic, Error}; + +use super::stream::RequestStream; pub struct ResolveRequest, B: Buf> { request_stream: RequestStream, diff --git a/h3/src/server/stream.rs b/h3/src/server/stream.rs new file mode 100644 index 00000000..610b83b6 --- /dev/null +++ b/h3/src/server/stream.rs @@ -0,0 +1,243 @@ +//! Server-side HTTP/3 stream management + +use bytes::Buf; + +use crate::{ + connection::{ConnectionState, SharedStateRef}, + ext::Datagram, + quic::{self, RecvDatagramExt}, + Error, +}; +use pin_project_lite::pin_project; + +use super::connection::{Connection, RequestEnd}; +use std::{marker::PhantomData, sync::Arc}; + +use std::{ + option::Option, + result::Result, + task::{Context, Poll}, +}; + +use bytes::BytesMut; +use futures_util::{future::Future, ready}; +use http::{response, HeaderMap, Response}; + +use quic::StreamId; + +use crate::{ + error::Code, + proto::{frame::Frame, headers::Header}, + qpack, + quic::SendStream as _, + stream::{self}, +}; + +use tracing::error; + +/// Manage request and response transfer for an incoming request +/// +/// The [`RequestStream`] struct is used to send and/or receive +/// information from the client. +pub struct RequestStream { + pub(super) inner: crate::connection::RequestStream, + pub(super) request_end: Arc, +} + +impl AsMut> for RequestStream { + fn as_mut(&mut self) -> &mut crate::connection::RequestStream { + &mut self.inner + } +} + +impl ConnectionState for RequestStream { + fn shared_state(&self) -> &SharedStateRef { + &self.inner.conn_state + } +} + +impl RequestStream +where + S: quic::RecvStream, + B: Buf, +{ + /// Receive data sent from the client + pub async fn recv_data(&mut self) -> Result, Error> { + self.inner.recv_data().await + } + + /// Poll for data sent from the client + pub fn poll_recv_data( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, Error>> { + self.inner.poll_recv_data(cx) + } + + /// Receive an optional set of trailers for the request + pub async fn recv_trailers(&mut self) -> Result, Error> { + self.inner.recv_trailers().await + } + + /// Tell the peer to stop sending into the underlying QUIC stream + pub fn stop_sending(&mut self, error_code: crate::error::Code) { + self.inner.stream.stop_sending(error_code) + } + + /// Returns the underlying stream id + pub fn id(&self) -> StreamId { + self.inner.stream.id() + } +} + +impl RequestStream +where + S: quic::SendStream, + B: Buf, +{ + /// Send the HTTP/3 response + /// + /// This should be called before trying to send any data with + /// [`RequestStream::send_data`]. + pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> { + let (parts, _) = resp.into_parts(); + let response::Parts { + status, headers, .. + } = parts; + let headers = Header::response(status, headers); + + let mut block = BytesMut::new(); + let mem_size = qpack::encode_stateless(&mut block, headers)?; + + let max_mem_size = self + .inner + .conn_state + .read("send_response") + .peer_config + .max_field_section_size; + + //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2 + //# An implementation that + //# has received this parameter SHOULD NOT send an HTTP message header + //# that exceeds the indicated size, as the peer will likely refuse to + //# process it. + if mem_size > max_mem_size { + return Err(Error::header_too_big(mem_size, max_mem_size)); + } + + stream::write(&mut self.inner.stream, Frame::Headers(block.freeze())) + .await + .map_err(|e| self.maybe_conn_err(e))?; + + Ok(()) + } + + /// Send some data on the response body. + pub async fn send_data(&mut self, buf: B) -> Result<(), Error> { + self.inner.send_data(buf).await + } + + /// Stop a stream with an error code + /// + /// The code can be [`Code::H3_NO_ERROR`]. + pub fn stop_stream(&mut self, error_code: Code) { + self.inner.stop_stream(error_code); + } + + /// Send a set of trailers to end the response. + /// + /// Either [`RequestStream::finish`] or + /// [`RequestStream::send_trailers`] must be called to finalize a + /// request. + pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> { + self.inner.send_trailers(trailers).await + } + + /// End the response without trailers. + /// + /// Either [`RequestStream::finish`] or + /// [`RequestStream::send_trailers`] must be called to finalize a + /// request. + pub async fn finish(&mut self) -> Result<(), Error> { + self.inner.finish().await + } + + //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1 + //= type=TODO + //# Implementations SHOULD cancel requests by abruptly terminating any + //# directions of a stream that are still open. To do so, an + //# implementation resets the sending parts of streams and aborts reading + //# on the receiving parts of streams; see Section 2.4 of + //# [QUIC-TRANSPORT]. + + /// Returns the underlying stream id + pub fn send_id(&self) -> StreamId { + self.inner.stream.send_id() + } +} + +impl RequestStream +where + S: quic::BidiStream, + B: Buf, +{ + /// Splits the Request-Stream into send and receive. + /// This can be used the send and receive data on different tasks. + pub fn split( + self, + ) -> ( + RequestStream, + RequestStream, + ) { + let (send, recv) = self.inner.split(); + ( + RequestStream { + inner: send, + request_end: self.request_end.clone(), + }, + RequestStream { + inner: recv, + request_end: self.request_end, + }, + ) + } +} + +impl Drop for RequestEnd { + fn drop(&mut self) { + if let Err(e) = self.request_end.send(self.stream_id) { + error!( + "failed to notify connection of request end: {} {}", + self.stream_id, e + ); + } + } +} + +pin_project! { + /// Future for [`Connection::read_datagram`] + pub struct ReadDatagram<'a, C, B> + where + C: quic::Connection, + B: Buf, + { + pub(super) conn: &'a mut Connection, + pub(super) _marker: PhantomData, + } +} + +impl<'a, C, B> Future for ReadDatagram<'a, C, B> +where + C: quic::Connection + RecvDatagramExt, + B: Buf, +{ + type Output = Result>, Error>; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + tracing::trace!("poll: read_datagram"); + match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? { + Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))), + None => Poll::Ready(Ok(None)), + } + } +} diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 186a7f12..0125368a 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -8,8 +8,9 @@ use bytes::{Buf, Bytes, BytesMut}; use futures_util::future; use http::{Request, Response, StatusCode}; +use crate::client::SendRequest; +use crate::{client, server}; use crate::{ - client::{self, SendRequest}, connection::ConnectionState, error::{Code, Error, Kind}, proto::{ @@ -20,7 +21,6 @@ use crate::{ varint::VarInt, }, quic::{self, SendStream}, - server, }; use super::h3_quinn;