diff --git a/.github/workflows/bencher.yaml b/.github/workflows/bencher.yaml index 12c577bb..b54e296d 100644 --- a/.github/workflows/bencher.yaml +++ b/.github/workflows/bencher.yaml @@ -3,6 +3,7 @@ on: push: branches: - main + workflow_dispatch: jobs: benchmark_with_bencher: @@ -11,12 +12,22 @@ jobs: - uses: actions/checkout@v4 - uses: bencherdev/bencher@main - name: Track Benchmarks with Bencher - continue-on-error: true run: | - bencher run \ - --branch main \ - --err \ - --project wtx \ - --testbed ubuntu-latest \ - --token "${{ secrets.BENCHER_API_TOKEN }}" \ - "cargo bench --all-features" \ No newline at end of file + set +e + set +o pipefail + + function fun() { + bencher run \ + --branch main \ + --err \ + --project wtx \ + --testbed ubuntu-latest \ + --token "${{ secrets.BENCHER_API_TOKEN }}" \ + "cargo bench --all-features" + return $? + } + + fun + echo $? + exit 0 + \ No newline at end of file diff --git a/.scripts/internal-tests.sh b/.scripts/internal-tests.sh index 7a5becde..e0e2ca89 100755 --- a/.scripts/internal-tests.sh +++ b/.scripts/internal-tests.sh @@ -5,7 +5,7 @@ $rt rustfmt $rt clippy -cargo miri test -p wtx +MIRIFLAGS="-Zmiri-tree-borrows" cargo miri test -p wtx # WTX @@ -98,5 +98,4 @@ cargo check --bin autobahn-server --features "async-send,flate2,optimization,poo cargo check --example database-client-postgres-tokio-rustls --features "_tokio-rustls-client,postgres" cargo check --example web-socket-client-raw-tokio-rustls --features "_tokio-rustls-client,web-socket-handshake" -cargo check --example web-socket-server-raw-tokio --features "async-send,pool,tokio,web-socket-handshake" cargo check --example web-socket-server-raw-tokio-rustls --features "_tokio-rustls-server,web-socket-handshake" \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index cde6fadb..113bf0a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", "syn_derive", ] @@ -568,9 +568,9 @@ checksum = "981520c98f422fcc584dc1a95c334e6953900b9106bc47a9839b81790009eb21" [[package]] name = "cc" -version = "1.0.96" +version = "1.0.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065a29261d53ba54260972629f9ca6bffa69bac13cd1fed61420f7fa68b9f8bd" +checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" dependencies = [ "jobserver", "libc", @@ -650,7 +650,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -827,7 +827,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -1046,9 +1046,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", "windows-sys 0.52.0", @@ -1238,7 +1238,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -1298,9 +1298,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "js-sys", @@ -1823,7 +1823,7 @@ checksum = "f28c501b91dabb672c4b303fb6ea259022ab21ed8d06a457ac21b70e479fb367" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2021,7 +2021,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2139,9 +2139,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" dependencies = [ "unicode-ident", ] @@ -2382,9 +2382,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc_version" @@ -2447,9 +2447,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.5.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" @@ -2464,9 +2464,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "scoped-tls" @@ -2501,15 +2501,15 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" +checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c" dependencies = [ "serde_derive", ] @@ -2528,20 +2528,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" +checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", @@ -2761,7 +2761,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2772,7 +2772,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2794,9 +2794,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.60" +version = "2.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +checksum = "c993ed8ccba56ae856363b1845da7266a7cb78e1d146c8a32d54b45a8b831fc9" dependencies = [ "proc-macro2", "quote", @@ -2812,7 +2812,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2839,27 +2839,27 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] name = "thiserror" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" +checksum = "579e9083ca58dd9dcf91a9923bb9054071b9ebbd800b342194c9feb0ee89fc18" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" +checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2912,7 +2912,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -2968,7 +2968,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.7", + "winnow 0.6.8", ] [[package]] @@ -2990,7 +2990,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -3031,12 +3031,11 @@ dependencies = [ [[package]] name = "trybuild" -version = "1.0.92" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b69ff3ed900f74eb1e0d9bdd3df38da829dc4a26531674f6f019ca7c093c8d" +checksum = "bfe21c256d6fba8499cf9d9b1c24971bec43a369d81c52e024adc7670cf112df" dependencies = [ "glob", - "once_cell", "serde", "serde_derive", "serde_json", @@ -3153,7 +3152,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", "wasm-bindgen-shared", ] @@ -3187,7 +3186,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3407,9 +3406,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b9415ee827af173ebb3f15f9083df5a122eb93572ec28741fb153356ea2578" +checksum = "c3c52e9c97a68071b23e836c9380edae937f17b9c4667bd021973efc689f618d" dependencies = [ "memchr", ] @@ -3540,22 +3539,22 @@ checksum = "791978798f0597cfc70478424c2b4fdc2b7a8024aaff78497ef00f24ef674193" [[package]] name = "zerocopy" -version = "0.7.32" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.32" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] [[package]] @@ -3575,5 +3574,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.61", ] diff --git a/wtx-docs/src/http2/README.md b/wtx-docs/src/http2/README.md index 859211cc..eaba33d4 100644 --- a/wtx-docs/src/http2/README.md +++ b/wtx-docs/src/http2/README.md @@ -6,7 +6,7 @@ Provides low and high level abstractions to interact with clients and servers. use std::net::ToSocketAddrs; use tokio::net::TcpStream; use wtx::{ - http::Method, + http::{Method, RequestStr}, http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, misc::{from_utf8_basic, UriRef}, rng::StaticRng, @@ -14,11 +14,7 @@ use wtx::{ #[tokio::main] async fn main() { - let mut rrb = ReqResBuffer::default(); - rrb.data.reserve(6); - rrb.data.extend_from_slice(b"Hello!").unwrap(); - rrb.uri.push_str("127.0.0.1:9000").unwrap(); - let uri = UriRef::new(&rrb.uri); + let uri = UriRef::new("127.0.0.1:9000"); let mut http2 = Http2Tokio::connect( Http2Buffer::new(StaticRng::default()), Http2Params::default(), @@ -27,9 +23,16 @@ async fn main() { .await .unwrap(); let mut stream = http2.stream().await.unwrap(); - stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); - let res = stream.recv_res(&mut rrb).await.unwrap(); - println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()); + stream + .send_req(RequestStr::http2(b"Hello!", Method::Get, uri)) + .await + .unwrap() + .resource() + .unwrap(); + let mut rrb = ReqResBuffer::default(); + let res = stream.recv_res(&mut rrb).await.unwrap().resource().unwrap(); + println!("{}", from_utf8_basic(res.body()).unwrap()); http2.send_go_away(ErrorCode::NoError).await.unwrap(); } + ``` \ No newline at end of file diff --git a/wtx/Cargo.toml b/wtx/Cargo.toml index 1d0a22b5..a9c389a9 100644 --- a/wtx/Cargo.toml +++ b/wtx/Cargo.toml @@ -28,11 +28,6 @@ name = "web-socket-client-raw-tokio-rustls" path = "examples/web-socket-client-raw-tokio-rustls.rs" required-features = ["_tokio-rustls-client", "tokio/io-std", "web-socket-handshake"] -[[example]] -name = "web-socket-server-raw-tokio" -path = "examples/web-socket-server-raw-tokio.rs" -required-features = ["async-send", "pool", "tokio", "web-socket-handshake"] - [[example]] name = "web-socket-server-raw-tokio-rustls" path = "examples/web-socket-server-raw-tokio-rustls.rs" diff --git a/wtx/examples/common/mod.rs b/wtx/examples/common/mod.rs index 966d1627..cbfdcd5a 100644 --- a/wtx/examples/common/mod.rs +++ b/wtx/examples/common/mod.rs @@ -1,11 +1,3 @@ -#![allow(unused_imports)] - -#[cfg(feature = "web-socket-handshake")] -mod web_socket; - -#[cfg(feature = "web-socket-handshake")] -pub(crate) use web_socket::{_accept_conn_and_echo_frames, _handle_frames}; - pub(crate) fn _host_from_args() -> String { std::env::args().nth(1).unwrap_or_else(|| "127.0.0.1:9000".to_owned()) } diff --git a/wtx/examples/common/web_socket.rs b/wtx/examples/common/web_socket.rs deleted file mode 100644 index 2fc4cc70..00000000 --- a/wtx/examples/common/web_socket.rs +++ /dev/null @@ -1,47 +0,0 @@ -use wtx::{ - misc::Stream, - rng::StaticRng, - web_socket::{ - compression::NegotiatedCompression, - handshake::{WebSocketAccept, WebSocketAcceptRaw}, - Compression, FrameBufferVec, OpCode, WebSocketBuffer, WebSocketServerMut, - }, -}; - -pub(crate) async fn _accept_conn_and_echo_frames( - compression: C, - fb: &mut FrameBufferVec, - stream: S, - wsb: &mut WebSocketBuffer, -) -> wtx::Result<()> -where - C: Compression, - S: Stream, -{ - let mut ws = WebSocketAcceptRaw { compression, rng: StaticRng::default(), stream, wsb } - .accept(|_| true) - .await?; - _handle_frames(fb, &mut ws).await?; - Ok(()) -} - -pub(crate) async fn _handle_frames( - fb: &mut FrameBufferVec, - ws: &mut WebSocketServerMut<'_, NC, StaticRng, S>, -) -> wtx::Result<()> -where - NC: NegotiatedCompression, - S: Stream, -{ - loop { - let mut frame = ws.read_frame(fb).await?; - match frame.op_code() { - OpCode::Binary | OpCode::Text => { - ws.write_frame(&mut frame).await?; - } - OpCode::Close => break, - _ => {} - } - } - Ok(()) -} diff --git a/wtx/examples/http2-client-tokio.rs b/wtx/examples/http2-client-tokio.rs index d20f2bf1..ad2b314c 100644 --- a/wtx/examples/http2-client-tokio.rs +++ b/wtx/examples/http2-client-tokio.rs @@ -6,19 +6,15 @@ mod common; use std::net::ToSocketAddrs; use tokio::net::TcpStream; use wtx::{ - http::Method, + http::{Method, RequestStr}, http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, - misc::{from_utf8_basic, UriRef}, + misc::{from_utf8_basic, UriString}, rng::StaticRng, }; #[tokio::main] async fn main() { - let mut rrb = ReqResBuffer::default(); - rrb.data.reserve(6); - rrb.data.extend_from_slice(b"Hello!").unwrap(); - rrb.uri.push_str(&common::_uri_from_args()).unwrap(); - let uri = UriRef::new(&rrb.uri); + let uri = UriString::new(common::_uri_from_args()); let mut http2 = Http2Tokio::connect( Http2Buffer::new(StaticRng::default()), Http2Params::default(), @@ -27,8 +23,14 @@ async fn main() { .await .unwrap(); let mut stream = http2.stream().await.unwrap(); - stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); - let res = stream.recv_res(&mut rrb).await.unwrap(); - println!("{}", from_utf8_basic(res.resource().unwrap().body()).unwrap()); + stream + .send_req(RequestStr::http2(b"Hello!", Method::Get, uri.to_ref())) + .await + .unwrap() + .resource() + .unwrap(); + let mut rrb = ReqResBuffer::default(); + let res = stream.recv_res(&mut rrb).await.unwrap().resource().unwrap(); + println!("{}", from_utf8_basic(res.body()).unwrap()); http2.send_go_away(ErrorCode::NoError).await.unwrap(); } diff --git a/wtx/examples/http2-server-tokio.rs b/wtx/examples/http2-server-tokio.rs index 9f24f00e..ca8e977b 100644 --- a/wtx/examples/http2-server-tokio.rs +++ b/wtx/examples/http2-server-tokio.rs @@ -4,7 +4,7 @@ mod common; use wtx::{ - http::{server::TokioHttp2, Headers, Method, RequestMut, Response, StatusCode}, + http::{server::TokioHttp2, Headers, Method, RequestStr, Response, StatusCode}, misc::{from_utf8_basic, ByteVector}, }; @@ -22,15 +22,15 @@ async fn main() { } async fn handle<'buffer>( - req: RequestMut<'buffer, 'buffer, 'buffer, ByteVector>, + req: RequestStr<'buffer, (&'buffer mut ByteVector, &'buffer mut Headers)>, ) -> Result, ()> { - req.headers.clear(); - println!("{}", from_utf8_basic(req.data).unwrap()); + req.data.1.clear(); + println!("{}", from_utf8_basic(req.body()).unwrap()); Ok(match (req.uri.path(), req.method) { - ("/", Method::Get) => Response::http2((req.data, req.headers), StatusCode::Ok), + ("/", Method::Get) => Response::http2(req.data, StatusCode::Ok), _ => { - req.data.clear(); - Response::http2((req.data, req.headers), StatusCode::NotFound) + req.data.1.clear(); + Response::http2(req.data, StatusCode::NotFound) } }) } diff --git a/wtx/examples/web-socket-server-raw-tokio-rustls.rs b/wtx/examples/web-socket-server-raw-tokio-rustls.rs index 59484a74..caa671e0 100644 --- a/wtx/examples/web-socket-server-raw-tokio-rustls.rs +++ b/wtx/examples/web-socket-server-raw-tokio-rustls.rs @@ -6,7 +6,11 @@ mod common; use tokio::net::TcpListener; use wtx::{ misc::TokioRustlsAcceptor, - web_socket::{FrameBufferVec, WebSocketBuffer}, + rng::StdRng, + web_socket::{ + handshake::{WebSocketAccept, WebSocketAcceptRaw}, + FrameBufferVec, OpCode, WebSocketBuffer, + }, }; static CERT: &[u8] = include_bytes!("../../.certs/cert.pem"); @@ -21,14 +25,31 @@ async fn main() { let local_acceptor = acceptor.clone(); let _jh = tokio::spawn(async move { let tls_stream = local_acceptor.accept(stream).await.unwrap(); - common::_accept_conn_and_echo_frames( - (), - &mut FrameBufferVec::default(), - tls_stream, - &mut WebSocketBuffer::default(), - ) - .await - .unwrap(); + let fun = || async move { + let mut ws = WebSocketAcceptRaw { + compression: (), + rng: StdRng::default(), + stream: tls_stream, + wsb: WebSocketBuffer::default(), + } + .accept(|_| true) + .await?; + let mut fb = FrameBufferVec::default(); + loop { + let mut frame = ws.read_frame(&mut fb).await?; + match frame.op_code() { + OpCode::Binary | OpCode::Text => { + ws.write_frame(&mut frame).await?; + } + OpCode::Close => break, + _ => {} + } + } + wtx::Result::Ok(()) + }; + if let Err(err) = fun().await { + eprintln!("{err:?}"); + } }); } } diff --git a/wtx/examples/web-socket-server-raw-tokio.rs b/wtx/examples/web-socket-server-raw-tokio.rs deleted file mode 100644 index a709ea79..00000000 --- a/wtx/examples/web-socket-server-raw-tokio.rs +++ /dev/null @@ -1,40 +0,0 @@ -//! WebSocket echo server. - -#[path = "./common/mod.rs"] -mod common; - -use tokio::net::TcpStream; -use wtx::{ - http::server::TokioWebSocket, - rng::StdRng, - web_socket::{FrameBufferVec, OpCode, WebSocketBuffer, WebSocketServer}, -}; - -#[tokio::main] -async fn main() { - TokioWebSocket::tokio_web_socket( - common::_host_from_args().parse().unwrap(), - None, - || (), - |err| eprintln!("Connection error: {err:?}"), - handle, - ) - .await - .unwrap() -} - -async fn handle( - (fb, mut ws): (&mut FrameBufferVec, WebSocketServer<(), StdRng, TcpStream, &mut WebSocketBuffer>), -) -> wtx::Result<()> { - loop { - let mut frame = ws.read_frame(fb).await?; - match frame.op_code() { - OpCode::Binary | OpCode::Text => { - ws.write_frame(&mut frame).await?; - } - OpCode::Close => break, - _ => {} - } - } - Ok(()) -} diff --git a/wtx/src/error.rs b/wtx/src/error.rs index abf57aca..2467f825 100644 --- a/wtx/src/error.rs +++ b/wtx/src/error.rs @@ -82,8 +82,10 @@ pub enum Error { SmoltcpTcpSendError(smoltcp::socket::tcp::SendError), #[cfg(feature = "embedded-tls")] TlsError(embedded_tls::TlsError), + #[cfg(feature = "tokio")] + TokioJoinError(Box), #[cfg(feature = "tokio-rustls")] - TokioRustLsError(Box), + TokioRustlsError(Box), #[cfg(feature = "_tracing-subscriber")] TryInitError(tracing_subscriber::util::TryInitError), #[cfg(feature = "std")] @@ -324,6 +326,8 @@ pub enum Error { ExceedAmountOfRapidResets, ExceedAmountOfActiveConcurrentStreams, VeryLargeHeadersLen, + WindowSizeCanNotBeReduced, + InvalidStreamState, // ***** Internal - WebSocket ***** // @@ -632,11 +636,19 @@ impl From for Error { } } +#[cfg(feature = "tokio")] +impl From for Error { + #[inline] + fn from(from: tokio::task::JoinError) -> Self { + Self::TokioJoinError(from.into()) + } +} + #[cfg(feature = "tokio-rustls")] impl From for Error { #[inline] fn from(from: tokio_rustls::rustls::Error) -> Self { - Self::TokioRustLsError(from.into()) + Self::TokioRustlsError(from.into()) } } diff --git a/wtx/src/http.rs b/wtx/src/http.rs index ec13e75e..6c1b7b17 100644 --- a/wtx/src/http.rs +++ b/wtx/src/http.rs @@ -10,9 +10,9 @@ mod headers; mod method; mod mime; mod protocol; +mod req_res_data; mod request; mod response; -mod response_data; pub mod server; mod status_code; mod version; @@ -27,9 +27,9 @@ pub use headers::Headers; pub use method::Method; pub use mime::Mime; pub use protocol::Protocol; -pub use request::{Request, RequestMut, RequestRef}; -pub use response::{Response, ResponseMut, ResponseRef}; -pub use response_data::ResponseData; +pub use req_res_data::ReqResData; +pub use request::{Request, RequestStr}; +pub use response::Response; pub use status_code::StatusCode; pub use version::Version; diff --git a/wtx/src/http/abstract_headers.rs b/wtx/src/http/abstract_headers.rs index 3ac25a8c..11aa573a 100644 --- a/wtx/src/http/abstract_headers.rs +++ b/wtx/src/http/abstract_headers.rs @@ -17,7 +17,7 @@ where #[inline] pub(crate) fn with_capacity(bytes: usize, headers: usize, max_bytes: usize) -> Self { - Self { bq: BlocksQueue::with_capacity(bytes.min(max_bytes), headers), max_bytes } + Self { bq: BlocksQueue::with_capacity(headers, bytes.min(max_bytes)), max_bytes } } #[inline] diff --git a/wtx/src/http/response_data.rs b/wtx/src/http/req_res_data.rs similarity index 57% rename from wtx/src/http/response_data.rs rename to wtx/src/http/req_res_data.rs index 68f86e36..8d859cd6 100644 --- a/wtx/src/http/response_data.rs +++ b/wtx/src/http/req_res_data.rs @@ -1,8 +1,8 @@ use crate::{http::Headers, misc::Lease}; -/// Groups the body and the headers of a response into a single structure. -pub trait ResponseData { - /// See [Self::body] +/// Groups the body and the headers of an HTTP request/response. +pub trait ReqResData { + /// See [Self::body]. type Body: ?Sized; /// Can be a sequence of bytes, a string, a deserialized element or any other desired type. @@ -12,9 +12,9 @@ pub trait ResponseData { fn headers(&self) -> &Headers; } -impl ResponseData for &T +impl ReqResData for &T where - T: ResponseData, + T: ReqResData, { type Body = T::Body; @@ -29,9 +29,9 @@ where } } -impl ResponseData for &mut T +impl ReqResData for &mut T where - T: ResponseData, + T: ReqResData, { type Body = T::Body; @@ -46,7 +46,35 @@ where } } -impl ResponseData for (B, H) +impl ReqResData for &[u8] { + type Body = [u8]; + + #[inline] + fn body(&self) -> &Self::Body { + self + } + + #[inline] + fn headers(&self) -> &Headers { + const { &Headers::new(0) } + } +} + +impl ReqResData for [u8; N] { + type Body = [u8; N]; + + #[inline] + fn body(&self) -> &Self::Body { + self + } + + #[inline] + fn headers(&self) -> &Headers { + const { &Headers::new(0) } + } +} + +impl ReqResData for (B, H) where H: Lease, { @@ -64,12 +92,12 @@ where } #[cfg(feature = "http2")] -impl ResponseData for crate::http2::ReqResBuffer { +impl ReqResData for crate::http2::ReqResBuffer { type Body = [u8]; #[inline] fn body(&self) -> &Self::Body { - &self.data + &self.body } #[inline] diff --git a/wtx/src/http/request.rs b/wtx/src/http/request.rs index d4b77f19..8d5caf71 100644 --- a/wtx/src/http/request.rs +++ b/wtx/src/http/request.rs @@ -1,21 +1,16 @@ use crate::{ - http::{Headers, Method, Version}, + http::{Headers, Method, ReqResData, Version}, misc::{Lease, Uri}, }; -/// Shortcut for mutable referenced elements. -pub type RequestMut<'data, 'headers, 'uri, D> = - Request<&'data mut D, &'headers mut Headers, &'headers str>; -/// Shortcut for referenced elements. -pub type RequestRef<'data, 'headers, 'uri, D> = Request<&'data D, &'headers Headers, &'headers str>; +/// A [Request] with a URI composed by a string reference +pub type RequestStr<'uri, D> = Request; /// An HTTP request received by a server or to be sent by a client. #[derive(Debug)] -pub struct Request { - /// The payload of the request, which can be nothing. +pub struct Request { + /// See [ReqResData]. pub data: D, - /// See [Headers]. - pub headers: H, /// See [Method]. pub method: Method, /// See [Uri]. @@ -24,15 +19,26 @@ pub struct Request { pub version: Version, } -impl Request +impl Request where - D: Lease<[u8]>, - H: Lease, + D: ReqResData, U: Lease, { /// Constructor that defaults to an HTTP/2 version. #[inline] - pub fn http2(data: D, headers: H, method: Method, uri: Uri) -> Self { - Self { data, headers, method, uri, version: Version::Http2 } + pub fn http2(data: D, method: Method, uri: Uri) -> Self { + Self { data, method, uri, version: Version::Http2 } + } + + /// Shortcut to access the body of `data`. + #[inline] + pub fn body(&self) -> &D::Body { + self.data.body() + } + + /// Shortcut to access the headers of `data`. + #[inline] + pub fn headers(&self) -> &Headers { + self.data.headers() } } diff --git a/wtx/src/http/response.rs b/wtx/src/http/response.rs index 0654fe22..8b97b59e 100644 --- a/wtx/src/http/response.rs +++ b/wtx/src/http/response.rs @@ -1,14 +1,9 @@ -use crate::http::{Headers, ResponseData, StatusCode, Version}; - -/// Shortcut for mutable referenced data. -pub type ResponseMut<'data, D> = Response<&'data mut D>; -/// Shortcut for referenced data. -pub type ResponseRef<'data, D> = Response<&'data D>; +use crate::http::{Headers, ReqResData, StatusCode, Version}; /// Represents the response from an HTTP request. #[derive(Debug)] pub struct Response { - /// See [ResponseData]. + /// See [ReqResData]. pub data: D, /// See [StatusCode]. pub status_code: StatusCode, @@ -18,7 +13,7 @@ pub struct Response { impl Response where - D: ResponseData, + D: ReqResData, { /// Constructor that defaults to an HTTP/2 version. #[inline] @@ -37,10 +32,4 @@ where pub fn headers(&self) -> &Headers { self.data.headers() } - - /// See [RequestRef]. - #[inline] - pub fn to_ref(&self) -> ResponseRef<'_, D> { - ResponseRef { data: &self.data, status_code: self.status_code, version: self.version } - } } diff --git a/wtx/src/http/server/tokio_http2.rs b/wtx/src/http/server/tokio_http2.rs index 66dfb50d..bdd43d17 100644 --- a/wtx/src/http/server/tokio_http2.rs +++ b/wtx/src/http/server/tokio_http2.rs @@ -1,9 +1,9 @@ use crate::{ http::{ server::{TokioHttp2, _buffers_len}, - Headers, RequestMut, Response, + Headers, RequestStr, Response, }, - http2::{Http2Params, Http2Tokio, ReadFrameRslt}, + http2::{Http2Params, Http2Rslt, Http2Tokio}, misc::{ByteVector, FnFut}, pool::{ FixedPoolGetRsltTokio, FixedPoolTokio, Http2ServerBufferRM, Pool, ReqResBufferRM, @@ -35,7 +35,7 @@ impl TokioHttp2 { E: Debug + From + Send + 'static, F: Copy + for<'any> FnFut< - RequestMut<'any, 'any, 'any, ByteVector>, + RequestStr<'any, (&'any mut ByteVector, &'any mut Headers)>, Result, E>, > + Send + 'static, @@ -77,7 +77,7 @@ where E: Debug + From + Send + 'static, F: Copy + for<'any> FnFut< - RequestMut<'any, 'any, 'any, ByteVector>, + RequestStr<'any, (&'any mut ByteVector, &'any mut Headers)>, Result, E>, > + Send + 'static, @@ -92,23 +92,25 @@ where drop(req_buffer_guard.release().await); return Err(err); } - Ok(ReadFrameRslt::ClosedConnection) => { + Ok(Http2Rslt::ClosedConnection) => { drop(req_buffer_guard.release().await); return Ok(()); } - Ok(ReadFrameRslt::ClosedStream | ReadFrameRslt::IdleConnection) => { + Ok(Http2Rslt::ClosedStream) => { drop(req_buffer_guard.release().await); continue; } - Ok(ReadFrameRslt::Resource(elem)) => elem, + Ok(Http2Rslt::Resource(elem)) => elem, }; let _stream_jh = tokio::spawn(async move { let rrb = &mut **req_buffer_guard; let fun = || async move { - let ReadFrameRslt::Resource(req) = stream.recv_req(rrb).await? else { + let Http2Rslt::Resource(req) = stream.recv_req(rrb).await? else { return Ok(()); }; - stream.send_res(handle(req).await?).await?; + if stream.send_res(handle(req).await?).await?.resource().is_none() { + return Ok(()); + } Ok::<_, E>(()) }; let rslt = fun().await; diff --git a/wtx/src/http2.rs b/wtx/src/http2.rs index 1260c442..6dbf878c 100644 --- a/wtx/src/http2.rs +++ b/wtx/src/http2.rs @@ -24,14 +24,14 @@ mod hpack_static_headers; mod http2_buffer; mod http2_data; mod http2_params; +mod http2_params_send; +mod http2_rslt; mod huffman; mod huffman_tables; mod misc; mod ping_frame; -mod read_frame_rslt; mod req_res_buffer; mod reset_stream_frame; -mod send_params; mod server_stream; mod settings_frame; mod stream_state; @@ -39,12 +39,13 @@ mod stream_state; mod tests; mod u31; mod uri_buffer; +mod window; mod window_update_frame; mod write_stream; use crate::{ http2::misc::{apply_initial_params, default_stream_frames, read_frame_until_cb_unknown_id}, - misc::{ConnectionState, LeaseMut, Lock, RefCounter, Stream}, + misc::{ConnectionState, LeaseMut, Lock, RefCounter, Stream, Usize}, }; pub use client_stream::ClientStream; pub(crate) use continuation_frame::ContinuationFrame; @@ -61,9 +62,10 @@ pub(crate) use hpack_static_headers::{HpackStaticRequestHeaders, HpackStaticResp pub use http2_buffer::Http2Buffer; pub(crate) use http2_data::Http2Data; pub use http2_params::Http2Params; +pub use http2_rslt::Http2Rslt; +pub(crate) use http2_rslt::Http2RsltExt; pub(crate) use huffman::{huffman_decode, huffman_encode}; pub(crate) use ping_frame::PingFrame; -pub use read_frame_rslt::ReadFrameRslt; pub use req_res_buffer::ReqResBuffer; pub(crate) use reset_stream_frame::ResetStreamFrame; pub use server_stream::ServerStream; @@ -72,20 +74,19 @@ pub(crate) use stream_state::StreamState; use tokio::sync::MutexGuard; pub(crate) use u31::U31; pub(crate) use uri_buffer::UriBuffer; +pub(crate) use window::Windows; pub(crate) use window_update_frame::WindowUpdateFrame; -pub(crate) const BODY_LEN_DEFAULT: u32 = 4_194_304; -pub(crate) const BUFFERED_FRAMES_NUM_DEFAULT: u8 = 16; -pub(crate) const CACHED_HEADERS_LEN_DEFAULT: u32 = 8_192; -pub(crate) const CACHED_HEADERS_LEN_LOWER_BOUND: u32 = 4_096; -pub(crate) const EXPANDED_HEADERS_LEN_DEFAULT: u32 = 4_096; -pub(crate) const FRAME_LEN_DEFAULT: u32 = 1_048_576; -pub(crate) const FRAME_LEN_LOWER_BOUND: u32 = 16_384; -pub(crate) const FRAME_LEN_UPPER_BOUND: u32 = 16_777_215; -pub(crate) const INITIAL_WINDOW_LEN_DEFAULT: u32 = 524_280; -pub(crate) const RAPID_RESETS_NUM_DEFAULT: u8 = 16; -pub(crate) const READ_BUFFER_LEN_DEFAULT: u32 = 4_194_304; -pub(crate) const STREAMS_NUM_DEFAULT: u32 = 1_073_741_824; +pub(crate) const MAX_BODY_LEN: u32 = max_body_len!(); +pub(crate) const MAX_BUFFERED_FRAMES_NUM: u8 = max_buffered_frames_num!(); +pub(crate) const MAX_CACHED_HEADERS_LEN: u32 = max_cached_headers_len!(); +pub(crate) const MAX_EXPANDED_HEADERS_LEN: u32 = max_expanded_headers_len!(); +pub(crate) const MAX_FRAME_LEN: u32 = max_frame_len!(); +pub(crate) const MAX_FRAME_LEN_LOWER_BOUND: u32 = max_frame_len_lower_bound!(); +pub(crate) const MAX_FRAME_LEN_UPPER_BOUND: u32 = max_frame_len_upper_bound!(); +pub(crate) const MAX_RAPID_RESETS_NUM: u8 = max_rapid_resets_num!(); +pub(crate) const MAX_STREAMS_NUM: u32 = max_streams_num!(); +pub(crate) const READ_BUFFER_LEN: u32 = read_buffer_len!(); const ACK_MASK: u8 = 0b0000_0001; const EOH_MASK: u8 = 0b0000_0100; @@ -164,34 +165,43 @@ where pub async fn stream( &mut self, rrb: &mut ReqResBuffer, - ) -> crate::Result>> { + ) -> crate::Result>> { rrb.clear(); - let mut guard = self.hd.lock().await; - if *guard.streams_num_mut() >= guard.hp().max_streams_num() { - return Err(crate::Error::ExceedAmountOfActiveConcurrentStreams); - } - *guard.streams_num_mut() = guard.streams_num_mut().wrapping_add(1); let mut stream_state = StreamState::Open; - let rfi = rfr_resource_or_return!( - guard - .read_frames_init( - rrb, - U31::ZERO, - &mut stream_state, - |hf| { hf.hsreqh().method.ok_or(crate::Error::MissingRequestMethod) }, - |data, fi, hp, streams_frames| { - read_frame_until_cb_unknown_id(data, fi, hp, streams_frames) - } - ) - .await? + let mut windows; + let rfi = hre_to_hr!( + self.hd, + |guard| { + if *guard.streams_num_mut() >= guard.hp().max_streams_num() { + return Err(crate::Error::ExceedAmountOfActiveConcurrentStreams); + } + rrb.headers.set_max_bytes(*Usize::from(guard.hp().max_cached_headers_len().0)); + windows = Windows::stream(guard.hp(), guard.hps()); + guard + .read_frames_init( + rrb, + U31::ZERO, + &mut stream_state, + &mut windows, + |hf| hf.hsreqh().method.ok_or(crate::Error::MissingRequestMethod), + |data, fi, hp, streams_frames| { + read_frame_until_cb_unknown_id(data, fi, hp, streams_frames) + }, + ) + .await? + }, + |guard, rfi| { + *guard.streams_num_mut() = guard.streams_num_mut().wrapping_add(1); + let stream_frame_entry = guard.hb_mut().streams_frames.entry(rfi.stream_id); + let _ = stream_frame_entry.or_insert_with(default_stream_frames); + } ); - let stream_frame_entry = guard.hb_mut().streams_frames.entry(rfi.stream_id); - let _ = stream_frame_entry.or_insert_with(default_stream_frames); - Ok(ReadFrameRslt::Resource(ServerStream::new( + Ok(Http2Rslt::Resource(ServerStream::new( self.hd.clone(), _trace_span!("Creating server stream", stream_id = rfi.stream_id.u32()), rfi, stream_state, + windows, ))) } } @@ -223,6 +233,7 @@ where if *guard.streams_num_mut() >= guard.send_params_mut().max_streams_num { return Err(crate::Error::ExceedAmountOfActiveConcurrentStreams); } + let windows = Windows::stream(guard.hp(), guard.hps()); *guard.streams_num_mut() = guard.streams_num_mut().wrapping_add(1); let stream_id = self.stream_id; self.stream_id = self.stream_id.wrapping_add(U31::TWO); @@ -231,6 +242,7 @@ where self.hd.clone(), _trace_span!("Creating client stream", stream_id = stream_id.u32()), stream_id, + windows, )) } } diff --git a/wtx/src/http2/client_stream.rs b/wtx/src/http2/client_stream.rs index 8f8eea23..14eeec27 100644 --- a/wtx/src/http2/client_stream.rs +++ b/wtx/src/http2/client_stream.rs @@ -1,12 +1,14 @@ use crate::{ - http::{RequestRef, ResponseMut}, + http::{ReqResData, RequestStr, Response}, http2::{ - misc::{send_go_away, send_reset}, + http2_rslt::Http2RsltExt, + misc::{send_go_away, send_reset, verify_before_send}, + window::WindowsPair, write_stream::write_stream, ErrorCode, GoAwayFrame, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, - Http2Data, ReadFrameRslt, ReqResBuffer, ResetStreamFrame, StreamState, U31, + Http2Data, Http2Rslt, ReqResBuffer, ResetStreamFrame, StreamState, Windows, U31, }, - misc::{AsyncBounds, Lease, LeaseMut, Lock, RefCounter, Stream, _Span}, + misc::{AsyncBounds, Lease, LeaseMut, Lock, RefCounter, Stream, Usize, _Span}, }; use core::marker::PhantomData; @@ -18,11 +20,12 @@ pub struct ClientStream { span: _Span, stream_id: U31, stream_state: StreamState, + windows: Windows, } impl ClientStream { - pub(crate) fn idle(hd: HD, span: _Span, stream_id: U31) -> Self { - Self { phantom: PhantomData, hd, span, stream_id, stream_state: StreamState::Idle } + pub(crate) const fn idle(hd: HD, span: _Span, stream_id: U31, windows: Windows) -> Self { + Self { phantom: PhantomData, hd, span, stream_id, stream_state: StreamState::Idle, windows } } } @@ -42,19 +45,24 @@ where pub async fn recv_res<'rrb>( &mut self, rrb: &'rrb mut ReqResBuffer, - ) -> crate::Result>> { + ) -> crate::Result>> { let _e = self.span._enter(); _trace!("Receiving response"); rrb.clear(); - let status_code = rfr_until_resource_with_guard!(self.hd, |guard| { + let status_code = hre_to_hr!(self.hd, |guard| { + rrb.headers.set_max_bytes(*Usize::from(guard.hp().max_cached_headers_len().0)); guard - .read_frames_stream(&mut *rrb, self.stream_id, &mut self.stream_state, |hf| { - hf.hsresh().status_code.ok_or(crate::Error::MissingResponseStatusCode) - }) + .read_frames_stream( + &mut *rrb, + self.stream_id, + &mut self.stream_state, + &mut self.windows, + |hf| hf.hsresh().status_code.ok_or(crate::Error::MissingResponseStatusCode), + ) .await? }); self.stream_state = StreamState::Closed; - Ok(ReadFrameRslt::Resource(ResponseMut::http2(rrb, status_code))) + Ok(Http2Rslt::Resource(Response::http2(rrb, status_code))) } /// Sends a GOAWAY frame to the peer, which cancels the connection and consequently all ongoing @@ -73,38 +81,52 @@ where /// /// Shouldn't be called more than one time. #[inline] - pub async fn send_req(&mut self, req: RequestRef<'_, '_, '_, D>) -> crate::Result<()> + pub async fn send_req(&mut self, req: RequestStr<'_, D>) -> crate::Result> where - D: Lease<[u8]> + ?Sized, + D: ReqResData, + D::Body: Lease<[u8]>, { let _e = self.span._enter(); _trace!("Sending request"); - let mut guard = self.hd.lock().await; - let (hb, is_conn_open, send_params, stream) = guard.parts_mut(); - write_stream::<_, true>( - req.data.lease(), - req.headers, - &mut hb.hpack_enc, - &mut hb.hpack_enc_buffer, - ( - HpackStaticRequestHeaders { - authority: req.uri.authority().as_bytes(), - method: Some(req.method), - path: req.uri.href().as_bytes(), - protocol: None, - scheme: req.uri.schema().as_bytes(), - }, - HpackStaticResponseHeaders::EMPTY, - ), - *is_conn_open, - send_params, - stream, - self.stream_id, - &mut self.stream_state, - ) - .await?; + let mut body = req.data.body().lease(); + let mut hf = { + let mut guard = self.hd.lock().await; + let (hb, _, _, hps, ..) = guard.parts_mut(); + verify_before_send::( + req.data.headers(), + &mut hb.hpack_enc, + &mut hb.hpack_enc_buffer, + hps, + ( + HpackStaticRequestHeaders { + authority: req.uri.authority().as_bytes(), + method: Some(req.method), + path: req.uri.href().as_bytes(), + protocol: None, + scheme: req.uri.schema().as_bytes(), + }, + HpackStaticResponseHeaders::EMPTY, + ), + self.stream_id, + )? + }; + hre_to_hr!(self.hd, |guard| { + let (hb, is_conn_open, _, hps, stream, streams_num, windows) = guard.parts_mut(); + write_stream::<_, true>( + &mut body, + &mut hf, + &mut hb.hpack_enc_buffer, + hps, + *is_conn_open, + stream, + &mut self.stream_state, + streams_num, + &mut WindowsPair::new(windows, &mut self.windows), + ) + .await? + }); self.stream_state = StreamState::HalfClosedLocal; - Ok(()) + Ok(Http2Rslt::Resource(())) } /// Sends a RST_STREAM frame to the peer, which cancels this stream. diff --git a/wtx/src/http2/data_frame.rs b/wtx/src/http2/data_frame.rs index 195c6d21..5ed03cbd 100644 --- a/wtx/src/http2/data_frame.rs +++ b/wtx/src/http2/data_frame.rs @@ -7,19 +7,23 @@ const FLAG_MASK: u8 = EOS_MASK | PAD_MASK; #[derive(Debug, Eq, PartialEq)] pub(crate) struct DataFrame { - data_len: u32, + data_len: U31, flag: u8, pad_len: Option, stream_id: U31, } impl DataFrame { - pub(crate) fn new(data_len: u32, stream_id: U31) -> Self { + pub(crate) fn new(data_len: U31, stream_id: U31) -> Self { Self { data_len, flag: 0, pad_len: None, stream_id } } pub(crate) fn bytes(&self) -> [u8; 9] { - FrameInit::new(self.data_len, self.flag, self.stream_id, FrameHeaderTy::Data).bytes() + FrameInit::new(self.data_len.u32(), self.flag, self.stream_id, FrameHeaderTy::Data).bytes() + } + + pub(crate) fn data_len(&self) -> U31 { + self.data_len } pub(crate) fn is_eos(&self) -> bool { @@ -32,7 +36,7 @@ impl DataFrame { } let flag = fi.flags & FLAG_MASK; let pad_len = trim_frame_pad(&mut data, flag)?; - let Ok(data_len) = u32::try_from(data.len()) else { + let Ok(data_len) = u32::try_from(data.len()).map(U31::from_u32) else { return _unlikely_elem(Err(crate::http2::ErrorCode::ProtocolError.into())); }; Ok(Self { data_len, flag, pad_len, stream_id: fi.stream_id }) diff --git a/wtx/src/http2/error_code.rs b/wtx/src/http2/error_code.rs index 4f2b27c0..0d70615a 100644 --- a/wtx/src/http2/error_code.rs +++ b/wtx/src/http2/error_code.rs @@ -1,7 +1,5 @@ create_enum! { /// HTTP/2 error codes. - /// - /// http://httpwg.org/specs/rfc7540.html#ErrorCodes #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum ErrorCode { /// The associated condition is not a result of an error. diff --git a/wtx/src/http2/frame_init.rs b/wtx/src/http2/frame_init.rs index 4e29dab3..847e598f 100644 --- a/wtx/src/http2/frame_init.rs +++ b/wtx/src/http2/frame_init.rs @@ -32,7 +32,7 @@ impl FrameInit { Ok(Self { data_len: u32::from_be_bytes([0, a, b, c]), flags: e, - stream_id: U31::new(u32::from_be_bytes([f, g, h, i])), + stream_id: U31::from_u32(u32::from_be_bytes([f, g, h, i])), ty: d.try_into()?, }) } diff --git a/wtx/src/http2/go_away_frame.rs b/wtx/src/http2/go_away_frame.rs index f0c6c52a..b0e57a4a 100644 --- a/wtx/src/http2/go_away_frame.rs +++ b/wtx/src/http2/go_away_frame.rs @@ -28,7 +28,7 @@ impl GoAwayFrame { }; Ok(Self { error_code: u32::from_be_bytes([*e, *f, *g, *h]).try_into()?, - last_stream_id: U31::new(u32::from_be_bytes([*a, *b, *c, *d])), + last_stream_id: U31::from_u32(u32::from_be_bytes([*a, *b, *c, *d])), }) } } diff --git a/wtx/src/http2/headers_frame.rs b/wtx/src/http2/headers_frame.rs index 18e0adc3..8a2f5068 100644 --- a/wtx/src/http2/headers_frame.rs +++ b/wtx/src/http2/headers_frame.rs @@ -18,6 +18,7 @@ pub(crate) struct HeadersFrame<'data> { } impl<'data> HeadersFrame<'data> { + #[inline] pub(crate) fn new( (hsreqh, hsresh): (HpackStaticRequestHeaders<'data>, HpackStaticResponseHeaders), stream_id: U31, @@ -25,30 +26,37 @@ impl<'data> HeadersFrame<'data> { Self { flag: 0, hsreqh, hsresh, is_over_size: false, stream_id } } + #[inline] pub(crate) fn bytes(&self) -> [u8; 9] { FrameInit::new(0, self.flag, self.stream_id, FrameHeaderTy::Headers).bytes() } + #[inline] pub(crate) fn hsreqh(&self) -> &HpackStaticRequestHeaders<'data> { &self.hsreqh } + #[inline] pub(crate) fn hsresh(&self) -> HpackStaticResponseHeaders { self.hsresh } + #[inline] pub(crate) fn is_eoh(&self) -> bool { self.flag & EOH_MASK == EOH_MASK } + #[inline] pub(crate) fn is_eos(&self) -> bool { self.flag & EOS_MASK == EOS_MASK } + #[inline] pub(crate) fn is_over_size(&self) -> bool { self.is_over_size } + #[inline] pub(crate) fn read( mut data: &[u8], fi: FrameInit, @@ -205,13 +213,20 @@ impl<'data> HeadersFrame<'data> { )) } + #[inline] pub(crate) fn set_eoh(&mut self) { self.flag |= EOH_MASK; } + #[inline] pub(crate) fn set_eos(&mut self) { self.flag |= EOS_MASK; } + + #[inline] + pub(crate) fn stream_id(&self) -> U31 { + self.stream_id + } } #[inline] diff --git a/wtx/src/http2/http2_data.rs b/wtx/src/http2/http2_data.rs index f4c1abda..8e7ecc0c 100644 --- a/wtx/src/http2/http2_data.rs +++ b/wtx/src/http2/http2_data.rs @@ -1,9 +1,10 @@ use crate::{ http2::{ + http2_params_send::Http2ParamsSend, misc::{read_frame_others, read_frame_until, read_frame_until_cb_known_id, reset_stream}, - send_params::SendParams, + window::WindowsPair, ContinuationFrame, DataFrame, FrameHeaderTy, FrameInit, HeadersFrame, Http2Buffer, Http2Params, - ReadFrameRslt, ReqResBuffer, StreamState, WindowUpdateFrame, U31, + Http2RsltExt, ReqResBuffer, StreamState, WindowUpdateFrame, Windows, U31, }, misc::{BlocksQueue, Lease, LeaseMut, Stream}, }; @@ -13,11 +14,12 @@ use hashbrown::HashMap; pub struct Http2Data { hb: HB, hp: Http2Params, + hps: Http2ParamsSend, is_conn_open: bool, rapid_resets_num: u8, - send_params: SendParams, stream: S, streams_num: u32, + windows: Windows, } impl Http2Data @@ -27,14 +29,16 @@ where { #[inline] pub(crate) fn new(hb: HB, hp: Http2Params, stream: S) -> Self { + let windows = Windows::conn(&hp); Self { hb, hp, is_conn_open: true, rapid_resets_num: 0, - send_params: SendParams::default(), + hps: Http2ParamsSend::default(), stream, streams_num: 0, + windows, } } @@ -48,6 +52,11 @@ where &self.hp } + #[inline] + pub(crate) fn hps(&self) -> &Http2ParamsSend { + &self.hps + } + #[inline] pub(crate) fn is_conn_open(&self) -> bool { self.is_conn_open @@ -59,8 +68,26 @@ where } #[inline] - pub(crate) fn parts_mut(&mut self) -> (&mut Http2Buffer, &mut bool, &mut SendParams, &mut S) { - (self.hb.lease_mut(), &mut self.is_conn_open, &mut self.send_params, &mut self.stream) + pub(crate) fn parts_mut( + &mut self, + ) -> ( + &mut Http2Buffer, + &mut bool, + &mut Http2Params, + &mut Http2ParamsSend, + &mut S, + &mut u32, + &mut Windows, + ) { + ( + self.hb.lease_mut(), + &mut self.is_conn_open, + &mut self.hp, + &mut self.hps, + &mut self.stream, + &mut self.streams_num, + &mut self.windows, + ) } /// Reads a frame that is expected to be the initial header of a message along with its @@ -71,6 +98,7 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, + stream_windows: &mut Windows, mut headers_cb: impl FnMut(&HeadersFrame<'_>) -> crate::Result, mut read_frame_until_cb: impl FnMut( &[u8], @@ -78,21 +106,22 @@ where &Http2Params, &mut HashMap>, ) -> crate::Result, - ) -> crate::Result>> { + ) -> crate::Result>> { for _ in 0.._max_frames_mismatches!() { let Http2Buffer { hpack_dec, hpack_enc, pfb, streams_frames, uri_buffer, .. } = self.hb.lease_mut(); - let fi = rfr_resource_or_return!( + let fi = hre_resource_or_return!( read_frame_until( &mut self.hp, hpack_enc, + &mut self.hps, &mut self.is_conn_open, pfb, - &mut self.send_params, &mut self.stream, stream_id, stream_state, &mut self.streams_num, + &mut WindowsPair::new(&mut self.windows, &mut *stream_windows), |fi, hp, data| { read_frame_until_cb(data, fi, hp, streams_frames) }, |hp| { if self.rapid_resets_num >= hp.max_rapid_resets_num() { @@ -127,7 +156,7 @@ where *stream_state = StreamState::HalfClosedRemote; } let headers_rslt = headers_cb(&hf)?; - rfr_until_resource!( + hre_until_resource!( self .read_frames_continuation( &mut hpack_size, @@ -135,10 +164,11 @@ where rrb, fi.stream_id, stream_state, + stream_windows ) .await? ); - return Ok(ReadFrameRslt::Resource(ReadFramesInit { + return Ok(Http2RsltExt::Resource(ReadFramesInit { headers_rslt, hpack_size, is_eos, @@ -146,7 +176,8 @@ where })); } FrameHeaderTy::WindowUpdate => { - let _wuf = WindowUpdateFrame::read(pfb._current(), fi)?; + let wuf = WindowUpdateFrame::read(pfb._current(), fi)?; + stream_windows.send.deposit(wuf.size_increment().i32()); } _ => return Err(crate::http2::ErrorCode::ProtocolError.into()), } @@ -163,30 +194,33 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, - ) -> crate::Result> { + stream_windows: &mut Windows, + ) -> crate::Result> { if is_eos { - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } let Http2Buffer { hpack_dec, hpack_enc, pfb, streams_frames, uri_buffer, .. } = self.hb.lease_mut(); let mut body_len: u32 = 0; let mut counter: u32 = 0; + let mut wp = WindowsPair::new(&mut self.windows, &mut *stream_windows); loop { if counter >= _max_frames_mismatches!() { return Err(crate::Error::VeryLargeAmountOfFrameMismatches); } - let (fi, data) = rfr_resource_or_return!( + let (fi, data) = hre_resource_or_return!( read_frame_others( &mut self.hp, hpack_enc, + &mut self.hps, &mut self.is_conn_open, pfb, - &mut self.send_params, &mut self.stream, stream_id, stream_state, streams_frames, - &mut self.streams_num + &mut self.streams_num, + &mut wp ) .await? ); @@ -198,11 +232,12 @@ where body_len = local_body_len; if let FrameHeaderTy::Data = fi.ty { let df = DataFrame::read(data, fi)?; - rrb.data.reserve(data.len()); - rrb.data.extend_from_slice(data)?; + rrb.body.reserve(data.len()); + rrb.body.extend_from_slice(data)?; + wp.manage_recv(self.is_conn_open, &mut self.stream, stream_id, df.data_len()).await?; if df.is_eos() { *stream_state = StreamState::HalfClosedRemote; - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } } else { let (hf, local_hpack_size) = HeadersFrame::read::( @@ -219,7 +254,7 @@ where } *hpack_size = hpack_size.saturating_add(local_hpack_size); if hf.is_eoh() { - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } break; } @@ -227,23 +262,24 @@ where } for _ in 0.._max_continuation_frames!() { - let (fi, data) = rfr_resource_or_return!( + let (fi, data) = hre_resource_or_return!( read_frame_others( &mut self.hp, hpack_enc, + &mut self.hps, &mut self.is_conn_open, pfb, - &mut self.send_params, &mut self.stream, stream_id, stream_state, streams_frames, - &mut self.streams_num + &mut self.streams_num, + &mut wp ) .await? ); if ContinuationFrame::read(data, fi, &mut rrb.headers, hpack_size, hpack_dec)?.is_eoh() { - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } } Err(crate::Error::VeryLargeAmountOfContinuationFrames) @@ -256,21 +292,36 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, + stream_windows: &mut Windows, cb: fn(&HeadersFrame<'_>) -> crate::Result, - ) -> crate::Result> { - let mut rfi = rfr_resource_or_return!( + ) -> crate::Result> { + let mut rfi = hre_resource_or_return!( self - .read_frames_init(rrb, stream_id, stream_state, cb, |data, fi, hp, streams_frames| { - read_frame_until_cb_known_id(data, fi, hp, stream_id, streams_frames) - }) + .read_frames_init( + rrb, + stream_id, + stream_state, + stream_windows, + cb, + |data, fi, hp, streams_frames| { + read_frame_until_cb_known_id(data, fi, hp, stream_id, streams_frames) + } + ) .await? ); - rfr_resource_or_return!( + hre_resource_or_return!( self - .read_frames_others(&mut rfi.hpack_size, rfi.is_eos, rrb, stream_id, stream_state) + .read_frames_others( + &mut rfi.hpack_size, + rfi.is_eos, + rrb, + stream_id, + stream_state, + stream_windows + ) .await? ); - Ok(ReadFrameRslt::Resource(rfi.headers_rslt)) + Ok(Http2RsltExt::Resource(rfi.headers_rslt)) } /// Reads all continuation frames. @@ -282,39 +333,41 @@ where rrb: &mut ReqResBuffer, stream_id: U31, stream_state: &mut StreamState, - ) -> crate::Result> { + stream_windows: &mut Windows, + ) -> crate::Result> { if is_eoh || is_eos { - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } let Http2Buffer { hpack_dec, hpack_enc, pfb, streams_frames, .. } = self.hb.lease_mut(); for _ in 0.._max_continuation_frames!() { - let (fi, data) = rfr_until_resource!( + let (fi, data) = hre_until_resource!( read_frame_others( &mut self.hp, hpack_enc, + &mut self.hps, &mut self.is_conn_open, pfb, - &mut self.send_params, &mut self.stream, stream_id, stream_state, streams_frames, - &mut self.streams_num + &mut self.streams_num, + &mut WindowsPair::new(&mut self.windows, &mut *stream_windows) ) .await? ); let ci = ContinuationFrame::read(data, fi, &mut rrb.headers, hpack_size, hpack_dec)?; is_eoh = ci.is_eoh(); if is_eoh { - return Ok(ReadFrameRslt::Resource(())); + return Ok(Http2RsltExt::Resource(())); } } Err(crate::Error::VeryLargeAmountOfContinuationFrames) } #[inline] - pub(crate) fn send_params_mut(&mut self) -> &mut SendParams { - &mut self.send_params + pub(crate) fn send_params_mut(&mut self) -> &mut Http2ParamsSend { + &mut self.hps } #[inline] diff --git a/wtx/src/http2/http2_params.rs b/wtx/src/http2/http2_params.rs index e089c9c9..b47d2933 100644 --- a/wtx/src/http2/http2_params.rs +++ b/wtx/src/http2/http2_params.rs @@ -1,15 +1,16 @@ use crate::http2::{ - SettingsFrame, BODY_LEN_DEFAULT, BUFFERED_FRAMES_NUM_DEFAULT, CACHED_HEADERS_LEN_DEFAULT, - EXPANDED_HEADERS_LEN_DEFAULT, FRAME_LEN_DEFAULT, FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND, - INITIAL_WINDOW_LEN_DEFAULT, RAPID_RESETS_NUM_DEFAULT, READ_BUFFER_LEN_DEFAULT, - STREAMS_NUM_DEFAULT, U31, + SettingsFrame, MAX_BODY_LEN, MAX_BUFFERED_FRAMES_NUM, MAX_CACHED_HEADERS_LEN, + MAX_EXPANDED_HEADERS_LEN, MAX_FRAME_LEN, MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND, + MAX_RAPID_RESETS_NUM, MAX_STREAMS_NUM, READ_BUFFER_LEN, U31, }; -/// Tells how connections and streams should behave. +/// Indicates to a remote peer the receiving parameters of a connection as well as its streams. +/// +/// Also states some configurations for local structures. #[derive(Debug)] pub struct Http2Params { enable_connect_protocol: bool, - initial_window_len: u32, + initial_window_len: U31, max_body_len: u32, max_buffered_frames_num: u8, max_cached_headers_len: (u32, u32), @@ -26,42 +27,47 @@ impl Http2Params { /// Allows the execution of other protocols like WebSockets within HTTP/2 connections. At the /// current time this parameter has no effect. /// - /// Defaults to `false`. + /// Corresponds to `SETTINGS_ENABLE_CONNECT_PROTOCOL`. Defaults to `false`. pub fn enable_connect_protocol(&self) -> bool { self.enable_connect_protocol } - /// Initial window length. + /// Initial window length /// /// The initial amount of "credit" a counterpart can have for sending data. /// - /// Corresponds to `SETTINGS_INITIAL_WINDOW_SIZE`. Defaults to 512 KiB. Capped within - /// 0 ~ (2^31 - 1) bytes - pub fn initial_window_len(&self) -> u32 { + /// Corresponds to `SETTINGS_INITIAL_WINDOW_SIZE`. Capped within 0 ~ (2^31 - 1) bytes. Defaults + /// to + #[doc = concat!(initial_window_len!())] + /// bytes. + pub fn initial_window_len(&self) -> U31 { self.initial_window_len } - /// Maximum request/response body length. + /// Maximum request/response body length /// - /// Or the maximum size allowed for the sum of the length of all data frames. Also servers as an - /// upper bound for the window size of a stream. + /// Or the maximum size allowed for the sum of the length of all data frames. /// - /// Defaults to 4 MiB. + /// Defaults to + #[doc = concat!(max_body_len!())] + /// bytes. pub fn max_body_len(&self) -> u32 { self.max_body_len } - /// Maximum number of buffered frames per stream. + /// Maximum number of buffered frames per stream /// /// An implementation detail. Due to the concurrent nature of the HTTP/2 specification, it is /// necessary to temporally store received frames into an intermediary structure. /// - /// Defaults to 16 frames. + /// Defaults to + #[doc = concat!(max_buffered_frames_num!())] + /// bytes. pub fn max_buffered_frames_num(&self) -> u8 { self.max_buffered_frames_num } - /// Maximum cached headers length. + /// Maximum cached headers length /// /// Related to HPACK, indicates the maximum length of the structure that holds cached decoded /// headers received from a counterpart. @@ -71,41 +77,57 @@ impl Http2Params { /// - The second parameter indicates the maximum local HPACK ***encoder*** length. In other words, /// it doesn't allow external actors to dictate very large lengths. /// - /// Corresponds to `SETTINGS_HEADER_TABLE_SIZE`. Defaults to 8 KiB. + /// Corresponds to `SETTINGS_HEADER_TABLE_SIZE`. Defaults to + #[doc = concat!(max_cached_headers_len!())] + /// bytes. pub fn max_cached_headers_len(&self) -> (u32, u32) { self.max_cached_headers_len } - /// Maximum expanded headers length. + /// Maximum expanded headers length /// /// Or the maximum length of the final Request/Response header. Contents may or may not originate /// from the HPACK structure that holds cached decoded headers. /// - /// Corresponds to `SETTINGS_MAX_HEADER_LIST_SIZE`. Defaults to 4 KiB. + /// Corresponds to `SETTINGS_MAX_HEADER_LIST_SIZE`. Defaults to + #[doc = concat!(max_expanded_headers_len!())] + /// bytes. pub fn max_expanded_headers_len(&self) -> u32 { self.max_expanded_headers_len } - /// Maximum frame length. + /// Maximum frame length /// /// Avoids the reading of very large frames sent by external actors. /// - /// Corresponds to `SETTINGS_MAX_FRAME_SIZE`. Defaults to 1 MiB. Capped within 16 KiB ~ 16 MiB + /// Corresponds to `SETTINGS_MAX_FRAME_SIZE`. Capped within + #[doc = concat!(max_frame_len_lower_bound!())] + /// ~ + #[doc = concat!(max_frame_len_upper_bound!())] + /// bytes. Defaults to + #[doc = concat!(max_frame_len!())] + /// bytes. pub fn max_frame_len(&self) -> u32 { self.max_frame_len } - /// Maximum number of rapid resets. + /// Maximum number of rapid resets /// /// A rapid reset happens when a peer sends an initial header followed by a RST_STREAM frame. This /// parameter is used to avoid CVE-2023-44487. + /// + /// Defaults to + #[doc = concat!(max_rapid_resets_num!())] + /// rapid resets. pub fn max_rapid_resets_num(&self) -> u8 { self.max_rapid_resets_num } - /// Maximum number of active concurrent streams. + /// Maximum number of active concurrent streams /// - /// Corresponds to `SETTINGS_MAX_CONCURRENT_STREAMS`. Defaults to 1073741824 streams. + /// Corresponds to `SETTINGS_MAX_CONCURRENT_STREAMS`. Defaults to + #[doc = concat!(max_streams_num!())] + /// streams pub fn max_streams_num(&self) -> u32 { self.max_streams_num } @@ -114,14 +136,16 @@ impl Http2Params { /// /// Allocated space intended to read bytes sent by external actors. /// - /// Defaults to 4 MiB. + /// Defaults to + #[doc = concat!(read_buffer_len!())] + /// streams pub fn read_buffer_len(&self) -> u32 { self.read_buffer_len } /// Mutable version of [Self::initial_window_len]. - pub fn set_initial_window_len(&mut self, value: u32) -> &mut Self { - self.initial_window_len = value.clamp(0, U31::MAX.u32()); + pub fn set_initial_window_len(&mut self, value: U31) -> &mut Self { + self.initial_window_len = value; self } @@ -151,7 +175,7 @@ impl Http2Params { /// Mutable version of [Self::max_frame_len]. pub fn set_max_frame_len(&mut self, value: u32) -> &mut Self { - self.max_frame_len = value.clamp(FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND); + self.max_frame_len = value.clamp(MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND); self } @@ -189,15 +213,15 @@ impl Default for Http2Params { fn default() -> Self { Self { enable_connect_protocol: false, - initial_window_len: INITIAL_WINDOW_LEN_DEFAULT, - max_body_len: BODY_LEN_DEFAULT, - max_buffered_frames_num: BUFFERED_FRAMES_NUM_DEFAULT, - max_cached_headers_len: (CACHED_HEADERS_LEN_DEFAULT, CACHED_HEADERS_LEN_DEFAULT), - max_expanded_headers_len: EXPANDED_HEADERS_LEN_DEFAULT, - max_frame_len: FRAME_LEN_DEFAULT, - max_rapid_resets_num: RAPID_RESETS_NUM_DEFAULT, - max_streams_num: STREAMS_NUM_DEFAULT, - read_buffer_len: READ_BUFFER_LEN_DEFAULT, + initial_window_len: U31::from_u32(initial_window_len!()), + max_body_len: MAX_BODY_LEN, + max_buffered_frames_num: MAX_BUFFERED_FRAMES_NUM, + max_cached_headers_len: (MAX_CACHED_HEADERS_LEN, MAX_CACHED_HEADERS_LEN), + max_expanded_headers_len: MAX_EXPANDED_HEADERS_LEN, + max_frame_len: MAX_FRAME_LEN, + max_rapid_resets_num: MAX_RAPID_RESETS_NUM, + max_streams_num: MAX_STREAMS_NUM, + read_buffer_len: READ_BUFFER_LEN, } } } diff --git a/wtx/src/http2/send_params.rs b/wtx/src/http2/http2_params_send.rs similarity index 68% rename from wtx/src/http2/send_params.rs rename to wtx/src/http2/http2_params_send.rs index 5900a654..5e543fd4 100644 --- a/wtx/src/http2/send_params.rs +++ b/wtx/src/http2/http2_params_send.rs @@ -1,29 +1,32 @@ use crate::http2::{ - HpackEncoder, SettingsFrame, FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND, U31, + HpackEncoder, SettingsFrame, Windows, MAX_CACHED_HEADERS_LEN, MAX_FRAME_LEN, + MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND, U31, }; /// Parameters used when sending data. #[derive(Debug)] -pub(crate) struct SendParams { +pub(crate) struct Http2ParamsSend { pub(crate) enable_connect_protocol: u32, - pub(crate) initial_window_len: u32, + pub(crate) initial_window_len: U31, pub(crate) max_cached_headers_len: u32, pub(crate) max_expanded_headers_len: u32, pub(crate) max_frame_len: u32, pub(crate) max_streams_num: u32, } -impl SendParams { +impl Http2ParamsSend { pub(crate) fn update( &mut self, hpack_enc: &mut HpackEncoder, sf: &SettingsFrame, + windows: &mut Windows, ) -> crate::Result<()> { if let Some(elem) = sf.enable_connect_protocol() { self.enable_connect_protocol = u32::from(elem); } if let Some(elem) = sf.initial_window_size() { - self.initial_window_len = elem.clamp(0, U31::MAX.u32()); + windows.send.set(elem.i32())?; + self.initial_window_len = elem; } if let Some(elem) = sf.header_table_size() { self.max_cached_headers_len = elem; @@ -33,7 +36,7 @@ impl SendParams { self.max_expanded_headers_len = elem; } if let Some(elem) = sf.max_frame_size() { - self.max_frame_len = elem.clamp(FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND); + self.max_frame_len = elem.clamp(MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND); } if let Some(elem) = sf.max_concurrent_streams() { self.max_streams_num = elem; @@ -45,14 +48,14 @@ impl SendParams { /// It is not possible to use the same default values of `Http2Params` because, for sending /// purposes, the default values provided by the RFC must be used until a settings frame /// is received. -impl Default for SendParams { +impl Default for Http2ParamsSend { fn default() -> Self { Self { enable_connect_protocol: 0, - initial_window_len: 65_535, - max_cached_headers_len: 4_096, + initial_window_len: U31::from_u32(initial_window_len!()), + max_cached_headers_len: MAX_CACHED_HEADERS_LEN, max_expanded_headers_len: u32::MAX, - max_frame_len: 16_384, + max_frame_len: MAX_FRAME_LEN, max_streams_num: u32::MAX, } } diff --git a/wtx/src/http2/read_frame_rslt.rs b/wtx/src/http2/http2_rslt.rs similarity index 56% rename from wtx/src/http2/read_frame_rslt.rs rename to wtx/src/http2/http2_rslt.rs index 71c53528..b61b4bf8 100644 --- a/wtx/src/http2/read_frame_rslt.rs +++ b/wtx/src/http2/http2_rslt.rs @@ -1,17 +1,15 @@ -/// Read Frame Result +/// HTTP/2 Result #[derive(Debug)] -pub enum ReadFrameRslt { +pub enum Http2Rslt { /// When a GOAWAY frame is sent or received. ClosedConnection, /// When a RST_STREAM frame is sent or received. ClosedStream, - /// Remote part didn't send any frames - IdleConnection, /// Resource was successfully fetched. Resource(T), } -impl ReadFrameRslt { +impl Http2Rslt { /// Extracts a successful resource, if any. #[inline] pub fn resource(self) -> Option { @@ -22,3 +20,14 @@ impl ReadFrameRslt { } } } + +/// HTTP/2 Result - Extended +#[derive(Debug)] +pub(crate) enum Http2RsltExt { + ClosedConnection, + ClosedStream, + Resource(T), + // Read: Stream is not returning new data + // Write: Frame is awaiting an WINDOW_UPDATE frame + Idle, +} diff --git a/wtx/src/http2/macros.rs b/wtx/src/http2/macros.rs index 56268e15..04765ad0 100644 --- a/wtx/src/http2/macros.rs +++ b/wtx/src/http2/macros.rs @@ -1,49 +1,46 @@ -macro_rules! rfr_resource_or_return { +macro_rules! hre_resource_or_return { ($rfr:expr) => { match $rfr { - ReadFrameRslt::ClosedConnection => return Ok(ReadFrameRslt::ClosedConnection), - ReadFrameRslt::ClosedStream => return Ok(ReadFrameRslt::ClosedStream), - ReadFrameRslt::IdleConnection => return Ok(ReadFrameRslt::IdleConnection), - ReadFrameRslt::Resource(elem) => elem, + Http2RsltExt::ClosedConnection => return Ok(Http2RsltExt::ClosedConnection), + Http2RsltExt::ClosedStream => return Ok(Http2RsltExt::ClosedStream), + Http2RsltExt::Idle => return Ok(Http2RsltExt::Idle), + Http2RsltExt::Resource(elem) => elem, } }; } -macro_rules! rfr_until_resource { - ($rfr:expr) => {{ - let rfr_resource = 'rfr_resource: { - for _ in 0.._max_frames_mismatches!() { - match $rfr { - ReadFrameRslt::ClosedConnection => return Ok(ReadFrameRslt::ClosedConnection), - ReadFrameRslt::ClosedStream => return Ok(ReadFrameRslt::ClosedStream), - ReadFrameRslt::IdleConnection => continue, - ReadFrameRslt::Resource(elem) => break 'rfr_resource elem, - } +macro_rules! hre_to_hr { + ($lock:expr, |$guard:ident| $cb:expr $(, |$another_guard:ident, $rslt:ident| $rest:expr)?) => {{ + let rslt = loop { + let mut $guard = $lock.lock().await; + match $cb { + Http2RsltExt::ClosedConnection => return Ok(Http2Rslt::ClosedConnection), + Http2RsltExt::ClosedStream => return Ok(Http2Rslt::ClosedStream), + Http2RsltExt::Resource(elem) => { + $( + let mut $another_guard = $guard; + let $rslt = elem; + $rest; + let elem = $rslt; + )? + break elem; + }, + Http2RsltExt::Idle => continue, } - return Err(crate::Error::VeryLargeAmountOfFrameMismatches); }; - rfr_resource + rslt }}; } -macro_rules! rfr_until_resource_with_guard { - ($lock:expr, |$guard:ident| $cb:expr $(, |$another_guard:ident, $rslt:ident| $rest:expr)?) => {{ +macro_rules! hre_until_resource { + ($rfr:expr) => {{ let rfr_resource = 'rfr_resource: { for _ in 0.._max_frames_mismatches!() { - let mut $guard = $lock.lock().await; - match $cb { - ReadFrameRslt::ClosedConnection => return Ok(ReadFrameRslt::ClosedConnection), - ReadFrameRslt::ClosedStream => return Ok(ReadFrameRslt::ClosedStream), - ReadFrameRslt::IdleConnection => continue, - ReadFrameRslt::Resource(elem) => { - $( - let mut $another_guard = $guard; - let $rslt = elem; - $rest; - let elem = $rslt; - )? - break 'rfr_resource elem; - } + match $rfr { + Http2RsltExt::ClosedConnection => return Ok(Http2RsltExt::ClosedConnection), + Http2RsltExt::ClosedStream => return Ok(Http2RsltExt::ClosedStream), + Http2RsltExt::Idle => continue, + Http2RsltExt::Resource(elem) => break 'rfr_resource elem, } } return Err(crate::Error::VeryLargeAmountOfFrameMismatches); @@ -51,3 +48,59 @@ macro_rules! rfr_until_resource_with_guard { rfr_resource }}; } + +macro_rules! initial_window_len { + () => { + 65_535 + }; +} +macro_rules! max_body_len { + () => { + 131_070 + }; +} +macro_rules! max_buffered_frames_num { + () => { + 16 + }; +} +macro_rules! max_cached_headers_len { + () => { + 4_096 + }; +} +macro_rules! max_expanded_headers_len { + () => { + 4_096 + }; +} +macro_rules! max_frame_len { + () => { + 16_384 + }; +} +macro_rules! max_frame_len_lower_bound { + () => { + 16_384 + }; +} +macro_rules! max_frame_len_upper_bound { + () => { + 16_777_215 + }; +} +macro_rules! max_rapid_resets_num { + () => { + 16 + }; +} +macro_rules! max_streams_num { + () => { + 16 + }; +} +macro_rules! read_buffer_len { + () => { + 131_070 + }; +} diff --git a/wtx/src/http2/misc.rs b/wtx/src/http2/misc.rs index 2cd9c861..7acd6178 100644 --- a/wtx/src/http2/misc.rs +++ b/wtx/src/http2/misc.rs @@ -1,11 +1,14 @@ use crate::{ + http::Headers, http2::{ - send_params::SendParams, ErrorCode, FrameHeaderTy, FrameInit, GoAwayFrame, HpackEncoder, - Http2Buffer, Http2Params, PingFrame, ReadFrameRslt, ResetStreamFrame, SettingsFrame, + http2_params_send::Http2ParamsSend, window::WindowsPair, ErrorCode, FrameHeaderTy, FrameInit, + GoAwayFrame, HeadersFrame, HpackEncoder, HpackStaticRequestHeaders, HpackStaticResponseHeaders, + Http2Buffer, Http2Params, Http2RsltExt, PingFrame, ResetStreamFrame, SettingsFrame, StreamState, WindowUpdateFrame, PAD_MASK, U31, }, misc::{ - BlocksQueue, PartitionedFilledBuffer, PollOnce, Stream, Usize, _read_until, _unlikely_elem, + BlocksQueue, ByteVector, PartitionedFilledBuffer, PollOnce, Stream, Usize, _read_until, + _unlikely_elem, }, }; use core::pin::pin; @@ -30,17 +33,17 @@ pub(crate) async fn read_frame( is_conn_open: bool, pfb: &mut PartitionedFilledBuffer, stream: &mut S, -) -> crate::Result> +) -> crate::Result> where S: Stream, { if !is_conn_open { - return Ok(ReadFrameRslt::ClosedConnection); + return Ok(Http2RsltExt::ClosedConnection); } let mut read = pfb._following_len(); let buffer = pfb._following_trail_mut(); let Some(array) = PollOnce(pin!(_read_until::<9, _>(buffer, &mut read, 0, stream))).await else { - return Ok(ReadFrameRslt::IdleConnection); + return Ok(Http2RsltExt::Idle); }; let fi = FrameInit::from_array(array?)?; _trace!("Received frame: {fi:?}"); @@ -67,7 +70,7 @@ where *Usize::from(fi.data_len), read.wrapping_sub(*Usize::from(frame_len)), )?; - Ok(ReadFrameRslt::Resource(fi)) + Ok(Http2RsltExt::Resource(fi)) } /// Reads a non-initial frame that corresponds to the desired `stream_id` which is locally stored @@ -76,15 +79,16 @@ where pub(crate) async fn read_frame_others<'rslt, S>( hp: &mut Http2Params, hpack_enc: &mut HpackEncoder, + hps: &mut Http2ParamsSend, is_conn_open: &mut bool, pfb: &'rslt mut PartitionedFilledBuffer, - send_params: &mut SendParams, stream: &mut S, stream_id: U31, stream_state: &mut StreamState, streams_frames: &'rslt mut HashMap>, streams_num: &mut u32, -) -> crate::Result> + wp: &mut WindowsPair<'_>, +) -> crate::Result> where S: Stream, { @@ -94,19 +98,20 @@ where clippy::unwrap_used )] let (fi, data) = streams_frames.get_mut(&stream_id).unwrap().pop_back().unwrap(); - return Ok(ReadFrameRslt::Resource((fi, data))); + return Ok(Http2RsltExt::Resource((fi, data))); } - let fi = rfr_resource_or_return!( + let fi = hre_resource_or_return!( read_frame_until( hp, hpack_enc, + hps, is_conn_open, pfb, - send_params, stream, stream_id, stream_state, streams_num, + wp, |fi, local_hp, data| { read_frame_until_cb_known_id(data, fi, local_hp, stream_id, streams_frames) }, @@ -115,7 +120,7 @@ where .await? ); let rslt = (fi, pfb._current()); - Ok(ReadFrameRslt::Resource(rslt)) + Ok(Http2RsltExt::Resource(rslt)) } /// Fetches a frame until `cb` yields a positive boolean. @@ -123,47 +128,49 @@ where pub(crate) async fn read_frame_until( hp: &mut Http2Params, hpack_enc: &mut HpackEncoder, + hps: &mut Http2ParamsSend, is_conn_open: &mut bool, pfb: &mut PartitionedFilledBuffer, - send_params: &mut SendParams, stream: &mut S, stream_id: U31, stream_state: &mut StreamState, streams_num: &mut u32, + wp: &mut WindowsPair<'_>, mut loop_cb: impl FnMut(FrameInit, &Http2Params, &[u8]) -> crate::Result, mut reset_cb: impl FnMut(&Http2Params) -> crate::Result<()>, -) -> crate::Result> +) -> crate::Result> where S: Stream, { for _ in 0.._max_frames_mismatches!() { - let fi = rfr_resource_or_return!(read_frame(hp, *is_conn_open, pfb, stream).await?); + let fi = hre_resource_or_return!(read_frame(hp, *is_conn_open, pfb, stream).await?); if fi.stream_id == U31::ZERO { match fi.ty { FrameHeaderTy::GoAway => { let _ = GoAwayFrame::read(pfb._current(), fi)?; let go_away_frame = GoAwayFrame::new(ErrorCode::Cancel, stream_id); send_go_away(go_away_frame, (is_conn_open, stream)).await?; - return _unlikely_elem(Ok(ReadFrameRslt::ClosedConnection)); + return _unlikely_elem(Ok(Http2RsltExt::ClosedConnection)); } FrameHeaderTy::Ping => { let mut pf = PingFrame::read(pfb._current(), fi)?; if !pf.is_ack() { pf.set_ack(); - write_bytes([&pf.bytes()], *is_conn_open, stream).await?; + write_array([&pf.bytes()], *is_conn_open, stream).await?; } continue; } FrameHeaderTy::Settings => { let sf = SettingsFrame::read(pfb._current(), fi)?; if !sf.is_ack() { - send_params.update(hpack_enc, &sf)?; - write_bytes([SettingsFrame::ack().bytes(&mut [0; 45])], *is_conn_open, stream).await?; + hps.update(hpack_enc, &sf, wp.conn)?; + write_array([SettingsFrame::ack().bytes(&mut [0; 45])], *is_conn_open, stream).await?; } continue; } FrameHeaderTy::WindowUpdate => { - let _wuf = WindowUpdateFrame::read(pfb._current(), fi)?; + let wuf = WindowUpdateFrame::read(pfb._current(), fi)?; + wp.conn.send.deposit(wuf.size_increment().i32()); continue; } _ => return Err(ErrorCode::ProtocolError.into()), @@ -175,7 +182,7 @@ where return Ok(reset_stream(stream_state, streams_num)); } if loop_cb(fi, hp, pfb._current())? { - return Ok(ReadFrameRslt::Resource(fi)); + return Ok(Http2RsltExt::Resource(fi)); } pfb._clear_if_following_is_empty(); } @@ -224,10 +231,10 @@ pub(crate) fn read_frame_until_cb_unknown_id( pub(crate) fn reset_stream( stream_state: &mut StreamState, streams_num: &mut u32, -) -> ReadFrameRslt { +) -> Http2RsltExt { *stream_state = StreamState::Closed; *streams_num = streams_num.wrapping_sub(1); - return ReadFrameRslt::ClosedStream; + return Http2RsltExt::ClosedStream; } #[inline] @@ -238,7 +245,7 @@ pub(crate) async fn send_go_away( where S: Stream, { - write_bytes([go_away_frame.bytes().as_slice()], *is_conn_open, stream).await?; + write_array([go_away_frame.bytes().as_slice()], *is_conn_open, stream).await?; *is_conn_open = false; Ok(()) } @@ -253,7 +260,7 @@ where S: Stream, { *stream_state = StreamState::Closed; - write_bytes([reset_frame.bytes().as_slice()], *is_conn_open, stream).await?; + write_array([reset_frame.bytes().as_slice()], *is_conn_open, stream).await?; Ok(()) } @@ -275,8 +282,30 @@ pub(crate) fn trim_frame_pad(data: &mut &[u8], flags: u8) -> crate::Result( - frames: [&[u8]; N], +pub(crate) fn verify_before_send<'hsreqh, const IS_CLIENT: bool>( + headers: &Headers, + hpack_enc: &mut HpackEncoder, + hpack_enc_buffer: &mut ByteVector, + hps: &Http2ParamsSend, + (hsreqh, hsresh): (HpackStaticRequestHeaders<'hsreqh>, HpackStaticResponseHeaders), + stream_id: U31, +) -> crate::Result> { + hpack_enc_buffer.clear(); + if headers.bytes_len() > *Usize::from(hps.max_expanded_headers_len) { + return Err(crate::Error::VeryLargeHeadersLen); + } + hpack_enc_buffer.clear(); + if IS_CLIENT { + hpack_enc.encode(hpack_enc_buffer, hsreqh.iter(), headers.iter())?; + } else { + hpack_enc.encode(hpack_enc_buffer, hsresh.iter(), headers.iter())?; + } + Ok(HeadersFrame::new((hsreqh, hsresh), stream_id)) +} + +#[inline] +pub(crate) async fn write_array( + array: [&[u8]; N], is_conn_open: bool, stream: &mut S, ) -> crate::Result<()> @@ -289,7 +318,7 @@ where _trace!("Sending frame(s): {:?}", { let mut is_prev_init = false; let mut rslt = [None; N]; - for (elem, frame) in rslt.iter_mut().zip(frames.iter()) { + for (elem, frame) in rslt.iter_mut().zip(array.iter()) { if let ([a, b, c, d, e, f, g, h, i], false) = (frame, is_prev_init) { if let Ok(frame_init) = FrameInit::from_array([*a, *b, *c, *d, *e, *f, *g, *h, *i]) { is_prev_init = true; @@ -301,6 +330,6 @@ where } rslt }); - stream.write_all_vectored(frames).await?; + stream.write_all_vectored(array).await?; Ok(()) } diff --git a/wtx/src/http2/req_res_buffer.rs b/wtx/src/http2/req_res_buffer.rs index 80eaca2c..e6b93bfa 100644 --- a/wtx/src/http2/req_res_buffer.rs +++ b/wtx/src/http2/req_res_buffer.rs @@ -1,7 +1,7 @@ use crate::{ - http::{Headers, Method, RequestRef, Version}, - http2::{uri_buffer::MAX_URI_LEN, CACHED_HEADERS_LEN_LOWER_BOUND}, - misc::{ArrayString, ByteVector, UriRef, Usize}, + http::{Headers, Method, Request, Version}, + http2::uri_buffer::MAX_URI_LEN, + misc::{ArrayString, ByteVector, UriRef}, }; use alloc::boxed::Box; @@ -11,32 +11,47 @@ use alloc::boxed::Box; #[derive(Debug)] pub struct ReqResBuffer { /// See [ByteVector]. - pub data: ByteVector, + pub body: ByteVector, /// See [Headers]. pub headers: Headers, /// Scheme, authority and path. pub uri: Box>, } +// For servers, the default headers length must be used until a settings frame is received. impl Default for ReqResBuffer { fn default() -> Self { - let n = *Usize::from(CACHED_HEADERS_LEN_LOWER_BOUND); - Self { data: ByteVector::new(), headers: Headers::new(n), uri: Box::new(ArrayString::new()) } + Self { body: ByteVector::new(), headers: Headers::new(0), uri: Box::new(ArrayString::new()) } } } impl ReqResBuffer { + /// Shortcut that avoids having to call `with_capacity` on each field. + /// + /// Should be used if you are willing to manually push data. + pub fn with_capacity( + body: usize, + headers_bytes: usize, + headers_headers: usize, + headers_max_bytes: usize, + ) -> Self { + Self { + body: ByteVector::with_capacity(body), + headers: Headers::with_capacity(headers_bytes, headers_headers, headers_max_bytes), + uri: Box::new(ArrayString::new()), + } + } + #[inline] pub(crate) fn clear(&mut self) { - self.data.clear(); + self.body.clear(); self.headers.clear(); } - /// Shortcut to create a [RequestRef] with inner data. - pub fn as_http2_request_ref(&self, method: Method) -> RequestRef<'_, '_, '_, [u8]> { - RequestRef { - data: &self.data, - headers: &self.headers, + /// Shortcut to create a [RequestRef] with inner body. + pub fn as_http2_request(&self, method: Method) -> Request<(&ByteVector, &Headers), &str> { + Request { + data: (&self.body, &self.headers), method, uri: UriRef::new(self.uri.as_str()), version: Version::Http2, diff --git a/wtx/src/http2/server_stream.rs b/wtx/src/http2/server_stream.rs index f0302241..b75e84a9 100644 --- a/wtx/src/http2/server_stream.rs +++ b/wtx/src/http2/server_stream.rs @@ -1,11 +1,12 @@ use crate::{ - http::{Method, RequestMut, Response, ResponseData}, + http::{Headers, Method, ReqResData, Request, Response}, http2::{ http2_data::ReadFramesInit, - misc::{send_go_away, send_reset}, + misc::{send_go_away, send_reset, verify_before_send}, + window::WindowsPair, write_stream::write_stream, ErrorCode, GoAwayFrame, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, - Http2Data, ReadFrameRslt, ReqResBuffer, ResetStreamFrame, StreamState, U31, + Http2Data, Http2Rslt, Http2RsltExt, ReqResBuffer, ResetStreamFrame, StreamState, Windows, U31, }, misc::{AsyncBounds, ByteVector, Lease, LeaseMut, Lock, RefCounter, Stream, Uri, _Span}, }; @@ -24,15 +25,17 @@ pub struct ServerStream { span: _Span, stream_id: U31, stream_state: StreamState, + windows: Windows, } impl ServerStream { #[inline] - pub(crate) fn new( + pub(crate) const fn new( hd: HD, span: _Span, rfi: ReadFramesInit, stream_state: StreamState, + windows: Windows, ) -> Self { Self { hd, @@ -43,6 +46,7 @@ impl ServerStream { span, stream_id: rfi.stream_id, stream_state, + windows, } } } @@ -64,10 +68,10 @@ where pub async fn recv_req<'rrb>( &mut self, rrb: &'rrb mut ReqResBuffer, - ) -> crate::Result>> { + ) -> crate::Result>> { let _e = self.span._enter(); _trace!("Receiving request"); - rfr_until_resource_with_guard!(self.hd, |guard| { + hre_to_hr!(self.hd, |guard| { guard .read_frames_others( &mut self.hpack_size, @@ -75,13 +79,13 @@ where rrb, self.stream_id, &mut self.stream_state, + &mut self.windows, ) .await? }); self.stream_state = StreamState::HalfClosedRemote; - Ok(ReadFrameRslt::Resource(RequestMut::http2( - &mut rrb.data, - &mut rrb.headers, + Ok(Http2Rslt::Resource(Request::http2( + (&mut rrb.body, &mut rrb.headers), self.method, Uri::new(&*rrb.uri), ))) @@ -99,33 +103,46 @@ where /// Auxiliary high-level method that sends a response. #[inline] - pub async fn send_res(&mut self, res: Response) -> crate::Result<()> + pub async fn send_res(&mut self, res: Response) -> crate::Result> where - D: ResponseData, + D: ReqResData, D::Body: Lease<[u8]>, { let _e = self.span._enter(); _trace!("Sending response"); - let mut guard = self.hd.lock().await; - let (hb, is_conn_open, send_params, stream) = guard.parts_mut(); - write_stream::<_, false>( - res.data.body().lease(), - res.data.headers(), - &mut hb.hpack_enc, - &mut hb.hpack_enc_buffer, - ( - HpackStaticRequestHeaders::EMPTY, - HpackStaticResponseHeaders { status_code: Some(res.status_code) }, - ), - *is_conn_open, - send_params, - stream, - self.stream_id, - &mut self.stream_state, - ) - .await?; + let mut body = res.data.body().lease(); + let mut hf = { + let mut guard = self.hd.lock().await; + let (hb, _, _, hps, ..) = guard.parts_mut(); + verify_before_send::( + res.data.headers(), + &mut hb.hpack_enc, + &mut hb.hpack_enc_buffer, + hps, + ( + HpackStaticRequestHeaders::EMPTY, + HpackStaticResponseHeaders { status_code: Some(res.status_code) }, + ), + self.stream_id, + )? + }; + hre_to_hr!(self.hd, |guard| { + let (hb, is_conn_open, _, hps, stream, streams_num, windows) = guard.parts_mut(); + write_stream::<_, false>( + &mut body, + &mut hf, + &mut hb.hpack_enc_buffer, + hps, + *is_conn_open, + stream, + &mut self.stream_state, + streams_num, + &mut WindowsPair::new(windows, &mut self.windows), + ) + .await? + }); self.stream_state = StreamState::Closed; - Ok(()) + Ok(Http2Rslt::Resource(())) } /// Sends a stream reset to the peer, which cancels this stream. diff --git a/wtx/src/http2/settings_frame.rs b/wtx/src/http2/settings_frame.rs index 32e04f87..76e62a83 100644 --- a/wtx/src/http2/settings_frame.rs +++ b/wtx/src/http2/settings_frame.rs @@ -1,23 +1,19 @@ use crate::{ - http2::{FrameHeaderTy, FrameInit, ACK_MASK, FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND, U31}, + http2::{ + FrameHeaderTy, FrameInit, ACK_MASK, MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND, U31, + }, misc::{ArrayChunks, _unlikely_elem}, }; #[derive(Debug, Eq, PartialEq)] pub(crate) struct SettingsFrame { - // SETTINGS_ENABLE_CONNECT_PROTOCOL enable_connect_protocol: Option, flags: u8, - // SETTINGS_HEADER_TABLE_SIZE header_table_size: Option, - // SETTINGS_INITIAL_WINDOW_SIZE - initial_window_size: Option, + initial_window_size: Option, len: u8, - // SETTINGS_MAX_CONCURRENT_STREAMS max_concurrent_streams: Option, - // SETTINGS_MAX_FRAME_SIZE max_frame_size: Option, - // SETTINGS_MAX_HEADER_LIST_SIZE max_header_list_size: Option, } @@ -91,7 +87,7 @@ impl SettingsFrame { let mut idx: usize = 9; copy_bytes!(buffer, header_table_size.map(|el| bytes(1, el)), idx); copy_bytes!(buffer, max_concurrent_streams.map(|el| bytes(3, el)), idx); - copy_bytes!(buffer, initial_window_size.map(|el| bytes(4, el)), idx); + copy_bytes!(buffer, initial_window_size.map(|el| bytes(4, el.u32())), idx); copy_bytes!(buffer, max_frame_size.map(|el| bytes(5, el)), idx); copy_bytes!(buffer, max_header_list_size.map(|el| bytes(6, el)), idx); copy_bytes!(buffer, enable_connect_protocol.map(|el| bytes(8, u32::from(el))), idx); @@ -106,7 +102,7 @@ impl SettingsFrame { self.header_table_size } - pub(crate) fn initial_window_size(&self) -> Option { + pub(crate) fn initial_window_size(&self) -> Option { self.initial_window_size } @@ -177,7 +173,7 @@ impl SettingsFrame { *max_concurrent_streams = Some(elem); } Setting::MaxFrameSize(elem) => { - if (FRAME_LEN_LOWER_BOUND..=FRAME_LEN_UPPER_BOUND).contains(&elem) { + if (MAX_FRAME_LEN_LOWER_BOUND..=MAX_FRAME_LEN_UPPER_BOUND).contains(&elem) { *max_frame_size = Some(elem); } else { return Err(crate::http2::ErrorCode::ProtocolError.into()); @@ -192,16 +188,19 @@ impl SettingsFrame { if enable_connect_protocol.is_some() { *len = len.wrapping_add(6); } - for _ in [ - header_table_size, - initial_window_size, - max_concurrent_streams, - max_frame_size, - max_header_list_size, - ] - .into_iter() - .flatten() - { + if header_table_size.is_some() { + *len = len.wrapping_add(6); + } + if initial_window_size.is_some() { + *len = len.wrapping_add(6); + } + if max_concurrent_streams.is_some() { + *len = len.wrapping_add(6); + } + if max_frame_size.is_some() { + *len = len.wrapping_add(6); + } + if max_header_list_size.is_some() { *len = len.wrapping_add(6); } @@ -218,9 +217,9 @@ impl SettingsFrame { self.header_table_size = elem; } - pub(crate) fn set_initial_window_size(&mut self, elem: Option) { + pub(crate) fn set_initial_window_size(&mut self, elem: Option) { Self::update_len(&mut self.len, self.initial_window_size, elem); - self.initial_window_size = elem.map(|val| val.clamp(0, U31::MAX.u32())); + self.initial_window_size = elem.map(|val| val); } pub(crate) fn set_max_concurrent_streams(&mut self, elem: Option) { @@ -230,7 +229,8 @@ impl SettingsFrame { pub(crate) fn set_max_frame_size(&mut self, elem: Option) { Self::update_len(&mut self.len, self.max_frame_size, elem); - self.max_frame_size = elem.map(|val| val.clamp(FRAME_LEN_LOWER_BOUND, FRAME_LEN_UPPER_BOUND)); + self.max_frame_size = + elem.map(|val| val.clamp(MAX_FRAME_LEN_LOWER_BOUND, MAX_FRAME_LEN_UPPER_BOUND)); } pub(crate) fn set_max_header_list_size(&mut self, elem: Option) { @@ -256,7 +256,7 @@ impl SettingsFrame { enum Setting { EnableConnectProtocol(bool), HeaderTableSize(u32), - InitialWindowSize(u32), + InitialWindowSize(U31), MaxConcurrentStreams(u32), MaxFrameSize(u32), MaxHeaderListSize(u32), @@ -272,7 +272,7 @@ impl Setting { Ok(match id { 1 => Self::HeaderTableSize(value), 3 => Self::MaxConcurrentStreams(value), - 4 => Self::InitialWindowSize(value), + 4 => Self::InitialWindowSize(U31::from_u32(value)), 5 => Self::MaxFrameSize(value), 6 => Self::MaxHeaderListSize(value), 8 => Self::EnableConnectProtocol(value != 0), diff --git a/wtx/src/http2/tests/hpack.rs b/wtx/src/http2/tests/hpack.rs index aa26465b..fe1b2f4d 100644 --- a/wtx/src/http2/tests/hpack.rs +++ b/wtx/src/http2/tests/hpack.rs @@ -1,6 +1,6 @@ use crate::{ http::StatusCode, - http2::{HpackDecoder, HpackEncoder, HpackHeaderBasic, CACHED_HEADERS_LEN_DEFAULT}, + http2::{HpackDecoder, HpackEncoder, HpackHeaderBasic, MAX_CACHED_HEADERS_LEN}, misc::{from_utf8_basic, ByteVector, Vector}, rng::StaticRng, }; @@ -188,8 +188,8 @@ fn test_story_encoding_and_decoding( decoder.set_max_bytes(size); encoder.set_max_dyn_sub_bytes(size).unwrap(); } else { - decoder.set_max_bytes(CACHED_HEADERS_LEN_DEFAULT); - encoder.set_max_dyn_sub_bytes(CACHED_HEADERS_LEN_DEFAULT).unwrap(); + decoder.set_max_bytes(MAX_CACHED_HEADERS_LEN); + encoder.set_max_dyn_sub_bytes(MAX_CACHED_HEADERS_LEN).unwrap(); } let mut pseudo_headers = case diff --git a/wtx/src/http2/u31.rs b/wtx/src/http2/u31.rs index 8a35e24b..de11dea3 100644 --- a/wtx/src/http2/u31.rs +++ b/wtx/src/http2/u31.rs @@ -12,7 +12,11 @@ impl U31 { pub(crate) const TWO: Self = Self(2); pub(crate) const MAX: Self = Self(2_147_483_647); - pub(crate) const fn new(value: u32) -> Self { + pub(crate) const fn from_i32(value: i32) -> Self { + Self(value.unsigned_abs()) + } + + pub(crate) const fn from_u32(value: u32) -> Self { Self(value & MASK) } @@ -28,6 +32,10 @@ impl U31 { self.0.to_be_bytes() } + pub(crate) const fn i32(self) -> i32 { + self.0 as i32 + } + pub(crate) const fn u32(self) -> u32 { self.0 } diff --git a/wtx/src/http2/window.rs b/wtx/src/http2/window.rs new file mode 100644 index 00000000..506987de --- /dev/null +++ b/wtx/src/http2/window.rs @@ -0,0 +1,162 @@ +use crate::{ + http2::{ + http2_params_send::Http2ParamsSend, misc::write_array, Http2Params, WindowUpdateFrame, U31, + }, + misc::Stream, +}; + +/// A "credit" system used to restrain the exchange of data. +#[derive(Debug)] +pub(crate) struct Window { + applied: i32, + total: i32, +} + +impl Window { + #[inline] + pub(crate) fn new(total: i32) -> Self { + Self { applied: total, total } + } + + #[inline] + pub(crate) fn deposit(&mut self, value: i32) { + self.applied = self.applied.wrapping_add(value); + } + + #[inline] + pub(crate) fn diff(&self) -> i32 { + self.total.wrapping_sub(self.applied) + } + + #[inline] + pub(crate) fn is_invalid(&self) -> bool { + self.applied <= 0 + } + + pub(crate) fn set(&mut self, value: i32) -> crate::Result<()> { + if value < self.total { + return Err(crate::Error::WindowSizeCanNotBeReduced); + }; + self.total = value; + Ok(()) + } + + #[inline] + fn withdrawn(&mut self, value: i32) { + self.applied = self.applied.wrapping_sub(value); + } +} + +#[derive(Debug)] +pub(crate) struct Windows { + /// Parameters used to received data. It is defined locally. + pub(crate) recv: Window, + /// Parameters used to send data. It is initially defined locally with default parameters + /// and then defined by a remote peer. + pub(crate) send: Window, +} + +impl Windows { + /// Used in initial connections. Sending parameters are only known when a settings frame is received. + #[inline] + pub(crate) fn conn(hp: &Http2Params) -> Self { + Self { + recv: Window::new(hp.initial_window_len().i32()), + send: Window::new(initial_window_len!()), + } + } + + /// Used in initial streams. + #[inline] + pub(crate) fn stream(hp: &Http2Params, hps: &Http2ParamsSend) -> Self { + Self { + recv: Window::new(hp.initial_window_len().i32()), + send: Window::new(hps.initial_window_len.i32()), + } + } +} + +#[derive(Debug)] +pub(crate) struct WindowsPair<'any> { + pub(crate) conn: &'any mut Windows, + pub(crate) stream: &'any mut Windows, +} + +impl<'any> WindowsPair<'any> { + pub(crate) fn new(conn: &'any mut Windows, stream: &'any mut Windows) -> Self { + Self { conn, stream } + } + + #[inline] + pub(crate) async fn manage_recv( + &mut self, + is_conn_open: bool, + stream: &mut S, + stream_id: U31, + value: U31, + ) -> crate::Result<()> + where + S: Stream, + { + self.conn.recv.withdrawn(value.i32()); + self.stream.recv.withdrawn(value.i32()); + match (self.conn.recv.is_invalid(), self.stream.recv.is_invalid()) { + (false, false) => {} + (false, true) => { + let conn_diff = self.conn.recv.diff(); + self.conn.recv.deposit(conn_diff); + write_array( + [&WindowUpdateFrame::new(U31::from_i32(conn_diff), U31::ZERO).bytes()], + is_conn_open, + stream, + ) + .await?; + } + (true, false) => { + let stream_diff = self.stream.recv.diff(); + self.stream.recv.deposit(stream_diff); + write_array( + [&WindowUpdateFrame::new(U31::from_i32(stream_diff), stream_id).bytes()], + is_conn_open, + stream, + ) + .await?; + } + (true, true) => { + let conn_diff = self.conn.recv.diff(); + let stream_diff = self.stream.recv.diff(); + self.conn.recv.deposit(conn_diff); + self.stream.recv.deposit(stream_diff); + write_array( + [ + &WindowUpdateFrame::new(U31::from_i32(conn_diff), U31::ZERO).bytes(), + &WindowUpdateFrame::new(U31::from_i32(stream_diff), stream_id).bytes(), + ], + is_conn_open, + stream, + ) + .await?; + } + } + Ok(()) + } + + #[inline] + pub(crate) fn manage_send(&mut self, value: U31) -> bool { + self.conn.send.withdrawn(value.i32()); + self.stream.send.withdrawn(value.i32()); + if !self.conn.send.is_invalid() && !self.stream.send.is_invalid() { + true + } else { + self.conn.send.deposit(value.i32()); + self.stream.send.deposit(value.i32()); + false + } + } + + // Sending data based on received parameters + #[inline] + pub(crate) fn is_invalid_send(&self) -> bool { + self.conn.send.is_invalid() || self.stream.send.is_invalid() + } +} diff --git a/wtx/src/http2/window_update_frame.rs b/wtx/src/http2/window_update_frame.rs index 3da26d21..298113e9 100644 --- a/wtx/src/http2/window_update_frame.rs +++ b/wtx/src/http2/window_update_frame.rs @@ -7,21 +7,33 @@ pub(crate) struct WindowUpdateFrame { } impl WindowUpdateFrame { + #[inline] + pub(crate) fn new(size_increment: U31, stream_id: U31) -> Self { + Self { size_increment, stream_id } + } + + #[inline] pub(crate) fn read(bytes: &[u8], fi: FrameInit) -> crate::Result { let [a, b, c, d] = bytes else { return Err(crate::http2::ErrorCode::FrameSizeError.into()); }; - let size_increment = U31::new(u32::from_be_bytes([*a, *b, *c, *d])); + let size_increment = U31::from_u32(u32::from_be_bytes([*a, *b, *c, *d])); if size_increment.is_zero() { return Err(crate::http2::ErrorCode::ProtocolError.into()); } Ok(Self { size_increment, stream_id: fi.stream_id }) } - pub(crate) fn _bytes(&self) -> [u8; 13] { + #[inline] + pub(crate) fn bytes(&self) -> [u8; 13] { let [a, b, c, d, e, f, g, h, i] = FrameInit::new(4, 0, self.stream_id, FrameHeaderTy::WindowUpdate).bytes(); let [j, k, l, m] = self.size_increment.to_be_bytes(); [a, b, c, d, e, f, g, h, i, j, k, l, m] } + + #[inline] + pub(crate) fn size_increment(&self) -> U31 { + self.size_increment + } } diff --git a/wtx/src/http2/write_stream.rs b/wtx/src/http2/write_stream.rs index 6fb1ec3c..ec3918b6 100644 --- a/wtx/src/http2/write_stream.rs +++ b/wtx/src/http2/write_stream.rs @@ -1,23 +1,36 @@ +macro_rules! manage_send { + ($before_body:expr, $body:expr, $buffer:expr, $wp:expr) => { + if !$wp.manage_send(U31::from_u32(u32::from_be_bytes([0, $buffer[1], $buffer[2], $buffer[3]]))) + { + *$body = $before_body; + return Ok(Http2RsltExt::Idle); + } + }; +} + macro_rules! write_data_frames { ( - ($body:expr, $is_conn_open:expr, $send_params:expr, $stream:expr, $stream_id:expr), + ($body:expr, $hps:expr, $is_conn_open:expr, $stream:expr, $stream_id:expr, $wp:expr), || $write_none_rslt:block, |$write_one:ident| $write_one_rslt:block, |$write_two:ident| $write_two_rslt:block ) => {{ - let mut iter = $body.chunks(*Usize::from($send_params.max_frame_len)); + let mut iter = $body.chunks(*Usize::from($hps.max_frame_len)); loop { let should_stop = write_generic_frames!( - iter, - |bytes: &[u8]| DataFrame::new(u32::try_from(bytes.len()).unwrap_or_default(), $stream_id), + ($body, iter, $wp), + |bytes: &[u8]| DataFrame::new( + U31::from_u32(u32::try_from(bytes.len()).unwrap_or_default()), + $stream_id, + ), |frame: &mut DataFrame| frame.set_eos(), || $write_none_rslt, - |array| { - let $write_one = array; + |tuple| { + let $write_one = tuple; $write_one_rslt; }, - |array| { - let $write_two = array; + |tuple| { + let $write_two = tuple; $write_two_rslt; } ); @@ -30,28 +43,33 @@ macro_rules! write_data_frames { macro_rules! write_generic_frames { ( - $iter:expr, + ($body:expr, $iter:expr, $wp:expr), $frame_cb:expr, $end_cb:expr, || $write_none_rslt:block, |$write_one:ident| $write_one_rslt:block, |$write_two:ident| $write_two_rslt:block - ) => { + ) => {{ + let before_first = *$body; if let Some(first) = $iter.next() { let mut first_frame = $frame_cb(first); let mut first_init_buffer = [0; 9]; + let before_second = *$body; if let Some(second) = $iter.next() { adjust_frame_init(first, first_frame.bytes(), &mut first_init_buffer); + manage_send!(before_first, $body, first_init_buffer, $wp); let mut second_frame = $frame_cb(second); let mut second_init_buffer = [0; 9]; if $iter.len() == 0 { $end_cb(&mut second_frame); adjust_frame_init(second, second_frame.bytes(), &mut second_init_buffer); + manage_send!(before_second, $body, second_init_buffer, $wp); let $write_two = (first_init_buffer, first, second_init_buffer, second); $write_two_rslt; true } else { adjust_frame_init(second, second_frame.bytes(), &mut second_init_buffer); + manage_send!(before_second, $body, second_init_buffer, $wp); let $write_two = (first_init_buffer, first, second_init_buffer, second); $write_two_rslt; false @@ -59,6 +77,7 @@ macro_rules! write_generic_frames { } else { $end_cb(&mut first_frame); adjust_frame_init(first, first_frame.bytes(), &mut first_init_buffer); + manage_send!(before_first, $body, first_init_buffer, $wp); let $write_one = (first_init_buffer, first); $write_one_rslt; true @@ -67,121 +86,123 @@ macro_rules! write_generic_frames { $write_none_rslt; true } - }; + }}; } use crate::{ - http::Headers, http2::{ - misc::write_bytes, send_params::SendParams, ContinuationFrame, DataFrame, HeadersFrame, - HpackEncoder, HpackStaticRequestHeaders, HpackStaticResponseHeaders, StreamState, U31, + http2_params_send::Http2ParamsSend, + http2_rslt::Http2RsltExt, + misc::{reset_stream, write_array}, + window::WindowsPair, + ContinuationFrame, DataFrame, HeadersFrame, StreamState, U31, }, misc::{AsyncBounds, ByteVector, Stream, Usize}, }; #[inline] pub(crate) async fn write_stream( - body: &[u8], - headers: &Headers, - hpack_enc: &mut HpackEncoder, + body: &mut &[u8], + hf: &mut HeadersFrame<'_>, hpack_enc_buffer: &mut ByteVector, - (hsreqh, hsresh): (HpackStaticRequestHeaders<'_>, HpackStaticResponseHeaders), + hps: &Http2ParamsSend, is_conn_open: bool, - send_params: &SendParams, stream: &mut S, - stream_id: U31, stream_state: &mut StreamState, -) -> crate::Result<()> + streams_num: &mut u32, + wp: &mut WindowsPair<'_>, +) -> crate::Result> where S: AsyncBounds + Stream, { + if wp.is_invalid_send() { + return Ok(Http2RsltExt::Idle); + } let can_start_sending = if IS_CLIENT { matches!(stream_state, StreamState::Idle | StreamState::HalfClosedLocal) } else { matches!(stream_state, StreamState::Idle | StreamState::HalfClosedRemote) }; if !can_start_sending { - return Ok(()); + return Err(crate::Error::InvalidStreamState); } - if headers.bytes_len() > *Usize::from(send_params.max_expanded_headers_len) { - return Err(crate::Error::VeryLargeHeadersLen); - } + let stream_id = hf.stream_id(); - hpack_enc_buffer.clear(); - if IS_CLIENT { - hpack_enc.encode(hpack_enc_buffer, hsreqh.iter(), headers.iter())?; - } else { - hpack_enc.encode(hpack_enc_buffer, hsresh.iter(), headers.iter())?; - } - - let hf = &mut HeadersFrame::new((hsreqh, hsresh), stream_id); if body.is_empty() { hf.set_eos(); } if hpack_enc_buffer.is_empty() { - write_single_header(body, hf, &[], is_conn_open, send_params, stream, stream_id).await?; + hf.set_eoh(); + hre_resource_or_return!( + write_init_header(body, hf, &[], hps, is_conn_open, stream, stream_id, wp).await? + ); *stream_state = StreamState::Open; - return Ok(()); + return Ok(Http2RsltExt::Resource(())); } - let mut iter = hpack_enc_buffer.chunks(*Usize::from(send_params.max_frame_len)); + let mut iter = hpack_enc_buffer.chunks(*Usize::from(hps.max_frame_len)); if let Some(first) = iter.next() { - write_single_header(body, hf, first, is_conn_open, send_params, stream, stream_id).await?; + if iter.len() == 0 { + hf.set_eoh(); + } + hre_resource_or_return!( + write_init_header(body, hf, first, hps, is_conn_open, stream, stream_id, wp).await? + ); *stream_state = StreamState::Open; } else { - return Ok(()); + return Ok(Http2RsltExt::Resource(())); } for _ in 0.._max_continuation_frames!() { let should_stop = write_generic_frames!( - iter, + (body, iter, wp), |_| ContinuationFrame::new(stream_id), |frame: &mut ContinuationFrame| frame.set_eoh(), || {}, |header_array| { let (a, b) = header_array; write_data_frames!( - (body, is_conn_open, send_params, stream, stream_id), + (body, hps, is_conn_open, stream, stream_id, wp), || { - write_bytes([&a, b], is_conn_open, stream).await?; + write_array([&a, b], is_conn_open, stream).await?; }, |data_array| { let (c, d) = data_array; - write_bytes([&a, b, &c, d], is_conn_open, stream).await?; + write_array([&a, b, &c, d], is_conn_open, stream).await?; }, |data_array| { let (c, d, e, f) = data_array; - write_bytes([&a, b, &c, d, &e, f], is_conn_open, stream).await?; + write_array([&a, b, &c, d, &e, f], is_conn_open, stream).await?; } ); }, |header_array| { let (a, b, c, d) = header_array; write_data_frames!( - (body, is_conn_open, send_params, stream, stream_id), + (body, hps, is_conn_open, stream, stream_id, wp), || { - write_bytes([&a, b], is_conn_open, stream).await?; + write_array([&a, b], is_conn_open, stream).await?; }, |data_array| { let (e, f) = data_array; - write_bytes([&a, b, &c, d, &e, f], is_conn_open, stream).await?; + write_array([&a, b, &c, d, &e, f], is_conn_open, stream).await?; }, |data_array| { let (e, f, g, h) = data_array; - write_bytes([&a, b, &c, d, &e, f, &g, h], is_conn_open, stream).await?; + write_array([&a, b, &c, d, &e, f, &g, h], is_conn_open, stream).await?; } ); } ); if should_stop { - return Ok(()); + return Ok(Http2RsltExt::Resource(())); } } - Ok(()) + Ok(reset_stream(stream_state, streams_num)) } #[inline] @@ -201,44 +222,44 @@ fn adjust_frame_init(content: &[u8], frame_init: [u8; 9], frame_init_buffer: &mu } #[inline] -async fn write_single_header( - body: &[u8], +async fn write_init_header( + body: &mut &[u8], hf: &mut HeadersFrame<'_>, hf_content: &[u8], + hps: &Http2ParamsSend, is_conn_open: bool, - send_params: &SendParams, stream: &mut S, stream_id: U31, -) -> crate::Result<()> + wp: &mut WindowsPair<'_>, +) -> crate::Result> where S: Stream, { let init_buffer = &mut [0; 9]; - hf.set_eoh(); adjust_frame_init(hf_content, hf.bytes(), init_buffer); write_data_frames!( - (body, is_conn_open, send_params, stream, stream_id), + (body, hps, is_conn_open, stream, stream_id, wp), || { - write_bytes([init_buffer, hf_content], is_conn_open, stream).await?; + write_array([init_buffer, hf_content], is_conn_open, stream).await?; }, |data_array| { let (a, b) = data_array; - write_bytes([init_buffer, hf_content, &a, b], is_conn_open, stream).await?; + write_array([init_buffer, hf_content, &a, b], is_conn_open, stream).await?; }, |data_array| { let (a, b, c, d) = data_array; - write_bytes([init_buffer, hf_content, &a, b, &c, d], is_conn_open, stream).await?; + write_array([init_buffer, hf_content, &a, b, &c, d], is_conn_open, stream).await?; } ); - return Ok(()); + return Ok(Http2RsltExt::Resource(())); } #[cfg(test)] mod tests { use crate::{ - http::{Headers, Method, Request, Response, ResponseData, StatusCode}, - http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReadFrameRslt, ReqResBuffer}, + http::{Headers, Method, ReqResData, RequestStr, Response, StatusCode}, + http2::{ErrorCode, Http2Buffer, Http2Params, Http2Tokio, ReqResBuffer}, misc::{UriString, Vector, _uri}, rng::StaticRng, }; @@ -262,9 +283,7 @@ mod tests { ) .await .unwrap(); - let mut rrb = ReqResBuffer::default(); - rrb.data.reserve(3); - rrb.headers.reserve(6, 1); + let mut rrb = ReqResBuffer::with_capacity(3, 6, 1, 6); let res = stream_client(&mut client, &mut rrb).await; _0(res.data.body(), res.data.headers()); @@ -275,12 +294,12 @@ mod tests { _1(res.data.body(), res.data.headers()); rrb.clear(); - rrb.data.extend_from_slice(b"123").unwrap(); + rrb.body.extend_from_slice(b"123").unwrap(); let res = stream_client(&mut client, &mut rrb).await; _2(res.data.body(), res.data.headers()); rrb.clear(); - rrb.data.extend_from_slice(b"123").unwrap(); + rrb.body.extend_from_slice(b"123").unwrap(); rrb.headers.push_front(b"123", b"456", false).unwrap(); let res = stream_client(&mut client, &mut rrb).await; _3(res.data.body(), res.data.headers()); @@ -299,19 +318,19 @@ mod tests { let mut rrb = ReqResBuffer::default(); stream_server(&mut rrb, &mut server, |req| { - _0(req.data, req.headers); + _0(req.data.body(), req.data.headers()); }) .await; stream_server(&mut rrb, &mut server, |req| { - _1(req.data, req.headers); + _1(req.data.body(), req.data.headers()); }) .await; stream_server(&mut rrb, &mut server, |req| { - _2(req.data, req.headers); + _2(req.data.body(), req.data.headers()); }) .await; stream_server(&mut rrb, &mut server, |req| { - _3(req.data, req.headers); + _3(req.data.body(), req.data.headers()); }) .await; @@ -322,22 +341,18 @@ mod tests { async fn stream_server<'rrb>( rrb: &'rrb mut ReqResBuffer, server: &mut Http2Tokio, - mut cb: impl FnMut(&Request<&mut Vector, &mut Headers, &str>), + mut cb: impl FnMut(&RequestStr<'_, (&mut Vector, &mut Headers)>), ) { loop { - let rfr = server.stream(rrb).await.unwrap(); - let mut stream = match rfr { - ReadFrameRslt::ClosedConnection | ReadFrameRslt::ClosedStream => { - panic!(); - } - ReadFrameRslt::IdleConnection => { - continue; - } - ReadFrameRslt::Resource(elem) => elem, - }; + let mut stream = server.stream(rrb).await.unwrap().resource().unwrap(); let req = stream.recv_req(rrb).await.unwrap().resource().unwrap(); cb(&req); - stream.send_res(Response::http2((&req.data, &req.headers), StatusCode::Ok)).await.unwrap(); + stream + .send_res(Response::http2(&req.data, StatusCode::Ok)) + .await + .unwrap() + .resource() + .unwrap(); break; } } @@ -347,7 +362,7 @@ mod tests { rrb: &'rrb mut ReqResBuffer, ) -> Response<&'rrb mut ReqResBuffer> { let mut stream = client.stream().await.unwrap(); - stream.send_req(rrb.as_http2_request_ref(Method::Get)).await.unwrap(); + stream.send_req(rrb.as_http2_request(Method::Get)).await.unwrap().resource().unwrap(); stream.recv_res(rrb).await.unwrap().resource().unwrap() } diff --git a/wtx/src/misc/array_chunks.rs b/wtx/src/misc/array_chunks.rs index 4bb19323..2a466614 100644 --- a/wtx/src/misc/array_chunks.rs +++ b/wtx/src/misc/array_chunks.rs @@ -1,5 +1,6 @@ use core::{ iter::FusedIterator, + mem::size_of, slice::{self, Iter, IterMut}, }; @@ -33,7 +34,9 @@ macro_rules! create_and_impl { /// the slice. pub fn new(slice: $slice) -> Self { const { - assert!(N != 0); + if N == 0 || size_of::() == 0 { + panic!(); + } } let len = slice.len() / N; let (multiple_of_n, remainder) = slice.$split_method(len * N); @@ -42,11 +45,30 @@ macro_rules! create_and_impl { Self { iter: arrays.$iter_method(), remainder } } - /// Returns the remainder of the original slice that is not going to be returned by the iterator. + /// Owned version of [Self::remainder] that can return mutable or immutable slices. #[inline] pub fn into_remainder(self) -> $slice { self.remainder } + + /// Returns the remainder of the original slice that is not going to be returned by the iterator. + #[inline] + pub fn remainder(&self) -> &[T] { + &self.remainder + } + + /// Views the underlying data as a subslice of the original data. + #[inline] + pub fn slice(&self) -> &[T] { + let slice = self.iter.as_slice(); + // SAFETY: `T` is not a ZST and the slice is already in the same address space + unsafe { + slice::from_raw_parts( + slice.as_ptr().cast(), + slice.len().unchecked_mul(N).unchecked_add(self.remainder.len()) + ) + } + } } impl<'slice, T, const N: usize> DoubleEndedIterator for $name<'slice, T, N> { @@ -121,3 +143,27 @@ create_and_impl!( split_at_mut, &'slice mut [T] ); + +#[cfg(test)] +mod tests { + use crate::misc::ArrayChunks; + + #[test] + fn basic_usage() { + let mut iter = ArrayChunks::new(&[1, 2, 3, 4, 5]); + assert_eq!(iter.slice(), &[1, 2, 3, 4, 5]); + assert_eq!(iter.remainder(), &[5]); + + assert_eq!(iter.next(), Some(&[1, 2])); + assert_eq!(iter.slice(), &[3, 4, 5]); + assert_eq!(iter.remainder(), &[5]); + + assert_eq!(iter.next(), Some(&[3, 4])); + assert_eq!(iter.slice(), &[5]); + assert_eq!(iter.remainder(), &[5]); + + assert_eq!(iter.next(), None); + assert_eq!(iter.slice(), &[5]); + assert_eq!(iter.remainder(), &[5]); + } +} diff --git a/wtx/src/misc/fx_hasher.rs b/wtx/src/misc/fx_hasher.rs index c8b576c3..41649233 100644 --- a/wtx/src/misc/fx_hasher.rs +++ b/wtx/src/misc/fx_hasher.rs @@ -2,7 +2,7 @@ use core::{hash::Hasher, ops::BitXor}; const K: u64 = 0x517c_c1b7_2722_0a95; -/// https://nnethercote.github.io/2021/12/08/a-brutally-effective-hash-function-in-rust.html +/// /// /// Has a fixed output standard, as such, it can be used in algorithms where a hash needs to be /// sent over the network, or persisted. diff --git a/wtx/src/misc/uri.rs b/wtx/src/misc/uri.rs index dacd69cc..4666eeb9 100644 --- a/wtx/src/misc/uri.rs +++ b/wtx/src/misc/uri.rs @@ -16,7 +16,7 @@ pub type UriString = Uri; /// ```txt /// foo://user:password@hostname:80/path?query=value#hash /// ``` -#[derive(Eq, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] pub struct Uri { authority_start_idx: u16, href_start_idx: u16, @@ -158,7 +158,7 @@ where .unwrap_or_default() } - /// See [UriPartsRef]. + /// See [UriRef]. #[inline] pub fn to_ref(&self) -> UriRef<'_> { UriRef { @@ -169,7 +169,7 @@ where } } - /// See [UriPartsString]. + /// See [UriString]. #[inline] pub fn to_string(&self) -> UriString { UriString {