Skip to content

Commit

Permalink
[HTTP/2] Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
c410-f3r committed May 6, 2024
1 parent f080431 commit c42d4ef
Show file tree
Hide file tree
Showing 48 changed files with 1,016 additions and 724 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/bencher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ on:

jobs:
benchmark_with_bencher:
continue-on-error: true
name: Continuous Benchmarking with Bencher
runs-on: ubuntu-latest
steps:
- if: always()
- uses: actions/checkout@v4
- uses: bencherdev/bencher@main
- name: Track Benchmarks with Bencher
run: |
bencher run \
--branch main \
--err \
--project wtx \
--testbed ubuntu-latest \
--token "${{ secrets.BENCHER_API_TOKEN }}" \
"cargo bench --all-features"
1 change: 0 additions & 1 deletion .scripts/internal-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ $rt test-with-features wtx httparse
$rt test-with-features wtx md-5
$rt test-with-features wtx memchr
$rt test-with-features wtx miniserde
$rt test-with-features wtx nightly
$rt test-with-features wtx orm
$rt test-with-features wtx parking_lot
$rt test-with-features wtx pool
Expand Down
2 changes: 0 additions & 2 deletions .scripts/podman-stop.sh

This file was deleted.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ A collection of different transport implementations and related tools focused pr
2. [Database Client](https://c410-f3r.github.io/wtx/database/client-connection.html)
3. [Database Object–Relational Mapping](https://c410-f3r.github.io/wtx/database/object%E2%80%93relational-mapping.html)
4. [Database Schema Manager](https://c410-f3r.github.io/wtx/database/schema-management.html)
6. [HTTP2 Client/Server](https://c410-f3r.github.io/wtx/http2/index.html)
7. [Pool Manager](https://c410-f3r.github.io/wtx/pool_manager/index.html)
8. [WebSocket Client/Server](https://c410-f3r.github.io/wtx/web-socket/index.html)
5. [HTTP2 Client/Server](https://c410-f3r.github.io/wtx/http2/index.html)
6. [Pool Manager](https://c410-f3r.github.io/wtx/pool_manager/index.html)
7. [WebSocket Client/Server](https://c410-f3r.github.io/wtx/web-socket/index.html)

Embedded devices with a working heap allocator can use this `no_std` crate.

Expand Down
26 changes: 13 additions & 13 deletions wtx-docs/src/http2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ Provides low and high level abstractions to interact with clients and servers.
use std::net::ToSocketAddrs;
use tokio::net::TcpStream;
use wtx::{
http::{Headers, Method, Request},
http2::{Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer},
misc::{from_utf8_basic, UriString},
http::Method,
http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer},
misc::{from_utf8_basic, UriRef},
rng::StaticRng,
};
#[tokio::main]
async fn main() {
let uri = UriString::new("127.0.0.1:9000");
let mut rrb = ReqResBuffer::default();
rrb.data.reserve(6);
rrb.data.extend_from_slice(b"Hello!").unwrap();
rrb.uri.push_str("127.0.0.1:9000").unwrap();
let uri = UriRef::new(&rrb.uri);
let mut http2 = Http2Tokio::connect(
Http2Buffer::new(StaticRng::default()),
Http2Params::default(),
TcpStream::connect(uri.host().to_socket_addrs().unwrap().next().unwrap()).await.unwrap(),
)
.await
.unwrap();
let mut rrb = ReqResBuffer::default();
let mut stream = http2.stream().await.unwrap();
let res = stream
.send_req_recv_res(
Request::http2(&"Hello!", &Headers::new(0), Method::Get, uri.to_ref()),
&mut rrb,
)
.await
.unwrap();
println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap())
stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap();
let res = stream.recv_res(&mut rrb).await.unwrap();
println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap());
http2.send_go_away(ErrorCode::NoError).await.unwrap();
}
```
2 changes: 1 addition & 1 deletion wtx-ui/src/embed_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(crate) async fn embed_migrations(input: &str, output: &str) -> wtx::Result<(
buffer.write_fmt(format_args!("];({mg_name},{mg_name}_MIGRATIONS)}},"))?;
}

buffer.push_str("];");
buffer.push_str("];\n");

OpenOptions::new()
.create(true)
Expand Down
1 change: 0 additions & 1 deletion wtx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ glommio = ["futures-lite", "dep:glommio", "std"]
http1 = ["httparse"]
http2 = ["ahash", "hashbrown", "tokio"]
miniserde = ["dep:miniserde", "std"]
nightly = []
optimization = ["atoi", "memchr", "simdutf8"]
orm = ["database", "dep:smallvec"]
pool = []
Expand Down
25 changes: 12 additions & 13 deletions wtx/examples/http2-client-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,29 @@ mod common;
use std::net::ToSocketAddrs;
use tokio::net::TcpStream;
use wtx::{
http::{Headers, Method, Request},
http2::{Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer},
misc::{from_utf8_basic, UriString},
http::Method,
http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer},
misc::{from_utf8_basic, UriRef},
rng::StaticRng,
};

#[tokio::main]
async fn main() {
let uri = UriString::new(common::_uri_from_args());
let mut rrb = ReqResBuffer::default();
rrb.data.reserve(6);
rrb.data.extend_from_slice(b"Hello!").unwrap();
rrb.uri.push_str(&common::_uri_from_args()).unwrap();
let uri = UriRef::new(&rrb.uri);
let mut http2 = Http2Tokio::connect(
Http2Buffer::new(StaticRng::default()),
Http2Params::default(),
TcpStream::connect(uri.host().to_socket_addrs().unwrap().next().unwrap()).await.unwrap(),
)
.await
.unwrap();
let mut rrb = ReqResBuffer::default();
let mut stream = http2.stream().await.unwrap();
let res = stream
.send_req_recv_res(
Request::http2(&"Hello!", &Headers::new(0), Method::Get, uri.to_ref()),
&mut rrb,
)
.await
.unwrap();
println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap())
stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap();
let res = stream.recv_res(&mut rrb).await.unwrap();
println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap());
http2.send_go_away(ErrorCode::NoError).await.unwrap();
}
1 change: 0 additions & 1 deletion wtx/examples/http2-server-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ async fn handle<'buffer>(
req: RequestMut<'buffer, 'buffer, 'buffer, ByteVector>,
) -> Result<Response<(&'buffer mut ByteVector, &'buffer mut Headers)>, ()> {
req.headers.clear();
println!("{:?}", core::str::from_utf8(req.data));
Ok(match (req.uri.path(), req.method) {
("/", Method::Get) => Response::http2((req.data, req.headers), StatusCode::Ok),
_ => {
Expand Down
7 changes: 0 additions & 7 deletions wtx/src/client_api_framework/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ macro_rules! _create_set_of_request_throttling {
};
}

macro_rules! _debug {
($($tt:tt)+) => {
#[cfg(feature = "tracing")]
tracing::debug!($($tt)+);
};
}

macro_rules! generic_data_format_doc {
($ty:literal) => {
concat!("Wrapper used in every generic ", $ty, " to manage different internal implementations.")
Expand Down
2 changes: 1 addition & 1 deletion wtx/src/database/client/postgres/executor/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ where
) -> crate::Result<()> {
let mut is_payload_filled = false;
nb._expand_following(len);
for _ in 0..len {
for _ in 0..=len {
if *read >= len {
is_payload_filled = true;
break;
Expand Down
11 changes: 6 additions & 5 deletions wtx/src/http/abstract_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,18 @@ where
value: &[u8],
is_sensitive: bool,
cb: impl FnMut(M, &mut [u8]),
) {
) -> crate::Result<()> {
let local_len = name.len().wrapping_add(value.len());
if local_len > self.max_bytes {
self.clear();
return;
return Ok(());
}
self.remove_until_max_bytes(local_len, cb);
self.bq.push_front_within_cap(
self.bq.push_front(
[name, value],
Metadata { is_active: true, is_sensitive, misc, name_len: name.len() },
);
)?;
Ok(())
}

#[inline]
Expand All @@ -98,7 +99,7 @@ where
Some(())
}

#[inline]
#[inline(always)]
pub(crate) fn reserve(&mut self, bytes: usize, headers: usize) {
self.bq.reserve(headers, bytes.min(self.max_bytes));
}
Expand Down
6 changes: 3 additions & 3 deletions wtx/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ where
ResourceManager = SimpleRM<
crate::Error,
(),
(ReqResBuffer, Http2<Http2Buffer<true>, S, SDC, true>),
(ReqResBuffer, Http2<Http2Buffer, S, SDC, true>),
>,
>,
S: Stream,
SDC: RefCounter,
SDC::Item: Lock<Http2Data<Http2Buffer<true>, S, true>>,
SDC::Item: Lock<Http2Data<Http2Buffer, S, true>>,
{
#[cfg(feature = "tokio-rustls")]
#[inline]
Expand All @@ -41,7 +41,7 @@ where
Response<
<P::Guard<'this> as LockGuard<
'this,
(ReqResBuffer, Http2<Http2Buffer<true>, S, SDC, true>),
(ReqResBuffer, Http2<Http2Buffer, S, SDC, true>),
>>::Mapped<ReqResBuffer>,
>,
> {
Expand Down
6 changes: 3 additions & 3 deletions wtx/src/http/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl Headers {
/// If the sum of `name` and `value` is greater than the maximum number of bytes, then the first
/// inserted entries will be deleted accordantly.
#[inline]
pub fn push_front(&mut self, name: &[u8], value: &[u8], is_sensitive: bool) {
self.ab.push_front((), name, value, is_sensitive, |_, _| {});
pub fn push_front(&mut self, name: &[u8], value: &[u8], is_sensitive: bool) -> crate::Result<()> {
self.ab.push_front((), name, value, is_sensitive, |_, _| {})
}

/// Removes all a pair referenced by `idx`.
Expand All @@ -106,7 +106,7 @@ impl Headers {
/// to the number of headers.
///
/// Bytes are capped according to the specified `max_bytes`.
#[inline]
#[inline(always)]
pub fn reserve(&mut self, bytes: usize, headers: usize) {
self.ab.reserve(bytes, headers);
}
Expand Down
12 changes: 0 additions & 12 deletions wtx/src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,4 @@ where
pub fn http2(data: D, headers: H, method: Method, uri: Uri<U>) -> Self {
Self { data, headers, method, uri, version: Version::Http2 }
}

/// See [RequestRef].
#[inline]
pub fn to_ref(&self) -> RequestRef<'_, '_, '_, D> {
RequestRef {
data: &self.data,
headers: self.headers.lease(),
method: self.method,
uri: self.uri.to_ref(),
version: self.version,
}
}
}
41 changes: 22 additions & 19 deletions wtx/src/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,17 @@ mod tests;
mod u31;
mod uri_buffer;
mod window_update_frame;
mod write_stream;

use crate::{
http2::misc::{
apply_initial_params, default_stream_frames, read_frame_until_cb_unknown_id, write_to_stream,
},
http2::misc::{apply_initial_params, default_stream_frames, read_frame_until_cb_unknown_id},
misc::{ConnectionState, LeaseMut, Lock, RefCounter, Stream},
};
pub use client_stream::ClientStream;
pub(crate) use continuation_frame::ContinuationFrame;
use core::marker::PhantomData;
pub(crate) use data_frame::DataFrame;
pub(crate) use error_code::ErrorCode;
pub use error_code::ErrorCode;
pub(crate) use frame_init::{FrameHeaderTy, FrameInit};
pub(crate) use go_away_frame::GoAwayFrame;
pub(crate) use headers_frame::HeadersFrame;
Expand All @@ -78,6 +77,7 @@ pub(crate) use window_update_frame::WindowUpdateFrame;
pub(crate) const BODY_LEN_DEFAULT: u32 = 4_194_304;
pub(crate) const BUFFERED_FRAMES_NUM_DEFAULT: u8 = 16;
pub(crate) const CACHED_HEADERS_LEN_DEFAULT: u32 = 8_192;
pub(crate) const CACHED_HEADERS_LEN_LOWER_BOUND: u32 = 4_096;
pub(crate) const EXPANDED_HEADERS_LEN_DEFAULT: u32 = 4_096;
pub(crate) const FRAME_LEN_DEFAULT: u32 = 1_048_576;
pub(crate) const FRAME_LEN_LOWER_BOUND: u32 = 16_384;
Expand Down Expand Up @@ -108,7 +108,7 @@ pub struct Http2<HB, HD, S, const IS_CLIENT: bool> {

impl<HB, HD, S, const IS_CLIENT: bool> Http2<HB, HD, S, IS_CLIENT>
where
HB: LeaseMut<Http2Buffer<IS_CLIENT>>,
HB: LeaseMut<Http2Buffer>,
HD: Lock<Resource = Http2Data<HB, S, IS_CLIENT>>,
S: Stream,
{
Expand All @@ -133,7 +133,7 @@ where
// `Send` bounds that makes anyone feel happy and delightful for uncountable hours.
impl<HB, HD, S> Http2<HB, HD, S, false>
where
HB: LeaseMut<Http2Buffer<false>>,
HB: LeaseMut<Http2Buffer>,
HD: RefCounter,
for<'guard> HD::Item: Lock<
Guard<'guard> = MutexGuard<'guard, Http2Data<HB, S, false>>,
Expand All @@ -150,7 +150,7 @@ where
if &buffer != PREFACE {
return Err(crate::Error::NoPreface);
}
write_to_stream([hp.to_settings_frame().bytes(&mut [0; 45])], true, &mut stream).await?;
stream.write_all(hp.to_settings_frame().bytes(&mut [0; 45])).await?;
apply_initial_params(hb.lease_mut(), &hp)?;
Ok(Self {
phantom: PhantomData,
Expand All @@ -165,6 +165,7 @@ where
&mut self,
rrb: &mut ReqResBuffer,
) -> crate::Result<ReadFrameRslt<ServerStream<HB, HD, S>>> {
rrb.clear();
let mut guard = self.hd.lock().await;
if *guard.streams_num_mut() >= guard.hp().max_streams_num() {
return Err(crate::Error::ExceedAmountOfActiveConcurrentStreams);
Expand All @@ -186,13 +187,18 @@ where
);
let stream_frame_entry = guard.hb_mut().streams_frames.entry(rfi.stream_id);
let _ = stream_frame_entry.or_insert_with(default_stream_frames);
Ok(ReadFrameRslt::Resource(ServerStream::new(self.hd.clone(), rfi, stream_state)))
Ok(ReadFrameRslt::Resource(ServerStream::new(
self.hd.clone(),
_trace_span!("Creating server stream", stream_id = rfi.stream_id.u32()),
rfi,
stream_state,
)))
}
}

impl<HB, HD, S> Http2<HB, HD, S, true>
where
HB: LeaseMut<Http2Buffer<true>>,
HB: LeaseMut<Http2Buffer>,
HD: RefCounter,
HD::Item: Lock<Resource = Http2Data<HB, S, true>>,
S: Stream,
Expand All @@ -202,7 +208,7 @@ where
pub async fn connect(mut hb: HB, hp: Http2Params, mut stream: S) -> crate::Result<Self> {
hb.lease_mut().clear();
stream.write_all(PREFACE).await?;
write_to_stream([hp.to_settings_frame().bytes(&mut [0; 45])], true, &mut stream).await?;
stream.write_all(hp.to_settings_frame().bytes(&mut [0; 45])).await?;
apply_initial_params(hb.lease_mut(), &hp)?;
Ok(Self {
phantom: PhantomData,
Expand All @@ -220,14 +226,11 @@ where
*guard.streams_num_mut() = guard.streams_num_mut().wrapping_add(1);
let stream_id = self.stream_id;
self.stream_id = self.stream_id.wrapping_add(U31::TWO);
let _ = self
.hd
.lock()
.await
.hb_mut()
.streams_frames
.entry(stream_id)
.or_insert_with(default_stream_frames);
Ok(ClientStream::idle(self.hd.clone(), stream_id))
let _ = guard.hb_mut().streams_frames.entry(stream_id).or_insert_with(default_stream_frames);
Ok(ClientStream::idle(
self.hd.clone(),
_trace_span!("Creating client stream", stream_id = stream_id.u32()),
stream_id,
))
}
}
Loading

0 comments on commit c42d4ef

Please sign in to comment.