From 2e45c845a79f7dbf88377e9d31ca501684d793c6 Mon Sep 17 00:00:00 2001 From: Caio Date: Sat, 21 Dec 2024 18:33:35 -0300 Subject: [PATCH] Fix #288 --- .gitignore | 1 + Cargo.lock | 109 ++++++++-------- .../generic-examples/client-api-framework.rs | 8 +- wtx-instances/src/bin/autobahn-client.rs | 8 +- wtx-instances/src/bin/autobahn-server.rs | 4 +- .../web-socket-examples/web-socket-client.rs | 4 +- .../web-socket-concurrent-client.rs | 12 +- .../web-socket-examples/web-socket-server.rs | 4 +- wtx-macros/tests/ui/pass/custom_transport.rs | 34 ++--- wtx-ui/src/web_socket.rs | 6 +- wtx/Cargo.toml | 3 +- .../client_api_framework/network/transport.rs | 122 ++---------------- .../network/transport/mock.rs | 53 +++++--- ...bi_transport.rs => recieving_transport.rs} | 24 ++-- .../transport/sending_recieving_transport.rs | 94 ++++++++++++++ .../network/transport/sending_transport.rs | 41 ++++++ .../network/transport/std.rs | 99 +++----------- .../network/transport/unit.rs | 58 +++++---- .../network/transport/wtx_http.rs | 96 ++++++++++---- .../network/transport/wtx_ws.rs | 119 +++-------------- .../network/transport/wtx_ws/web_socket.rs | 78 +++++++++++ .../wtx_ws/web_socket_parts_owned.rs | 64 +++++++++ .../wtx_ws/web_socket_reader_part_owned.rs | 45 +++++++ .../wtx_ws/web_socket_writer_part_owned.rs | 58 +++++++++ wtx/src/data_transformation/dnsn/tests.rs | 2 +- wtx/src/error.rs | 19 --- .../http/optioned_server/web_socket_tokio.rs | 8 +- wtx/src/lib.rs | 2 - wtx/src/macros.rs | 14 +- wtx/src/misc/facades/arc.rs | 13 +- wtx/src/misc/vector.rs | 40 ++++-- wtx/src/tls.rs | 29 ----- wtx/src/tls/acceptor.rs | 49 ------- wtx/src/tls/acceptor/acceptor_backend.rs | 35 ----- wtx/src/tls/acceptor/rustls.rs | 39 ------ wtx/src/tls/connector.rs | 94 -------------- wtx/src/tls/connector/connector_backend.rs | 74 ----------- wtx/src/tls/connector/rustls.rs | 88 ------------- wtx/src/tls/handshake.rs | 1 - wtx/src/tls/item.rs | 36 ------ wtx/src/tls/state.rs | 7 - wtx/src/tls/tls_error.rs | 6 - wtx/src/tls/tls_stream.rs | 13 -- wtx/src/tls/trust_anchor.rs | 6 - wtx/src/web_socket.rs | 32 ++--- wtx/src/web_socket/handshake/tests.rs | 44 +++---- wtx/src/web_socket/macros.rs | 2 +- .../web_socket_parts/web_socket_part.rs | 4 +- .../web_socket_parts/web_socket_part_mut.rs | 12 ++ .../web_socket_parts/web_socket_part_owned.rs | 21 +++ 50 files changed, 794 insertions(+), 1040 deletions(-) rename wtx/src/client_api_framework/network/transport/{bi_transport.rs => recieving_transport.rs} (70%) create mode 100644 wtx/src/client_api_framework/network/transport/sending_recieving_transport.rs create mode 100644 wtx/src/client_api_framework/network/transport/sending_transport.rs create mode 100644 wtx/src/client_api_framework/network/transport/wtx_ws/web_socket.rs create mode 100644 wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_parts_owned.rs create mode 100644 wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_reader_part_owned.rs create mode 100644 wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_writer_part_owned.rs delete mode 100644 wtx/src/tls.rs delete mode 100644 wtx/src/tls/acceptor.rs delete mode 100644 wtx/src/tls/acceptor/acceptor_backend.rs delete mode 100644 wtx/src/tls/acceptor/rustls.rs delete mode 100644 wtx/src/tls/connector.rs delete mode 100644 wtx/src/tls/connector/connector_backend.rs delete mode 100644 wtx/src/tls/connector/rustls.rs delete mode 100644 wtx/src/tls/handshake.rs delete mode 100644 wtx/src/tls/item.rs delete mode 100644 wtx/src/tls/state.rs delete mode 100644 wtx/src/tls/tls_error.rs delete mode 100644 wtx/src/tls/tls_stream.rs delete mode 100644 wtx/src/tls/trust_anchor.rs diff --git a/.gitignore b/.gitignore index b347bd3e..410e1ef2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ **/profile.json **/rustc-ice-*.txt **/target +boringssl mdbook-target wtx-docs/book wtx-fuzz/artifacts diff --git a/Cargo.lock b/Cargo.lock index cd218eee..6cf50cc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,9 +183,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-lc-rs" -version = "1.11.1" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f47bb8cc16b669d267eeccf585aea077d0882f4777b1c1f740217885d6e6e5a3" +checksum = "f409eb70b561706bf8abba8ca9c112729c481595893fd06a2dd9af8ed8441148" dependencies = [ "aws-lc-sys", "paste", @@ -194,9 +194,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.23.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2101df3813227bbaaaa0b04cd61c534c7954b22bd68d399b440be937dc63ff7" +checksum = "8478a5c29ead3f3be14aff8a202ad965cf7da6856860041bfca271becf8ba48b" dependencies = [ "bindgen", "cc", @@ -389,9 +389,9 @@ checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cc" -version = "1.2.2" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" +checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" dependencies = [ "jobserver", "libc", @@ -421,9 +421,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -854,9 +854,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "ff" @@ -881,9 +881,9 @@ dependencies = [ [[package]] name = "foldhash" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" [[package]] name = "fs_extra" @@ -1100,11 +1100,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.9" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1115,9 +1115,9 @@ checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "hybrid-array" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a9a965bb102c1c891fb017c09a05c965186b1265a207640f323ddd009f9deb" +checksum = "f2d35805454dc9f8662a98d6d61886ffe26bd465f5960e0e55345c70d5c0d2a9" dependencies = [ "typenum", ] @@ -1200,9 +1200,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.74" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a865e038f7f6ed956f788f0d7d60c541fff74c7bd74272c5d4cf15c63743e705" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ "once_cell", "wasm-bindgen", @@ -1222,9 +1222,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.167" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libfuzzer-sys" @@ -1248,9 +1248,9 @@ dependencies = [ [[package]] name = "libz-rs-sys" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39cc71ac688c22a9f5730a38171ac94795c071ac81d1a0ab5537f6ef164fff30" +checksum = "a90e19106f1b2c93f1fa6cdeec2e56facbf2e403559c1e1c0ddcc6d46e979cdf" dependencies = [ "zlib-rs", ] @@ -1308,9 +1308,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" dependencies = [ "adler2", ] @@ -1371,9 +1371,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] @@ -1735,22 +1735,22 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.38.41" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "rustls" -version = "0.23.19" +version = "0.23.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" +checksum = "5065c3f250cbd332cd894be57c40fa52387247659b14a2d6041d121547903b1b" dependencies = [ "aws-lc-rs", "once_cell", @@ -1772,9 +1772,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" [[package]] name = "rustls-webpki" @@ -1815,18 +1815,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", @@ -1835,9 +1835,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" dependencies = [ "itoa", "memchr", @@ -2038,9 +2038,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +checksum = "022db8904dfa342efe721985167e9fcd16c29b226db4397ed752a761cfce81e8" dependencies = [ "tinyvec_macros", ] @@ -2265,9 +2265,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d15e63b4482863c109d70a7b8706c1e364eb6ea449b201a76c5b89cedcec2d5c" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -2276,13 +2276,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d36ef12e3aaca16ddd3f67922bc63e48e953f126de60bd33ccc0101ef9998cd" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.90", @@ -2291,9 +2290,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705440e08b42d3e4b36de7d66c944be628d579796b8090bfa3471478a2260051" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2301,9 +2300,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c9ae5a76e46f4deecd0f0255cc223cfa18dc9b261213b8aa0c7b36f61b3f1d" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", @@ -2314,9 +2313,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee99da9c5ba11bd675621338ef6fa52296b76b83305e9b6e5c77d4c286d6d49" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "webpki-roots" @@ -2616,6 +2615,6 @@ dependencies = [ [[package]] name = "zlib-rs" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ca4a9dc6566c9224cc161dedc5577bd81f4a9ee0f9fbe80592756d096b07ee5" +checksum = "aada01553a9312bad4b9569035a1f12b05e5ec9770a1a4b323757356928944f8" diff --git a/wtx-instances/generic-examples/client-api-framework.rs b/wtx-instances/generic-examples/client-api-framework.rs index c23fb9c1..9db4a6a7 100644 --- a/wtx-instances/generic-examples/client-api-framework.rs +++ b/wtx-instances/generic-examples/client-api-framework.rs @@ -16,13 +16,13 @@ use tokio::net::TcpStream; use wtx::{ client_api_framework::{ misc::{Pair, RequestLimit, RequestThrottling}, - network::{transport::Transport, HttpParams, WsParams}, + network::{transport::SendingRecievingTransport, HttpParams, WsParams}, Api, }, data_transformation::dnsn::SerdeJson, http::client_framework::ClientFrameworkTokio, misc::{simple_seed, Uri, Xorshift64}, - web_socket::{WebSocketBuffer, WebSocketClient}, + web_socket::{WebSocket, WebSocketBuffer}, }; wtx::create_packages_aux_wrapper!(); @@ -97,11 +97,11 @@ async fn http_pair( async fn web_socket_pair() -> wtx::Result< Pair< PkgsAux, - WebSocketClient<(), TcpStream, WebSocketBuffer>, + WebSocket<(), TcpStream, WebSocketBuffer, true>, >, > { let uri = Uri::new("ws://generic_web_socket_uri.com"); - let web_socket = WebSocketClient::connect( + let web_socket = WebSocket::connect( (), [], false, diff --git a/wtx-instances/src/bin/autobahn-client.rs b/wtx-instances/src/bin/autobahn-client.rs index c5587ec9..689a2590 100644 --- a/wtx-instances/src/bin/autobahn-client.rs +++ b/wtx-instances/src/bin/autobahn-client.rs @@ -3,7 +3,7 @@ use tokio::net::TcpStream; use wtx::{ misc::{simple_seed, UriRef, Xorshift64}, - web_socket::{compression::Flate2, Frame, OpCode, WebSocketBuffer, WebSocketClient}, + web_socket::{compression::Flate2, Frame, OpCode, WebSocket, WebSocketBuffer}, }; #[tokio::main] @@ -11,7 +11,7 @@ async fn main() -> wtx::Result<()> { let host = "127.0.0.1:9080"; let mut wsb = WebSocketBuffer::default(); for case in 1..=get_case_count(host, &mut wsb).await? { - let mut ws = WebSocketClient::connect( + let mut ws = WebSocket::connect( Flate2::default(), [], false, @@ -38,7 +38,7 @@ async fn main() -> wtx::Result<()> { } } } - WebSocketClient::connect( + WebSocket::connect( (), [], false, @@ -54,7 +54,7 @@ async fn main() -> wtx::Result<()> { } async fn get_case_count(host: &str, wsb: &mut WebSocketBuffer) -> wtx::Result { - let mut ws = WebSocketClient::connect( + let mut ws = WebSocket::connect( (), [], false, diff --git a/wtx-instances/src/bin/autobahn-server.rs b/wtx-instances/src/bin/autobahn-server.rs index e628bd2d..ae0c6695 100644 --- a/wtx-instances/src/bin/autobahn-server.rs +++ b/wtx-instances/src/bin/autobahn-server.rs @@ -7,7 +7,7 @@ use wtx::{ http::OptionedServer, web_socket::{ compression::{Flate2, NegotiatedFlate2}, - OpCode, WebSocketBuffer, WebSocketServer, + OpCode, WebSocket, WebSocketBuffer, }, }; @@ -25,7 +25,7 @@ async fn main() -> wtx::Result<()> { } async fn handle( - mut ws: WebSocketServer, TcpStream, &mut WebSocketBuffer>, + mut ws: WebSocket, TcpStream, &mut WebSocketBuffer, false>, ) -> wtx::Result<()> { let (mut common, mut reader, mut writer) = ws.parts_mut(); loop { diff --git a/wtx-instances/web-socket-examples/web-socket-client.rs b/wtx-instances/web-socket-examples/web-socket-client.rs index 31a55376..f425211b 100644 --- a/wtx-instances/web-socket-examples/web-socket-client.rs +++ b/wtx-instances/web-socket-examples/web-socket-client.rs @@ -13,13 +13,13 @@ use tokio::{ }; use wtx::{ misc::{simple_seed, Uri, Xorshift64}, - web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient}, + web_socket::{Frame, OpCode, WebSocket, WebSocketBuffer}, }; #[tokio::main] async fn main() -> wtx::Result<()> { let uri = Uri::new("ws://www.example.com"); - let mut ws = WebSocketClient::connect( + let mut ws = WebSocket::connect( (), [], false, diff --git a/wtx-instances/web-socket-examples/web-socket-concurrent-client.rs b/wtx-instances/web-socket-examples/web-socket-concurrent-client.rs index 50e715ef..404da590 100644 --- a/wtx-instances/web-socket-examples/web-socket-concurrent-client.rs +++ b/wtx-instances/web-socket-examples/web-socket-concurrent-client.rs @@ -7,7 +7,7 @@ extern crate wtx_instances; use tokio::{net::TcpStream, sync::Mutex}; use wtx::{ misc::{simple_seed, Arc, TokioRustlsConnector, Uri, Xorshift64}, - web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient}, + web_socket::{Frame, OpCode, WebSocket, WebSocketBuffer}, }; #[tokio::main] @@ -15,7 +15,7 @@ async fn main() -> wtx::Result<()> { let uri = Uri::new("ws://www.example.com"); let connector = TokioRustlsConnector::from_auto()?.push_certs(wtx_instances::ROOT_CA)?; let stream = TcpStream::connect(uri.hostname_with_implied_port()).await?; - let ws = WebSocketClient::connect( + let ws = WebSocket::connect( (), [], false, @@ -26,10 +26,10 @@ async fn main() -> wtx::Result<()> { |_| wtx::Result::Ok(()), ) .await?; - let (mut reader, mut writer) = ws.into_parts::>, _, _>(|el| tokio::io::split(el)); + let mut parts = ws.into_parts::>, _, _>(|el| tokio::io::split(el)); let reader_jh = tokio::spawn(async move { loop { - let frame = reader.read_frame().await?; + let frame = parts.reader.read_frame().await?; match (frame.op_code(), frame.text_payload()) { (_, Some(elem)) => println!("{elem}"), (OpCode::Close, _) => break, @@ -39,8 +39,8 @@ async fn main() -> wtx::Result<()> { wtx::Result::Ok(()) }); let writer_jh = tokio::spawn(async move { - writer.write_frame(&mut Frame::new_fin(OpCode::Text, *b"Hi and Bye")).await?; - writer.write_frame(&mut Frame::new_fin(OpCode::Close, [])).await?; + parts.writer.write_frame(&mut Frame::new_fin(OpCode::Text, *b"Hi and Bye")).await?; + parts.writer.write_frame(&mut Frame::new_fin(OpCode::Close, [])).await?; wtx::Result::Ok(()) }); let (reader_rslt, writer_rslt) = tokio::join!(reader_jh, writer_jh); diff --git a/wtx-instances/web-socket-examples/web-socket-server.rs b/wtx-instances/web-socket-examples/web-socket-server.rs index 096c0ec3..8e745d70 100644 --- a/wtx-instances/web-socket-examples/web-socket-server.rs +++ b/wtx-instances/web-socket-examples/web-socket-server.rs @@ -10,7 +10,7 @@ use tokio_rustls::server::TlsStream; use wtx::{ http::OptionedServer, misc::TokioRustlsAcceptor, - web_socket::{OpCode, WebSocketBuffer, WebSocketServer}, + web_socket::{OpCode, WebSocket, WebSocketBuffer}, }; #[tokio::main] @@ -34,7 +34,7 @@ async fn main() -> wtx::Result<()> { } async fn handle( - mut ws: WebSocketServer<(), TlsStream, &mut WebSocketBuffer>, + mut ws: WebSocket<(), TlsStream, &mut WebSocketBuffer, false>, ) -> wtx::Result<()> { let (mut common, mut reader, mut writer) = ws.parts_mut(); loop { diff --git a/wtx-macros/tests/ui/pass/custom_transport.rs b/wtx-macros/tests/ui/pass/custom_transport.rs index 395edd87..e8d4bbda 100644 --- a/wtx-macros/tests/ui/pass/custom_transport.rs +++ b/wtx-macros/tests/ui/pass/custom_transport.rs @@ -4,41 +4,43 @@ use wtx::client_api_framework::pkg::PkgsAux; use wtx::client_api_framework::pkg::Package; use wtx::client_api_framework::network::TransportGroup; use wtx::client_api_framework::network::transport::Transport; +use wtx::client_api_framework::network::transport::RecievingTransport; +use wtx::client_api_framework::network::transport::SendingTransport; use wtx::client_api_framework::network::transport::TransportParams; -use core::ops::Range; use wtx::client_api_framework::Api; - +use core::ops::Range; struct CustomTransport; -impl Transport for CustomTransport { - const GROUP: TransportGroup = TransportGroup::Custom("Custom"); - type Params = CustomTransportParams; - - async fn send( - &mut self, - _: &mut P, - _: &mut PkgsAux, - ) -> Result<(), A::Error> +impl RecievingTransport for CustomTransport { + #[inline] + async fn recv(&mut self, _: &mut PkgsAux) -> Result, A::Error> where A: Api, - P: Package, { - Ok(()) + Ok(0..0) } +} - async fn send_recv( +impl SendingTransport for CustomTransport { + #[inline] + async fn send( &mut self, _: &mut P, _: &mut PkgsAux, - ) -> Result, A::Error> + ) -> Result<(), A::Error> where A: Api, P: Package, { - Ok(0..0) + Ok(()) } } +impl Transport for CustomTransport { + const GROUP: TransportGroup = TransportGroup::Custom("Custom"); + type Params = CustomTransportParams; +} + struct CustomTransportParams(()); impl TransportParams for CustomTransportParams { diff --git a/wtx-ui/src/web_socket.rs b/wtx-ui/src/web_socket.rs index efb70791..cc5955b0 100644 --- a/wtx-ui/src/web_socket.rs +++ b/wtx-ui/src/web_socket.rs @@ -4,13 +4,13 @@ use tokio::{ }; use wtx::{ misc::{simple_seed, UriRef, Xorshift64}, - web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient, WebSocketServer}, + web_socket::{Frame, OpCode, WebSocket, WebSocketBuffer}, }; pub(crate) async fn connect(uri: &str, cb: impl Fn(&str)) -> wtx::Result<()> { let uri = UriRef::new(uri); let wsb = &mut WebSocketBuffer::default(); - let mut ws = WebSocketClient::connect( + let mut ws = WebSocket::connect( (), [], false, @@ -54,7 +54,7 @@ pub(crate) async fn serve( let (stream, _) = listener.accept().await?; let _jh = tokio::spawn(async move { let fun = async move { - let mut ws = WebSocketServer::accept( + let mut ws = WebSocket::accept( (), false, Xorshift64::from(simple_seed()), diff --git a/wtx/Cargo.toml b/wtx/Cargo.toml index 42365bc5..d615b553 100644 --- a/wtx/Cargo.toml +++ b/wtx/Cargo.toml @@ -28,7 +28,7 @@ rand_chacha = { default-features = false, optional = true, version = "0.3" } rand_core = { default-features = false, optional = true, version = "0.6" } ring = { default-features = false, optional = true, version = "0.17" } rust_decimal = { default-features = false, features = ["maths"], optional = true, version = "1.0" } -rustls = { default-features = false, optional = true, version = "0.23" } +rustls = { default-features = false, features = ["tls12"], optional = true, version = "0.23" } rustls-pemfile = { default-features = false, optional = true, version = "2.0" } rustls-pki-types = { default-features = false, optional = true, version = "1.0" } serde = { default-features = false, features = ["alloc", "derive"], optional = true, version = "1.0" } @@ -111,7 +111,6 @@ std = [ "tracing?/std", "tracing-subscriber?/std" ] -tls = [] tokio = ["std", "dep:tokio"] tokio-rustls = ["ring", "rustls", "dep:rustls-pemfile", "rustls-pki-types", "tokio", "dep:tokio-rustls"] web-socket = ["http"] diff --git a/wtx/src/client_api_framework/network/transport.rs b/wtx/src/client_api_framework/network/transport.rs index de4014a7..d75fe37e 100644 --- a/wtx/src/client_api_framework/network/transport.rs +++ b/wtx/src/client_api_framework/network/transport.rs @@ -1,7 +1,9 @@ //! Implementations of the [Transport] trait. -mod bi_transport; mod mock; +mod recieving_transport; +mod sending_recieving_transport; +mod sending_transport; #[cfg(feature = "std")] mod std; mod transport_params; @@ -11,20 +13,12 @@ mod wtx_http; #[cfg(feature = "web-socket")] mod wtx_ws; -use crate::{ - client_api_framework::{ - misc::log_res, - network::TransportGroup, - pkg::{BatchElems, BatchPkg, Package, PkgsAux}, - Api, - }, - data_transformation::dnsn::{Deserialize, Serialize}, - misc::{Lease, Vector}, -}; -pub use bi_transport::*; -use core::{future::Future, ops::Range}; -pub use mock::*; -pub use transport_params::*; +use crate::client_api_framework::network::TransportGroup; +pub use mock::{Mock, MockBytes, MockStr}; +pub use recieving_transport::RecievingTransport; +pub use sending_recieving_transport::SendingRecievingTransport; +pub use sending_transport::SendingTransport; +pub use transport_params::TransportParams; /// Any means of transferring data between two parties. /// @@ -40,78 +34,6 @@ pub trait Transport { /// Every transport has request and response parameters. type Params: TransportParams; - /// Sends a request without trying to retrieve any counterpart data. - fn send( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> impl Future> - where - A: Api, - P: Package; - - /// Sends a request and then awaits its counterpart data response. - /// - /// The returned bytes are stored in `pkgs_aux` and its length is returned by this method. - fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> impl Future, A::Error>> - where - A: Api, - P: Package; - - /// Convenient method similar to [`Self::send_recv_decode_contained`] but used for batch - /// requests. - /// - /// All the expected data must be available in a single response. - #[inline] - fn send_recv_decode_batch<'pkgs, 'pkgs_aux, A, P>( - &mut self, - buffer: &mut Vector>, - pkgs: &'pkgs mut [P], - pkgs_aux: &'pkgs_aux mut PkgsAux, - ) -> impl Future> - where - A: Api, - P: Package, - BatchElems<'pkgs, A, DRSR, P, Self::Params>: Serialize, - { - async { - let range = self.send_recv(&mut BatchPkg::new(pkgs), pkgs_aux).await?; - log_res(pkgs_aux.byte_buffer.lease()); - P::ExternalResponseContent::seq_from_bytes( - buffer, - pkgs_aux.byte_buffer.get(range).unwrap_or_default(), - &mut pkgs_aux.drsr, - )?; - Ok(()) - } - } - - /// Internally calls [`Self::send_recv`] and then tries to decode the defined response specified - /// in [`Package::ExternalResponseContent`]. - #[inline] - fn send_recv_decode_contained<'de, A, P>( - &mut self, - pkg: &mut P, - pkgs_aux: &'de mut PkgsAux, - ) -> impl Future, A::Error>> - where - A: Api, - P: Package, - { - async { - let range = self.send_recv(pkg, pkgs_aux).await?; - log_res(pkgs_aux.byte_buffer.lease()); - Ok(P::ExternalResponseContent::from_bytes( - pkgs_aux.byte_buffer.get(range).unwrap_or_default(), - &mut pkgs_aux.drsr, - )?) - } - } - /// Instance counterpart of [`Self::GROUP`]. #[inline] fn ty(&self) -> TransportGroup { @@ -125,32 +47,6 @@ where { const GROUP: TransportGroup = T::GROUP; type Params = T::Params; - - #[inline] - async fn send( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result<(), A::Error> - where - A: Api, - P: Package, - { - (**self).send(pkg, pkgs_aux).await - } - - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> - where - A: Api, - P: Package, - { - (**self).send_recv(pkg, pkgs_aux).await - } } #[cfg(test)] diff --git a/wtx/src/client_api_framework/network/transport/mock.rs b/wtx/src/client_api_framework/network/transport/mock.rs index 17d3c28d..8e3406d1 100644 --- a/wtx/src/client_api_framework/network/transport/mock.rs +++ b/wtx/src/client_api_framework/network/transport/mock.rs @@ -4,7 +4,7 @@ use crate::{ client_api_framework::{ misc::{manage_after_sending_related, manage_before_sending_related, FromBytes}, network::{ - transport::{Transport, TransportParams}, + transport::{RecievingTransport, SendingTransport, Transport, TransportParams}, TransportGroup, }, pkg::{Package, PkgsAux}, @@ -27,7 +27,7 @@ pub type MockStr = Mock; /// ```rust,no_run /// # async fn fun() -> wtx::Result<()> { /// use wtx::client_api_framework::{ -/// network::transport::{MockStr, Transport}, +/// network::transport::{MockStr, SendingRecievingTransport}, /// pkg::PkgsAux, /// }; /// let _ = MockStr::default() @@ -86,15 +86,33 @@ where } } -impl Transport for Mock +impl RecievingTransport for Mock where T: Debug + Lease<[u8]> + PartialEq + ToOwned + 'static + ?Sized, TP: TransportParams, ::Owned: Debug + FromBytes, { - const GROUP: TransportGroup = TransportGroup::Stub; - type Params = TP; + #[inline] + async fn recv( + &mut self, + pkgs_aux: &mut PkgsAux, + ) -> Result, A::Error> + where + A: Api, + { + let response = self.pop_response()?; + pkgs_aux.byte_buffer.clear(); + pkgs_aux.byte_buffer.extend_from_copyable_slice(response.lease()).map_err(Into::into)?; + Ok(0..pkgs_aux.byte_buffer.len()) + } +} +impl SendingTransport for Mock +where + T: Debug + Lease<[u8]> + PartialEq + ToOwned + 'static + ?Sized, + TP: TransportParams, + ::Owned: Debug + FromBytes, +{ #[inline] async fn send( &mut self, @@ -114,23 +132,16 @@ where manage_after_sending_related(pkg, pkgs_aux).await?; Ok(()) } +} - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> - where - A: Api, - P: Package, - { - >::send(self, pkg, pkgs_aux).await?; - let response = self.pop_response()?; - pkgs_aux.byte_buffer.clear(); - pkgs_aux.byte_buffer.extend_from_copyable_slice(response.lease()).map_err(Into::into)?; - Ok(0..pkgs_aux.byte_buffer.len()) - } +impl Transport for Mock +where + T: Debug + Lease<[u8]> + PartialEq + ToOwned + 'static + ?Sized, + TP: TransportParams, + ::Owned: Debug + FromBytes, +{ + const GROUP: TransportGroup = TransportGroup::Stub; + type Params = TP; } impl Default for Mock diff --git a/wtx/src/client_api_framework/network/transport/bi_transport.rs b/wtx/src/client_api_framework/network/transport/recieving_transport.rs similarity index 70% rename from wtx/src/client_api_framework/network/transport/bi_transport.rs rename to wtx/src/client_api_framework/network/transport/recieving_transport.rs index 20039318..bd663fac 100644 --- a/wtx/src/client_api_framework/network/transport/bi_transport.rs +++ b/wtx/src/client_api_framework/network/transport/recieving_transport.rs @@ -10,27 +10,25 @@ use crate::{ }; use core::{future::Future, ops::Range}; -/// Bidirectional Transport -/// -/// Similar to [Transport] but expects an connection where clients call poll data from the server. +/// Transport that receives package data. /// /// # Types /// /// * `DRSR`: `D`eserialize`R`/`S`erialize`R` -pub trait BiTransport: Transport { +pub trait RecievingTransport: Transport { /// Retrieves data from the server filling the internal buffer and returning the amount of /// bytes written. - fn retrieve( + fn recv( &mut self, pkgs_aux: &mut PkgsAux, - ) -> impl Future>> + ) -> impl Future, A::Error>> where A: Api; /// Internally calls [`Self::retrieve`] and then tries to decode the defined response specified /// in [`Package::ExternalResponseContent`]. #[inline] - fn retrieve_and_decode_contained<'de, A, P>( + fn recv_decode_contained<'de, A, P>( &mut self, pkgs_aux: &'de mut PkgsAux, ) -> impl Future, A::Error>> @@ -39,7 +37,7 @@ pub trait BiTransport: Transport { P: Package, { async { - let range = self.retrieve(pkgs_aux).await?; + let range = self.recv(pkgs_aux).await?; log_res(pkgs_aux.byte_buffer.lease()); Ok(P::ExternalResponseContent::from_bytes( pkgs_aux.byte_buffer.get(range).unwrap_or_default(), @@ -49,18 +47,18 @@ pub trait BiTransport: Transport { } } -impl BiTransport for &mut T +impl RecievingTransport for &mut T where - T: BiTransport, + T: RecievingTransport, { #[inline] - async fn retrieve( + async fn recv( &mut self, pkgs_aux: &mut PkgsAux, - ) -> crate::Result> + ) -> Result, A::Error> where A: Api, { - (**self).retrieve(pkgs_aux).await + (**self).recv(pkgs_aux).await } } diff --git a/wtx/src/client_api_framework/network/transport/sending_recieving_transport.rs b/wtx/src/client_api_framework/network/transport/sending_recieving_transport.rs new file mode 100644 index 00000000..8253543a --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/sending_recieving_transport.rs @@ -0,0 +1,94 @@ +use crate::{ + client_api_framework::{ + misc::log_res, + network::transport::{RecievingTransport, SendingTransport}, + pkg::{BatchElems, BatchPkg, Package, PkgsAux}, + Api, + }, + data_transformation::dnsn::{Deserialize, Serialize}, + misc::{Lease, Vector}, +}; +use core::{future::Future, ops::Range}; + +/// Transport that sends and receives package data +/// +/// # Types +/// +/// * `DRSR`: `D`eserialize`R`/`S`erialize`R` +pub trait SendingRecievingTransport: + RecievingTransport + SendingTransport +{ + /// Sends a request and then awaits its counterpart data response. + /// + /// The returned bytes are stored in `pkgs_aux` and its length is returned by this method. + #[inline] + fn send_recv( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> impl Future, A::Error>> + where + A: Api, + P: Package, + { + async { + self.send(pkg, pkgs_aux).await?; + Ok(self.recv(pkgs_aux).await?) + } + } + + /// Convenient method similar to [`Self::send_recv_decode_contained`] but used for batch + /// requests. + /// + /// All the expected data must be available in a single response. + #[inline] + fn send_recv_decode_batch<'pkgs, 'pkgs_aux, A, P>( + &mut self, + buffer: &mut Vector>, + pkgs: &'pkgs mut [P], + pkgs_aux: &'pkgs_aux mut PkgsAux, + ) -> impl Future> + where + A: Api, + P: Package, + BatchElems<'pkgs, A, DRSR, P, Self::Params>: Serialize, + { + async { + let range = self.send_recv(&mut BatchPkg::new(pkgs), pkgs_aux).await?; + log_res(pkgs_aux.byte_buffer.lease()); + P::ExternalResponseContent::seq_from_bytes( + buffer, + pkgs_aux.byte_buffer.get(range).unwrap_or_default(), + &mut pkgs_aux.drsr, + )?; + Ok(()) + } + } + + /// Internally calls [`Self::send_recv`] and then tries to decode the defined response specified + /// in [`Package::ExternalResponseContent`]. + #[inline] + fn send_recv_decode_contained<'de, A, P>( + &mut self, + pkg: &mut P, + pkgs_aux: &'de mut PkgsAux, + ) -> impl Future, A::Error>> + where + A: Api, + P: Package, + { + async { + let range = self.send_recv(pkg, pkgs_aux).await?; + log_res(pkgs_aux.byte_buffer.lease()); + Ok(P::ExternalResponseContent::from_bytes( + pkgs_aux.byte_buffer.get(range).unwrap_or_default(), + &mut pkgs_aux.drsr, + )?) + } + } +} + +impl SendingRecievingTransport for T where + T: RecievingTransport + SendingTransport +{ +} diff --git a/wtx/src/client_api_framework/network/transport/sending_transport.rs b/wtx/src/client_api_framework/network/transport/sending_transport.rs new file mode 100644 index 00000000..cf004727 --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/sending_transport.rs @@ -0,0 +1,41 @@ +use crate::client_api_framework::{ + network::transport::Transport, + pkg::{Package, PkgsAux}, + Api, +}; +use core::future::Future; + +/// Transport that sends package data. +/// +/// # Types +/// +/// * `DRSR`: `D`eserialize`R`/`S`erialize`R` +pub trait SendingTransport: Transport { + /// Sends a request without trying to retrieve any counterpart data. + fn send( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> impl Future> + where + A: Api, + P: Package; +} + +impl SendingTransport for &mut T +where + T: SendingTransport, +{ + #[inline] + async fn send( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> Result<(), A::Error> + where + A: Api, + P: Package, + { + (**self).send(pkg, pkgs_aux).await + } +} diff --git a/wtx/src/client_api_framework/network/transport/std.rs b/wtx/src/client_api_framework/network/transport/std.rs index 59453f77..0be24138 100644 --- a/wtx/src/client_api_framework/network/transport/std.rs +++ b/wtx/src/client_api_framework/network/transport/std.rs @@ -2,8 +2,8 @@ use crate::{ client_api_framework::{ misc::{manage_after_sending_related, manage_before_sending_related}, network::{ - transport::{Transport, TransportParams}, - TcpParams, TransportGroup, UdpParams, + transport::{RecievingTransport, SendingTransport, Transport, TransportParams}, + TcpParams, TransportGroup, }, pkg::{Package, PkgsAux}, Api, ClientApiFrameworkError, @@ -13,72 +13,41 @@ use crate::{ use core::ops::Range; use std::{ io::{Read, Write}, - net::{TcpStream, UdpSocket}, + net::TcpStream, }; -impl Transport for TcpStream { - const GROUP: TransportGroup = TransportGroup::TCP; - type Params = TcpParams; - +impl RecievingTransport for TcpStream { #[inline] - async fn send( + async fn recv( &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result<(), A::Error> - where - A: Api, - P: Package, - { - send(pkg, pkgs_aux, self, |bytes, _, trans| Ok(trans.write(bytes)?)).await - } - - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, + pkgs_aux: &mut PkgsAux, ) -> Result, A::Error> where A: Api, - P: Package, { - send_recv(pkg, pkgs_aux, self, |bytes, _, trans| Ok(trans.read(bytes)?)).await + let len = self.read(pkgs_aux.byte_buffer.as_mut()).map_err(Into::into)?; + Ok(0..len) } } -impl Transport for UdpSocket { - const GROUP: TransportGroup = TransportGroup::UDP; - type Params = UdpParams; - +impl SendingTransport for TcpStream { #[inline] async fn send( &mut self, pkg: &mut P, - pkgs_aux: &mut PkgsAux, + pkgs_aux: &mut PkgsAux, ) -> Result<(), A::Error> where A: Api, - P: Package, + P: Package, { - send(pkg, pkgs_aux, self, |bytes, ext_req_params, trans| { - Ok(trans.send_to(bytes, ext_req_params.url.as_str())?) - }) - .await + send(pkg, pkgs_aux, self, |bytes, _, trans| Ok(trans.write(bytes)?)).await } +} - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> - where - A: Api, - P: Package, - { - send_recv(pkg, pkgs_aux, self, |bytes, _, trans| Ok(trans.recv(bytes)?)).await - } +impl Transport for TcpStream { + const GROUP: TransportGroup = TransportGroup::TCP; + type Params = TcpParams; } async fn send( @@ -121,27 +90,6 @@ where } } -async fn send_recv( - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - trans: &mut T, - cb: impl Fn( - &mut [u8], - &::ExternalRequestParams, - &mut T, - ) -> crate::Result, -) -> Result, A::Error> -where - A: Api, - P: Package, - T: Transport, -{ - trans.send(pkg, pkgs_aux).await?; - let slice = pkgs_aux.byte_buffer.as_mut(); - let len = cb(slice, pkgs_aux.tp.ext_req_params(), trans)?; - Ok(0..len) -} - #[cfg(all(feature = "_async-tests", test))] mod tests { use crate::{ @@ -149,9 +97,9 @@ mod tests { network::{ transport::{ tests::{_Ping, _PingPong, _Pong}, - Transport, + SendingRecievingTransport, }, - TcpParams, UdpParams, + TcpParams, }, pkg::PkgsAux, }, @@ -161,7 +109,7 @@ mod tests { use core::time::Duration; use std::{ io::{Read, Write}, - net::{TcpListener, TcpStream, UdpSocket}, + net::{TcpListener, TcpStream}, }; #[tokio::test(flavor = "multi_thread")] @@ -181,13 +129,4 @@ mod tests { let res = trans.send_recv_decode_contained(&mut _PingPong(_Ping, ()), &mut pa).await.unwrap(); assert_eq!(res, _Pong("pong")); } - - #[tokio::test] - async fn udp() { - let addr = "127.0.0.1:12346"; - let mut pa = PkgsAux::from_minimum((), (), UdpParams::from_uri(addr)); - let mut trans = UdpSocket::bind(addr).unwrap(); - let res = trans.send_recv_decode_contained(&mut _PingPong(_Ping, ()), &mut pa).await.unwrap(); - assert_eq!(res, _Pong("pong")); - } } diff --git a/wtx/src/client_api_framework/network/transport/unit.rs b/wtx/src/client_api_framework/network/transport/unit.rs index 52978bd6..24354bb5 100644 --- a/wtx/src/client_api_framework/network/transport/unit.rs +++ b/wtx/src/client_api_framework/network/transport/unit.rs @@ -1,57 +1,61 @@ use crate::client_api_framework::{ misc::{manage_after_sending_related, manage_before_sending_related}, - network::{transport::Transport, TransportGroup}, + network::{ + transport::{RecievingTransport, SendingTransport, Transport}, + TransportGroup, + }, pkg::{Package, PkgsAux}, Api, }; use core::ops::Range; -/// Does absolutely nothing. Good for demonstration purposes. -/// -/// ```rust,no_run -/// # async fn fun() -> wtx::Result<()> { -/// use wtx::client_api_framework::{network::transport::Transport, pkg::PkgsAux}; -/// let _ = -/// ().send_recv_decode_contained(&mut (), &mut PkgsAux::from_minimum((), (), ())).await?; -/// # Ok(()) } -/// ``` -impl Transport for () { - const GROUP: TransportGroup = TransportGroup::Stub; - type Params = (); - +impl RecievingTransport for () { #[inline] - async fn send( + async fn recv( &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result<(), A::Error> + _: &mut PkgsAux, + ) -> Result, A::Error> where A: Api, - P: Package, { - manage_before_sending_related(pkg, pkgs_aux, self).await?; - manage_after_sending_related(pkg, pkgs_aux).await?; - Ok(()) + Ok(0..0) } +} +impl SendingTransport for () { #[inline] - async fn send_recv( + async fn send( &mut self, pkg: &mut P, pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> + ) -> Result<(), A::Error> where A: Api, P: Package, { - self.send(pkg, pkgs_aux).await?; - Ok(0..0) + manage_before_sending_related(pkg, pkgs_aux, self).await?; + manage_after_sending_related(pkg, pkgs_aux).await?; + Ok(()) } } +/// Does absolutely nothing. Good for demonstration purposes. +/// +/// ```rust,no_run +/// # async fn fun() -> wtx::Result<()> { +/// use wtx::client_api_framework::{network::transport::SendingRecievingTransport, pkg::PkgsAux}; +/// let _ = +/// ().send_recv_decode_contained(&mut (), &mut PkgsAux::from_minimum((), (), ())).await?; +/// # Ok(()) } +/// ``` +impl Transport for () { + const GROUP: TransportGroup = TransportGroup::Stub; + type Params = (); +} + #[cfg(all(feature = "_async-tests", test))] mod tests { - use crate::client_api_framework::{network::transport::Transport, pkg::PkgsAux}; + use crate::client_api_framework::{network::transport::SendingRecievingTransport, pkg::PkgsAux}; #[tokio::test] async fn unit() { diff --git a/wtx/src/client_api_framework/network/transport/wtx_http.rs b/wtx/src/client_api_framework/network/transport/wtx_http.rs index a3029c6e..0752d32a 100644 --- a/wtx/src/client_api_framework/network/transport/wtx_http.rs +++ b/wtx/src/client_api_framework/network/transport/wtx_http.rs @@ -2,7 +2,7 @@ use crate::{ client_api_framework::{ misc::{manage_after_sending_related, manage_before_sending_related}, network::{ - transport::{Transport, TransportParams}, + transport::{RecievingTransport, SendingTransport, Transport, TransportParams}, HttpParams, HttpReqParams, TransportGroup, }, pkg::{Package, PkgsAux}, @@ -15,7 +15,7 @@ use crate::{ }; use core::{mem, ops::Range}; -impl Transport for ClientFramework +impl RecievingTransport for ClientFramework where HD: RefCounter + 'static, HD::Item: Lock>, @@ -30,9 +30,33 @@ where for<'any> RL: 'any, for<'any> RM: 'any, { - const GROUP: TransportGroup = TransportGroup::HTTP; - type Params = HttpParams; + #[inline] + async fn recv( + &mut self, + pkgs_aux: &mut PkgsAux, + ) -> Result, A::Error> + where + A: Api, + { + Ok(0..pkgs_aux.byte_buffer.len()) + } +} +impl SendingTransport for ClientFramework +where + HD: RefCounter + 'static, + HD::Item: Lock>, + RL: Lock>, + RM: ResourceManager< + CreateAux = str, + Error = crate::Error, + RecycleAux = str, + Resource = Http2, + >, + SW: StreamWriter, + for<'any> RL: 'any, + for<'any> RM: 'any, +{ #[inline] async fn send( &mut self, @@ -46,23 +70,41 @@ where response(self, pkg, pkgs_aux).await?; Ok(()) } +} +impl Transport for ClientFramework { + const GROUP: TransportGroup = TransportGroup::HTTP; + type Params = HttpParams; +} + +impl RecievingTransport for &ClientFramework +where + HD: RefCounter + 'static, + HD::Item: Lock>, + RL: Lock>, + RM: ResourceManager< + CreateAux = str, + Error = crate::Error, + RecycleAux = str, + Resource = Http2, + >, + SW: StreamWriter, + for<'any> RL: 'any, + for<'any> RM: 'any, +{ #[inline] - async fn send_recv( + async fn recv( &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, + pkgs_aux: &mut PkgsAux, ) -> Result, A::Error> where A: Api, - P: Package, { - response(self, pkg, pkgs_aux).await?; Ok(0..pkgs_aux.byte_buffer.len()) } } -impl Transport for &ClientFramework +impl SendingTransport for &ClientFramework where HD: RefCounter + 'static, HD::Item: Lock>, @@ -77,9 +119,6 @@ where for<'any> RL: 'any, for<'any> RM: 'any, { - const GROUP: TransportGroup = TransportGroup::HTTP; - type Params = HttpParams; - #[inline] async fn send( &mut self, @@ -93,20 +132,25 @@ where response(self, pkg, pkgs_aux).await?; Ok(()) } +} - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> - where - A: Api, - P: Package, - { - response(self, pkg, pkgs_aux).await?; - Ok(0..pkgs_aux.byte_buffer.len()) - } +impl Transport for &ClientFramework +where + HD: RefCounter + 'static, + HD::Item: Lock>, + RL: Lock>, + RM: ResourceManager< + CreateAux = str, + Error = crate::Error, + RecycleAux = str, + Resource = Http2, + >, + SW: StreamWriter, + for<'any> RL: 'any, + for<'any> RM: 'any, +{ + const GROUP: TransportGroup = TransportGroup::HTTP; + type Params = HttpParams; } async fn response( diff --git a/wtx/src/client_api_framework/network/transport/wtx_ws.rs b/wtx/src/client_api_framework/network/transport/wtx_ws.rs index 4922da6e..32225e8b 100644 --- a/wtx/src/client_api_framework/network/transport/wtx_ws.rs +++ b/wtx/src/client_api_framework/network/transport/wtx_ws.rs @@ -1,85 +1,28 @@ +mod web_socket; +mod web_socket_parts_owned; +mod web_socket_reader_part_owned; +mod web_socket_writer_part_owned; + use crate::{ client_api_framework::{ misc::{manage_after_sending_related, manage_before_sending_related}, network::{ - transport::{BiTransport, Transport, TransportParams}, - TransportGroup, WsParams, WsReqParamsTy, + transport::{Transport, TransportParams}, + WsParams, WsReqParamsTy, }, pkg::{Package, PkgsAux}, Api, ClientApiFrameworkError, }, - misc::{LeaseMut, Stream}, - web_socket::{ - compression::NegotiatedCompression, Frame, OpCode, WebSocketBuffer, WebSocketClient, - }, + misc::{FnMutFut, Vector}, + web_socket::{Frame, OpCode}, }; use core::ops::Range; -impl Transport for WebSocketClient -where - NC: NegotiatedCompression, - S: Stream, - WSB: LeaseMut, -{ - const GROUP: TransportGroup = TransportGroup::WebSocket; - type Params = WsParams; - - #[inline] - async fn send( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result<(), A::Error> - where - A: Api, - P: Package, - { - send(pkg, pkgs_aux, self).await - } - - #[inline] - async fn send_recv( - &mut self, - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ) -> Result, A::Error> - where - A: Api, - P: Package, - { - send_recv(pkg, pkgs_aux, self).await - } -} - -impl BiTransport for WebSocketClient -where - NC: NegotiatedCompression, - S: Stream, - WSB: LeaseMut, -{ - #[inline] - async fn retrieve( - &mut self, - pkgs_aux: &mut PkgsAux, - ) -> crate::Result> - where - A: Api, - { - retrieve(pkgs_aux, self).await - } -} - -async fn retrieve( +async fn recv( + frame: Frame<&mut [u8], true>, pkgs_aux: &mut PkgsAux, - ws: &mut WebSocketClient, -) -> crate::Result> -where - NC: NegotiatedCompression, - S: Stream, - WSB: LeaseMut, -{ +) -> crate::Result> { pkgs_aux.byte_buffer.clear(); - let frame = ws.read_frame().await?; if let OpCode::Close = frame.op_code() { return Err(ClientApiFrameworkError::ClosedWsConnection.into()); } @@ -87,49 +30,29 @@ where Ok(0..pkgs_aux.byte_buffer.len()) } -async fn send( +async fn send( pkg: &mut P, pkgs_aux: &mut PkgsAux, - ws: &mut WebSocketClient, + trans: &mut T, + mut cb: impl for<'any> FnMutFut< + (Frame<&'any mut Vector, true>, &'any mut T), + Result = crate::Result<()>, + >, ) -> Result<(), A::Error> where A: Api, - NC: NegotiatedCompression, P: Package, - S: Stream, - WSB: LeaseMut, + T: Transport, { pkgs_aux.byte_buffer.clear(); - manage_before_sending_related(pkg, pkgs_aux, &mut *ws).await?; + manage_before_sending_related(pkg, pkgs_aux, &mut *trans).await?; let op_code = match pkgs_aux.tp.ext_req_params_mut().ty { WsReqParamsTy::Bytes => OpCode::Binary, WsReqParamsTy::String => OpCode::Text, }; - let mut frame = Frame::new_fin(op_code, &mut pkgs_aux.byte_buffer); - ws.write_frame(&mut frame).await.map_err(Into::into)?; + cb.call((Frame::new_fin(op_code, &mut pkgs_aux.byte_buffer), trans)).await?; pkgs_aux.byte_buffer.clear(); manage_after_sending_related(pkg, pkgs_aux).await?; pkgs_aux.tp.reset(); Ok(()) } - -async fn send_recv( - pkg: &mut P, - pkgs_aux: &mut PkgsAux, - ws: &mut WebSocketClient, -) -> Result, A::Error> -where - A: Api, - NC: NegotiatedCompression, - P: Package, - S: Stream, - WSB: LeaseMut, -{ - send(pkg, pkgs_aux, ws).await?; - let frame = ws.read_frame().await.map_err(Into::into)?; - if let OpCode::Close = frame.op_code() { - return Err(A::Error::from(ClientApiFrameworkError::ClosedWsConnection.into())); - } - pkgs_aux.byte_buffer.extend_from_copyable_slice(frame.payload()).map_err(Into::into)?; - Ok(0..pkgs_aux.byte_buffer.len()) -} diff --git a/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket.rs b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket.rs new file mode 100644 index 00000000..e2155646 --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket.rs @@ -0,0 +1,78 @@ +use crate::{ + client_api_framework::{ + network::{ + transport::{ + wtx_ws::{recv, send}, + RecievingTransport, SendingTransport, Transport, + }, + TransportGroup, WsParams, + }, + pkg::{Package, PkgsAux}, + Api, + }, + misc::{LeaseMut, Stream, Vector}, + web_socket::{compression::NegotiatedCompression, Frame, WebSocket, WebSocketBuffer}, +}; +use core::ops::Range; + +impl RecievingTransport for WebSocket +where + NC: NegotiatedCompression, + S: Stream, + WSB: LeaseMut, +{ + #[inline] + async fn recv( + &mut self, + pkgs_aux: &mut PkgsAux, + ) -> Result, A::Error> + where + A: Api, + { + Ok(recv(self.read_frame().await?, pkgs_aux).await?) + } +} + +impl SendingTransport for WebSocket +where + NC: NegotiatedCompression, + S: Stream, + WSB: LeaseMut, +{ + #[inline] + async fn send( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> Result<(), A::Error> + where + A: Api, + P: Package, + { + send(pkg, pkgs_aux, self, cb).await?; + Ok(()) + } +} + +impl Transport for WebSocket +where + NC: NegotiatedCompression, + S: Stream, + WSB: LeaseMut, +{ + const GROUP: TransportGroup = TransportGroup::TCP; + type Params = WsParams; +} + +async fn cb( + mut frame: Frame<&mut Vector, true>, + trans: &mut WebSocket, +) -> crate::Result<()> +where + NC: NegotiatedCompression, + S: Stream, + WSB: LeaseMut, +{ + trans.write_frame(&mut frame).await?; + Ok(()) +} diff --git a/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_parts_owned.rs b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_parts_owned.rs new file mode 100644 index 00000000..390c7361 --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_parts_owned.rs @@ -0,0 +1,64 @@ +use crate::{ + client_api_framework::{ + network::{ + transport::{RecievingTransport, SendingTransport, Transport}, + TransportGroup, WsParams, + }, + pkg::{Package, PkgsAux}, + Api, + }, + misc::{Lock, StreamReader, StreamWriter}, + web_socket::{compression::NegotiatedCompression, WebSocketCommonPartOwned, WebSocketPartsOwned}, +}; +use core::ops::Range; + +impl RecievingTransport for WebSocketPartsOwned +where + C: Lock>, + NC: NegotiatedCompression, + SR: StreamReader, + SW: StreamWriter, +{ + #[inline] + async fn recv( + &mut self, + pkgs_aux: &mut PkgsAux, + ) -> Result, A::Error> + where + A: Api, + { + self.reader.recv(pkgs_aux).await + } +} + +impl SendingTransport for WebSocketPartsOwned +where + C: Lock>, + NC: NegotiatedCompression, + SR: StreamReader, + SW: StreamWriter, +{ + #[inline] + async fn send( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> Result<(), A::Error> + where + A: Api, + P: Package, + { + self.writer.send(pkg, pkgs_aux).await + } +} + +impl Transport for WebSocketPartsOwned +where + C: Lock>, + NC: NegotiatedCompression, + SR: StreamReader, + SW: StreamWriter, +{ + const GROUP: TransportGroup = TransportGroup::TCP; + type Params = WsParams; +} diff --git a/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_reader_part_owned.rs b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_reader_part_owned.rs new file mode 100644 index 00000000..5f909df6 --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_reader_part_owned.rs @@ -0,0 +1,45 @@ +use crate::{ + client_api_framework::{ + network::{ + transport::{wtx_ws::recv, RecievingTransport, Transport}, + TransportGroup, WsParams, + }, + pkg::PkgsAux, + Api, + }, + misc::{Lock, StreamReader, StreamWriter}, + web_socket::{ + compression::NegotiatedCompression, WebSocketCommonPartOwned, WebSocketReaderPartOwned, + }, +}; +use core::ops::Range; + +impl RecievingTransport for WebSocketReaderPartOwned +where + C: Lock>, + NC: NegotiatedCompression, + SR: StreamReader, + SW: StreamWriter, +{ + #[inline] + async fn recv( + &mut self, + pkgs_aux: &mut PkgsAux, + ) -> Result, A::Error> + where + A: Api, + { + recv(self.read_frame().await?, pkgs_aux).await.map_err(Into::into) + } +} + +impl Transport for WebSocketReaderPartOwned +where + C: Lock>, + NC: NegotiatedCompression, + SR: StreamReader, + SW: StreamWriter, +{ + const GROUP: TransportGroup = TransportGroup::TCP; + type Params = WsParams; +} diff --git a/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_writer_part_owned.rs b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_writer_part_owned.rs new file mode 100644 index 00000000..123d18f2 --- /dev/null +++ b/wtx/src/client_api_framework/network/transport/wtx_ws/web_socket_writer_part_owned.rs @@ -0,0 +1,58 @@ +use crate::{ + client_api_framework::{ + network::{ + transport::{wtx_ws::send, SendingTransport, Transport}, + TransportGroup, WsParams, + }, + pkg::{Package, PkgsAux}, + Api, + }, + misc::{Lock, StreamWriter, Vector}, + web_socket::{ + compression::NegotiatedCompression, Frame, WebSocketCommonPartOwned, WebSocketWriterPartOwned, + }, +}; + +impl SendingTransport for WebSocketWriterPartOwned +where + C: Lock>, + NC: NegotiatedCompression, + SW: StreamWriter, +{ + #[inline] + async fn send( + &mut self, + pkg: &mut P, + pkgs_aux: &mut PkgsAux, + ) -> Result<(), A::Error> + where + A: Api, + P: Package, + { + send(pkg, pkgs_aux, self, cb).await?; + Ok(()) + } +} + +impl Transport for WebSocketWriterPartOwned +where + C: Lock>, + NC: NegotiatedCompression, + SW: StreamWriter, +{ + const GROUP: TransportGroup = TransportGroup::TCP; + type Params = WsParams; +} + +async fn cb( + mut frame: Frame<&mut Vector, true>, + trans: &mut WebSocketWriterPartOwned, +) -> crate::Result<()> +where + C: Lock>, + NC: NegotiatedCompression, + SW: StreamWriter, +{ + trans.write_frame(&mut frame).await?; + Ok(()) +} diff --git a/wtx/src/data_transformation/dnsn/tests.rs b/wtx/src/data_transformation/dnsn/tests.rs index 6b452092..756d91bf 100644 --- a/wtx/src/data_transformation/dnsn/tests.rs +++ b/wtx/src/data_transformation/dnsn/tests.rs @@ -67,7 +67,7 @@ macro_rules! _create_dnsn_test { mod $name { use crate::{ client_api_framework::{ - network::transport::{Mock, Transport}, + network::transport::{Mock, SendingRecievingTransport}, pkg::PkgsAux, }, data_transformation::{ diff --git a/wtx/src/error.rs b/wtx/src/error.rs index e2597c82..b0aebe63 100644 --- a/wtx/src/error.rs +++ b/wtx/src/error.rs @@ -54,8 +54,6 @@ pub enum Error { MatchitInsertError(Box), #[cfg(feature = "digest")] MacError(digest::MacError), - #[cfg(feature = "rustls-pki-types")] - PemError(Box), #[cfg(feature = "postgres")] PostgresDbError(Box), #[cfg(feature = "quick-protobuf")] @@ -158,8 +156,6 @@ pub enum Error { SchemaManagerError(crate::database::schema_manager::SchemaManagerError), #[cfg(feature = "http-server-framework")] ServerFrameworkError(crate::http::server_framework::ServerFrameworkError), - #[cfg(feature = "tls")] - TlsError(crate::tls::TlsError), VectorError(VectorError), #[cfg(feature = "web-socket")] WebSocketError(crate::web_socket::WebSocketError), @@ -362,13 +358,6 @@ impl From for Error { } } -#[cfg(feature = "rustls-pki-types")] -impl From for Error { - #[inline] - fn from(from: rustls_pki_types::pem::Error) -> Self { - Self::PemError(from.into()) - } -} #[cfg(feature = "postgres")] impl From for Error { #[inline] @@ -553,14 +542,6 @@ impl From for Error { } } -#[cfg(feature = "tls")] -impl From for Error { - #[inline] - fn from(from: crate::tls::TlsError) -> Self { - Self::TlsError(from) - } -} - impl From for Error { #[inline] fn from(from: VectorError) -> Self { diff --git a/wtx/src/http/optioned_server/web_socket_tokio.rs b/wtx/src/http/optioned_server/web_socket_tokio.rs index 7718331e..f38598e7 100644 --- a/wtx/src/http/optioned_server/web_socket_tokio.rs +++ b/wtx/src/http/optioned_server/web_socket_tokio.rs @@ -2,7 +2,7 @@ use crate::{ http::OptionedServer, misc::{FnFut, Stream, Xorshift64, _number_or_available_parallelism, simple_seed}, pool::{SimplePoolTokio, WebSocketRM}, - web_socket::{Compression, WebSocketBuffer, WebSocketServer}, + web_socket::{Compression, WebSocket, WebSocketBuffer}, }; use core::{fmt::Debug, future::Future}; use std::sync::OnceLock; @@ -32,14 +32,14 @@ impl OptionedServer { E: Debug + From + Send + 'static, for<'wsb> H: Clone + FnFut< - (WebSocketServer,), + (WebSocket,), Result = Result<(), E>, > + Send + 'static, N: Send + Future>, S: Stream + Send, for<'wsb> , + WebSocket, )>>::Future: Send, for<'handle> &'handle H: Send, { @@ -64,7 +64,7 @@ impl OptionedServer { let fun = async move { let net = conn_net_cb(conn_acceptor, tcp_stream).await?; conn_handle_cb - .call((WebSocketServer::accept( + .call((WebSocket::accept( conn_compression_cb(), true, Xorshift64::from(simple_seed()), diff --git a/wtx/src/lib.rs b/wtx/src/lib.rs index 64c65a77..895f0f7a 100644 --- a/wtx/src/lib.rs +++ b/wtx/src/lib.rs @@ -35,8 +35,6 @@ pub mod misc; pub mod pool; #[cfg(test)] mod tests; -#[cfg(feature = "tls")] -pub mod tls; #[cfg(feature = "web-socket")] pub mod web_socket; diff --git a/wtx/src/macros.rs b/wtx/src/macros.rs index 6696e0c1..55a0a93a 100644 --- a/wtx/src/macros.rs +++ b/wtx/src/macros.rs @@ -1,3 +1,11 @@ +/// Creates a vector containing the arguments. +#[macro_export] +macro_rules! vector { + ($($tt:tt)+) => { + $crate::misc::Vector::from_vec(alloc::vec![$($tt)+]) + }; +} + macro_rules! _conn_params_methods { () => { /// The initial amount of "credit" a counterpart can have for sending data. @@ -405,9 +413,3 @@ macro_rules! _trace_span { ) }; } - -macro_rules! _vector { - ($($tt:tt)+) => { - crate::misc::Vector::from_vec(alloc::vec![$($tt)+]) - }; -} diff --git a/wtx/src/misc/facades/arc.rs b/wtx/src/misc/facades/arc.rs index d9be5e61..11434b21 100644 --- a/wtx/src/misc/facades/arc.rs +++ b/wtx/src/misc/facades/arc.rs @@ -8,8 +8,9 @@ pub struct Arc( ); impl Arc { + /// Constructs a new instance. #[inline] - pub(crate) fn new(data: T) -> Self { + pub fn new(data: T) -> Self { Self( #[cfg(feature = "portable-atomic-util")] portable_atomic_util::Arc::new(data), @@ -17,6 +18,16 @@ impl Arc { alloc::sync::Arc::new(data), ) } + + /// Returns a mutable reference into the given `Arc`, if there are + /// no other `Arc` or [`Weak`] pointers to the same allocation. + #[inline] + pub fn get_mut(this: &mut Self) -> Option<&mut T> { + #[cfg(feature = "portable-atomic-util")] + return portable_atomic_util::Arc::get_mut(&mut this.0); + #[cfg(not(feature = "portable-atomic-util"))] + return alloc::sync::Arc::get_mut(&mut this.0); + } } impl Clone for Arc { diff --git a/wtx/src/misc/vector.rs b/wtx/src/misc/vector.rs index 71818ede..c7a08000 100644 --- a/wtx/src/misc/vector.rs +++ b/wtx/src/misc/vector.rs @@ -47,7 +47,7 @@ impl core::error::Error for VectorError {} /// A wrapper around the std's vector. //#[cfg_attr(kani, derive(kani::Arbitrary))] -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)] #[repr(transparent)] pub struct Vector { data: Vec, @@ -346,6 +346,24 @@ impl Vector { Ok(()) } + /// Retains only the elements specified by the predicate. + /// + /// In other words, remove all elements `e` for which `f(&e)` returns `false`. + /// This method operates in place, visiting each element exactly once in the + /// original order, and preserves the order of the retained elements. + /// + /// # Examples + /// + /// ``` + /// let mut vec = wtx::misc::Vector::from_iter(1u8..5).unwrap(); + /// vec.retain(|&x| x % 2 == 0); + /// assert_eq!(vec.as_slice(), [2, 4]); + /// ``` + #[inline(always)] + pub fn retain(&mut self, f: impl FnMut(&T) -> bool) { + self.data.retain(f); + } + /// Shortens the vector, keeping the first len elements and dropping the rest. /// /// ``` @@ -603,16 +621,6 @@ impl DerefMut for Vector { } } -impl IntoIterator for Vector { - type Item = T; - type IntoIter = IntoIter; - - #[inline] - fn into_iter(self) -> Self::IntoIter { - self.data.into_iter() - } -} - impl From> for Vector { #[inline] fn from(from: Vec) -> Self { @@ -627,6 +635,16 @@ impl From> for Vec { } } +impl IntoIterator for Vector { + type Item = T; + type IntoIter = IntoIter; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + self.data.into_iter() + } +} + impl core::fmt::Write for Vector { #[inline] fn write_str(&mut self, s: &str) -> core::fmt::Result { diff --git a/wtx/src/tls.rs b/wtx/src/tls.rs deleted file mode 100644 index 10a8e74b..00000000 --- a/wtx/src/tls.rs +++ /dev/null @@ -1,29 +0,0 @@ -#![allow(dead_code, missing_docs, reason = "development")] - -mod acceptor; -mod connector; -mod handshake; -mod item; -mod state; -mod tls_error; -mod tls_stream; -mod trust_anchor; - -pub use acceptor::{acceptor_backend::AcceptorBackend, Acceptor}; -pub use connector::{connector_backend::ConnectorBackend, Connector}; -pub use tls_error::TlsError; -pub use tls_stream::TlsStream; -pub use trust_anchor::TrustAnchor; - -/// Marker used to indicate the operations with `rustls` -#[cfg(feature = "rustls")] -#[derive(Debug)] -pub struct Rustls; - -#[inline] -fn _invalid_input_err(err: E) -> std::io::Error -where - E: Into>, -{ - std::io::Error::new(std::io::ErrorKind::InvalidInput, err) -} diff --git a/wtx/src/tls/acceptor.rs b/wtx/src/tls/acceptor.rs deleted file mode 100644 index efd845b0..00000000 --- a/wtx/src/tls/acceptor.rs +++ /dev/null @@ -1,49 +0,0 @@ -pub(crate) mod acceptor_backend; -#[cfg(feature = "rustls")] -mod rustls; - -/// TLS implementation responsable for accepting connections -#[derive(Debug)] -pub struct Acceptor -where - B: acceptor_backend::AcceptorBackend, -{ - backend: B, - backend_wca: B::WithoutClientAuth, - is_http2: bool, -} - -impl Acceptor -where - B: acceptor_backend::AcceptorBackend, -{ - #[inline] - pub fn without_client_auth(mut backend: B) -> Self { - let backend_wca = backend.without_client_auth(); - Self { backend, backend_wca, is_http2: false } - } - - /// Creates a [`tokio_rustls::TlsAcceptor`] with a single certificate chain and matching private - /// key. - #[inline] - pub fn build_with_cert_chain_and_priv_key( - self, - cert_chain: &[u8], - priv_key: &[u8], - ) -> crate::Result { - self.backend.build_with_cert_chain_and_priv_key( - self.backend_wca, - cert_chain, - self.is_http2, - priv_key, - ) - } - - /// Erases the set of ALPN protocols when building and then pushes the expected ALPN value for an - /// HTTP2 connection. - #[inline] - pub fn http2(mut self) -> Self { - self.is_http2 = true; - self - } -} diff --git a/wtx/src/tls/acceptor/acceptor_backend.rs b/wtx/src/tls/acceptor/acceptor_backend.rs deleted file mode 100644 index c4680f52..00000000 --- a/wtx/src/tls/acceptor/acceptor_backend.rs +++ /dev/null @@ -1,35 +0,0 @@ -pub trait AcceptorBackend { - type Acceptor; - type WithoutClientAuth; - - fn without_client_auth(&mut self) -> Self::WithoutClientAuth; - - fn build_with_cert_chain_and_priv_key( - self, - wca: Self::WithoutClientAuth, - cert_chain: &[u8], - is_http2: bool, - priv_key: &[u8], - ) -> crate::Result; -} - -impl AcceptorBackend for () { - type Acceptor = (); - type WithoutClientAuth = (); - - #[inline] - fn without_client_auth(&mut self) -> Self { - () - } - - #[inline] - fn build_with_cert_chain_and_priv_key( - self, - _: Self::WithoutClientAuth, - _: &[u8], - _: bool, - _: &[u8], - ) -> crate::Result { - Ok(()) - } -} diff --git a/wtx/src/tls/acceptor/rustls.rs b/wtx/src/tls/acceptor/rustls.rs deleted file mode 100644 index d9da1c45..00000000 --- a/wtx/src/tls/acceptor/rustls.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::tls::{Acceptor, AcceptorBackend, Rustls, _invalid_input_err}; -use alloc::sync::Arc; -use rustls::{server::WantsServerCert, ConfigBuilder, ServerConfig}; -use tokio_rustls::TlsAcceptor; - -impl Acceptor { - pub fn without_client_auth_rustls() -> Self { - Self::without_client_auth(Rustls) - } -} - -impl AcceptorBackend for Rustls { - type Acceptor = TlsAcceptor; - type WithoutClientAuth = ConfigBuilder; - - #[inline] - fn without_client_auth(&mut self) -> Self::WithoutClientAuth { - ServerConfig::builder().with_no_client_auth() - } - - #[inline] - fn build_with_cert_chain_and_priv_key( - self, - wca: Self::WithoutClientAuth, - cert_chain: &[u8], - is_http2: bool, - priv_key: &[u8], - ) -> crate::Result { - let mut config = wca.with_single_cert( - rustls_pemfile::certs(&mut &*cert_chain).collect::>()?, - rustls_pemfile::private_key(&mut &*priv_key)?.ok_or_else(|| _invalid_input_err("No key"))?, - )?; - if is_http2 { - config.alpn_protocols.clear(); - config.alpn_protocols.push("h2".into()); - } - Ok(TlsAcceptor::from(Arc::new(config))) - } -} diff --git a/wtx/src/tls/connector.rs b/wtx/src/tls/connector.rs deleted file mode 100644 index f96690c6..00000000 --- a/wtx/src/tls/connector.rs +++ /dev/null @@ -1,94 +0,0 @@ -pub(crate) mod connector_backend; -#[cfg(feature = "rustls")] -mod rustls; - -use crate::{ - misc::Stream, - tls::{TlsStream, TrustAnchor}, -}; -use alloc::vec::Vec; - -/// TLS client using `tokio-rustls` and associated crates. -#[derive(Debug)] -pub struct Connector -where - B: connector_backend::ConnectorBackend, -{ - alpn_protocols: Vec>, - backend: B, - backend_config: B::Config, -} - -impl Connector -where - B: connector_backend::ConnectorBackend, -{ - #[inline] - pub fn new(mut backend: B) -> Self { - let backend_config = backend.config(); - Self { alpn_protocols: Vec::new(), backend, backend_config } - } - - /// From the automatic selection of dependencies. - /// - /// An error will be returned if no dependency that provides CA certificates is selected. - #[inline] - pub fn from_auto(backend: B) -> crate::Result { - #[cfg(feature = "webpki-roots")] - { - let mut this = Self::new(backend); - this.backend.extend_from_trust_anchors( - &mut this.backend_config, - webpki_roots::TLS_SERVER_ROOTS.iter().map(|el| TrustAnchor { - name_constraints: el.name_constraints.as_ref().map(|der| der.as_ref()), - subject: el.subject.as_ref(), - subject_public_key_info: el.subject_public_key_info.as_ref(), - }), - )?; - Ok(this) - } - #[cfg(not(feature = "webpki-roots"))] - return Err(crate::Error::MissingCaProviders); - } - - /// Connects using a generic stream without client authentication. - #[inline] - pub async fn connect_without_client_auth( - self, - hostname: &str, - stream: S, - ) -> crate::Result> - where - S: Stream, - { - self.backend.connect_without_client_auth(self.backend_config, hostname, stream) - } - - /// Erases the current set of ALPN protocols and then pushes the expected ALPN value for a HTTP2 - /// connection. - #[inline] - pub fn http2(mut self) -> Self { - self.alpn_protocols.clear(); - self.alpn_protocols.push("h2".into()); - self - } - - /// Avoids additional round trips by specifying in advance which protocols should be used. - #[inline] - pub fn push_alpn_protocol(mut self, protocol: &[u8]) -> Self { - self.alpn_protocols.push(protocol.into()); - self - } - - #[inline] - pub fn push_der_cert(mut self, der_cert: &[u8]) -> crate::Result { - self.backend.push_der_cert(&mut self.backend_config, der_cert)?; - Ok(self) - } - - #[inline] - pub fn push_der_certs_from_pem(mut self, pem: &[u8]) -> crate::Result { - self.backend.push_der_cert(&mut self.backend_config, pem)?; - Ok(self) - } -} diff --git a/wtx/src/tls/connector/connector_backend.rs b/wtx/src/tls/connector/connector_backend.rs deleted file mode 100644 index 21b5b52c..00000000 --- a/wtx/src/tls/connector/connector_backend.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::{ - misc::Stream, - tls::{connector::TrustAnchor, TlsStream}, -}; - -/// TLS implementation responsable for initiating connections -pub trait ConnectorBackend { - type Config; - type StreamAux; - - fn config(&mut self) -> Self::Config; - - fn connect_without_client_auth( - self, - config: Self::Config, - hostname: &str, - stream: S, - ) -> crate::Result> - where - S: Stream; - - fn extend_from_trust_anchors( - &mut self, - config: &mut Self::Config, - trust_anchors: impl Iterator>, - ) -> crate::Result<()>; - - fn push_der_cert(&mut self, config: &mut Self::Config, der_cert: &[u8]) -> crate::Result<()>; - - fn push_der_certs_from_pem(&mut self, config: &mut Self::Config, pem: &[u8]) - -> crate::Result<()>; -} - -impl ConnectorBackend for () { - type Config = (); - type StreamAux = (); - - #[inline] - fn config(&mut self) -> Self::Config { - () - } - - #[inline] - fn connect_without_client_auth( - self, - _: Self::Config, - _: &str, - stream: S, - ) -> crate::Result> - where - S: Stream, - { - Ok(TlsStream::new((), stream)) - } - - #[inline] - fn extend_from_trust_anchors( - &mut self, - _: &mut Self::Config, - _: impl Iterator>, - ) -> crate::Result<()> { - Ok(()) - } - - #[inline] - fn push_der_cert(&mut self, _: &mut Self::Config, _: &[u8]) -> crate::Result<()> { - Ok(()) - } - - #[inline] - fn push_der_certs_from_pem(&mut self, _: &mut Self::Config, _: &[u8]) -> crate::Result<()> { - Ok(()) - } -} diff --git a/wtx/src/tls/connector/rustls.rs b/wtx/src/tls/connector/rustls.rs deleted file mode 100644 index 49f7d8b0..00000000 --- a/wtx/src/tls/connector/rustls.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::{ - misc::{GenericTimeProvider, Stream}, - tls::{ - connector::TrustAnchor, item::Item, Connector, ConnectorBackend, Rustls, TlsStream, - _invalid_input_err, - }, -}; -use alloc::{string::String, sync::Arc}; -use rustls::{client::UnbufferedClientConnection, version::TLS13, ClientConfig, RootCertStore}; -use rustls_pki_types::{CertificateDer, Der, ServerName}; - -impl Connector { - pub fn rustls() -> Self { - Connector::new(Rustls) - } -} - -impl ConnectorBackend for Rustls { - type Config = RootCertStore; - type StreamAux = UnbufferedClientConnection; - - #[inline] - fn config(&mut self) -> Self::Config { - RootCertStore::empty() - } - - #[inline] - fn connect_without_client_auth( - self, - config: Self::Config, - hostname: &str, - stream: S, - ) -> crate::Result> - where - S: Stream, - { - #[cfg(feature = "aws-lc-rs")] - let provider = rustls::crypto::aws_lc_rs::default_provider(); - #[cfg(all(feature = "ring", not(any(feature = "aws-lc-rs"))))] - let provider = rustls::crypto::ring::default_provider(); - #[cfg(not(any(feature = "aws-lc-rs", feature = "ring")))] - return Err(crate::tls::TlsError::MissingProvider.into()); - - let time_provider = Arc::new(GenericTimeProvider); - let client_config = ClientConfig::builder_with_details(Arc::new(provider), time_provider) - .with_protocol_versions(&[&TLS13])? - .with_root_certificates(config) - .with_no_client_auth(); - let ucc = UnbufferedClientConnection::new( - Arc::new(client_config), - ServerName::try_from(String::from(hostname)).map_err(_invalid_input_err)?, - )?; - Ok(TlsStream::new(ucc, stream)) - } - - #[inline] - fn extend_from_trust_anchors( - &mut self, - config: &mut Self::Config, - trust_anchors: impl Iterator>, - ) -> crate::Result<()> { - config.extend(trust_anchors.map(|ta| rustls_pki_types::TrustAnchor { - subject: Der::from_slice(ta.subject), - subject_public_key_info: Der::from_slice(ta.subject_public_key_info), - name_constraints: ta.name_constraints.map(|el| Der::from_slice(el)), - })); - Ok(()) - } - - #[inline] - fn push_der_cert(&mut self, config: &mut Self::Config, der_cert: &[u8]) -> crate::Result<()> { - config.add(CertificateDer::from_slice(der_cert))?; - Ok(()) - } - - #[inline] - fn push_der_certs_from_pem( - &mut self, - config: &mut Self::Config, - pem: &[u8], - ) -> crate::Result<()> { - for rslt in Item::rustls_pki_types(pem) { - let Item::X509DerCertificate(der_cert) = rslt?; - config.add(der_cert.into())?; - } - Ok(()) - } -} diff --git a/wtx/src/tls/handshake.rs b/wtx/src/tls/handshake.rs deleted file mode 100644 index eb349b02..00000000 --- a/wtx/src/tls/handshake.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) struct Handshake {} diff --git a/wtx/src/tls/item.rs b/wtx/src/tls/item.rs deleted file mode 100644 index 54526e9f..00000000 --- a/wtx/src/tls/item.rs +++ /dev/null @@ -1,36 +0,0 @@ -use alloc::vec::Vec; - -#[derive(Debug, PartialEq)] -pub(crate) enum Item { - X509DerCertificate(Vec), -} - -#[cfg(feature = "rustls-pki-types")] -mod rustls { - use crate::tls::item::Item; - use rustls_pki_types::pem::{PemObject, SectionKind}; - use std::vec::Vec; - - impl Item { - #[inline] - pub(crate) fn rustls_pki_types( - pem: &[u8], - ) -> impl Iterator> + use<'_> { - PemObject::pem_slice_iter(pem).filter_map(|rslt| { - let (sk, bytes) = match rslt { - Err(err) => return Some(Err(err)), - Ok(elem) => elem, - }; - Ok(Self::from_rustls_kind(bytes, sk)).transpose() - }) - } - - #[inline] - fn from_rustls_kind(bytes: Vec, sk: SectionKind) -> Option { - match sk { - SectionKind::Certificate => Some(Self::X509DerCertificate(bytes.into())), - _ => None, - } - } - } -} diff --git a/wtx/src/tls/state.rs b/wtx/src/tls/state.rs deleted file mode 100644 index 6e000f6f..00000000 --- a/wtx/src/tls/state.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[derive(Debug, Copy, Clone)] -pub(crate) enum State { - FullyShutdown, - ReadShutdown, - Stream, - WriteShutdown, -} diff --git a/wtx/src/tls/tls_error.rs b/wtx/src/tls/tls_error.rs deleted file mode 100644 index 806c010a..00000000 --- a/wtx/src/tls/tls_error.rs +++ /dev/null @@ -1,6 +0,0 @@ -/// TLS errror -#[derive(Debug)] -pub enum TlsError { - /// It is necessary to provide a crypto provide using Cargo features - MissingProvider, -} diff --git a/wtx/src/tls/tls_stream.rs b/wtx/src/tls/tls_stream.rs deleted file mode 100644 index 1843d8e5..00000000 --- a/wtx/src/tls/tls_stream.rs +++ /dev/null @@ -1,13 +0,0 @@ -#[derive(Debug)] -pub struct TlsStream { - aux: A, - stream: S, -} - -impl TlsStream { - /// Creates a new instance with a stream that supposedly already performed a handshake. - #[inline] - pub fn new(aux: A, stream: S) -> Self { - Self { aux, stream } - } -} diff --git a/wtx/src/tls/trust_anchor.rs b/wtx/src/tls/trust_anchor.rs deleted file mode 100644 index ed013a1a..00000000 --- a/wtx/src/tls/trust_anchor.rs +++ /dev/null @@ -1,6 +0,0 @@ -#[derive(Debug)] -pub struct TrustAnchor<'bytes> { - pub(crate) name_constraints: Option<&'bytes [u8]>, - pub(crate) subject: &'bytes [u8], - pub(crate) subject_public_key_info: &'bytes [u8], -} diff --git a/wtx/src/web_socket.rs b/wtx/src/web_socket.rs index 24b98d70..b72531cd 100644 --- a/wtx/src/web_socket.rs +++ b/wtx/src/web_socket.rs @@ -46,7 +46,8 @@ pub use web_socket_error::WebSocketError; pub use web_socket_parts::{ web_socket_part_mut::{WebSocketCommonPartMut, WebSocketReaderPartMut, WebSocketWriterPartMut}, web_socket_part_owned::{ - WebSocketCommonPartOwned, WebSocketReaderPartOwned, WebSocketWriterPartOwned, + WebSocketCommonPartOwned, WebSocketPartsOwned, WebSocketReaderPartOwned, + WebSocketWriterPartOwned, }, }; @@ -60,18 +61,12 @@ const RSV1_MASK: u8 = 0b0100_0000; const RSV2_MASK: u8 = 0b0010_0000; const RSV3_MASK: u8 = 0b0001_0000; -/// [`WebSocket`] instance for clients. -pub type WebSocketClient = WebSocket; /// [`WebSocketClient`] with a mutable reference of [`WebSocketBuffer`]. -pub type WebSocketClientMut<'wsb, NC, S> = WebSocket; +pub type WebSocketMut<'wsb, NC, S, const IS_CLIENT: bool> = + WebSocket; /// [`WebSocketClient`] with an owned [`WebSocketBuffer`]. -pub type WebSocketClientOwned = WebSocket; -/// [`WebSocket`] instance for servers -pub type WebSocketServer = WebSocket; -/// [`WebSocketServer`] with a mutable reference of [`WebSocketBuffer`]. -pub type WebSocketServerMut<'wsb, NC, S> = WebSocket; -/// [`WebSocketServer`] with an owned [`WebSocketBuffer`]. -pub type WebSocketServerOwned = WebSocket; +pub type WebSocketOwned = + WebSocket; /// Full-duplex communication over an asynchronous stream. /// @@ -206,7 +201,7 @@ where writer_buffer: _, } = wsb.lease_mut(); let nc_rsv1 = nc.rsv1(); - let (frame, payload_ty) = read_frame_from_stream!( + let (frame, payload_ty) = read_frame!( *max_payload_len, (NC::IS_NOOP, nc_rsv1), network_buffer, @@ -259,10 +254,7 @@ where pub fn into_parts( self, split: impl FnOnce(S) -> (SR, SW), - ) -> ( - WebSocketReaderPartOwned, - WebSocketWriterPartOwned, - ) + ) -> WebSocketPartsOwned where C: Clone + Lock>, { @@ -287,8 +279,8 @@ where let common = C::new(WebSocketCommonPartOwned { wsc: WebSocketCommonPart { connection_state, nc, rng, stream: stream_writer }, }); - ( - WebSocketReaderPartOwned { + WebSocketPartsOwned { + reader: WebSocketReaderPartOwned { common: common.clone(), phantom: PhantomData, stream_reader, @@ -302,11 +294,11 @@ where reader_buffer_second, }, }, - WebSocketWriterPartOwned { + writer: WebSocketWriterPartOwned { common, phantom: PhantomData, wswp: WebSocketWriterPart { no_masking, writer_buffer }, }, - ) + } } } diff --git a/wtx/src/web_socket/handshake/tests.rs b/wtx/src/web_socket/handshake/tests.rs index 9ffcb03d..9be9bde1 100644 --- a/wtx/src/web_socket/handshake/tests.rs +++ b/wtx/src/web_socket/handshake/tests.rs @@ -11,8 +11,8 @@ use crate::{ misc::{simple_seed, Xorshift64}, tests::_uri, web_socket::{ - compression::NegotiatedCompression, Compression, Frame, OpCode, WebSocketBuffer, - WebSocketClient, WebSocketClientOwned, WebSocketServer, WebSocketServerOwned, + compression::NegotiatedCompression, Compression, Frame, OpCode, WebSocket, WebSocketBuffer, + WebSocketOwned, }, }; use core::{ @@ -65,7 +65,7 @@ async fn do_test_client_and_server_frames( let listener = TcpListener::bind(uri.hostname_with_implied_port()).await.unwrap(); let _server_jh = tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); - let mut ws = WebSocketServer::accept( + let mut ws = WebSocket::accept( server_compression, server_no_masking, Xorshift64::from(simple_seed()), @@ -89,7 +89,7 @@ async fn do_test_client_and_server_frames( HAS_SERVER_FINISHED.store(true, Ordering::Relaxed); }); - let mut ws = WebSocketClient::connect( + let mut ws = WebSocket::connect( client_compression, [], client_no_masking, @@ -128,9 +128,9 @@ async fn do_test_client_and_server_frames( } trait Test { - async fn client(ws: &mut WebSocketClientOwned); + async fn client(ws: &mut WebSocketOwned); - async fn server(ws: &mut WebSocketServerOwned); + async fn server(ws: &mut WebSocketOwned); } struct FragmentedText; @@ -138,12 +138,12 @@ impl Test for FragmentedText where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_unfin(OpCode::Text, &mut [b'1'])).await.unwrap(); ws.write_frame(&mut Frame::new_fin(OpCode::Continuation, &mut [b'2', b'3'])).await.unwrap(); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { let text = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, text.op_code()); assert_eq!(&[b'1', b'2', b'3'], text.payload()); @@ -155,7 +155,7 @@ impl Test for HelloAndGoodbye where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { let hello = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, hello.op_code()); assert_eq!(b"Hello!", hello.payload()); @@ -163,7 +163,7 @@ where assert_eq!(OpCode::Close, ws.read_frame().await.unwrap().op_code()); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_fin(OpCode::Text, *b"Hello!")).await.unwrap(); assert_eq!(ws.read_frame().await.unwrap().payload(), b"Goodbye!"); ws.write_frame(&mut Frame::new_fin(OpCode::Close, &mut [])).await.unwrap(); @@ -175,8 +175,8 @@ impl Test for LargeFragmentedText where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { - let bytes = || _vector![b'1'; 256 * 1024]; + async fn client(ws: &mut WebSocketOwned) { + let bytes = || vector![b'1'; 256 * 1024]; ws.write_frame(&mut Frame::new_unfin(OpCode::Text, &mut bytes())).await.unwrap(); ws.write_frame(&mut Frame::new_unfin(OpCode::Continuation, &mut bytes())).await.unwrap(); ws.write_frame(&mut Frame::new_unfin(OpCode::Continuation, &mut bytes())).await.unwrap(); @@ -189,10 +189,10 @@ where ws.write_frame(&mut Frame::new_fin(OpCode::Continuation, &mut bytes())).await.unwrap(); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { let text = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, text.op_code()); - assert_eq!(_vector![b'1'; 10 * 256 * 1024].as_slice(), *text.payload()); + assert_eq!(vector![b'1'; 10 * 256 * 1024].as_slice(), *text.payload()); } } @@ -201,13 +201,13 @@ impl Test for PingAndText where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_fin(OpCode::Ping, &mut [1, 2, 3])).await.unwrap(); ws.write_frame(&mut Frame::new_fin(OpCode::Text, *b"ipat")).await.unwrap(); assert_eq!(OpCode::Pong, ws.read_frame().await.unwrap().op_code()); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { assert_eq!(b"ipat", ws.read_frame().await.unwrap().payload()); } } @@ -217,14 +217,14 @@ impl Test for PingBetweenFragmentedText where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_unfin(OpCode::Text, &mut [b'1'])).await.unwrap(); ws.write_frame(&mut Frame::new_fin(OpCode::Ping, &mut [b'9'])).await.unwrap(); ws.write_frame(&mut Frame::new_fin(OpCode::Continuation, &mut [b'2', b'3'])).await.unwrap(); assert_eq!(OpCode::Pong, ws.read_frame().await.unwrap().op_code()); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { let text = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, text.op_code()); assert_eq!(&[b'1', b'2', b'3'], text.payload()); @@ -236,7 +236,7 @@ impl Test for SeveralBytes where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_unfin(OpCode::Text, &mut [206])).await.unwrap(); ws.write_frame(&mut Frame::new_unfin(OpCode::Continuation, &mut [186])).await.unwrap(); ws.write_frame(&mut Frame::new_unfin(OpCode::Continuation, &mut [225])).await.unwrap(); @@ -251,7 +251,7 @@ where ws.write_frame(&mut Frame::new_fin(OpCode::Continuation, &mut [])).await.unwrap(); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { let text = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, text.op_code()); assert_eq!("κόσμε".as_bytes(), *text.payload()); @@ -263,7 +263,7 @@ impl Test for TwoPings where NC: NegotiatedCompression, { - async fn client(ws: &mut WebSocketClientOwned) { + async fn client(ws: &mut WebSocketOwned) { ws.write_frame(&mut Frame::new_fin(OpCode::Ping, &mut [b'0'])).await.unwrap(); ws.write_frame(&mut Frame::new_fin(OpCode::Ping, &mut [b'1'])).await.unwrap(); let _0 = ws.read_frame().await.unwrap(); @@ -275,7 +275,7 @@ where ws.write_frame(&mut Frame::new_fin(OpCode::Text, &mut [])).await.unwrap(); } - async fn server(ws: &mut WebSocketServerOwned) { + async fn server(ws: &mut WebSocketOwned) { let _0 = ws.read_frame().await.unwrap(); assert_eq!(OpCode::Text, _0.op_code()); assert_eq!(b"", _0.payload()); diff --git a/wtx/src/web_socket/macros.rs b/wtx/src/web_socket/macros.rs index e54b5004..4a462bcd 100644 --- a/wtx/src/web_socket/macros.rs +++ b/wtx/src/web_socket/macros.rs @@ -75,7 +75,7 @@ macro_rules! read_continuation_frames { }; } -macro_rules! read_frame_from_stream { +macro_rules! read_frame { ( $max_payload_len:expr, ($nc_is_noop:expr, $nc_rsv1:expr), diff --git a/wtx/src/web_socket/web_socket_parts/web_socket_part.rs b/wtx/src/web_socket/web_socket_parts/web_socket_part.rs index 0bea5c46..11bdbf8f 100644 --- a/wtx/src/web_socket/web_socket_parts/web_socket_part.rs +++ b/wtx/src/web_socket/web_socket_parts/web_socket_part.rs @@ -56,7 +56,7 @@ where reader_buffer_first, reader_buffer_second, } = self; - let (frame, payload_ty) = read_frame_from_stream!( + let (frame, payload_ty) = read_frame!( *max_payload_len, (NC::IS_NOOP, *nc_rsv1), network_buffer.lease_mut(), @@ -100,7 +100,7 @@ where reader_buffer_second, } = self; let parts = &mut (stream_reader, common); - let (frame, payload_ty) = read_frame_from_stream!( + let (frame, payload_ty) = read_frame!( *max_payload_len, (NC::IS_NOOP, *nc_rsv1), network_buffer.lease_mut(), diff --git a/wtx/src/web_socket/web_socket_parts/web_socket_part_mut.rs b/wtx/src/web_socket/web_socket_parts/web_socket_part_mut.rs index 2fddac6b..43f663df 100644 --- a/wtx/src/web_socket/web_socket_parts/web_socket_part_mut.rs +++ b/wtx/src/web_socket/web_socket_parts/web_socket_part_mut.rs @@ -44,6 +44,18 @@ where NC: NegotiatedCompression, S: Stream, { + /// The current frame payload that is set when [`Self::read_frame`] is called, otherwise, + /// returns an empty slice. + #[inline] + pub fn curr_payload(&mut self) -> &mut [u8] { + match self.wsrp.curr_payload { + PayloadTy::Network => self.wsrp.network_buffer._current_mut(), + PayloadTy::None => &mut [], + PayloadTy::FirstReader => self.wsrp.reader_buffer_first.as_slice_mut(), + PayloadTy::SecondReader => self.wsrp.reader_buffer_second.as_slice_mut(), + } + } + /// Reads a frame from the stream. /// /// If a frame is made up of other sub-frames or continuations, then everything is collected diff --git a/wtx/src/web_socket/web_socket_parts/web_socket_part_owned.rs b/wtx/src/web_socket/web_socket_parts/web_socket_part_owned.rs index 4672d469..1d5b1291 100644 --- a/wtx/src/web_socket/web_socket_parts/web_socket_part_owned.rs +++ b/wtx/src/web_socket/web_socket_parts/web_socket_part_owned.rs @@ -14,6 +14,15 @@ use crate::{ }; use core::marker::PhantomData; +/// Owned reader and writer pair +#[derive(Debug)] +pub struct WebSocketPartsOwned { + /// Reader + pub reader: WebSocketReaderPartOwned, + /// Writer + pub writer: WebSocketWriterPartOwned, +} + /// Auxiliary structure used by [`WebSocketReaderPartOwned`] and [`WebSocketWriterPartOwned`] #[derive(Debug)] pub struct WebSocketCommonPartOwned { @@ -36,6 +45,18 @@ where SR: StreamReader, SW: StreamWriter, { + /// The current frame payload that is set when [`Self::read_frame`] is called, otherwise, + /// returns an empty slice. + #[inline] + pub fn curr_payload(&mut self) -> &mut [u8] { + match self.wsrp.curr_payload { + PayloadTy::Network => self.wsrp.network_buffer._current_mut(), + PayloadTy::None => &mut [], + PayloadTy::FirstReader => self.wsrp.reader_buffer_first.as_slice_mut(), + PayloadTy::SecondReader => self.wsrp.reader_buffer_second.as_slice_mut(), + } + } + /// Reads a frame from the stream. /// /// If a frame is made up of other sub-frames or continuations, then everything is collected