diff --git a/.github/workflows/bencher.yaml b/.github/workflows/bencher.yaml index 43e2c060..a004ff2a 100644 --- a/.github/workflows/bencher.yaml +++ b/.github/workflows/bencher.yaml @@ -6,17 +6,16 @@ on: jobs: benchmark_with_bencher: - continue-on-error: true - name: Continuous Benchmarking with Bencher runs-on: ubuntu-latest steps: + - if: always() - uses: actions/checkout@v4 - uses: bencherdev/bencher@main - - name: Track Benchmarks with Bencher run: | bencher run \ --branch main \ --err \ --project wtx \ --testbed ubuntu-latest \ + --token "${{ secrets.BENCHER_API_TOKEN }}" \ "cargo bench --all-features" \ No newline at end of file diff --git a/.scripts/internal-tests.sh b/.scripts/internal-tests.sh index d49f3ca9..7a5becde 100755 --- a/.scripts/internal-tests.sh +++ b/.scripts/internal-tests.sh @@ -39,7 +39,6 @@ $rt test-with-features wtx httparse $rt test-with-features wtx md-5 $rt test-with-features wtx memchr $rt test-with-features wtx miniserde -$rt test-with-features wtx nightly $rt test-with-features wtx orm $rt test-with-features wtx parking_lot $rt test-with-features wtx pool diff --git a/.scripts/podman-stop.sh b/.scripts/podman-stop.sh deleted file mode 100755 index dc0e7899..00000000 --- a/.scripts/podman-stop.sh +++ /dev/null @@ -1,2 +0,0 @@ -podman container stop wtx_postgres_md5 -podman container stop wtx_postgres_scram diff --git a/README.md b/README.md index 40d19e0e..ef4bf663 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ A collection of different transport implementations and related tools focused pr 2. [Database Client](https://c410-f3r.github.io/wtx/database/client-connection.html) 3. [Database Object–Relational Mapping](https://c410-f3r.github.io/wtx/database/object%E2%80%93relational-mapping.html) 4. [Database Schema Manager](https://c410-f3r.github.io/wtx/database/schema-management.html) -6. [HTTP2 Client/Server](https://c410-f3r.github.io/wtx/http2/index.html) -7. [Pool Manager](https://c410-f3r.github.io/wtx/pool_manager/index.html) -8. [WebSocket Client/Server](https://c410-f3r.github.io/wtx/web-socket/index.html) +5. [HTTP2 Client/Server](https://c410-f3r.github.io/wtx/http2/index.html) +6. [Pool Manager](https://c410-f3r.github.io/wtx/pool_manager/index.html) +7. [WebSocket Client/Server](https://c410-f3r.github.io/wtx/web-socket/index.html) Embedded devices with a working heap allocator can use this `no_std` crate. diff --git a/wtx-docs/src/http2/README.md b/wtx-docs/src/http2/README.md index 95c88878..675d1ad0 100644 --- a/wtx-docs/src/http2/README.md +++ b/wtx-docs/src/http2/README.md @@ -6,15 +6,19 @@ Provides low and high level abstractions to interact with clients and servers. use std::net::ToSocketAddrs; use tokio::net::TcpStream; use wtx::{ - http::{Headers, Method, Request}, - http2::{Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, - misc::{from_utf8_basic, UriString}, + http::Method, + http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, + misc::{from_utf8_basic, UriRef}, rng::StaticRng, }; #[tokio::main] async fn main() { - let uri = UriString::new("127.0.0.1:9000"); + let mut rrb = ReqResBuffer::default(); + rrb.data.reserve(6); + rrb.data.extend_from_slice(b"Hello!").unwrap(); + rrb.uri.push_str("127.0.0.1:9000").unwrap(); + let uri = UriRef::new(&rrb.uri); let mut http2 = Http2Tokio::connect( Http2Buffer::new(StaticRng::default()), Http2Params::default(), @@ -22,15 +26,11 @@ async fn main() { ) .await .unwrap(); - let mut rrb = ReqResBuffer::default(); let mut stream = http2.stream().await.unwrap(); - let res = stream - .send_req_recv_res( - Request::http2(&"Hello!", &Headers::new(0), Method::Get, uri.to_ref()), - &mut rrb, - ) - .await - .unwrap(); - println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()) + stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); + let res = stream.recv_res(&mut rrb).await.unwrap(); + println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()); + http2.send_go_away(ErrorCode::NoError).await.unwrap(); } + ``` \ No newline at end of file diff --git a/wtx-ui/src/embed_migrations.rs b/wtx-ui/src/embed_migrations.rs index 0cb42a72..eb7c7d2d 100644 --- a/wtx-ui/src/embed_migrations.rs +++ b/wtx-ui/src/embed_migrations.rs @@ -58,7 +58,7 @@ pub(crate) async fn embed_migrations(input: &str, output: &str) -> wtx::Result<( buffer.write_fmt(format_args!("];({mg_name},{mg_name}_MIGRATIONS)}},"))?; } - buffer.push_str("];"); + buffer.push_str("];\n"); OpenOptions::new() .create(true) diff --git a/wtx/Cargo.toml b/wtx/Cargo.toml index 749ecc7e..1d0a22b5 100644 --- a/wtx/Cargo.toml +++ b/wtx/Cargo.toml @@ -112,7 +112,6 @@ glommio = ["futures-lite", "dep:glommio", "std"] http1 = ["httparse"] http2 = ["ahash", "hashbrown", "tokio"] miniserde = ["dep:miniserde", "std"] -nightly = [] optimization = ["atoi", "memchr", "simdutf8"] orm = ["database", "dep:smallvec"] pool = [] diff --git a/wtx/examples/http2-client-tokio.rs b/wtx/examples/http2-client-tokio.rs index 895d510f..d20f2bf1 100644 --- a/wtx/examples/http2-client-tokio.rs +++ b/wtx/examples/http2-client-tokio.rs @@ -6,15 +6,19 @@ mod common; use std::net::ToSocketAddrs; use tokio::net::TcpStream; use wtx::{ - http::{Headers, Method, Request}, - http2::{Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, - misc::{from_utf8_basic, UriString}, + http::Method, + http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, + misc::{from_utf8_basic, UriRef}, rng::StaticRng, }; #[tokio::main] async fn main() { - let uri = UriString::new(common::_uri_from_args()); + let mut rrb = ReqResBuffer::default(); + rrb.data.reserve(6); + rrb.data.extend_from_slice(b"Hello!").unwrap(); + rrb.uri.push_str(&common::_uri_from_args()).unwrap(); + let uri = UriRef::new(&rrb.uri); let mut http2 = Http2Tokio::connect( Http2Buffer::new(StaticRng::default()), Http2Params::default(), @@ -22,14 +26,9 @@ async fn main() { ) .await .unwrap(); - let mut rrb = ReqResBuffer::default(); let mut stream = http2.stream().await.unwrap(); - let res = stream - .send_req_recv_res( - Request::http2(&"Hello!", &Headers::new(0), Method::Get, uri.to_ref()), - &mut rrb, - ) - .await - .unwrap(); - println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()) + stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); + let res = stream.recv_res(&mut rrb).await.unwrap(); + println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()); + http2.send_go_away(ErrorCode::NoError).await.unwrap(); } diff --git a/wtx/examples/http2-server-tokio.rs b/wtx/examples/http2-server-tokio.rs index 25c96ea4..958b5d66 100644 --- a/wtx/examples/http2-server-tokio.rs +++ b/wtx/examples/http2-server-tokio.rs @@ -25,7 +25,6 @@ async fn handle<'buffer>( req: RequestMut<'buffer, 'buffer, 'buffer, ByteVector>, ) -> Result, ()> { req.headers.clear(); - println!("{:?}", core::str::from_utf8(req.data)); Ok(match (req.uri.path(), req.method) { ("/", Method::Get) => Response::http2((req.data, req.headers), StatusCode::Ok), _ => { diff --git a/wtx/src/client_api_framework/macros.rs b/wtx/src/client_api_framework/macros.rs index 91dc6273..dcfa443e 100644 --- a/wtx/src/client_api_framework/macros.rs +++ b/wtx/src/client_api_framework/macros.rs @@ -96,13 +96,6 @@ macro_rules! _create_set_of_request_throttling { }; } -macro_rules! _debug { - ($($tt:tt)+) => { - #[cfg(feature = "tracing")] - tracing::debug!($($tt)+); - }; -} - macro_rules! generic_data_format_doc { ($ty:literal) => { concat!("Wrapper used in every generic ", $ty, " to manage different internal implementations.") diff --git a/wtx/src/database/client/postgres/executor/fetch.rs b/wtx/src/database/client/postgres/executor/fetch.rs index 827258b3..7bc2cc95 100644 --- a/wtx/src/database/client/postgres/executor/fetch.rs +++ b/wtx/src/database/client/postgres/executor/fetch.rs @@ -108,7 +108,7 @@ where ) -> crate::Result<()> { let mut is_payload_filled = false; nb._expand_following(len); - for _ in 0..len { + for _ in 0..=len { if *read >= len { is_payload_filled = true; break; diff --git a/wtx/src/http/abstract_headers.rs b/wtx/src/http/abstract_headers.rs index 6be90980..3ac25a8c 100644 --- a/wtx/src/http/abstract_headers.rs +++ b/wtx/src/http/abstract_headers.rs @@ -78,17 +78,18 @@ where value: &[u8], is_sensitive: bool, cb: impl FnMut(M, &mut [u8]), - ) { + ) -> crate::Result<()> { let local_len = name.len().wrapping_add(value.len()); if local_len > self.max_bytes { self.clear(); - return; + return Ok(()); } self.remove_until_max_bytes(local_len, cb); - self.bq.push_front_within_cap( + self.bq.push_front( [name, value], Metadata { is_active: true, is_sensitive, misc, name_len: name.len() }, - ); + )?; + Ok(()) } #[inline] @@ -98,7 +99,7 @@ where Some(()) } - #[inline] + #[inline(always)] pub(crate) fn reserve(&mut self, bytes: usize, headers: usize) { self.bq.reserve(headers, bytes.min(self.max_bytes)); } diff --git a/wtx/src/http/client.rs b/wtx/src/http/client.rs index b44041e9..f8aeed0c 100644 --- a/wtx/src/http/client.rs +++ b/wtx/src/http/client.rs @@ -19,12 +19,12 @@ where ResourceManager = SimpleRM< crate::Error, (), - (ReqResBuffer, Http2, S, SDC, true>), + (ReqResBuffer, Http2), >, >, S: Stream, SDC: RefCounter, - SDC::Item: Lock, S, true>>, + SDC::Item: Lock>, { #[cfg(feature = "tokio-rustls")] #[inline] @@ -41,7 +41,7 @@ where Response< as LockGuard< 'this, - (ReqResBuffer, Http2, S, SDC, true>), + (ReqResBuffer, Http2), >>::Mapped, >, > { diff --git a/wtx/src/http/headers.rs b/wtx/src/http/headers.rs index 4813de51..1ad9154c 100644 --- a/wtx/src/http/headers.rs +++ b/wtx/src/http/headers.rs @@ -92,8 +92,8 @@ impl Headers { /// If the sum of `name` and `value` is greater than the maximum number of bytes, then the first /// inserted entries will be deleted accordantly. #[inline] - pub fn push_front(&mut self, name: &[u8], value: &[u8], is_sensitive: bool) { - self.ab.push_front((), name, value, is_sensitive, |_, _| {}); + pub fn push_front(&mut self, name: &[u8], value: &[u8], is_sensitive: bool) -> crate::Result<()> { + self.ab.push_front((), name, value, is_sensitive, |_, _| {}) } /// Removes all a pair referenced by `idx`. @@ -106,7 +106,7 @@ impl Headers { /// to the number of headers. /// /// Bytes are capped according to the specified `max_bytes`. - #[inline] + #[inline(always)] pub fn reserve(&mut self, bytes: usize, headers: usize) { self.ab.reserve(bytes, headers); } diff --git a/wtx/src/http/request.rs b/wtx/src/http/request.rs index 4996d745..d4b77f19 100644 --- a/wtx/src/http/request.rs +++ b/wtx/src/http/request.rs @@ -35,16 +35,4 @@ where pub fn http2(data: D, headers: H, method: Method, uri: Uri) -> Self { Self { data, headers, method, uri, version: Version::Http2 } } - - /// See [RequestRef]. - #[inline] - pub fn to_ref(&self) -> RequestRef<'_, '_, '_, D> { - RequestRef { - data: &self.data, - headers: self.headers.lease(), - method: self.method, - uri: self.uri.to_ref(), - version: self.version, - } - } } diff --git a/wtx/src/http2.rs b/wtx/src/http2.rs index 0dbbca29..1260c442 100644 --- a/wtx/src/http2.rs +++ b/wtx/src/http2.rs @@ -40,18 +40,17 @@ mod tests; mod u31; mod uri_buffer; mod window_update_frame; +mod write_stream; use crate::{ - http2::misc::{ - apply_initial_params, default_stream_frames, read_frame_until_cb_unknown_id, write_to_stream, - }, + http2::misc::{apply_initial_params, default_stream_frames, read_frame_until_cb_unknown_id}, misc::{ConnectionState, LeaseMut, Lock, RefCounter, Stream}, }; pub use client_stream::ClientStream; pub(crate) use continuation_frame::ContinuationFrame; use core::marker::PhantomData; pub(crate) use data_frame::DataFrame; -pub(crate) use error_code::ErrorCode; +pub use error_code::ErrorCode; pub(crate) use frame_init::{FrameHeaderTy, FrameInit}; pub(crate) use go_away_frame::GoAwayFrame; pub(crate) use headers_frame::HeadersFrame; @@ -78,6 +77,7 @@ pub(crate) use window_update_frame::WindowUpdateFrame; pub(crate) const BODY_LEN_DEFAULT: u32 = 4_194_304; pub(crate) const BUFFERED_FRAMES_NUM_DEFAULT: u8 = 16; pub(crate) const CACHED_HEADERS_LEN_DEFAULT: u32 = 8_192; +pub(crate) const CACHED_HEADERS_LEN_LOWER_BOUND: u32 = 4_096; pub(crate) const EXPANDED_HEADERS_LEN_DEFAULT: u32 = 4_096; pub(crate) const FRAME_LEN_DEFAULT: u32 = 1_048_576; pub(crate) const FRAME_LEN_LOWER_BOUND: u32 = 16_384; @@ -108,7 +108,7 @@ pub struct Http2 { impl Http2 where - HB: LeaseMut>, + HB: LeaseMut, HD: Lock>, S: Stream, { @@ -133,7 +133,7 @@ where // `Send` bounds that makes anyone feel happy and delightful for uncountable hours. impl Http2 where - HB: LeaseMut>, + HB: LeaseMut, HD: RefCounter, for<'guard> HD::Item: Lock< Guard<'guard> = MutexGuard<'guard, Http2Data>, @@ -150,7 +150,7 @@ where if &buffer != PREFACE { return Err(crate::Error::NoPreface); } - write_to_stream([hp.to_settings_frame().bytes(&mut [0; 45])], true, &mut stream).await?; + stream.write_all(hp.to_settings_frame().bytes(&mut [0; 45])).await?; apply_initial_params(hb.lease_mut(), &hp)?; Ok(Self { phantom: PhantomData, @@ -165,6 +165,7 @@ where &mut self, rrb: &mut ReqResBuffer, ) -> crate::Result>> { + rrb.clear(); let mut guard = self.hd.lock().await; if *guard.streams_num_mut() >= guard.hp().max_streams_num() { return Err(crate::Error::ExceedAmountOfActiveConcurrentStreams); @@ -186,13 +187,18 @@ where ); let stream_frame_entry = guard.hb_mut().streams_frames.entry(rfi.stream_id); let _ = stream_frame_entry.or_insert_with(default_stream_frames); - Ok(ReadFrameRslt::Resource(ServerStream::new(self.hd.clone(), rfi, stream_state))) + Ok(ReadFrameRslt::Resource(ServerStream::new( + self.hd.clone(), + _trace_span!("Creating server stream", stream_id = rfi.stream_id.u32()), + rfi, + stream_state, + ))) } } impl Http2 where - HB: LeaseMut>, + HB: LeaseMut, HD: RefCounter, HD::Item: Lock>, S: Stream, @@ -202,7 +208,7 @@ where pub async fn connect(mut hb: HB, hp: Http2Params, mut stream: S) -> crate::Result { hb.lease_mut().clear(); stream.write_all(PREFACE).await?; - write_to_stream([hp.to_settings_frame().bytes(&mut [0; 45])], true, &mut stream).await?; + stream.write_all(hp.to_settings_frame().bytes(&mut [0; 45])).await?; apply_initial_params(hb.lease_mut(), &hp)?; Ok(Self { phantom: PhantomData, @@ -220,14 +226,11 @@ where *guard.streams_num_mut() = guard.streams_num_mut().wrapping_add(1); let stream_id = self.stream_id; self.stream_id = self.stream_id.wrapping_add(U31::TWO); - let _ = self - .hd - .lock() - .await - .hb_mut() - .streams_frames - .entry(stream_id) - .or_insert_with(default_stream_frames); - Ok(ClientStream::idle(self.hd.clone(), stream_id)) + let _ = guard.hb_mut().streams_frames.entry(stream_id).or_insert_with(default_stream_frames); + Ok(ClientStream::idle( + self.hd.clone(), + _trace_span!("Creating client stream", stream_id = stream_id.u32()), + stream_id, + )) } } diff --git a/wtx/src/http2/client_stream.rs b/wtx/src/http2/client_stream.rs index 373f741d..8f8eea23 100644 --- a/wtx/src/http2/client_stream.rs +++ b/wtx/src/http2/client_stream.rs @@ -1,11 +1,12 @@ use crate::{ http::{RequestRef, ResponseMut}, http2::{ - misc::{send_go_away, send_reset, write_init_headers}, + misc::{send_go_away, send_reset}, + write_stream::write_stream, ErrorCode, GoAwayFrame, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, Http2Data, ReadFrameRslt, ReqResBuffer, ResetStreamFrame, StreamState, U31, }, - misc::{Lease, LeaseMut, Lock, RefCounter, Stream, Usize}, + misc::{AsyncBounds, Lease, LeaseMut, Lock, RefCounter, Stream, _Span}, }; use core::marker::PhantomData; @@ -14,22 +15,23 @@ use core::marker::PhantomData; pub struct ClientStream { hd: HD, phantom: PhantomData<(HB, S)>, + span: _Span, stream_id: U31, stream_state: StreamState, } impl ClientStream { - pub(crate) fn idle(hd: HD, stream_id: U31) -> Self { - Self { phantom: PhantomData, hd, stream_id, stream_state: StreamState::Idle } + pub(crate) fn idle(hd: HD, span: _Span, stream_id: U31) -> Self { + Self { phantom: PhantomData, hd, span, stream_id, stream_state: StreamState::Idle } } } impl ClientStream where - HB: LeaseMut>, + HB: LeaseMut, HD: RefCounter, HD::Item: Lock>, - S: Stream, + S: AsyncBounds + Stream, { /// Receive Response /// @@ -41,6 +43,8 @@ where &mut self, rrb: &'rrb mut ReqResBuffer, ) -> crate::Result>> { + let _e = self.span._enter(); + _trace!("Receiving response"); rrb.clear(); let status_code = rfr_until_resource_with_guard!(self.hd, |guard| { guard @@ -71,14 +75,13 @@ where #[inline] pub async fn send_req(&mut self, req: RequestRef<'_, '_, '_, D>) -> crate::Result<()> where - D: Lease<[u8]>, + D: Lease<[u8]> + ?Sized, { + let _e = self.span._enter(); + _trace!("Sending request"); let mut guard = self.hd.lock().await; - if req.data.lease().len() > *Usize::from(guard.send_params_mut().max_expanded_headers_len) { - return Err(crate::Error::VeryLargeHeadersLen); - } let (hb, is_conn_open, send_params, stream) = guard.parts_mut(); - write_init_headers::<_, true>( + write_stream::<_, true>( req.data.lease(), req.headers, &mut hb.hpack_enc, @@ -97,26 +100,13 @@ where send_params, stream, self.stream_id, + &mut self.stream_state, ) .await?; self.stream_state = StreamState::HalfClosedLocal; Ok(()) } - /// Groups [Self::send_req] and [Self::recv_res] in a single method. - #[inline] - pub async fn send_req_recv_res<'rrb, D>( - &mut self, - req: RequestRef<'_, '_, '_, D>, - rrb: &'rrb mut ReqResBuffer, - ) -> crate::Result>> - where - D: Lease<[u8]>, - { - self.send_req(req).await?; - self.recv_res(rrb).await - } - /// Sends a RST_STREAM frame to the peer, which cancels this stream. pub async fn send_reset(&mut self, error_code: ErrorCode) -> crate::Result<()> { send_reset( diff --git a/wtx/src/http2/continuation_frame.rs b/wtx/src/http2/continuation_frame.rs index 19ca5471..471fd511 100644 --- a/wtx/src/http2/continuation_frame.rs +++ b/wtx/src/http2/continuation_frame.rs @@ -35,30 +35,33 @@ impl ContinuationFrame { return Err(crate::http2::ErrorCode::FrameSizeError.into()); } let max_header_list_size = *Usize::from(hpack_dec.max_bytes()); - hpack_dec.decode(data, |(elem, name, value)| match elem { - HpackHeaderBasic::Field => match HeaderName::new(name) { - HeaderName::CONNECTION - | HeaderName::KEEP_ALIVE - | HeaderName::PROXY_CONNECTION - | HeaderName::TRANSFER_ENCODING - | HeaderName::UPGRADE => { - is_malformed = true; - } - HeaderName::TE if value != b"trailers" => { - is_malformed = true; - } - _ => { - let len = decoded_header_size(name.len(), value.len()); - *headers_size = headers_size.wrapping_add(len); - let is_over_size = *headers_size >= max_header_list_size; - if !is_over_size { - headers.push_front(name, value, false); + hpack_dec.decode(data, |(elem, name, value)| { + match elem { + HpackHeaderBasic::Field => match HeaderName::new(name) { + HeaderName::CONNECTION + | HeaderName::KEEP_ALIVE + | HeaderName::PROXY_CONNECTION + | HeaderName::TRANSFER_ENCODING + | HeaderName::UPGRADE => { + is_malformed = true; } + HeaderName::TE if value != b"trailers" => { + is_malformed = true; + } + _ => { + let len = decoded_header_size(name.len(), value.len()); + *headers_size = headers_size.wrapping_add(len); + let is_over_size = *headers_size >= max_header_list_size; + if !is_over_size { + headers.push_front(name, value, false)?; + } + } + }, + _ => { + is_malformed = true; } - }, - _ => { - is_malformed = true; } + Ok(()) })?; Ok(Self { flag: fi.flags, stream_id: fi.stream_id }) diff --git a/wtx/src/http2/data_frame.rs b/wtx/src/http2/data_frame.rs index 5a9b7061..48cc2c18 100644 --- a/wtx/src/http2/data_frame.rs +++ b/wtx/src/http2/data_frame.rs @@ -15,10 +15,8 @@ pub(crate) struct DataFrame<'data> { } impl<'data> DataFrame<'data> { - pub(crate) fn eos(data: &'data [u8], data_len: u32, stream_id: U31) -> Self { - let mut this = Self { data, data_len, flag: 0, pad_len: None, stream_id }; - this.set_eos(); - this + pub(crate) fn new(data: &'data [u8], data_len: u32, stream_id: U31) -> Self { + Self { data, data_len, flag: 0, pad_len: None, stream_id } } pub(crate) fn bytes(&self) -> [u8; 9] { diff --git a/wtx/src/http2/headers_frame.rs b/wtx/src/http2/headers_frame.rs index bd3febab..18e0adc3 100644 --- a/wtx/src/http2/headers_frame.rs +++ b/wtx/src/http2/headers_frame.rs @@ -2,29 +2,27 @@ use crate::{ http::{HeaderName, Headers}, http2::{ misc::trim_frame_pad, uri_buffer::MAX_URI_LEN, FrameHeaderTy, FrameInit, HpackDecoder, - HpackEncoder, HpackHeaderBasic, HpackStaticRequestHeaders, HpackStaticResponseHeaders, - Http2Params, UriBuffer, EOH_MASK, EOS_MASK, U31, + HpackHeaderBasic, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Params, + UriBuffer, EOH_MASK, EOS_MASK, U31, }, - misc::{from_utf8_basic, ArrayString, ByteVector, Usize}, + misc::{from_utf8_basic, ArrayString, Usize}, }; #[derive(Debug)] -pub(crate) struct HeadersFrame<'data, 'headers> { +pub(crate) struct HeadersFrame<'data> { flag: u8, - headers: &'headers Headers, hsreqh: HpackStaticRequestHeaders<'data>, hsresh: HpackStaticResponseHeaders, is_over_size: bool, stream_id: U31, } -impl<'data, 'headers> HeadersFrame<'data, 'headers> { +impl<'data> HeadersFrame<'data> { pub(crate) fn new( - headers: &'headers Headers, (hsreqh, hsresh): (HpackStaticRequestHeaders<'data>, HpackStaticResponseHeaders), stream_id: U31, ) -> Self { - Self { flag: 0, headers, hsreqh, hsresh, is_over_size: false, stream_id } + Self { flag: 0, hsreqh, hsresh, is_over_size: false, stream_id } } pub(crate) fn bytes(&self) -> [u8; 9] { @@ -54,7 +52,7 @@ impl<'data, 'headers> HeadersFrame<'data, 'headers> { pub(crate) fn read( mut data: &[u8], fi: FrameInit, - headers: &'headers mut Headers, + headers: &mut Headers, hp: &Http2Params, hpack_dec: &mut HpackDecoder, uri: &mut ArrayString, @@ -76,119 +74,122 @@ impl<'data, 'headers> HeadersFrame<'data, 'headers> { let mut protocol = None; let mut status = None; - hpack_dec.decode(data, |(elem, name, value)| match elem { - HpackHeaderBasic::Authority => { - push_uri( - &mut uri_buffer.authority, - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - max_expanded_headers_len, - name, - value, - ); - } - HpackHeaderBasic::Field => match HeaderName::new(name) { - HeaderName::CONNECTION - | HeaderName::KEEP_ALIVE - | HeaderName::PROXY_CONNECTION - | HeaderName::TRANSFER_ENCODING - | HeaderName::UPGRADE => { - is_malformed = true; - } - HeaderName::TE if value != b"trailers" => { - is_malformed = true; + hpack_dec.decode(data, |(elem, name, value)| { + match elem { + HpackHeaderBasic::Authority => { + push_uri( + &mut uri_buffer.authority, + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + max_expanded_headers_len, + name, + value, + ); } - _ => { - has_fields = true; - let len = decoded_header_size(name.len(), value.len()); - expanded_headers_len = expanded_headers_len.wrapping_add(len); - is_over_size = expanded_headers_len >= max_expanded_headers_len; - if !is_over_size { - headers.push_front(name, value, false); + HpackHeaderBasic::Field => match HeaderName::new(name) { + HeaderName::CONNECTION + | HeaderName::KEEP_ALIVE + | HeaderName::PROXY_CONNECTION + | HeaderName::TRANSFER_ENCODING + | HeaderName::UPGRADE => { + is_malformed = true; + } + HeaderName::TE if value != b"trailers" => { + is_malformed = true; + } + _ => { + has_fields = true; + let len = decoded_header_size(name.len(), value.len()); + expanded_headers_len = expanded_headers_len.wrapping_add(len); + is_over_size = expanded_headers_len >= max_expanded_headers_len; + if !is_over_size { + headers.reserve(name.len().wrapping_add(value.len()), 1); + headers.push_front(name, value, false)?; + } + } + }, + HpackHeaderBasic::Method(local_method) => { + if push_enum( + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + method.is_some(), + max_expanded_headers_len, + name, + value, + ) { + method = Some(local_method); } } - }, - HpackHeaderBasic::Method(local_method) => { - if push_enum( - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - method.is_some(), - max_expanded_headers_len, - name, - value, - ) { - method = Some(local_method); + HpackHeaderBasic::Path => { + push_uri( + &mut uri_buffer.path, + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + max_expanded_headers_len, + name, + value, + ); } - } - HpackHeaderBasic::Path => { - push_uri( - &mut uri_buffer.path, - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - max_expanded_headers_len, - name, - value, - ); - } - HpackHeaderBasic::Protocol(local_protocol) => { - if push_enum( - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - protocol.is_some(), - max_expanded_headers_len, - name, - value, - ) { - protocol = Some(local_protocol); + HpackHeaderBasic::Protocol(local_protocol) => { + if push_enum( + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + protocol.is_some(), + max_expanded_headers_len, + name, + value, + ) { + protocol = Some(local_protocol); + } } - } - HpackHeaderBasic::Scheme => { - push_uri( - &mut uri_buffer.scheme, - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - max_expanded_headers_len, - name, - value, - ); - } - HpackHeaderBasic::StatusCode(local_status) => { - if push_enum( - &mut expanded_headers_len, - &mut has_fields, - &mut is_malformed, - &mut is_over_size, - status.is_some(), - max_expanded_headers_len, - name, - value, - ) { - status = Some(local_status); + HpackHeaderBasic::Scheme => { + push_uri( + &mut uri_buffer.scheme, + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + max_expanded_headers_len, + name, + value, + ); + } + HpackHeaderBasic::StatusCode(local_status) => { + if push_enum( + &mut expanded_headers_len, + &mut has_fields, + &mut is_malformed, + &mut is_over_size, + status.is_some(), + max_expanded_headers_len, + name, + value, + ) { + status = Some(local_status); + } } } + Ok(()) })?; if DO_NOT_PUSH_URI { uri.clear(); - uri.try_push_str(uri_buffer.scheme.as_str())?; - uri.try_push_str(uri_buffer.authority.as_str())?; - uri.try_push_str(uri_buffer.path.as_str())?; + uri.push_str(uri_buffer.scheme.as_str())?; + uri.push_str(uri_buffer.authority.as_str())?; + uri.push_str(uri_buffer.path.as_str())?; } Ok(( Self { flag: fi.flags, - headers, hsreqh: HpackStaticRequestHeaders { authority: &[], method, @@ -211,31 +212,6 @@ impl<'data, 'headers> HeadersFrame<'data, 'headers> { pub(crate) fn set_eos(&mut self) { self.flag |= EOS_MASK; } - - /// Does not write frame headers, instead, a set of opaque bytes are initially written for - /// posterior overwritten. - pub(crate) fn write( - &self, - hpack_enc: &mut HpackEncoder, - wb: &mut ByteVector, - ) -> crate::Result<()> { - let before_init: usize = wb.len(); - wb.extend_from_slice(&[0; 9]); - let after_init = wb.len(); - if IS_CLIENT { - hpack_enc.encode(wb, self.hsreqh.iter(), self.headers.iter())?; - } else { - hpack_enc.encode(wb, self.hsresh.iter(), self.headers.iter())?; - } - let headers_len = wb.len().wrapping_sub(after_init); - if let Some([a, b, c, ..]) = wb.get_mut(before_init..) { - let [_, d, e, f] = u32::try_from(headers_len).unwrap_or_default().to_be_bytes(); - *a = d; - *b = e; - *c = f; - } - Ok(()) - } } #[inline] @@ -283,7 +259,7 @@ fn push_uri( *expanded_headers_len = expanded_headers_len.wrapping_add(len); *is_over_size = *expanded_headers_len >= max_expanded_headers_len; if !*is_over_size { - let _ = from_utf8_basic(value).ok().and_then(|el| buffer.try_push_str(el).ok()); + let _ = from_utf8_basic(value).ok().and_then(|el| buffer.push_str(el).ok()); } } } diff --git a/wtx/src/http2/hpack_decoder.rs b/wtx/src/http2/hpack_decoder.rs index ff3a7711..a7edf2ce 100644 --- a/wtx/src/http2/hpack_decoder.rs +++ b/wtx/src/http2/hpack_decoder.rs @@ -40,7 +40,7 @@ impl HpackDecoder { pub(crate) fn decode( &mut self, mut data: &[u8], - mut cb: impl FnMut((HpackHeaderBasic, &[u8], &[u8])), + mut cb: impl FnMut((HpackHeaderBasic, &[u8], &[u8])) -> crate::Result<()>, ) -> crate::Result<()> { if let Some(elem) = self.max_bytes.1.take() { self.max_bytes.0 = elem; @@ -115,7 +115,7 @@ impl HpackDecoder { &mut self, data: &mut &[u8], mask: u8, - elem_cb: &mut impl FnMut((HpackHeaderBasic, &[u8], &[u8])), + elem_cb: &mut impl FnMut((HpackHeaderBasic, &[u8], &[u8])) -> crate::Result<()>, ) -> crate::Result<()> { let idx = Self::decode_integer(data, mask)?.1; let has_indexed_name = idx != 0; @@ -132,12 +132,12 @@ impl HpackDecoder { HpackHeaderBasic::StatusCode(_) => HpackHeaderBasic::StatusCode(value.try_into()?), }; let name = if static_name.is_empty() { - elem_cb((new_hhb, dyn_name, value)); + elem_cb((new_hhb, dyn_name, value))?; self.header_buffers.0.clear(); self.header_buffers.0.try_extend_from_slice(dyn_name)?; self.header_buffers.0.get_mut(..dyn_name.len()).unwrap_or_default() } else { - elem_cb((new_hhb, static_name, value)); + elem_cb((new_hhb, static_name, value))?; static_name }; (new_hhb, name, value) @@ -145,12 +145,12 @@ impl HpackDecoder { let (hhn, name) = Self::decode_string_name(&mut self.header_buffers.0, data)?; let value = Self::decode_string_value(&mut self.header_buffers.1, data)?; let hhb = HpackHeaderBasic::try_from((hhn, value))?; - elem_cb((hhb, name, value)); + elem_cb((hhb, name, value))?; (hhb, name, value) }; if STORE { self.dyn_headers.reserve(name.len().wrapping_add(value.len()), 1); - self.dyn_headers.push_front(hhb, name, value, false, |_, _| {}); + self.dyn_headers.push_front(hhb, name, value, false, |_, _| {})?; } Ok(()) } @@ -314,7 +314,7 @@ impl HpackDecoder { &mut self, byte: u8, data: &mut &[u8], - elem_cb: &mut impl FnMut((HpackHeaderBasic, &[u8], &[u8])), + elem_cb: &mut impl FnMut((HpackHeaderBasic, &[u8], &[u8])) -> crate::Result<()>, mut size_update_cb: impl FnMut() -> crate::Result<()>, ) -> crate::Result<()> { match DecodeIdx::try_from(byte)? { @@ -326,7 +326,7 @@ impl HpackDecoder { if name.0.is_empty() { name.1 } else { name.0 }, if value.0.is_empty() { value.1 } else { value.0 }, ) - })?) + })?)?; } DecodeIdx::LiteralNeverIndexed | DecodeIdx::LiteralWithoutIndexing => { self.decode_literal::(data, 0b0000_1111, elem_cb)?; @@ -411,7 +411,7 @@ mod bench { let mut hd = HpackDecoder::new(); hd.set_max_bytes(N); b.iter(|| { - hd.decode(&buffer, |_| {}).unwrap(); + hd.decode(&buffer, |_| Ok(())).unwrap(); hd.clear(); }); } diff --git a/wtx/src/http2/hpack_encoder.rs b/wtx/src/http2/hpack_encoder.rs index ac67edab..44fa0bd4 100644 --- a/wtx/src/http2/hpack_encoder.rs +++ b/wtx/src/http2/hpack_encoder.rs @@ -74,9 +74,10 @@ impl HpackEncoder { .and_then(|el| el.checked_add(user_headers.size_hint().1?)) .unwrap_or(usize::MAX), ); + buffer.reserve(pseudo_headers.size_hint().0.wrapping_add(user_headers.size_hint().0)); self.manage_size_update(buffer)?; for (hhb, value) in pseudo_headers { - let idx = self.encode_idx((&[], value, false), hhb, Self::shi_pseudo((hhb, value))); + let idx = self.encode_idx((&[], value, false), hhb, Self::shi_pseudo((hhb, value)))?; self.manage_encode(buffer, (&[], value), idx)?; } for (name, value, is_sensitive) in user_headers { @@ -84,7 +85,7 @@ impl HpackEncoder { (name, value, is_sensitive), HpackHeaderBasic::Field, Self::shi_user((name, value)), - ); + )?; self.manage_encode(buffer, (name, value), idx)?; } Ok(()) @@ -134,7 +135,11 @@ impl HpackEncoder { } #[inline] - fn dyn_idx(&mut self, header: (&[u8], &[u8], bool), should_not_index: bool) -> EncodeIdx { + fn dyn_idx( + &mut self, + header: (&[u8], &[u8], bool), + should_not_index: bool, + ) -> crate::Result { let (name, value, is_sensitive) = header; let mut name_hasher = self.rs.build_hasher(); @@ -145,7 +150,7 @@ impl HpackEncoder { let pair_hash = pair_hasher.finish(); if let (false, Some(pair_idx)) = (should_not_index, self.indcs.get(&pair_hash).copied()) { - return EncodeIdx::RefNameRefValue(self.from_indcs_to_encode_idx(pair_idx)); + return Ok(EncodeIdx::RefNameRefValue(self.from_indcs_to_encode_idx(pair_idx))); } let name_hash = name_hasher.finish(); @@ -159,26 +164,30 @@ impl HpackEncoder { pair_hash, ); } - (true, None) => return EncodeIdx::UnsavedNameUnsavedValue, + (true, None) => return Ok(EncodeIdx::UnsavedNameUnsavedValue), (true, Some(name_idx)) => { - return EncodeIdx::RefNameUnsavedValue(self.from_indcs_to_encode_idx(name_idx)) + return Ok(EncodeIdx::RefNameUnsavedValue(self.from_indcs_to_encode_idx(name_idx))) } } - self.push_dyn_headers((name, value, is_sensitive), (Some(name_hash), pair_hash)); + self.push_dyn_headers((name, value, is_sensitive), (Some(name_hash), pair_hash))?; let next_dyn_idx = self.next_dyn_idx(); self.indcs.reserve(2); let _ = self.indcs.insert(name_hash, next_dyn_idx); let _ = self.indcs.insert(pair_hash, next_dyn_idx); - EncodeIdx::SavedNameSavedValue + Ok(EncodeIdx::SavedNameSavedValue) } #[inline] - fn dyn_idx_with_static_name(&mut self, header: (&[u8], &[u8], bool), name_idx: u32) -> EncodeIdx { + fn dyn_idx_with_static_name( + &mut self, + header: (&[u8], &[u8], bool), + name_idx: u32, + ) -> crate::Result { let (name, value, is_sensitive) = header; let pair_hash = self.rs.hash_one((name, value)); if let Some(common_idx) = self.indcs.get(&pair_hash).copied() { - return EncodeIdx::RefNameRefValue(self.from_indcs_to_encode_idx(common_idx)); + return Ok(EncodeIdx::RefNameRefValue(self.from_indcs_to_encode_idx(common_idx))); } self.store_header_with_ref_name::((name, value, is_sensitive), name_idx, pair_hash) } @@ -189,18 +198,18 @@ impl HpackEncoder { header: (&[u8], &[u8], bool), hhb: HpackHeaderBasic, static_header: Option, - ) -> EncodeIdx { + ) -> crate::Result { match static_header { None => { let (name, value, is_sensitive) = header; let should_not_index = self.should_not_index((name, value, is_sensitive), hhb); self.dyn_idx((name, value, is_sensitive), should_not_index) } - Some(StaticHeader { has_value: true, idx, name: _ }) => EncodeIdx::RefNameRefValue(idx), + Some(StaticHeader { has_value: true, idx, name: _ }) => Ok(EncodeIdx::RefNameRefValue(idx)), Some(StaticHeader { has_value: false, idx, name }) => { let (_, value, is_sensitive) = header; if self.should_not_index((name, value, is_sensitive), hhb) { - EncodeIdx::RefNameUnsavedValue(idx) + Ok(EncodeIdx::RefNameUnsavedValue(idx)) } else { self.dyn_idx_with_static_name((name, value, is_sensitive), idx) } @@ -214,19 +223,19 @@ impl HpackEncoder { buffer.reserve(4); if n < u32::from(mask) { - buffer.push_within_cap(first_byte | n as u8); + buffer.push(first_byte | n as u8)?; return Ok(1); } n = n.wrapping_sub(mask.into()); - buffer.push_within_cap(first_byte | mask); + buffer.push(first_byte | mask)?; for len in 2..5 { if n <= 127 { - buffer.push_within_cap(n as u8); + buffer.push(n as u8)?; return Ok(len); } - buffer.push_within_cap(0b1000_0000 | n as u8); + buffer.push(0b1000_0000 | n as u8)?; n >>= 7; } @@ -238,12 +247,12 @@ impl HpackEncoder { #[inline] fn encode_str(buffer: &mut ByteVector, bytes: &[u8]) -> crate::Result<()> { let before_byte = buffer.len(); - buffer.push(0); + buffer.push(0)?; if bytes.is_empty() { return Ok(()); } let after_byte = buffer.len(); - huffman_encode(bytes, buffer); + huffman_encode(bytes, buffer)?; let after_huffman = buffer.len(); let len_usize = after_huffman.wrapping_sub(after_byte); let fits_in_1_byte = len_usize < 0b0111_1111; @@ -357,12 +366,12 @@ impl HpackEncoder { Self::encode_str(buffer, value)?; } EncodeIdx::SavedNameSavedValue => { - buffer.push(0b0100_0000); + buffer.push(0b0100_0000)?; Self::encode_str(buffer, name)?; Self::encode_str(buffer, value)?; } EncodeIdx::UnsavedNameUnsavedValue => { - buffer.push(0b0001_0000); + buffer.push(0b0001_0000)?; Self::encode_str(buffer, name)?; Self::encode_str(buffer, value)?; } @@ -405,7 +414,7 @@ impl HpackEncoder { &mut self, (name, value, is_sensitive): (&[u8], &[u8], bool), (name_hash, pair_hash): (Option, u64), - ) { + ) -> crate::Result<()> { self.idx = self.idx.wrapping_add(1); self.dyn_headers.reserve(name.len().wrapping_add(value.len()), 1); self.dyn_headers.push_front( @@ -416,7 +425,8 @@ impl HpackEncoder { |metadata, _| { Self::remove_outdated_indices(&mut self.indcs, metadata); }, - ); + )?; + Ok(()) } #[inline] @@ -560,19 +570,19 @@ impl HpackEncoder { (name, value, is_sensitive): (&[u8], &[u8], bool), name_idx: u32, pair_hash: u64, - ) -> EncodeIdx { + ) -> crate::Result { let before = self.dyn_headers.headers_len(); - self.push_dyn_headers((name, value, is_sensitive), (None, pair_hash)); + self.push_dyn_headers((name, value, is_sensitive), (None, pair_hash))?; let _ = self.indcs.insert(pair_hash, self.next_dyn_idx()); if !HAS_STATIC_NAME { let after = self.dyn_headers.headers_len(); let diff = before.wrapping_sub(after.wrapping_sub(1)); let name_idx_has_been_removed = diff > *Usize::from(name_idx); if name_idx_has_been_removed { - return EncodeIdx::SavedNameSavedValue; + return Ok(EncodeIdx::SavedNameSavedValue); } } - EncodeIdx::RefNameSavedValue(name_idx) + Ok(EncodeIdx::RefNameSavedValue(name_idx)) } } diff --git a/wtx/src/http2/http2_buffer.rs b/wtx/src/http2/http2_buffer.rs index 1f443339..b99bc872 100644 --- a/wtx/src/http2/http2_buffer.rs +++ b/wtx/src/http2/http2_buffer.rs @@ -10,7 +10,7 @@ use hashbrown::HashMap; // // Maximum sizes are dictated by `AcceptParams` or `ConnectParams`. #[derive(Debug)] -pub struct Http2Buffer { +pub struct Http2Buffer { pub(crate) hpack_dec: HpackDecoder, pub(crate) hpack_enc: HpackEncoder, pub(crate) hpack_enc_buffer: ByteVector, @@ -19,7 +19,7 @@ pub struct Http2Buffer { pub(crate) uri_buffer: Box, } -impl Http2Buffer { +impl Http2Buffer { /// Creates a new instance without pre-allocated resources. #[inline] pub fn new(rng: RNG) -> Self @@ -48,16 +48,16 @@ impl Http2Buffer { } } -impl Lease> for Http2Buffer { +impl Lease for Http2Buffer { #[inline] - fn lease(&self) -> &Http2Buffer { + fn lease(&self) -> &Http2Buffer { self } } -impl LeaseMut> for Http2Buffer { +impl LeaseMut for Http2Buffer { #[inline] - fn lease_mut(&mut self) -> &mut Http2Buffer { + fn lease_mut(&mut self) -> &mut Http2Buffer { self } } diff --git a/wtx/src/http2/http2_data.rs b/wtx/src/http2/http2_data.rs index 56ca970e..43c7d2d2 100644 --- a/wtx/src/http2/http2_data.rs +++ b/wtx/src/http2/http2_data.rs @@ -22,7 +22,7 @@ pub struct Http2Data { impl Http2Data where - HB: LeaseMut>, + HB: LeaseMut, S: Stream, { #[inline] @@ -39,7 +39,7 @@ where } #[inline] - pub(crate) fn hb_mut(&mut self) -> &mut Http2Buffer { + pub(crate) fn hb_mut(&mut self) -> &mut Http2Buffer { self.hb.lease_mut() } @@ -59,9 +59,7 @@ where } #[inline] - pub(crate) fn parts_mut( - &mut self, - ) -> (&mut Http2Buffer, &mut bool, &mut SendParams, &mut S) { + pub(crate) fn parts_mut(&mut self) -> (&mut Http2Buffer, &mut bool, &mut SendParams, &mut S) { (self.hb.lease_mut(), &mut self.is_conn_open, &mut self.send_params, &mut self.stream) } @@ -73,7 +71,7 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, - mut headers_cb: impl FnMut(&HeadersFrame<'_, '_>) -> crate::Result, + mut headers_cb: impl FnMut(&HeadersFrame<'_>) -> crate::Result, mut read_frame_until_cb: impl FnMut( &[u8], FrameInit, @@ -192,6 +190,7 @@ where ) .await? ); + let check_opt = body_len.checked_add(fi.data_len).filter(|el| *el <= self.hp.max_body_len()); let Some(local_body_len) = check_opt else { return Err(crate::http2::ErrorCode::ProtocolError.into()); @@ -199,7 +198,8 @@ where body_len = local_body_len; if let FrameHeaderTy::Data = fi.ty { let df = DataFrame::read(data, fi)?; - rrb.data.extend_from_slice(df.data()); + rrb.data.reserve(df.data().len()); + rrb.data.extend_from_slice(df.data())?; if df.is_eos() { *stream_state = StreamState::HalfClosedRemote; return Ok(ReadFrameRslt::Resource(())); @@ -245,7 +245,6 @@ where if ContinuationFrame::read(data, fi, &mut rrb.headers, hpack_size, hpack_dec)?.is_eoh() { return Ok(ReadFrameRslt::Resource(())); } - counter = counter.wrapping_add(1); } Err(crate::Error::VeryLargeAmountOfContinuationFrames) } @@ -257,7 +256,7 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, - cb: fn(&HeadersFrame<'_, '_>) -> crate::Result, + cb: fn(&HeadersFrame<'_>) -> crate::Result, ) -> crate::Result> { let mut rfi = rfr_resource_or_return!( self diff --git a/wtx/src/http2/http2_params.rs b/wtx/src/http2/http2_params.rs index d83296ba..e089c9c9 100644 --- a/wtx/src/http2/http2_params.rs +++ b/wtx/src/http2/http2_params.rs @@ -174,7 +174,7 @@ impl Http2Params { } pub(crate) fn to_settings_frame(&self) -> SettingsFrame { - let mut settings_frame = SettingsFrame::default(); + let mut settings_frame = SettingsFrame::empty(); settings_frame.set_enable_connect_protocol(Some(self.enable_connect_protocol)); settings_frame.set_header_table_size(Some(self.max_cached_headers_len.0)); settings_frame.set_initial_window_size(Some(self.initial_window_len)); diff --git a/wtx/src/http2/huffman.rs b/wtx/src/http2/huffman.rs index 6a79ce01..a5691735 100644 --- a/wtx/src/http2/huffman.rs +++ b/wtx/src/http2/huffman.rs @@ -65,16 +65,22 @@ pub(crate) fn huffman_decode( Ok(()) } -pub(crate) fn huffman_encode(from: &[u8], wb: &mut ByteVector) { +pub(crate) fn huffman_encode(from: &[u8], wb: &mut ByteVector) -> crate::Result<()> { const MASK: u64 = 0b1111_1111; - fn push_within_iter(bits: &mut u64, bits_left: &mut u64, wb: &mut ByteVector) { + #[inline] + fn push_within_iter( + bits: &mut u64, + bits_left: &mut u64, + wb: &mut ByteVector, + ) -> crate::Result<()> { let Ok(n) = u8::try_from((*bits >> 32) & MASK) else { _unreachable(); }; - wb.push_within_cap(n); + wb.push(n)?; *bits <<= 8; *bits_left = bits_left.wrapping_add(8); + Ok(()) } let mut bits: u64 = 0; @@ -90,16 +96,16 @@ pub(crate) fn huffman_encode(from: &[u8], wb: &mut ByteVector) { bits |= code << bits_offset; bits_left = bits_offset; if bits_left <= 32 { - push_within_iter(&mut bits, &mut bits_left, wb); + push_within_iter(&mut bits, &mut bits_left, wb)?; } if bits_left <= 32 { - push_within_iter(&mut bits, &mut bits_left, wb); + push_within_iter(&mut bits, &mut bits_left, wb)?; } if bits_left <= 32 { - push_within_iter(&mut bits, &mut bits_left, wb); + push_within_iter(&mut bits, &mut bits_left, wb)?; } if bits_left <= 32 { - push_within_iter(&mut bits, &mut bits_left, wb); + push_within_iter(&mut bits, &mut bits_left, wb)?; } if bits_left <= 32 { _unreachable() @@ -112,8 +118,9 @@ pub(crate) fn huffman_encode(from: &[u8], wb: &mut ByteVector) { let Ok(n) = u8::try_from((bits >> 32) & MASK) else { _unreachable(); }; - wb.push_within_cap(n); + wb.push(n)?; } + Ok(()) } #[cfg(feature = "_bench")] @@ -131,7 +138,7 @@ mod bench { const N: usize = 1024 * 1024; let data = { let mut dest = Vector::with_capacity(N); - huffman_encode(&_data(N), &mut dest); + huffman_encode(&_data(N), &mut dest).unwrap(); dest }; let mut dest = Box::new(ArrayVector::<_, N>::default()); @@ -163,7 +170,7 @@ mod proptest { #[test_strategy::proptest] fn encode_and_decode(data: Vec) { let mut encoded = Vector::with_capacity(data.len()); - huffman_encode(&data, &mut encoded); + huffman_encode(&data, &mut encoded).unwrap(); let mut decoded = _HeaderValueBuffer::default(); if huffman_decode(&encoded, &mut decoded).is_ok() { assert_eq!(&data, &*decoded); @@ -204,7 +211,7 @@ mod test { huffman_decode(encoded, decode_buffer).unwrap(); assert_eq!(&**decode_buffer, bytes); - huffman_encode(&*bytes, encode_buffer); + huffman_encode(&*bytes, encode_buffer).unwrap(); assert_eq!(&**encode_buffer, encoded); decode_buffer.clear(); diff --git a/wtx/src/http2/misc.rs b/wtx/src/http2/misc.rs index 275e2202..2cd9c861 100644 --- a/wtx/src/http2/misc.rs +++ b/wtx/src/http2/misc.rs @@ -1,24 +1,18 @@ use crate::{ - http::Headers, http2::{ - send_params::SendParams, ContinuationFrame, ErrorCode, FrameHeaderTy, FrameInit, GoAwayFrame, - HeadersFrame, HpackEncoder, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, - Http2Params, PingFrame, ReadFrameRslt, ResetStreamFrame, SettingsFrame, StreamState, - WindowUpdateFrame, PAD_MASK, U31, + send_params::SendParams, ErrorCode, FrameHeaderTy, FrameInit, GoAwayFrame, HpackEncoder, + Http2Buffer, Http2Params, PingFrame, ReadFrameRslt, ResetStreamFrame, SettingsFrame, + StreamState, WindowUpdateFrame, PAD_MASK, U31, }, misc::{ - BlocksQueue, ByteVector, PartitionedFilledBuffer, PollOnce, Stream, Usize, _read_until, - _unlikely_elem, + BlocksQueue, PartitionedFilledBuffer, PollOnce, Stream, Usize, _read_until, _unlikely_elem, }, }; use core::pin::pin; use hashbrown::HashMap; #[inline] -pub(crate) fn apply_initial_params( - hb: &mut Http2Buffer, - hp: &Http2Params, -) -> crate::Result<()> { +pub(crate) fn apply_initial_params(hb: &mut Http2Buffer, hp: &Http2Params) -> crate::Result<()> { hb.hpack_dec.set_max_bytes(hp.max_cached_headers_len().0); hb.hpack_enc.set_max_dyn_super_bytes(hp.max_cached_headers_len().1); hb.pfb._expand_buffer(*Usize::from(hp.read_buffer_len())); @@ -49,13 +43,14 @@ where return Ok(ReadFrameRslt::IdleConnection); }; let fi = FrameInit::from_array(array?)?; + _trace!("Received frame: {fi:?}"); if fi.data_len > hp.max_frame_len() { return Err(crate::Error::VeryLargePayload); } let frame_len = fi.data_len.wrapping_add(9); let mut is_fulfilled = false; pfb._expand_following(*Usize::from(fi.data_len)); - for _ in 0..fi.data_len { + for _ in 0..=fi.data_len { if read >= *Usize::from(frame_len) { is_fulfilled = true; break; @@ -155,21 +150,15 @@ where let mut pf = PingFrame::read(pfb._current(), fi)?; if !pf.is_ack() { pf.set_ack(); - write_to_stream([&pf.bytes()], *is_conn_open, stream).await?; + write_bytes([&pf.bytes()], *is_conn_open, stream).await?; } continue; } - FrameHeaderTy::Reset => { - reset_cb(hp)?; - let _ = ResetStreamFrame::read(pfb._current(), fi)?; - return Ok(reset_stream(stream_state, streams_num)); - } FrameHeaderTy::Settings => { let sf = SettingsFrame::read(pfb._current(), fi)?; if !sf.is_ack() { send_params.update(hpack_enc, &sf)?; - write_to_stream([SettingsFrame::ack().bytes(&mut [0; 45])], *is_conn_open, stream) - .await?; + write_bytes([SettingsFrame::ack().bytes(&mut [0; 45])], *is_conn_open, stream).await?; } continue; } @@ -180,6 +169,11 @@ where _ => return Err(ErrorCode::ProtocolError.into()), } } + if let FrameHeaderTy::Reset = fi.ty { + reset_cb(hp)?; + let _ = ResetStreamFrame::read(pfb._current(), fi)?; + return Ok(reset_stream(stream_state, streams_num)); + } if loop_cb(fi, hp, pfb._current())? { return Ok(ReadFrameRslt::Resource(fi)); } @@ -205,7 +199,7 @@ pub(crate) fn read_frame_until_cb_known_id( if stream_frames.elements_len() > hp.max_buffered_frames_num().into() { return Err(crate::Error::VeryLargeAmountOfBufferedFrames); } - stream_frames.push_front_within_cap([data], fi); + stream_frames.push_front([data], fi)?; Ok(false) } @@ -222,7 +216,7 @@ pub(crate) fn read_frame_until_cb_unknown_id( if stream_frames.elements_len() > hp.max_buffered_frames_num().into() { return Err(crate::Error::VeryLargeAmountOfBufferedFrames); } - stream_frames.push_front_within_cap([data], fi); + stream_frames.push_front([data], fi)?; Ok(false) } @@ -244,7 +238,7 @@ pub(crate) async fn send_go_away( where S: Stream, { - write_to_stream([go_away_frame.bytes().as_slice()], *is_conn_open, stream).await?; + write_bytes([go_away_frame.bytes().as_slice()], *is_conn_open, stream).await?; *is_conn_open = false; Ok(()) } @@ -259,7 +253,7 @@ where S: Stream, { *stream_state = StreamState::Closed; - write_to_stream([reset_frame.bytes().as_slice()], *is_conn_open, stream).await?; + write_bytes([reset_frame.bytes().as_slice()], *is_conn_open, stream).await?; Ok(()) } @@ -281,101 +275,32 @@ pub(crate) fn trim_frame_pad(data: &mut &[u8], flags: u8) -> crate::Result( - body: &[u8], - headers: &Headers, - hpack_enc: &mut HpackEncoder, - hpack_enc_buffer: &mut ByteVector, - (hsreqh, hsresh): (HpackStaticRequestHeaders<'_>, HpackStaticResponseHeaders), +pub(crate) async fn write_bytes( + frames: [&[u8]; N], is_conn_open: bool, - send_params: &SendParams, stream: &mut S, - stream_id: U31, ) -> crate::Result<()> where S: Stream, { - #[inline] - fn adjust_frame_init(content: &[u8], frame_init: [u8; 9], frame_init_buffer: &mut [u8; 9]) { - let [a, b, c, d, e, f, g, h, i] = frame_init_buffer; - let [_, j, k, l] = u32::try_from(content.len()).unwrap_or_default().to_be_bytes(); - let [_, _, _, m, n, o, p, q, r] = frame_init; - *a = j; - *b = k; - *c = l; - *d = m; - *e = n; - *f = o; - *g = p; - *h = q; - *i = r; - } - - #[inline] - async fn single_header_frame( - frame_init_buffer: &mut [u8; 9], - hf: &mut HeadersFrame<'_, '_>, - hf_content: &[u8], - is_conn_open: bool, - stream: &mut S, - ) -> crate::Result<()> - where - S: Stream, - { - hf.set_eoh(); - adjust_frame_init(hf_content, hf.bytes(), frame_init_buffer); - write_to_stream([frame_init_buffer, hf_content], is_conn_open, stream).await?; - return Ok(()); - } - - if IS_CLIENT { - hpack_enc.encode(hpack_enc_buffer, hsreqh.iter(), headers.iter())?; - } else { - hpack_enc.encode(hpack_enc_buffer, hsresh.iter(), headers.iter())?; - } - - let header_frame_init = &mut [0; 9]; - let hf = &mut HeadersFrame::new(headers, (hsreqh, hsresh), stream_id); - if body.is_empty() { - hf.set_eos(); - } - - if hpack_enc_buffer.is_empty() { - return single_header_frame(header_frame_init, hf, &[], is_conn_open, stream).await; - } - - let mut iter = hpack_enc_buffer.chunks_mut(*Usize::from(send_params.max_frame_len)); - - let Some(first) = iter.next() else { - return Ok(()); - }; - let Some(second) = iter.next() else { - return single_header_frame(header_frame_init, hf, first, is_conn_open, stream).await; - }; - - if iter.len() <= 2 { - adjust_frame_init(first, hf.bytes(), header_frame_init); - let mut cf = ContinuationFrame::new(stream_id); - cf.set_eoh(); - adjust_frame_init(second, cf.bytes(), header_frame_init); - write_to_stream([header_frame_init, first, second], is_conn_open, stream).await?; - } - - Ok(()) -} - -#[inline] -pub(crate) async fn write_to_stream( - bytes: [&[u8]; N], - is_conn_open: bool, - stream: &mut S, -) -> crate::Result<()> -where - S: Stream, -{ - if is_conn_open { + if !is_conn_open { return Ok(()); } - stream.write_all_vectored(bytes).await?; + _trace!("Sending frame(s): {:?}", { + let mut is_prev_init = false; + let mut rslt = [None; N]; + for (elem, frame) in rslt.iter_mut().zip(frames.iter()) { + if let ([a, b, c, d, e, f, g, h, i], false) = (frame, is_prev_init) { + if let Ok(frame_init) = FrameInit::from_array([*a, *b, *c, *d, *e, *f, *g, *h, *i]) { + is_prev_init = true; + *elem = Some(frame_init); + } + } else { + is_prev_init = false; + } + } + rslt + }); + stream.write_all_vectored(frames).await?; Ok(()) } diff --git a/wtx/src/http2/req_res_buffer.rs b/wtx/src/http2/req_res_buffer.rs index 9a2ecb8a..80eaca2c 100644 --- a/wtx/src/http2/req_res_buffer.rs +++ b/wtx/src/http2/req_res_buffer.rs @@ -1,7 +1,7 @@ use crate::{ - http::Headers, - http2::uri_buffer::MAX_URI_LEN, - misc::{ArrayString, ByteVector}, + http::{Headers, Method, RequestRef, Version}, + http2::{uri_buffer::MAX_URI_LEN, CACHED_HEADERS_LEN_LOWER_BOUND}, + misc::{ArrayString, ByteVector, UriRef, Usize}, }; use alloc::boxed::Box; @@ -20,7 +20,8 @@ pub struct ReqResBuffer { impl Default for ReqResBuffer { fn default() -> Self { - Self { data: ByteVector::new(), headers: Headers::new(0), uri: Box::new(ArrayString::new()) } + let n = *Usize::from(CACHED_HEADERS_LEN_LOWER_BOUND); + Self { data: ByteVector::new(), headers: Headers::new(n), uri: Box::new(ArrayString::new()) } } } @@ -30,4 +31,15 @@ impl ReqResBuffer { self.data.clear(); self.headers.clear(); } + + /// Shortcut to create a [RequestRef] with inner data. + pub fn as_http2_request_ref(&self, method: Method) -> RequestRef<'_, '_, '_, [u8]> { + RequestRef { + data: &self.data, + headers: &self.headers, + method, + uri: UriRef::new(self.uri.as_str()), + version: Version::Http2, + } + } } diff --git a/wtx/src/http2/server_stream.rs b/wtx/src/http2/server_stream.rs index 814374d1..f0302241 100644 --- a/wtx/src/http2/server_stream.rs +++ b/wtx/src/http2/server_stream.rs @@ -2,12 +2,12 @@ use crate::{ http::{Method, RequestMut, Response, ResponseData}, http2::{ http2_data::ReadFramesInit, - misc::{send_go_away, send_reset, write_to_stream}, - DataFrame, ErrorCode, GoAwayFrame, HeadersFrame, HpackStaticRequestHeaders, - HpackStaticResponseHeaders, Http2Buffer, Http2Data, ReadFrameRslt, ReqResBuffer, - ResetStreamFrame, StreamState, U31, + misc::{send_go_away, send_reset}, + write_stream::write_stream, + ErrorCode, GoAwayFrame, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, + Http2Data, ReadFrameRslt, ReqResBuffer, ResetStreamFrame, StreamState, U31, }, - misc::{ByteVector, Lease, LeaseMut, Lock, RefCounter, Stream, Uri}, + misc::{AsyncBounds, ByteVector, Lease, LeaseMut, Lock, RefCounter, Stream, Uri, _Span}, }; use core::marker::PhantomData; use tokio::sync::MutexGuard; @@ -21,19 +21,26 @@ pub struct ServerStream { is_eos: bool, method: Method, phantom: PhantomData<(HB, S)>, + span: _Span, stream_id: U31, stream_state: StreamState, } impl ServerStream { #[inline] - pub(crate) fn new(hd: HD, rfi: ReadFramesInit, stream_state: StreamState) -> Self { + pub(crate) fn new( + hd: HD, + span: _Span, + rfi: ReadFramesInit, + stream_state: StreamState, + ) -> Self { Self { hd, hpack_size: rfi.hpack_size, is_eos: rfi.is_eos, method: rfi.headers_rslt, phantom: PhantomData, + span, stream_id: rfi.stream_id, stream_state, } @@ -42,21 +49,24 @@ impl ServerStream { impl ServerStream where - HB: LeaseMut>, + HB: LeaseMut, HD: RefCounter, for<'guard> HD::Item: Lock< Guard<'guard> = MutexGuard<'guard, Http2Data>, Resource = Http2Data, > + 'guard, - S: Stream, + S: AsyncBounds + Stream, { /// High-level method that reads all remaining data to build a request. + // + // `rrb` won't be cleared because it should have been used earlier when accepting a stream. #[inline] pub async fn recv_req<'rrb>( &mut self, rrb: &'rrb mut ReqResBuffer, ) -> crate::Result>> { - rrb.clear(); + let _e = self.span._enter(); + _trace!("Receiving request"); rfr_until_resource_with_guard!(self.hd, |guard| { guard .read_frames_others( @@ -94,35 +104,26 @@ where D: ResponseData, D::Body: Lease<[u8]>, { - if !self.stream_state.can_server_send() { - return Ok(()); - } + let _e = self.span._enter(); + _trace!("Sending response"); let mut guard = self.hd.lock().await; - let (hb, is_conn_open, _, stream) = guard.parts_mut(); - let mut hf = HeadersFrame::new( + let (hb, is_conn_open, send_params, stream) = guard.parts_mut(); + write_stream::<_, false>( + res.data.body().lease(), res.data.headers(), + &mut hb.hpack_enc, + &mut hb.hpack_enc_buffer, ( HpackStaticRequestHeaders::EMPTY, HpackStaticResponseHeaders { status_code: Some(res.status_code) }, ), + *is_conn_open, + send_params, + stream, self.stream_id, - ); - let body = res.data.body().lease(); - if body.is_empty() { - hf.set_eos(); - hf.write::(&mut hb.hpack_enc, &mut hb.hpack_enc_buffer)?; - write_to_stream([&*hb.hpack_enc_buffer], *is_conn_open, stream).await?; - } else { - hf.write::(&mut hb.hpack_enc, &mut hb.hpack_enc_buffer)?; - let data_len = u32::try_from(body.len())?; - let df = DataFrame::eos(body, data_len, self.stream_id); - write_to_stream( - [&*hb.hpack_enc_buffer, df.bytes().as_slice(), df.data()], - *is_conn_open, - stream, - ) - .await?; - } + &mut self.stream_state, + ) + .await?; self.stream_state = StreamState::Closed; Ok(()) } diff --git a/wtx/src/http2/settings_frame.rs b/wtx/src/http2/settings_frame.rs index af7cb428..32e04f87 100644 --- a/wtx/src/http2/settings_frame.rs +++ b/wtx/src/http2/settings_frame.rs @@ -22,24 +22,11 @@ pub(crate) struct SettingsFrame { } impl SettingsFrame { - pub(crate) const fn default() -> Self { - Self { - enable_connect_protocol: None, - flags: 0, - header_table_size: None, - initial_window_size: None, - len: 0, - max_concurrent_streams: None, - max_frame_size: None, - max_header_list_size: None, - } - } - pub(crate) const fn ack() -> Self { SettingsFrame { flags: ACK_MASK, ..SettingsFrame::empty() } } - const fn empty() -> Self { + pub(crate) const fn empty() -> Self { Self { enable_connect_protocol: None, flags: 0, @@ -222,31 +209,47 @@ impl SettingsFrame { } pub(crate) fn set_enable_connect_protocol(&mut self, elem: Option) { + Self::update_len(&mut self.len, self.enable_connect_protocol, elem); self.enable_connect_protocol = elem; } pub(crate) fn set_header_table_size(&mut self, elem: Option) { + Self::update_len(&mut self.len, self.header_table_size, elem); self.header_table_size = elem; } pub(crate) fn set_initial_window_size(&mut self, elem: Option) { - self.initial_window_size = elem; + Self::update_len(&mut self.len, self.initial_window_size, elem); + self.initial_window_size = elem.map(|val| val.clamp(0, U31::MAX.u32())); } pub(crate) fn set_max_concurrent_streams(&mut self, elem: Option) { + Self::update_len(&mut self.len, self.max_concurrent_streams, elem); self.max_concurrent_streams = elem; } pub(crate) fn set_max_frame_size(&mut self, elem: Option) { - if let Some(elem) = elem { - assert!((FRAME_LEN_LOWER_BOUND..=FRAME_LEN_UPPER_BOUND).contains(&elem)); - } - self.max_frame_size = elem; + Self::update_len(&mut self.len, self.max_frame_size, elem); + self.max_frame_size = elem.map(|val| val.clamp(FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND)); } pub(crate) fn set_max_header_list_size(&mut self, elem: Option) { + Self::update_len(&mut self.len, self.max_header_list_size, elem); self.max_header_list_size = elem; } + + #[inline] + fn update_len(len: &mut u8, a: Option, b: Option) { + match (a, b) { + (None, Some(_)) => { + *len = len.wrapping_add(6); + } + (Some(_), None) => { + *len = len.wrapping_sub(6); + } + _ => {} + } + } } #[derive(Debug)] diff --git a/wtx/src/http2/stream_state.rs b/wtx/src/http2/stream_state.rs index 8a3994bd..deb96d02 100644 --- a/wtx/src/http2/stream_state.rs +++ b/wtx/src/http2/stream_state.rs @@ -13,9 +13,3 @@ pub(crate) enum StreamState { /// The system is receiving data after [StreamState::Open]. Open, } - -impl StreamState { - pub(crate) fn can_server_send(self) -> bool { - matches!(self, Self::Open | Self::HalfClosedRemote) - } -} diff --git a/wtx/src/http2/tests/connections.rs b/wtx/src/http2/tests/connections.rs index eacd5950..5a9d146b 100644 --- a/wtx/src/http2/tests/connections.rs +++ b/wtx/src/http2/tests/connections.rs @@ -60,14 +60,14 @@ async fn connections() { } trait Test { - async fn client(http2: &mut Http2Tokio, TcpStream, true>); + async fn client(http2: &mut Http2Tokio); - async fn server(http2: &mut Http2Tokio, TcpStream, false>); + async fn server(http2: &mut Http2Tokio); } struct Stub; impl Test for Stub { - async fn client(_: &mut Http2Tokio, TcpStream, true>) {} + async fn client(_: &mut Http2Tokio) {} - async fn server(_: &mut Http2Tokio, TcpStream, false>) {} + async fn server(_: &mut Http2Tokio) {} } diff --git a/wtx/src/http2/tests/hpack.rs b/wtx/src/http2/tests/hpack.rs index c84616f3..bd527d7a 100644 --- a/wtx/src/http2/tests/hpack.rs +++ b/wtx/src/http2/tests/hpack.rs @@ -256,6 +256,7 @@ fn test_story_encoding_and_decoding( } else { assert_eq!((hhb, value), pseudo_headers.remove(0)); } + Ok(()) }) .unwrap(); @@ -287,6 +288,7 @@ fn test_story_wired_decoding( let (name, value) = strs(hhb, name, value); assert_eq!(case_header.name, name); assert_eq!(case_header.value, value); + Ok(()) }) .unwrap(); assert_eq!(0, case.headers.len()); diff --git a/wtx/src/http2/write_stream.rs b/wtx/src/http2/write_stream.rs new file mode 100644 index 00000000..c3eca4e8 --- /dev/null +++ b/wtx/src/http2/write_stream.rs @@ -0,0 +1,370 @@ +macro_rules! write_data_frames { + ( + ($body:expr, $is_conn_open:expr, $send_params:expr, $stream:expr, $stream_id:expr), + || $write_none_rslt:block, + |$write_one:ident| $write_one_rslt:block, + |$write_two:ident| $write_two_rslt:block + ) => {{ + let mut iter = $body.chunks(*Usize::from($send_params.max_frame_len)); + loop { + let should_stop = write_generic_frames!( + iter, + |bytes| DataFrame::new(bytes, u32::try_from(bytes.len()).unwrap_or_default(), $stream_id), + |frame: &mut DataFrame<'_>| frame.set_eos(), + || $write_none_rslt, + |array| { + let $write_one = array; + $write_one_rslt; + }, + |array| { + let $write_two = array; + $write_two_rslt; + } + ); + if should_stop { + break; + } + } + }}; +} + +macro_rules! write_generic_frames { + ( + $iter:expr, + $frame_cb:expr, + $end_cb:expr, + || $write_none_rslt:block, + |$write_one:ident| $write_one_rslt:block, + |$write_two:ident| $write_two_rslt:block + ) => { + if let Some(first) = $iter.next() { + let mut first_frame = $frame_cb(first); + let mut first_init_buffer = [0; 9]; + if let Some(second) = $iter.next() { + adjust_frame_init(first, first_frame.bytes(), &mut first_init_buffer); + let mut second_frame = $frame_cb(second); + let mut second_init_buffer = [0; 9]; + if $iter.len() == 0 { + $end_cb(&mut second_frame); + adjust_frame_init(second, second_frame.bytes(), &mut second_init_buffer); + let $write_two = (first_init_buffer, first, second_init_buffer, second); + $write_two_rslt; + true + } else { + adjust_frame_init(second, second_frame.bytes(), &mut second_init_buffer); + let $write_two = (first_init_buffer, first, second_init_buffer, second); + $write_two_rslt; + false + } + } else { + $end_cb(&mut first_frame); + adjust_frame_init(first, first_frame.bytes(), &mut first_init_buffer); + let $write_one = (first_init_buffer, first); + $write_one_rslt; + true + } + } else { + $write_none_rslt; + true + } + }; +} + +use crate::{ + http::Headers, + http2::{ + misc::write_bytes, send_params::SendParams, ContinuationFrame, DataFrame, HeadersFrame, + HpackEncoder, HpackStaticRequestHeaders, HpackStaticResponseHeaders, StreamState, U31, + }, + misc::{AsyncBounds, ByteVector, Stream, Usize}, +}; + +#[inline] +pub(crate) async fn write_stream( + body: &[u8], + headers: &Headers, + hpack_enc: &mut HpackEncoder, + hpack_enc_buffer: &mut ByteVector, + (hsreqh, hsresh): (HpackStaticRequestHeaders<'_>, HpackStaticResponseHeaders), + is_conn_open: bool, + send_params: &SendParams, + stream: &mut S, + stream_id: U31, + stream_state: &mut StreamState, +) -> crate::Result<()> +where + S: AsyncBounds + Stream, +{ + let can_start_sending = if IS_CLIENT { + matches!(stream_state, StreamState::Idle | StreamState::HalfClosedLocal) + } else { + matches!(stream_state, StreamState::Idle | StreamState::HalfClosedRemote) + }; + if !can_start_sending { + return Ok(()); + } + + if headers.bytes_len() > *Usize::from(send_params.max_expanded_headers_len) { + return Err(crate::Error::VeryLargeHeadersLen); + } + + hpack_enc_buffer.clear(); + if IS_CLIENT { + hpack_enc.encode(hpack_enc_buffer, hsreqh.iter(), headers.iter())?; + } else { + hpack_enc.encode(hpack_enc_buffer, hsresh.iter(), headers.iter())?; + } + + let hf = &mut HeadersFrame::new((hsreqh, hsresh), stream_id); + if body.is_empty() { + hf.set_eos(); + } + + if hpack_enc_buffer.is_empty() { + write_single_header(body, hf, &[], is_conn_open, send_params, stream, stream_id).await?; + *stream_state = StreamState::Open; + return Ok(()); + } + + let mut iter = hpack_enc_buffer.chunks(*Usize::from(send_params.max_frame_len)); + + if let Some(first) = iter.next() { + write_single_header(body, hf, first, is_conn_open, send_params, stream, stream_id).await?; + *stream_state = StreamState::Open; + } else { + return Ok(()); + } + + for _ in 0.._max_continuation_frames!() { + let should_stop = write_generic_frames!( + iter, + |_| ContinuationFrame::new(stream_id), + |frame: &mut ContinuationFrame| frame.set_eoh(), + || {}, + |header_array| { + let (a, b) = header_array; + write_data_frames!( + (body, is_conn_open, send_params, stream, stream_id), + || { + write_bytes([&a, b], is_conn_open, stream).await?; + }, + |data_array| { + let (c, d) = data_array; + write_bytes([&a, b, &c, d], is_conn_open, stream).await?; + }, + |data_array| { + let (c, d, e, f) = data_array; + write_bytes([&a, b, &c, d, &e, f], is_conn_open, stream).await?; + } + ); + }, + |header_array| { + let (a, b, c, d) = header_array; + write_data_frames!( + (body, is_conn_open, send_params, stream, stream_id), + || { + write_bytes([&a, b], is_conn_open, stream).await?; + }, + |data_array| { + let (e, f) = data_array; + write_bytes([&a, b, &c, d, &e, f], is_conn_open, stream).await?; + }, + |data_array| { + let (e, f, g, h) = data_array; + write_bytes([&a, b, &c, d, &e, f, &g, h], is_conn_open, stream).await?; + } + ); + } + ); + if should_stop { + return Ok(()); + } + } + + Ok(()) +} + +#[inline] +fn adjust_frame_init(content: &[u8], frame_init: [u8; 9], frame_init_buffer: &mut [u8; 9]) { + let [a, b, c, d, e, f, g, h, i] = frame_init_buffer; + let [_, j, k, l] = u32::try_from(content.len()).unwrap_or_default().to_be_bytes(); + let [_, _, _, m, n, o, p, q, r] = frame_init; + *a = j; + *b = k; + *c = l; + *d = m; + *e = n; + *f = o; + *g = p; + *h = q; + *i = r; +} + +#[inline] +async fn write_single_header( + body: &[u8], + hf: &mut HeadersFrame<'_>, + hf_content: &[u8], + is_conn_open: bool, + send_params: &SendParams, + stream: &mut S, + stream_id: U31, +) -> crate::Result<()> +where + S: Stream, +{ + let init_buffer = &mut [0; 9]; + hf.set_eoh(); + adjust_frame_init(hf_content, hf.bytes(), init_buffer); + + write_data_frames!( + (body, is_conn_open, send_params, stream, stream_id), + || { + write_bytes([init_buffer, hf_content], is_conn_open, stream).await?; + }, + |data_array| { + let (a, b) = data_array; + write_bytes([init_buffer, hf_content, &a, b], is_conn_open, stream).await?; + }, + |data_array| { + let (a, b, c, d) = data_array; + write_bytes([init_buffer, hf_content, &a, b, &c, d], is_conn_open, stream).await?; + } + ); + return Ok(()); +} + +#[cfg(test)] +mod tests { + use crate::{ + http::{Headers, Method, Request, Response, ResponseData, StatusCode}, + http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReadFrameRslt, ReqResBuffer}, + misc::{UriString, Vector, _uri}, + rng::StaticRng, + }; + use core::time::Duration; + use tokio::net::{TcpListener, TcpStream}; + + #[tokio::test] + async fn streams_with_different_frames() { + #[cfg(feature = "_tracing-subscriber")] + let _rslt = crate::misc::tracing_subscriber_init(); + let uri = _uri(); + server(&uri).await; + client(uri).await; + } + + async fn client(uri: UriString) { + let mut client = Http2Tokio::connect( + Http2Buffer::new(StaticRng::default()), + Http2Params::default(), + TcpStream::connect(uri.host()).await.unwrap(), + ) + .await + .unwrap(); + let mut rrb = ReqResBuffer::default(); + rrb.data.reserve(3); + rrb.headers.reserve(6, 1); + + let res = stream_client(&mut client, &mut rrb).await; + _0(res.data.body(), res.data.headers()); + + rrb.clear(); + rrb.headers.push_front(b"123", b"456", false).unwrap(); + let res = stream_client(&mut client, &mut rrb).await; + _1(res.data.body(), res.data.headers()); + + rrb.clear(); + rrb.data.extend_from_slice(b"123").unwrap(); + let res = stream_client(&mut client, &mut rrb).await; + _2(res.data.body(), res.data.headers()); + + rrb.clear(); + rrb.data.extend_from_slice(b"123").unwrap(); + rrb.headers.push_front(b"123", b"456", false).unwrap(); + let res = stream_client(&mut client, &mut rrb).await; + _3(res.data.body(), res.data.headers()); + + tokio::time::sleep(Duration::from_millis(2000)).await; + } + + async fn server(uri: &UriString) { + let listener = TcpListener::bind(uri.host()).await.unwrap(); + let _server_jh = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut server = + Http2Tokio::accept(Http2Buffer::new(StaticRng::default()), Http2Params::default(), stream) + .await + .unwrap(); + let mut rrb = ReqResBuffer::default(); + + stream_server(&mut rrb, &mut server, |req| { + _0(req.data, req.headers); + }) + .await; + stream_server(&mut rrb, &mut server, |req| { + _1(req.data, req.headers); + }) + .await; + stream_server(&mut rrb, &mut server, |req| { + _2(req.data, req.headers); + }) + .await; + stream_server(&mut rrb, &mut server, |req| { + _3(req.data, req.headers); + }) + .await; + + server.send_go_away(ErrorCode::NoError).await.unwrap(); + }); + } + + async fn stream_server<'rrb>( + rrb: &'rrb mut ReqResBuffer, + server: &mut Http2Tokio, + mut cb: impl FnMut(&Request<&mut Vector, &mut Headers, &str>), + ) { + loop { + let rfr = server.stream(rrb).await.unwrap(); + let mut stream = match rfr { + ReadFrameRslt::ClosedConnection | ReadFrameRslt::ClosedStream => { + panic!(); + } + ReadFrameRslt::IdleConnection => { + continue; + } + ReadFrameRslt::Resource(elem) => elem, + }; + let req = stream.recv_req(rrb).await.unwrap().resource().unwrap(); + cb(&req); + stream.send_res(Response::http2((&req.data, &req.headers), StatusCode::Ok)).await.unwrap(); + break; + } + } + + async fn stream_client<'rrb>( + client: &mut Http2Tokio, + rrb: &'rrb mut ReqResBuffer, + ) -> Response<&'rrb mut ReqResBuffer> { + let mut stream = client.stream().await.unwrap(); + stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); + stream.recv_res(rrb).await.unwrap().resource().unwrap() + } + + #[track_caller] + fn _0(data: &[u8], headers: &Headers) { + assert_eq!((data.len(), headers.bytes_len(), headers.elements_len()), (0, 0, 0)); + } + #[track_caller] + fn _1(data: &[u8], headers: &Headers) { + assert_eq!((data.len(), headers.bytes_len(), headers.elements_len()), (0, 6, 1)); + } + #[track_caller] + fn _2(data: &[u8], headers: &Headers) { + assert_eq!((data.len(), headers.bytes_len(), headers.elements_len()), (3, 0, 0)); + } + #[track_caller] + fn _3(data: &[u8], headers: &Headers) { + assert_eq!((data.len(), headers.bytes_len(), headers.elements_len()), (3, 6, 1)); + } +} diff --git a/wtx/src/lib.rs b/wtx/src/lib.rs index 936e9a63..cc73696f 100644 --- a/wtx/src/lib.rs +++ b/wtx/src/lib.rs @@ -1,7 +1,6 @@ #![doc = include_str!("../README.md")] #![cfg_attr(feature = "_bench", allow(soft_unstable))] #![cfg_attr(feature = "_bench", feature(test))] -#![cfg_attr(feature = "nightly", feature(hint_assert_unchecked))] #![no_std] extern crate alloc; diff --git a/wtx/src/macros.rs b/wtx/src/macros.rs index 012e07ef..5d366e59 100644 --- a/wtx/src/macros.rs +++ b/wtx/src/macros.rs @@ -198,3 +198,21 @@ macro_rules! _max_frames_mismatches { 2_147_483_647 }; } + +macro_rules! _trace { + ($($tt:tt)+) => { + #[cfg(feature = "tracing")] + tracing::trace!($($tt)+); + }; +} + +macro_rules! _trace_span { + ($($tt:tt)+) => { + crate::misc::_Span::_new( + #[cfg(feature = "tracing")] + tracing::trace_span!($($tt)+), + #[cfg(not(feature = "tracing"))] + () + ) + }; +} diff --git a/wtx/src/misc.rs b/wtx/src/misc.rs index ec96b61f..859f80a8 100644 --- a/wtx/src/misc.rs +++ b/wtx/src/misc.rs @@ -25,6 +25,7 @@ mod queue; mod queue_utils; mod ref_counter; mod single_type_storage; +mod span; mod stream; #[cfg(feature = "tokio-rustls")] mod tokio_rustls; @@ -70,6 +71,7 @@ pub(crate) use { blocks_queue::{Block, BlocksQueue}, mem_transfer::_shift_bytes, partitioned_filled_buffer::PartitionedFilledBuffer, + span::{_Entered, _Span}, }; /// Vector of bytes diff --git a/wtx/src/misc/array_string.rs b/wtx/src/misc/array_string.rs index 312e4e52..6c53b0f0 100644 --- a/wtx/src/misc/array_string.rs +++ b/wtx/src/misc/array_string.rs @@ -50,6 +50,23 @@ impl ArrayString { self.len = 0; } + /// Appends an element to the back of the collection. + #[inline] + pub fn push(&mut self, cf: char) -> crate::Result<()> { + self.push_bytes(char_slice(&mut [0; 4], cf)) + } + + /// Iterates over the slice `other`, copies each element, and then appends + /// it to this vector. The `other` slice is traversed in-order. + /// + /// # Panics + /// + /// If there is no available capacity. + #[inline] + pub fn push_str(&mut self, str: &str) -> crate::Result<()> { + self.push_bytes(str.as_bytes()) + } + /// How many elements can be added to this collection. #[inline] pub fn remaining(&self) -> u32 { @@ -66,24 +83,6 @@ impl ArrayString { slice.copy_from_slice(str.as_bytes()); Ok(()) } - - /// Appends an element to the back of the collection. - #[inline] - pub fn try_push(&mut self, cf: char) -> crate::Result<()> { - self.try_push_bytes(char_slice(&mut [0; 4], cf)) - } - - /// Iterates over the slice `other`, copies each element, and then appends - /// it to this vector. The `other` slice is traversed in-order. - /// - /// # Panics - /// - /// If there is no available capacity. - #[inline] - pub fn try_push_str(&mut self, str: &str) -> crate::Result<()> { - self.try_push_bytes(str.as_bytes()) - } - /// Shortens the vector, keeping the first `len` elements. #[inline] pub fn truncate(&mut self, len: u32) { @@ -91,7 +90,7 @@ impl ArrayString { } #[inline] - fn try_push_bytes(&mut self, other: &[u8]) -> crate::Result<()> { + fn push_bytes(&mut self, other: &[u8]) -> crate::Result<()> { let Some(len) = u32::try_from(other.len()).ok().filter(|el| self.remaining() >= *el) else { return Err(crate::Error::CapacityOverflow); }; @@ -215,7 +214,7 @@ impl TryFrom<&str> for ArrayString { #[inline] fn try_from(from: &str) -> Result { let mut this = Self::default(); - this.try_push_str(from)?; + this.push_str(from)?; Ok(this) } } @@ -223,12 +222,12 @@ impl TryFrom<&str> for ArrayString { impl Write for ArrayString { #[inline] fn write_char(&mut self, ch: char) -> fmt::Result { - self.try_push(ch).map_err(|_err| fmt::Error) + self.push(ch).map_err(|_err| fmt::Error) } #[inline] fn write_str(&mut self, str: &str) -> fmt::Result { - self.try_push_str(str).map_err(|_err| fmt::Error) + self.push_str(str).map_err(|_err| fmt::Error) } } diff --git a/wtx/src/misc/blocks_queue.rs b/wtx/src/misc/blocks_queue.rs index 8f8fa16e..8833833d 100644 --- a/wtx/src/misc/blocks_queue.rs +++ b/wtx/src/misc/blocks_queue.rs @@ -38,7 +38,6 @@ macro_rules! do_get { } use crate::misc::{ - _unreachable, queue_utils::{reserve, wrap_sub}, Lease, Queue, SingleTypeStorage, Vector, }; @@ -175,7 +174,11 @@ where } #[inline] - pub(crate) fn push_front_within_cap(&mut self, data: [&[D]; N], misc: M) { + pub(crate) fn push_front( + &mut self, + data: [&[D]; N], + misc: M, + ) -> crate::Result<()> { let mut len: usize = 0; for elem in data { len = len.wrapping_add(elem.len()); @@ -187,9 +190,9 @@ where let head = match (left_free >= len, right_free >= len) { (true, _) => self.head_lhs(len), (false, true) => self.head_rhs(len), - (false, false) => _unreachable(), + (false, false) => return Err(crate::Error::CapacityOverflow), }; - self.metadata.push_front_within_cap(BlocksQueueMetadata { begin: head, len, misc }); + self.metadata.push_front(BlocksQueueMetadata { begin: head, len, misc })?; self.head = head; self.tail = tail; // SAFETY: indices point to valid memory locations @@ -201,6 +204,7 @@ where } self.data.set_len(self.data.len().wrapping_add(len)); } + Ok(()) } #[inline(always)] @@ -359,22 +363,22 @@ mod tests { let mut q = BlocksQueue::with_capacity(4, 8); check_state(&q, 0, 0, 0, 0); - q.push_front_within_cap([&[1]], ()); + q.push_front([&[1]], ()).unwrap(); check_state(&q, 1, 1, 7, 8); - q.push_front_within_cap([&[2, 3]], ()); + q.push_front([&[2, 3]], ()).unwrap(); check_state(&q, 2, 3, 5, 8); - q.push_front_within_cap([&[4, 5], &[6]], ()); + q.push_front([&[4, 5], &[6]], ()).unwrap(); check_state(&q, 3, 6, 2, 8); - q.push_front_within_cap([&[7, 8]], ()); + q.push_front([&[7, 8]], ()).unwrap(); check_state(&q, 4, 8, 0, 8); let _ = q.pop_back(); check_state(&q, 3, 7, 0, 7); - q.push_front_within_cap([&[9]], ()); + q.push_front([&[9]], ()).unwrap(); check_state(&q, 4, 8, 7, 7); let _ = q.pop_back(); @@ -383,10 +387,10 @@ mod tests { let _ = q.pop_back(); check_state(&q, 2, 3, 7, 2); - q.push_front_within_cap([&[10], &[11, 12]], ()); + q.push_front([&[10], &[11, 12]], ()).unwrap(); check_state(&q, 3, 6, 4, 2); - q.push_front_within_cap([&[13, 14]], ()); + q.push_front([&[13, 14]], ()).unwrap(); check_state(&q, 4, 8, 2, 2); let _ = q.pop_back(); @@ -398,10 +402,10 @@ mod tests { let _ = q.pop_back(); check_state(&q, 1, 2, 2, 4); - q.push_front_within_cap([&[15]], ()); + q.push_front([&[15]], ()).unwrap(); check_state(&q, 2, 3, 1, 4); - q.push_front_within_cap([&[16]], ()); + q.push_front([&[16]], ()).unwrap(); check_state(&q, 3, 4, 0, 4); let _ = q.pop_back(); @@ -424,10 +428,10 @@ mod tests { let mut q = BlocksQueue::with_capacity(2, 8); check_state(&q, 0, 0, 0, 0); - q.push_front_within_cap([&[1, 2, 3]], ()); + q.push_front([&[1, 2, 3]], ()).unwrap(); check_state(&q, 1, 3, 5, 8); - q.push_front_within_cap([&[4, 5], &[6, 7, 8]], ()); + q.push_front([&[4, 5], &[6, 7, 8]], ()).unwrap(); check_state(&q, 2, 8, 0, 8); let _ = q.pop_front(); @@ -443,12 +447,12 @@ mod tests { fn push_reserve_and_push() { let mut q = BlocksQueue::new(); q.reserve(1, 4); - q.push_front_within_cap([&[0, 1, 2, 3]], ()); + q.push_front([&[0, 1, 2, 3]], ()).unwrap(); check_state(&q, 1, 4, 0, 4); assert_eq!(q.get(0), Some(BlockRef { data: &[0, 1, 2, 3], misc: &(), range: 0..4 })); assert_eq!(q.get(1), None); q.reserve(1, 6); - q.push_front_within_cap([&[4, 5, 6, 7, 8, 9]], ()); + q.push_front([&[4, 5, 6, 7, 8, 9]], ()).unwrap(); check_state(&q, 2, 10, 0, 10); assert_eq!(q.get(0), Some(BlockRef { data: &[4, 5, 6, 7, 8, 9], misc: &(), range: 0..6 })); assert_eq!(q.get(1), Some(BlockRef { data: &[0, 1, 2, 3], misc: &(), range: 6..10 })); @@ -500,7 +504,7 @@ mod tests { let mut q = BlocksQueue::with_capacity(6, 8); check_state(&q, 0, 0, 0, 0); for _ in 0..6 { - q.push_front_within_cap([&[0]], ()); + q.push_front([&[0]], ()).unwrap(); } check_state(&q, 6, 6, 2, 8); for idx in 0..6 { @@ -513,7 +517,7 @@ mod tests { check_state(&q, 2, 2, 2, 4); assert_eq!(q.get(0).unwrap().data, &[0]); assert_eq!(q.get(1).unwrap().data, &[0]); - q.push_front_within_cap([&[1, 2, 3]], ()); + q.push_front([&[1, 2, 3]], ()).unwrap(); check_state(&q, 3, 5, 5, 4); assert_eq!(q.get(0).unwrap().data, &[1, 2, 3]); assert_eq!(q.get(1).unwrap().data, &[0]); diff --git a/wtx/src/misc/filled_buffer_writer.rs b/wtx/src/misc/filled_buffer_writer.rs index d8f37578..6ca1546f 100644 --- a/wtx/src/misc/filled_buffer_writer.rs +++ b/wtx/src/misc/filled_buffer_writer.rs @@ -118,20 +118,3 @@ impl<'vec> FilledBufferWriter<'vec> { self._curr_idx = until_suffix; } } - -#[cfg(feature = "_bench")] -#[cfg(test)] -mod bench { - #[bench] - fn extend_from_slice(b: &mut test::Bencher) { - let array: [u8; 64] = core::array::from_fn(|idx| { - let n = idx % 255; - n.try_into().unwrap_or(u8::MAX) - }); - let mut vec = alloc::vec![0; 128]; - let mut fbw = crate::misc::FilledBufferWriter::new(32, &mut vec); - b.iter(|| { - fbw._extend_from_slice(&array); - }); - } -} diff --git a/wtx/src/misc/queue.rs b/wtx/src/misc/queue.rs index dd69c024..03c295e2 100644 --- a/wtx/src/misc/queue.rs +++ b/wtx/src/misc/queue.rs @@ -22,7 +22,6 @@ macro_rules! as_slices { } use crate::misc::{ - _unreachable, queue_utils::{reserve, wrap_add, wrap_sub}, Vector, }; @@ -158,9 +157,9 @@ where } #[inline] - pub(crate) fn push_front_within_cap(&mut self, element: D) { + pub(crate) fn push_front(&mut self, element: D) -> crate::Result<()> { if self.is_full() { - _unreachable() + return Err(crate::Error::CapacityOverflow); } let len = self.data.len(); self.head = wrap_sub(self.data.capacity(), self.head, 1); @@ -169,6 +168,7 @@ where ptr::write(self.data.as_mut_ptr().add(self.head), element); self.data.set_len(len.unchecked_add(1)); } + Ok(()) } #[inline(always)] @@ -210,7 +210,7 @@ mod _proptest { let mut vec_deque = VecDeque::with_capacity(bytes.len()); for byte in bytes.iter().copied() { - queue.push_front_within_cap(byte); + queue.push_front(byte).unwrap(); vec_deque.push_front(byte); } assert_eq!((queue.capacity(), queue.len()), (vec_deque.capacity(), vec_deque.len())); @@ -255,7 +255,7 @@ mod tests { fn clear() { let mut queue = Queue::with_capacity(1); assert_eq!(queue.len(), 0); - queue.push_front_within_cap(1); + queue.push_front(1).unwrap(); assert_eq!(queue.len(), 1); queue.clear(); assert_eq!(queue.len(), 0); @@ -266,7 +266,7 @@ mod tests { let mut queue = Queue::with_capacity(1); assert_eq!(queue.get(0), None); assert_eq!(queue.get_mut(0), None); - queue.push_front_within_cap(1); + queue.push_front(1).unwrap(); assert_eq!(queue.get(0), Some(&1i32)); assert_eq!(queue.get_mut(0), Some(&mut 1i32)); } @@ -275,7 +275,7 @@ mod tests { fn pop_back() { let mut queue = Queue::with_capacity(1); assert_eq!(queue.pop_back(), None); - queue.push_front_within_cap(1); + queue.push_front(1).unwrap(); assert_eq!(queue.pop_back(), Some(1)); assert_eq!(queue.pop_back(), None); } @@ -284,16 +284,16 @@ mod tests { fn pop_front() { let mut queue = Queue::with_capacity(1); assert_eq!(queue.pop_front(), None); - queue.push_front_within_cap(1); + queue.push_front(1).unwrap(); assert_eq!(queue.pop_front(), Some(1)); assert_eq!(queue.pop_front(), None); } #[test] - fn push_front_within_cap() { + fn push_front() { let mut queue = Queue::with_capacity(1); assert_eq!(queue.len(), 0); - queue.push_front_within_cap(1); + queue.push_front(1).unwrap(); assert_eq!(queue.len(), 1); } diff --git a/wtx/src/misc/queue_utils.rs b/wtx/src/misc/queue_utils.rs index 210b11cd..e8c7922c 100644 --- a/wtx/src/misc/queue_utils.rs +++ b/wtx/src/misc/queue_utils.rs @@ -1,5 +1,5 @@ use crate::misc::{Vector, _shift_bytes}; -use core::{iter, ptr}; +use core::{hint::unreachable_unchecked, iter, ptr}; #[inline(always)] pub(crate) fn reserve(additional: usize, data: &mut Vector, head: &mut usize) -> Option @@ -19,10 +19,8 @@ where // SAFETY: Slice is allocated but not initialized let allocated = unsafe { let rslt = &mut *ptr::slice_from_raw_parts_mut(data.as_mut_ptr(), curr_cap); - #[cfg(feature = "nightly")] - { - core::hint::assert_unchecked(curr_head <= rslt.len()); - core::hint::assert_unchecked(prev_head <= prev_cap); + if curr_head > rslt.len() || prev_head > prev_cap { + unreachable_unchecked(); } rslt }; diff --git a/wtx/src/misc/span.rs b/wtx/src/misc/span.rs new file mode 100644 index 00000000..69d897d8 --- /dev/null +++ b/wtx/src/misc/span.rs @@ -0,0 +1,33 @@ +#[derive(Debug)] +pub(crate) struct _Entered<'span> { + #[cfg(feature = "tracing")] + _elem: tracing::span::Entered<'span>, + #[cfg(not(feature = "tracing"))] + _elem: core::marker::PhantomData<&'span ()>, +} + +#[derive(Debug)] +pub(crate) struct _Span { + #[cfg(feature = "tracing")] + _elem: tracing::span::Span, + #[cfg(not(feature = "tracing"))] + _elem: (), +} + +impl _Span { + pub(crate) fn _new( + #[cfg(feature = "tracing")] _elem: tracing::span::Span, + #[cfg(not(feature = "tracing"))] _elem: (), + ) -> Self { + Self { _elem } + } + + pub(crate) fn _enter(&self) -> _Entered<'_> { + _Entered { + #[cfg(feature = "tracing")] + _elem: self._elem.enter(), + #[cfg(not(feature = "tracing"))] + _elem: core::marker::PhantomData, + } + } +} diff --git a/wtx/src/misc/stream.rs b/wtx/src/misc/stream.rs index a8b67b2d..eb24b97d 100644 --- a/wtx/src/misc/stream.rs +++ b/wtx/src/misc/stream.rs @@ -13,12 +13,12 @@ macro_rules! _local_write_all { macro_rules! _local_write_all_vectored { ($bytes:expr, |$io_slices:ident| $write:expr) => {{ let mut buffer = [std::io::IoSlice::new(&[]); N]; - let $io_slices = crate::misc::stream::convert_to_io_slices(&mut buffer, $bytes); + let mut $io_slices = crate::misc::stream::convert_to_io_slices(&mut buffer, $bytes); while !$io_slices.is_empty() { match $write { Err(e) => return Err(e.into()), Ok(0) => return Err(crate::Error::UnexpectedEOF), - Ok(n) => super::advance_slices(&mut &$bytes[..], &mut &mut *$io_slices, n), + Ok(n) => super::advance_slices(&mut &$bytes[..], &mut $io_slices, n), } } }}; @@ -607,29 +607,32 @@ mod tokio_rustls { )] #[cfg(feature = "std")] #[inline] -fn advance_slices<'bytes, 'buffer>( +fn advance_slices<'bytes>( bytes: &mut &[&'bytes [u8]], - io_slices: &'buffer mut &'buffer mut [std::io::IoSlice<'bytes>], - mut written: usize, + io_slices: &mut &mut [std::io::IoSlice<'bytes>], + written: usize, ) { - let mut idx = 0; - for (local_idx, io_slice) in io_slices.iter().enumerate() { - let Some(diff) = written.checked_sub(io_slice.len()) else { + let mut first_slice_idx = written; + let mut slices_idx: usize = 0; + for io_slice in io_slices.iter() { + let Some(diff) = first_slice_idx.checked_sub(io_slice.len()) else { break; }; - written = diff; - idx = local_idx; + first_slice_idx = diff; + slices_idx = slices_idx.wrapping_add(1); } - let locals_opt = bytes.get(idx..).and_then(|el| Some((el, io_slices.get_mut(idx..)?))); - let Some((local_bytes, local_io_slices)) = locals_opt else { + let Some((local_bytes @ [first_bytes, ..], local_io_slices)) = bytes + .get(slices_idx..) + .and_then(|el| Some((el, core::mem::take(io_slices).get_mut(slices_idx..)?))) + else { return; }; *bytes = local_bytes; *io_slices = local_io_slices; - let ([first_bytes, ..], [first_io_slices, ..]) = (bytes, io_slices) else { + let [first_io_slices, ..] = io_slices else { return; }; - *first_io_slices = std::io::IoSlice::new(first_bytes.get(written..).unwrap_or_default()); + *first_io_slices = std::io::IoSlice::new(first_bytes.get(first_slice_idx..).unwrap_or_default()); } #[cfg(feature = "std")] diff --git a/wtx/src/misc/vector.rs b/wtx/src/misc/vector.rs index 9e58f7ea..c9894a02 100644 --- a/wtx/src/misc/vector.rs +++ b/wtx/src/misc/vector.rs @@ -1,7 +1,8 @@ -use crate::misc::{Lease, _unreachable}; +use crate::misc::Lease; use alloc::vec::Vec; use core::{ fmt::{Debug, Formatter}, + hint::unreachable_unchecked, ops::{Deref, DerefMut}, ptr, }; @@ -26,9 +27,11 @@ where #[inline] pub fn with_capacity(cap: usize) -> Self { let data = Vec::with_capacity(cap); - #[cfg(feature = "nightly")] + // SAFETY: There is enough capacity unsafe { - core::hint::assert_unchecked(data.len().unchecked_add(cap) <= data.capacity()); + if data.len().unchecked_add(cap) > data.capacity() { + unreachable_unchecked(); + } } Self { data } } @@ -61,23 +64,29 @@ where /// Iterates over the slice `other`, copies each element, and then appends /// it to this vector. The `other` slice is traversed in-order. - /// - /// # Panics - /// - /// If memory reservation fails. #[inline] - pub fn extend_from_slice(&mut self, other: &[D]) { - self.reserve(other.len()); - self.extend_from_slice_within_cap(other); + pub fn extend_from_slice(&mut self, other: &[D]) -> crate::Result<()> { + let len = self.len(); + let other_len = other.len(); + // SAFETY: There is enough capacity + unsafe { + let new_len = len.unchecked_add(other_len); + if new_len > self.data.capacity() { + return Err(crate::Error::CapacityOverflow); + } + ptr::copy_nonoverlapping(other.as_ptr(), self.data.as_mut_ptr().add(len), other_len); + self.set_len(new_len); + } + Ok(()) } - /// Generalization of [Self::extend_from_slice_within_cap]. - /// - /// # Panics - /// - /// If memory reservation fails. + /// Generalization of [Self::extend_from_slice]. + #[allow( + // False-positive + clippy::missing_panics_doc + )] #[inline] - pub fn extend_from_slices_within_cap(&mut self, others: &[U; N]) + pub fn extend_from_slices(&mut self, others: &[U; N]) -> crate::Result<()> where U: Lease<[D]>, { @@ -95,44 +104,9 @@ where } self.reserve(len); for other in others { - self.extend_from_slice_within_cap(other.lease()); - } - } - - /// Iterates over the slice `other`, copies each element, and then appends - /// it to this vector. The `other` slice is traversed in-order. - /// - /// # Panics - /// - /// If there is no available capacity. - #[allow( - // Programming error that should be handled by the caller - clippy::panic - )] - #[inline] - pub fn extend_from_slice_within_cap(&mut self, other: &[D]) { - let len = self.len(); - let other_len = other.len(); - // SAFETY: There is enough capacity - unsafe { - let new_len = len.unchecked_add(other_len); - if new_len > self.data.capacity() { - panic!("Must be called with sufficient capacity"); - } - ptr::copy_nonoverlapping(other.as_ptr(), self.data.as_mut_ptr().add(len), other_len); - self.set_len(new_len); + self.extend_from_slice(other.lease())?; } - } - - /// Appends an element to the back of the collection. - /// - /// # Panics - /// - /// If memory reservation fails. - #[inline] - pub fn push(&mut self, value: D) { - self.reserve(1); - self.push_within_cap(value); + Ok(()) } /// Appends an element to the back of the collection. @@ -141,16 +115,17 @@ where /// /// If there is no available capacity. #[inline] - pub fn push_within_cap(&mut self, value: D) { + pub fn push(&mut self, value: D) -> crate::Result<()> { let len = self.data.len(); if len >= self.data.capacity() { - _unreachable(); + return Err(crate::Error::CapacityOverflow); } // SAFETY: There is enough capacity unsafe { ptr::write(self.data.as_mut_ptr().add(len), value); self.set_len(len.unchecked_add(1)); } + Ok(()) } /// Reserves capacity for at least `additional` more elements to be inserted @@ -165,11 +140,11 @@ where #[inline] pub fn reserve(&mut self, additional: usize) { self.data.reserve(additional); - #[cfg(feature = "nightly")] + // SAFETY: There is enough capacity unsafe { - core::hint::assert_unchecked( - self.data.len().unchecked_add(additional) <= self.data.capacity(), - ); + if self.data.len().unchecked_add(additional) > self.data.capacity() { + unreachable_unchecked(); + } } } @@ -239,89 +214,99 @@ mod bench { use crate::misc::vector::Vector; use alloc::vec::Vec; - macro_rules! extend_from_slice { - ( - $instance:expr, - $extend_from_slice_method:ident, - $reserve_method:ident - ) => { - $instance.$reserve_method(16 * 8); - extend_from_slice!( - @$instance, - $extend_from_slice_method, - $reserve_method, - 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 - ); - }; - ( - @$instance:expr, - $extend_from_slice_method:ident, - $reserve_method:ident, - $($n:literal),* - ) => { - $( - let _ = $n; - $instance.$extend_from_slice_method(&[0, 1, 2, 4, 5, 6, 7]); - )* + #[rustfmt::skip] + macro_rules! extend_from_slice_batch { + ($instance_cb:expr) => { + $instance_cb(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]); + $instance_cb(&[16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]); + $instance_cb(&[32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]); + $instance_cb(&[48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63]); + $instance_cb(&[64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]); + $instance_cb(&[80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95]); + $instance_cb(&[96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111]); + $instance_cb(&[112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127]); + $instance_cb(&[128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143]); + $instance_cb(&[144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159]); + $instance_cb(&[160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175]); + $instance_cb(&[176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191]); + $instance_cb(&[192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207]); + $instance_cb(&[208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223]); + $instance_cb(&[224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239]); + $instance_cb(&[240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255]); }; } - macro_rules! push { - ( - $instance:expr, - $push_method:ident, - $reserve_method:ident - ) => { - $instance.$reserve_method(64); - push!( - @$instance, - $push_method, - $reserve_method, - 01, 02, 03, 04, 05, 06, 07, 08, 09, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, - 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, - 61, 62, 63, 64 - ) - }; - ( - @$instance:expr, - $push_method:ident, - $reserve_method:ident, - $($n:literal),* - ) => { - $($instance.$push_method($n);)* + #[rustfmt::skip] + macro_rules! push_batch { + ($instance_cb:expr) => { + $instance_cb(0); $instance_cb(1); $instance_cb(2); $instance_cb(3); $instance_cb(4); $instance_cb(5); $instance_cb(6); $instance_cb(7); + $instance_cb(8); $instance_cb(9); $instance_cb(10); $instance_cb(11); $instance_cb(12); $instance_cb(13); $instance_cb(14); $instance_cb(15); + $instance_cb(16); $instance_cb(17); $instance_cb(18); $instance_cb(19); $instance_cb(20); $instance_cb(21); $instance_cb(22); $instance_cb(23); + $instance_cb(24); $instance_cb(25); $instance_cb(26); $instance_cb(27); $instance_cb(28); $instance_cb(29); $instance_cb(30); $instance_cb(31); + $instance_cb(32); $instance_cb(33); $instance_cb(34); $instance_cb(35); $instance_cb(36); $instance_cb(37); $instance_cb(38); $instance_cb(39); + $instance_cb(40); $instance_cb(41); $instance_cb(42); $instance_cb(43); $instance_cb(44); $instance_cb(45); $instance_cb(46); $instance_cb(47); + $instance_cb(48); $instance_cb(49); $instance_cb(50); $instance_cb(51); $instance_cb(52); $instance_cb(53); $instance_cb(54); $instance_cb(55); + $instance_cb(56); $instance_cb(57); $instance_cb(58); $instance_cb(59); $instance_cb(60); $instance_cb(61); $instance_cb(62); $instance_cb(63); }; } #[bench] - fn extend_from_slice(b: &mut test::Bencher) { - let mut vec = Vec::default(); + fn extend_from_slice_local(b: &mut test::Bencher) { + let mut vec = Vector::default(); b.iter(|| { - extend_from_slice!(vec, extend_from_slice, reserve); + vec.reserve(256 * 4); + extend_from_slice_batch!(|elem| { + vec.extend_from_slice(elem).unwrap(); + vec.extend_from_slice(elem).unwrap(); + vec.extend_from_slice(elem).unwrap(); + vec.extend_from_slice(elem).unwrap(); + }); }); } #[bench] - fn extend_from_slice_within_cap(b: &mut test::Bencher) { - let mut vec = Vector::default(); + fn extend_from_slice_std(b: &mut test::Bencher) { + let mut vec = Vec::default(); b.iter(|| { - extend_from_slice!(vec, extend_from_slice_within_cap, reserve); + vec.reserve(256 * 4); + extend_from_slice_batch!(|elem| { + vec.extend_from_slice(elem); + vec.extend_from_slice(elem); + vec.extend_from_slice(elem); + vec.extend_from_slice(elem); + }); }); } #[bench] - fn push(b: &mut test::Bencher) { - let mut vec = Vec::default(); + fn push_local(b: &mut test::Bencher) { + let mut vec = Vector::default(); b.iter(|| { - push!(vec, push, reserve); + vec.reserve(64 * 8); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); + push_batch!(|elem| vec.push(elem).unwrap()); }); } #[bench] - fn push_within_cap(b: &mut test::Bencher) { - let mut vec = Vector::default(); + fn push_std(b: &mut test::Bencher) { + let mut vec = Vec::default(); b.iter(|| { - push!(vec, push_within_cap, reserve); + vec.reserve(64 * 8); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); + push_batch!(|elem| vec.push(elem)); }); } } diff --git a/wtx/src/pool/fixed_pool.rs b/wtx/src/pool/fixed_pool.rs index 66e8a922..dc90c80b 100644 --- a/wtx/src/pool/fixed_pool.rs +++ b/wtx/src/pool/fixed_pool.rs @@ -49,7 +49,7 @@ where indcs: { let mut rslt = Queue::with_capacity(len); for idx in 0..len { - rslt.push_front_within_cap(idx); + let _rslt = rslt.push_front(idx); } ::new(IC::Item::new(rslt)) }, @@ -139,7 +139,7 @@ where /// Releases the inner lock. #[inline] pub async fn release(self) -> LG { - self.indcs.lock().await.push_front_within_cap(self.idx); + let _rslt = self.indcs.lock().await.push_front(self.idx); self.lock_guard } } diff --git a/wtx/src/pool/resource_manager.rs b/wtx/src/pool/resource_manager.rs index 06e7f7b4..76adfea5 100644 --- a/wtx/src/pool/resource_manager.rs +++ b/wtx/src/pool/resource_manager.rs @@ -159,21 +159,21 @@ pub(crate) mod http2 { }; /// Manages HTTP/2 resources for clients. - pub type Http2ClientBufferRM = SimpleRM>; + pub type Http2ClientBufferRM = SimpleRM; /// Manages HTTP/2 resources for servers. - pub type Http2ServerBufferRM = SimpleRM>; + pub type Http2ServerBufferRM = SimpleRM; /// Manages resources for HTTP2 requests and responses. pub type ReqResBufferRM = SimpleRM; - type Http2RM = SimpleRM>; + type Http2RM = SimpleRM; - impl Http2RM + impl Http2RM where RNG: Clone + Rng, { /// Instance of [Http2ClientRM] or [Http2ServerRM]. pub fn http2_buffer(rng: RNG) -> Self { - fn cb(rng: &RNG) -> crate::Result> + fn cb(rng: &RNG) -> crate::Result where RNG: Clone + Rng, { diff --git a/wtx/src/web_socket.rs b/wtx/src/web_socket.rs index 5b66171a..ac9b59bc 100644 --- a/wtx/src/web_socket.rs +++ b/wtx/src/web_socket.rs @@ -663,7 +663,7 @@ where ) -> crate::Result<()> { let mut is_payload_filled = false; pb._expand_following(rfi.frame_len); - for _ in 0..rfi.frame_len { + for _ in 0..=rfi.frame_len { if *read >= rfi.frame_len { is_payload_filled = true; break;