diff --git a/Cargo.lock b/Cargo.lock index 89f7a65f..3b428935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,9 +237,9 @@ checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cc" -version = "1.1.18" +version = "1.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" +checksum = "2d74707dde2ba56f86ae90effb3b43ddd369504387e718014de010cec7959800" dependencies = [ "jobserver", "libc", @@ -723,9 +723,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" [[package]] name = "opaque-debug" diff --git a/README.md b/README.md index ef60f039..863b88a7 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,10 @@ Anything marked with `#[bench]` in the repository is considered a low-level benc Take a look at to see all low-level benchmarks over different periods of time. +## Examples + +Demonstrations of different use-cases can be found in the `wtx-instances` directory as well as in the documentation. + ## Limitations Does not support systems with 16bit memory addresses and expects the infallible addition of the sizes of 8 allocated chunks of memories, otherwise the program will overflow in certain arithmetic operations involving `usize` potentially resulting in unexpected operations. diff --git a/wtx-instances/database-examples/database-client-postgres-composite-type.rs b/wtx-instances/database-examples/database-client-postgres-composite-type.rs index e181ffe1..38d712f2 100644 --- a/wtx-instances/database-examples/database-client-postgres-composite-type.rs +++ b/wtx-instances/database-examples/database-client-postgres-composite-type.rs @@ -14,7 +14,7 @@ use wtx::database::{ #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE"; + let uri = "postgres://USER:PASSWORD@localhost/DATABASE"; let mut executor = wtx_instances::executor(&uri).await?; let _ = executor .execute_with_stmt( diff --git a/wtx-instances/database-examples/database-client-postgres-enum.rs b/wtx-instances/database-examples/database-client-postgres-enum.rs index ff084ca7..f58bb4ed 100644 --- a/wtx-instances/database-examples/database-client-postgres-enum.rs +++ b/wtx-instances/database-examples/database-client-postgres-enum.rs @@ -14,7 +14,7 @@ use wtx::database::{ #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE"; + let uri = "postgres://USER:PASSWORD@localhost/DATABASE"; let mut executor = wtx_instances::executor(&uri).await?; let _ = executor .execute_with_stmt("INSERT INTO custom_enum_table VALUES ($1, $2)", (1, Enum::Bar)) diff --git a/wtx-instances/database-examples/database-client-postgres.rs b/wtx-instances/database-examples/database-client-postgres.rs index ca821ed5..94de1e2b 100644 --- a/wtx-instances/database-examples/database-client-postgres.rs +++ b/wtx-instances/database-examples/database-client-postgres.rs @@ -10,7 +10,7 @@ use wtx::database::{Executor as _, Record, Records, TransactionManager}; #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE"; + let uri = "postgres://USER:PASSWORD@localhost/DATABASE"; let mut executor = wtx_instances::executor(&uri).await?; let mut tm = executor.transaction().await?; tm.executor().execute("CREATE TABLE IF NOT EXISTS example(id INT, name VARCHAR)", |_| {}).await?; diff --git a/wtx-instances/generic-examples/client-api-framework.rs b/wtx-instances/generic-examples/client-api-framework.rs index 6e8d0ce1..2cd0631a 100644 --- a/wtx-instances/generic-examples/client-api-framework.rs +++ b/wtx-instances/generic-examples/client-api-framework.rs @@ -88,7 +88,7 @@ async fn http_pair( rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))), }, SerdeJson, - HttpParams::from_uri("ws://generic_web_socket_uri.com:80".into()), + HttpParams::from_uri("ws://generic_web_socket_uri.com".into()), ), ClientFrameworkTokio::tokio(1).build(), ) @@ -101,14 +101,14 @@ async fn web_socket_pair() -> wtx::Result< >, > { let mut fb = FrameBufferVec::default(); - let uri: Uri<&str> = Uri::new("ws://generic_web_socket_uri.com:80"); + let uri = Uri::new("ws://generic_web_socket_uri.com"); let web_socket = WebSocketClient::connect( (), &mut fb, [], &mut HeadersBuffer::default(), NoStdRng::default(), - TcpStream::connect(uri.host()).await?, + TcpStream::connect(uri.hostname_with_implied_port()).await?, &uri, WebSocketBuffer::default(), ) diff --git a/wtx-instances/generic-examples/http-client-framework.rs b/wtx-instances/generic-examples/http-client-framework.rs index 33bf4b10..146aca82 100644 --- a/wtx-instances/generic-examples/http-client-framework.rs +++ b/wtx-instances/generic-examples/http-client-framework.rs @@ -15,7 +15,7 @@ use wtx::{ #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = Uri::new("http://www.example.com:80"); + let uri = Uri::new("http://www.example.com"); let buffer = ReqResBuffer::default(); let client = ClientFramework::tokio(1).build(); let res = client.send(Method::Get, buffer, &uri.to_ref()).await?; diff --git a/wtx-instances/generic-examples/http2-client.rs b/wtx-instances/generic-examples/http2-client.rs index 408a4fe3..cc62c98d 100644 --- a/wtx-instances/generic-examples/http2-client.rs +++ b/wtx-instances/generic-examples/http2-client.rs @@ -10,30 +10,25 @@ use tokio::net::TcpStream; use wtx::{ http::{Method, ReqResBuffer, Request}, http2::{Http2Buffer, Http2ErrorCode, Http2Params, Http2Tokio}, - misc::{from_utf8_basic, Either, NoStdRng, Uri}, + misc::{from_utf8_basic, NoStdRng, Uri}, }; #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = Uri::new("http://www.example.com:80"); + let uri = Uri::new("http://www.example.com"); let (frame_reader, mut http2) = Http2Tokio::connect( Http2Buffer::new(NoStdRng::default()), Http2Params::default(), - TcpStream::connect(uri.host()).await?.into_split(), + TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(), ) .await?; - let _jh = tokio::spawn(async move { - if let Err(err) = frame_reader.await { - eprintln!("{err}"); - } - }); + let _jh = tokio::spawn(frame_reader); let rrb = ReqResBuffer::default(); let mut stream = http2.stream().await?; stream.send_req(Request::http2(Method::Get, b"Hello!"), &uri.to_ref()).await?; - let Either::Right(res) = stream.recv_res(rrb).await? else { - return Err(wtx::Error::ClosedConnection); - }; - println!("{}", from_utf8_basic(&res.0.data)?); + let (res_rrb, opt) = stream.recv_res(rrb).await?; + let _status_code = opt.unwrap(); + println!("{}", from_utf8_basic(&res_rrb.data)?); http2.send_go_away(Http2ErrorCode::NoError).await; Ok(()) } diff --git a/wtx-instances/generic-examples/web-socket-client.rs b/wtx-instances/generic-examples/web-socket-client.rs index b039915a..3494ca89 100644 --- a/wtx-instances/generic-examples/web-socket-client.rs +++ b/wtx-instances/generic-examples/web-socket-client.rs @@ -20,7 +20,7 @@ use wtx::{ #[tokio::main] async fn main() -> wtx::Result<()> { - let uri = Uri::new("ws://www.example.com:80"); + let uri = Uri::new("ws://www.example.com"); let fb = &mut FrameBufferVec::default(); let (_, mut ws) = WebSocketClient::connect( (), @@ -28,7 +28,7 @@ async fn main() -> wtx::Result<()> { [], &mut HeadersBuffer::default(), StdRng::default(), - TcpStream::connect(uri.host()).await?, + TcpStream::connect(uri.hostname_with_implied_port()).await?, &uri.to_ref(), WebSocketBuffer::default(), ) diff --git a/wtx-instances/http-server-framework-examples/http-server-framework-session.rs b/wtx-instances/http-server-framework-examples/http-server-framework-session.rs index 00466518..d2b2dc03 100644 --- a/wtx-instances/http-server-framework-examples/http-server-framework-session.rs +++ b/wtx-instances/http-server-framework-examples/http-server-framework-session.rs @@ -58,7 +58,7 @@ async fn main() -> wtx::Result<()> { (SessionDecoder::new(), SessionEnforcer::new([LOGIN, LOGOUT])), (), )?; - let pool = Pool::new(4, PostgresRM::tokio("postgres://USER:PASSWORD@localhost:5432/DB_NAME")); + let pool = Pool::new(4, PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME")); let mut rng = StdRng::default(); let mut key = [0; 16]; rng.fill_slice(&mut key); diff --git a/wtx-instances/http-server-framework-examples/http-server-framework.rs b/wtx-instances/http-server-framework-examples/http-server-framework.rs index b46fe954..08225183 100644 --- a/wtx-instances/http-server-framework-examples/http-server-framework.rs +++ b/wtx-instances/http-server-framework-examples/http-server-framework.rs @@ -41,7 +41,7 @@ async fn main() -> wtx::Result<()> { )?, ), ))?; - let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost:5432/DB_NAME"); + let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME"); let pool = Pool::new(4, rm); ServerFrameworkBuilder::new(router) .with_req_aux(move || pool.clone()) diff --git a/wtx-instances/src/lib.rs b/wtx-instances/src/lib.rs index dbfaa444..bfc68e19 100644 --- a/wtx-instances/src/lib.rs +++ b/wtx-instances/src/lib.rs @@ -40,7 +40,7 @@ pub async fn executor( &Config::from_uri(&uri)?, ExecutorBuffer::with_default_params(&mut rng)?, &mut rng, - TcpStream::connect(uri.host()).await?, + TcpStream::connect(uri.hostname_with_implied_port()).await?, ) .await } diff --git a/wtx-ui/src/schema_manager.rs b/wtx-ui/src/schema_manager.rs index ba1b0eab..2905bc94 100644 --- a/wtx-ui/src/schema_manager.rs +++ b/wtx-ui/src/schema_manager.rs @@ -19,14 +19,14 @@ pub(crate) async fn schema_manager(sm: SchemaManager) -> wtx::Result<()> { let var = std::env::var(DEFAULT_URI_VAR)?; let uri = UriRef::new(&var); - match uri.schema() { + match uri.scheme() { "postgres" | "postgresql" => { let mut rng = StdRng::default(); let executor = Executor::connect( &Config::from_uri(&uri)?, ExecutorBuffer::with_default_params(&mut rng)?, &mut rng, - TcpStream::connect(uri.host()).await.map_err(wtx::Error::from)?, + TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(wtx::Error::from)?, ) .await?; handle_commands(executor, &sm).await?; diff --git a/wtx-ui/src/web_socket.rs b/wtx-ui/src/web_socket.rs index e502ca8d..226c720c 100644 --- a/wtx-ui/src/web_socket.rs +++ b/wtx-ui/src/web_socket.rs @@ -20,7 +20,7 @@ pub(crate) async fn connect(uri: &str, cb: impl Fn(&str)) -> wtx::Result<()> { [], &mut HeadersBuffer::default(), StdRng::default(), - TcpStream::connect(uri.host()).await?, + TcpStream::connect(uri.hostname_with_implied_port()).await?, &uri, wsb, ) @@ -53,7 +53,7 @@ pub(crate) async fn serve( str: fn(&str), ) -> wtx::Result<()> { let uri = UriRef::new(uri); - let listener = TcpListener::bind(uri.host()).await?; + let listener = TcpListener::bind(uri.hostname_with_implied_port()).await?; loop { let (stream, _) = listener.accept().await?; let _jh = tokio::spawn(async move { diff --git a/wtx/src/client_api_framework/network/transport.rs b/wtx/src/client_api_framework/network/transport.rs index e4379c4a..f8577eba 100644 --- a/wtx/src/client_api_framework/network/transport.rs +++ b/wtx/src/client_api_framework/network/transport.rs @@ -19,7 +19,7 @@ use crate::{ Api, }, data_transformation::dnsn::{Deserialize, Serialize}, - misc::Lease, + misc::{Lease, Vector}, }; pub use bi_transport::*; use core::{future::Future, ops::Range}; @@ -69,14 +69,10 @@ pub trait Transport { #[inline] fn send_recv_decode_batch<'pkgs, 'pkgs_aux, A, P>( &mut self, + buffer: &mut Vector>, pkgs: &'pkgs mut [P], pkgs_aux: &'pkgs_aux mut PkgsAux, - ) -> impl Future< - Output = Result< - impl Iterator>>, - A::Error, - >, - > + ) -> impl Future> where A: Api, P: Package, @@ -85,10 +81,12 @@ pub trait Transport { async { let range = self.send_recv(&mut BatchPkg::new(pkgs), pkgs_aux).await?; log_res(pkgs_aux.byte_buffer.lease()); - Ok(P::ExternalResponseContent::seq_from_bytes( + P::ExternalResponseContent::seq_from_bytes( + buffer, pkgs_aux.byte_buffer.get(range).unwrap_or_default(), &mut pkgs_aux.drsr, - )) + )?; + Ok(()) } } @@ -217,8 +215,8 @@ mod tests { } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut DRSR) -> crate::Result<()> { + Ok(()) } } } diff --git a/wtx/src/client_api_framework/network/transport/std.rs b/wtx/src/client_api_framework/network/transport/std.rs index 779dd00d..3c37bdeb 100644 --- a/wtx/src/client_api_framework/network/transport/std.rs +++ b/wtx/src/client_api_framework/network/transport/std.rs @@ -169,7 +169,7 @@ mod tests { let uri_client = _uri(); let uri_server = uri_client.to_string(); let _server = tokio::spawn(async move { - let tcp_listener = TcpListener::bind(uri_server.host()).unwrap(); + let tcp_listener = TcpListener::bind(uri_server.hostname_with_implied_port()).unwrap(); let mut buffer = [0; 8]; let (mut stream, _) = tcp_listener.accept().unwrap(); let idx = stream.read(&mut buffer).unwrap(); @@ -177,7 +177,7 @@ mod tests { }); sleep(Duration::from_millis(100)).await.unwrap(); let mut pa = PkgsAux::from_minimum((), (), TcpParams::from_uri(uri_client.as_str())); - let mut trans = TcpStream::connect(uri_client.host()).unwrap(); + let mut trans = TcpStream::connect(uri_client.hostname_with_implied_port()).unwrap(); let res = trans.send_recv_decode_contained(&mut _PingPong(_Ping, ()), &mut pa).await.unwrap(); assert_eq!(res, _Pong("pong")); } diff --git a/wtx/src/client_api_framework/network/transport/transport_params.rs b/wtx/src/client_api_framework/network/transport/transport_params.rs index afcbf9f4..f877ee26 100644 --- a/wtx/src/client_api_framework/network/transport/transport_params.rs +++ b/wtx/src/client_api_framework/network/transport/transport_params.rs @@ -1,11 +1,9 @@ -use core::fmt::Debug; - /// Additional information or metadata received or transmitted by a transport. pub trait TransportParams { /// For example, HTTP has request headers. - type ExternalRequestParams: Debug; + type ExternalRequestParams; /// For example, HTTP has response headers. - type ExternalResponseParams: Debug; + type ExternalResponseParams; /// External Request Parameters. fn ext_req_params(&self) -> &Self::ExternalRequestParams; diff --git a/wtx/src/data_transformation/dnsn/deserialize.rs b/wtx/src/data_transformation/dnsn/deserialize.rs index 1326d1c3..4b3a8d7e 100644 --- a/wtx/src/data_transformation/dnsn/deserialize.rs +++ b/wtx/src/data_transformation/dnsn/deserialize.rs @@ -1,3 +1,5 @@ +use crate::misc::Vector; + /// Marker trait that has different bounds according to the given set of enabled deserializers. pub trait Deserialize<'de, DRSR> where @@ -7,8 +9,11 @@ where fn from_bytes(bytes: &'de [u8], drsr: &mut DRSR) -> crate::Result; /// Similar to [`Self::from_bytes`] but deals with sequences instead of a single element. - fn seq_from_bytes(bytes: &'de [u8], drsr: &mut DRSR) - -> impl Iterator>; + fn seq_from_bytes( + buffer: &mut Vector, + bytes: &'de [u8], + drsr: &mut DRSR, + ) -> crate::Result<()>; } impl<'de, DRSR> Deserialize<'de, DRSR> for () { @@ -18,7 +23,7 @@ impl<'de, DRSR> Deserialize<'de, DRSR> for () { } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut DRSR) -> crate::Result<()> { + Ok(()) } } diff --git a/wtx/src/data_transformation/format.rs b/wtx/src/data_transformation/format.rs index 032290bb..c013b096 100644 --- a/wtx/src/data_transformation/format.rs +++ b/wtx/src/data_transformation/format.rs @@ -8,6 +8,7 @@ mod graph_ql; mod json_rpc; +mod misc; mod verbatim; pub use graph_ql::*; diff --git a/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs b/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs index b6d563fc..0a2f9264 100644 --- a/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs +++ b/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs @@ -118,16 +118,19 @@ mod serde { #[cfg(feature = "serde_json")] mod serde_json { use crate::{ - data_transformation::{dnsn::SerdeJson, format::GraphQlResponse}, + data_transformation::{ + dnsn::SerdeJson, + format::{misc::collect_using_serde_json, GraphQlResponse}, + }, misc::Vector, }; - use serde_json::{de::SliceRead, StreamDeserializer}; + use serde::{Deserialize, Serialize}; impl<'de, D, E> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for GraphQlResponse where - D: serde::Deserialize<'de>, - E: serde::Deserialize<'de>, + D: Deserialize<'de>, + E: Deserialize<'de>, { #[inline] fn from_bytes(bytes: &'de [u8], _: &mut SerdeJson) -> crate::Result { @@ -136,17 +139,18 @@ mod serde_json { #[inline] fn seq_from_bytes( + buffer: &mut Vector, bytes: &'de [u8], _: &mut SerdeJson, - ) -> impl Iterator> { - StreamDeserializer::new(SliceRead::new(bytes)).map(|el| el.map_err(From::from)) + ) -> crate::Result<()> { + collect_using_serde_json(buffer, bytes) } } impl crate::data_transformation::dnsn::Serialize for GraphQlResponse where - D: serde::Serialize, - E: serde::Serialize, + D: Serialize, + E: Serialize, { #[inline] fn to_bytes(&mut self, bytes: &mut Vector, _: &mut SerdeJson) -> crate::Result<()> { diff --git a/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs b/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs index 9eae0097..8ba77843 100644 --- a/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs +++ b/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs @@ -35,8 +35,8 @@ where } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut ()) -> crate::Result<()> { + Ok(()) } } @@ -245,16 +245,17 @@ mod serde { #[cfg(feature = "serde_json")] mod serde_json { - use serde_json::{de::SliceRead, StreamDeserializer}; - use crate::{ - data_transformation::{dnsn::SerdeJson, format::JsonRpcResponse}, + data_transformation::{ + dnsn::SerdeJson, + format::{misc::collect_using_serde_json, JsonRpcResponse}, + }, misc::Vector, }; impl<'de, R> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for JsonRpcResponse where - R: for<'serde_de> serde::Deserialize<'serde_de>, + R: serde::Deserialize<'de>, { #[inline] fn from_bytes(bytes: &'de [u8], _: &mut SerdeJson) -> crate::Result { @@ -263,10 +264,11 @@ mod serde_json { #[inline] fn seq_from_bytes( + buffer: &mut Vector, bytes: &'de [u8], _: &mut SerdeJson, - ) -> impl Iterator> { - StreamDeserializer::new(SliceRead::new(bytes)).map(|el| el.map_err(crate::Error::from)) + ) -> crate::Result<()> { + collect_using_serde_json(buffer, bytes) } } diff --git a/wtx/src/data_transformation/format/misc.rs b/wtx/src/data_transformation/format/misc.rs new file mode 100644 index 00000000..0a3300cf --- /dev/null +++ b/wtx/src/data_transformation/format/misc.rs @@ -0,0 +1,72 @@ +#[cfg(feature = "serde_json")] +pub(crate) use serde_json::collect_using_serde_json; + +#[cfg(feature = "serde_json")] +mod serde_json { + use crate::misc::Vector; + use core::{any::type_name, fmt::Formatter}; + use serde::{ + de::{Deserializer, Error, SeqAccess, Visitor}, + Deserialize, + }; + + pub(crate) fn collect_using_serde_json<'de, T>( + buffer: &mut Vector, + bytes: &'de [u8], + ) -> crate::Result<()> + where + T: Deserialize<'de>, + { + struct Buffer<'any, T>(&'any mut Vector); + + impl<'any, 'de, T> Visitor<'de> for Buffer<'any, T> + where + T: Deserialize<'de>, + { + type Value = (); + + #[inline] + fn expecting(&self, formatter: &mut Formatter<'_>) -> core::fmt::Result { + formatter.write_fmt(format_args!("a sequence of `{}`", type_name::())) + } + + #[inline] + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + if let Some(elem) = seq.size_hint() { + self.0.reserve(elem).map_err(|err| A::Error::custom(err))?; + } + while let Some(elem) = seq.next_element()? { + self.0.push(elem).map_err(|err| A::Error::custom(err))?; + } + Ok(()) + } + } + + serde_json::Deserializer::from_slice(bytes).deserialize_seq(Buffer(buffer))?; + Ok(()) + } + + #[cfg(test)] + mod tests { + use crate::{ + data_transformation::format::misc::serde_json::collect_using_serde_json, misc::Vector, + }; + + #[derive(Debug, PartialEq, serde::Deserialize)] + struct Foo { + a: u8, + b: u64, + } + + #[test] + fn array_is_deserialized() { + let json = r#"[{"a":1,"b":90},{"a":7,"b":567}]"#; + let mut vector = Vector::::new(); + collect_using_serde_json(&mut vector, json.as_bytes()).unwrap(); + assert_eq!(vector.as_slice(), &[Foo { a: 1, b: 90 }, Foo { a: 7, b: 567 }]); + } + } +} diff --git a/wtx/src/data_transformation/format/verbatim/verbatim_request.rs b/wtx/src/data_transformation/format/verbatim/verbatim_request.rs index 4286e50c..213020ee 100644 --- a/wtx/src/data_transformation/format/verbatim/verbatim_request.rs +++ b/wtx/src/data_transformation/format/verbatim/verbatim_request.rs @@ -32,8 +32,8 @@ mod borsh { } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut Borsh) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut Borsh) -> crate::Result<()> { + Ok(()) } } @@ -55,6 +55,7 @@ mod quick_protobuf { data_transformation::{ dnsn::{Deserialize, QuickProtobuf, Serialize}, format::VerbatimRequest, + DataTransformationError, }, misc::Vector, }; @@ -71,10 +72,11 @@ mod quick_protobuf { #[inline] fn seq_from_bytes( + _: &mut Vector, _: &'de [u8], _: &mut QuickProtobuf, - ) -> impl Iterator> { - [].into_iter() + ) -> crate::Result<()> { + Err(DataTransformationError::UnsupportedOperation.into()) } } diff --git a/wtx/src/data_transformation/format/verbatim/verbatim_response.rs b/wtx/src/data_transformation/format/verbatim/verbatim_response.rs index 9c43284d..41afdd7b 100644 --- a/wtx/src/data_transformation/format/verbatim/verbatim_response.rs +++ b/wtx/src/data_transformation/format/verbatim/verbatim_response.rs @@ -5,6 +5,8 @@ use crate::{ #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)] #[doc = generic_data_format_doc!("verbatim response")] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[cfg_attr(feature = "serde", serde(transparent))] pub struct VerbatimResponse { /// Actual data pub data: D, @@ -20,8 +22,8 @@ where } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut ()) -> crate::Result<()> { + Ok(()) } } @@ -34,7 +36,10 @@ impl Serialize<()> for VerbatimResponse { #[cfg(feature = "borsh")] mod borsh { - use crate::data_transformation::{dnsn::Borsh, format::VerbatimResponse}; + use crate::{ + data_transformation::{dnsn::Borsh, format::VerbatimResponse, DataTransformationError}, + misc::Vector, + }; use borsh::BorshDeserialize; impl<'de, D> crate::data_transformation::dnsn::Deserialize<'de, Borsh> for VerbatimResponse @@ -47,8 +52,8 @@ mod borsh { } #[inline] - fn seq_from_bytes(_: &'de [u8], _: &mut Borsh) -> impl Iterator> { - [].into_iter() + fn seq_from_bytes(_: &mut Vector, _: &'de [u8], _: &mut Borsh) -> crate::Result<()> { + Err(DataTransformationError::UnsupportedOperation.into()) } } } @@ -59,6 +64,7 @@ mod quick_protobuf { data_transformation::{ dnsn::{Deserialize, QuickProtobuf, Serialize}, format::VerbatimResponse, + DataTransformationError, }, misc::Vector, }; @@ -75,10 +81,11 @@ mod quick_protobuf { #[inline] fn seq_from_bytes( + _: &mut Vector, _: &'de [u8], _: &mut QuickProtobuf, - ) -> impl Iterator> { - [].into_iter() + ) -> crate::Result<()> { + Err(DataTransformationError::UnsupportedOperation.into()) } } @@ -97,10 +104,12 @@ mod quick_protobuf { #[cfg(feature = "serde_json")] mod serde_json { use crate::{ - data_transformation::{dnsn::SerdeJson, format::VerbatimResponse}, + data_transformation::{ + dnsn::SerdeJson, + format::{misc::collect_using_serde_json, VerbatimResponse}, + }, misc::Vector, }; - use serde_json::{de::SliceRead, StreamDeserializer}; impl<'de, D> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for VerbatimResponse where @@ -113,11 +122,11 @@ mod serde_json { #[inline] fn seq_from_bytes( + buffer: &mut Vector, bytes: &'de [u8], _: &mut SerdeJson, - ) -> impl Iterator> { - StreamDeserializer::new(SliceRead::new(bytes)) - .map(|el| el.map(|data| VerbatimResponse { data }).map_err(From::from)) + ) -> crate::Result<()> { + collect_using_serde_json(buffer, bytes) } } diff --git a/wtx/src/database/client/postgres/config.rs b/wtx/src/database/client/postgres/config.rs index 61730d64..2130dec4 100644 --- a/wtx/src/database/client/postgres/config.rs +++ b/wtx/src/database/client/postgres/config.rs @@ -1,6 +1,6 @@ use crate::{ database::client::postgres::PostgresError, - misc::{str_split1, FromRadix10, UriRef}, + misc::{into_rslt, str_split1, FromRadix10, UriRef}, }; use core::time::Duration; @@ -29,17 +29,17 @@ impl<'data> Config<'data> { app_name: "", channel_binding: ChannelBinding::Prefer, connect_timeout: Duration::ZERO, - db: uri.href().get(1..).unwrap_or_default(), + db: uri.relative_reference().get(1..).unwrap_or_default(), host: uri.host(), keepalives: true, load_balance_hosts: LoadBalanceHosts::Disable, password: uri.password(), - port: u16::from_radix_10(uri.port().as_bytes())?, + port: into_rslt(uri.port())?, target_session_attrs: TargetSessionAttrs::Any, tcp_user_timeout: Duration::ZERO, user: uri.user(), }; - for key_value in str_split1(uri.query(), b'&') { + for key_value in str_split1(uri.query_and_fragment(), b'&') { let mut iter = str_split1(key_value, b':'); if let [Some(key), Some(value)] = [iter.next(), iter.next()] { this.set_param(key, value)?; diff --git a/wtx/src/database/client/postgres/db_error.rs b/wtx/src/database/client/postgres/db_error.rs index 83c18031..3bda8810 100644 --- a/wtx/src/database/client/postgres/db_error.rs +++ b/wtx/src/database/client/postgres/db_error.rs @@ -60,7 +60,7 @@ pub struct DbError { message: Range, position: Option, routine: Option>, - schema: Option>, + scheme: Option>, severity_localized: Range, severity_nonlocalized: Option, table: Option>, @@ -154,9 +154,9 @@ impl DbError { /// If the error was associated with a specific database object, the name of the schema /// containing that object, if any. #[inline] - pub fn schema(&self) -> Option<&str> { + pub fn scheme(&self) -> Option<&str> { self - .schema + .scheme .as_ref() .and_then(|range| self.buffer.get(_usize_range_from_u32_range(range.clone()))) } @@ -211,7 +211,7 @@ impl Debug for DbError { .field("message", &self.message()) .field("position", &self.position()) .field("routine", &self.routine()) - .field("schema", &self.schema()) + .field("schema", &self.scheme()) .field("severity_localized", &self.severity_localized()) .field("severity_nonlocalized", &self.severity_nonlocalized()) .field("table", &self.table()) @@ -323,7 +323,7 @@ impl TryFrom<&str> for DbError { Some(position) => Some(ErrorPosition::Original(position)), }, routine, - schema, + scheme: schema, table, r#where, }) diff --git a/wtx/src/database/client/postgres/integration_tests.rs b/wtx/src/database/client/postgres/integration_tests.rs index d7ac681d..8d6baa2a 100644 --- a/wtx/src/database/client/postgres/integration_tests.rs +++ b/wtx/src/database/client/postgres/integration_tests.rs @@ -11,7 +11,7 @@ use crate::{ use alloc::string::String; use tokio::net::TcpStream; -const SCRAM: &str = "postgres://wtx_scram:wtx@localhost:5432/wtx"; +const SCRAM: &str = "postgres://wtx_scram:wtx@localhost/wtx"; #[cfg(feature = "webpki-roots")] #[tokio::test] @@ -21,7 +21,7 @@ async fn conn_scram_tls() { let _executor = Executor::::connect_encrypted( &Config::from_uri(&uri).unwrap(), ExecutorBuffer::with_default_params(&mut rng).unwrap(), - TcpStream::connect(uri.host()).await.unwrap(), + TcpStream::connect(uri.hostname_with_implied_port()).await.unwrap(), &mut rng, |stream| async { Ok( @@ -438,7 +438,7 @@ async fn executor() -> Executor { &Config::from_uri(&uri).unwrap(), ExecutorBuffer::with_default_params(&mut rng).unwrap(), &mut rng, - TcpStream::connect(uri.host()).await.unwrap(), + TcpStream::connect(uri.hostname_with_implied_port()).await.unwrap(), ) .await .unwrap() diff --git a/wtx/src/database/schema_manager/integration_tests.rs b/wtx/src/database/schema_manager/integration_tests.rs index 96003036..4851a88d 100644 --- a/wtx/src/database/schema_manager/integration_tests.rs +++ b/wtx/src/database/schema_manager/integration_tests.rs @@ -43,7 +43,7 @@ macro_rules! create_integration_tests { let uri = std::env::var(DEFAULT_URI_VAR).unwrap(); let uri = crate::misc::UriRef::new(&uri); let config = crate::database::client::postgres::Config::from_uri(&uri).unwrap(); - let stream = TcpStream::connect(uri.host()).await.unwrap(); + let stream = TcpStream::connect(uri.hostname_with_implied_port()).await.unwrap(); let mut rng = NoStdRng::default(); crate::database::client::postgres::Executor::connect( &config, diff --git a/wtx/src/http/client_framework.rs b/wtx/src/http/client_framework.rs index 669ed558..cde7af6d 100644 --- a/wtx/src/http/client_framework.rs +++ b/wtx/src/http/client_framework.rs @@ -13,7 +13,7 @@ mod req_builder; use crate::{ http::{ConnParams, Method, ReqResBuffer, ReqResData, ReqUri, Request, Response}, http2::{Http2, Http2Buffer, Http2Data, Http2ErrorCode}, - misc::{Either, LeaseMut, Lock, RefCounter, StreamWriter}, + misc::{LeaseMut, Lock, RefCounter, StreamWriter}, pool::{Pool, ResourceManager, SimplePool, SimplePoolResource}, }; use core::marker::PhantomData; @@ -82,9 +82,10 @@ where if stream.send_req(Request::http2(method, rrb.lease()), actual_req_uri).await?.is_none() { return Err(crate::Error::ClosedConnection); } - let (res_rrb, status_code) = match stream.recv_res(rrb).await? { - Either::Left(_) => return Err(crate::Error::ClosedConnection), - Either::Right(elem) => elem, + let (res_rrb, opt) = stream.recv_res(rrb).await?; + let status_code = match opt { + None => return Err(crate::Error::ClosedConnection), + Some(elem) => elem, }; Ok(Response::http2(res_rrb, status_code)) } @@ -135,7 +136,7 @@ mod tokio { let (frame_reader, http2) = Http2Tokio::connect( Http2Buffer::default(), self._cp.to_hp(), - TcpStream::connect(uri.host()).await?.into_split(), + TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(), ) .await?; let _jh = tokio::spawn(frame_reader); @@ -159,7 +160,7 @@ mod tokio { let (frame_reader, http2) = Http2Tokio::connect( buffer, self._cp.to_hp(), - TcpStream::connect(uri.host()).await?.into_split(), + TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(), ) .await?; let _jh = tokio::spawn(frame_reader); @@ -216,7 +217,10 @@ mod tokio_rustls { tokio::io::split( TokioRustlsConnector::from_auto()? .http2() - .connect_without_client_auth(uri.hostname(), TcpStream::connect(uri.host()).await?) + .connect_without_client_auth( + uri.hostname(), + TcpStream::connect(uri.hostname_with_implied_port()).await?, + ) .await?, ), ) @@ -245,7 +249,10 @@ mod tokio_rustls { tokio::io::split( TokioRustlsConnector::from_auto()? .http2() - .connect_without_client_auth(uri.hostname(), TcpStream::connect(uri.host()).await?) + .connect_without_client_auth( + uri.hostname(), + TcpStream::connect(uri.hostname_with_implied_port()).await?, + ) .await?, ), ) diff --git a/wtx/src/http/client_framework/integration_tests.rs b/wtx/src/http/client_framework/integration_tests.rs index 2887d496..74b41efd 100644 --- a/wtx/src/http/client_framework/integration_tests.rs +++ b/wtx/src/http/client_framework/integration_tests.rs @@ -6,15 +6,15 @@ use crate::{ #[tokio::test] async fn popular_sites() { let _res = ReqBuilder::get() - .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://github.com:443")) + .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://github.com")) .await .unwrap(); let _res = ReqBuilder::get() - .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://duckduckgo.com:443")) + .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://duckduckgo.com")) .await .unwrap(); let _res = ReqBuilder::get() - .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://www.google.com:443")) + .send(&ClientFramework::tokio_rustls(1).build(), &Uri::new("https://www.google.com")) .await .unwrap(); } diff --git a/wtx/src/http/low_level_server/tokio_http2.rs b/wtx/src/http/low_level_server/tokio_http2.rs index 4b24ca07..155eedd3 100644 --- a/wtx/src/http/low_level_server/tokio_http2.rs +++ b/wtx/src/http/low_level_server/tokio_http2.rs @@ -91,20 +91,12 @@ where for<'handle> &'handle F: Send, { let (ca, http2_buffer, http2_params) = conn_cb()?; - let (frame_reader, mut http2) = - Http2Tokio::accept(http2_buffer, http2_params, stream_cb(local_acceptor, tcp_stream).await?) - .await?; - { - let local_err_cb = err_cb.clone(); - let _jh = tokio::spawn(async move { - if let Err(err) = frame_reader.await { - local_err_cb(err.into()); - } - }); - } + let tuple = stream_cb(local_acceptor, tcp_stream).await?; + let (frame_reader, mut http2) = Http2Tokio::accept(http2_buffer, http2_params, tuple).await?; + let _jh = tokio::spawn(frame_reader); loop { let (ra, rrb) = req_cb()?; - let mut http2_stream = match http2.stream(rrb).await { + let mut http2_stream = match http2.stream(rrb).await? { Either::Left(_) => return Ok(()), Either::Right(elem) => elem, }; @@ -113,9 +105,10 @@ where let local_err_cb = err_cb.clone(); let _stream_jh = tokio::spawn(async move { let fun = || async { - let (local_rrb, method) = match http2_stream.recv_req().await? { - Either::Left(_) => return Ok(()), - Either::Right(elem) => elem, + let (local_rrb, opt) = http2_stream.recv_req().await?; + let method = match opt { + None => return Ok(()), + Some(elem) => elem, }; let req = local_rrb.into_http2_request(method); let res = local_handle_cb.call((local_ca, ra, req)).await?; diff --git a/wtx/src/http/server_framework.rs b/wtx/src/http/server_framework.rs index d581d386..a9108497 100644 --- a/wtx/src/http/server_framework.rs +++ b/wtx/src/http/server_framework.rs @@ -25,7 +25,6 @@ use crate::{ }; use alloc::sync::Arc; pub use conn_aux::ConnAux; -use core::fmt::Debug; pub use cors_middleware::CorsMiddleware; pub use endpoint::Endpoint; pub use middleware::{ReqMiddleware, ResMiddleware}; diff --git a/wtx/src/http/server_framework/state.rs b/wtx/src/http/server_framework/state.rs index eb6fe303..327d3d24 100644 --- a/wtx/src/http/server_framework/state.rs +++ b/wtx/src/http/server_framework/state.rs @@ -19,7 +19,7 @@ pub struct State<'any, CA, RA, RRD> { impl<'any, CA, RA, RRD> State<'any, CA, RA, RRD> { #[inline] - pub(crate) const fn new(ca: &'any mut CA, ra: &'any mut RA, req: &'any mut Request) -> Self { + pub(crate) fn new(ca: &'any mut CA, ra: &'any mut RA, req: &'any mut Request) -> Self { Self { ca, ra, req } } } diff --git a/wtx/src/http2.rs b/wtx/src/http2.rs index d91d42c4..98630480 100644 --- a/wtx/src/http2.rs +++ b/wtx/src/http2.rs @@ -47,7 +47,8 @@ mod window_update_frame; use crate::{ http::ReqResBuffer, http2::misc::{ - manage_initial_stream_receiving, process_higher_operation_err, protocol_err, write_array, + frame_reader_rslt, manage_initial_stream_receiving, process_higher_operation_err, protocol_err, + write_array, }, misc::{ AtomicWaker, ConnectionState, Either, LeaseMut, Lock, PartitionedFilledBuffer, RefCounter, @@ -203,7 +204,7 @@ where mut hb: HB, hp: Http2Params, (mut stream_reader, mut stream_writer): (SR, SW), - ) -> crate::Result<(impl Future>, Self)> + ) -> crate::Result<(impl Future, Self)> where SR: StreamReader, { @@ -238,7 +239,7 @@ where /// Returns [`Either::Left`] if the network connection has been closed, either locally /// or externally. #[inline] - pub async fn stream(&mut self, rrb: RRB) -> Either, ServerStream> { + pub async fn stream(&mut self, rrb: RRB) -> crate::Result, ServerStream>> { let Self { hd, is_conn_open } = self; let rrb_opt = &mut Some(rrb); let mut lock_pin = pin!(hd.lock()); @@ -247,15 +248,17 @@ where let hdpm = lock.parts_mut(); if let Some(mut elem) = rrb_opt.take() { if !manage_initial_stream_receiving(is_conn_open, &mut elem) { - return Poll::Ready(Either::Left(Some(elem))); + let rslt = frame_reader_rslt(hdpm.frame_reader_error); + return Poll::Ready(Either::Left((Some(elem), rslt))); } hdpm.hb.initial_server_header_buffers.push_back((elem, cx.waker().clone())); Poll::Pending } else { if !is_conn_open.load(Ordering::Relaxed) { - return Poll::Ready(Either::Left( + return Poll::Ready(Either::Left(( hdpm.hb.initial_server_header_buffers.pop_front().map(|el| el.0), - )); + frame_reader_rslt(hdpm.frame_reader_error), + ))); } let Some((method, stream_id)) = hdpm.hb.initial_server_header_params.pop_front() else { return Poll::Pending; @@ -265,14 +268,17 @@ where }) .await { - Either::Left(elem) => Either::Left(elem), - Either::Right((method, stream_id)) => Either::Right(ServerStream::new( + Either::Left(elem) => { + elem.1?; + Ok(Either::Left(elem.0)) + } + Either::Right((method, stream_id)) => Ok(Either::Right(ServerStream::new( hd.clone(), Arc::clone(is_conn_open), method, _trace_span!("New server stream", stream_id = %stream_id), stream_id, - )), + ))), } } } @@ -291,7 +297,7 @@ where mut hb: HB, hp: Http2Params, (stream_reader, mut stream_writer): (SR, SW), - ) -> crate::Result<(impl Future>, Self)> + ) -> crate::Result<(impl Future, Self)> where SR: StreamReader, { diff --git a/wtx/src/http2/client_stream.rs b/wtx/src/http2/client_stream.rs index e8a867bd..5f8222b2 100644 --- a/wtx/src/http2/client_stream.rs +++ b/wtx/src/http2/client_stream.rs @@ -2,14 +2,14 @@ use crate::{ http::{ReqResBuffer, ReqResData, ReqUri, Request, StatusCode}, http2::{ misc::{ - manage_initial_stream_receiving, manage_recurrent_stream_receiving, + frame_reader_rslt, manage_initial_stream_receiving, manage_recurrent_stream_receiving, process_higher_operation_err, send_go_away, send_reset_stream, }, send_msg::send_msg, HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, Http2Data, Http2ErrorCode, StreamOverallRecvParams, StreamState, Windows, U31, }, - misc::{Either, Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span}, + misc::{Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span}, }; use alloc::sync::Arc; use core::{ @@ -55,12 +55,12 @@ where /// Higher operation that awaits for the data necessary to build a response and then closes the /// stream. /// - /// Returns [`Either::Left`] if the network/stream connection has been closed, either locally + /// Returns [`Option::None`] if the network/stream connection has been closed, either locally /// or externally. /// /// Should be called after [`Self::send_req`] is successfully executed. #[inline] - pub async fn recv_res(&mut self, rrb: RRB) -> crate::Result> { + pub async fn recv_res(&mut self, rrb: RRB) -> crate::Result<(RRB, Option)> { let rrb_opt = &mut Some(rrb); let Self { hd, is_conn_open, span, stream_id, windows } = self; let _e = span._enter(); @@ -71,7 +71,8 @@ where let hdpm = lock.parts_mut(); if let Some(mut elem) = rrb_opt.take() { if !manage_initial_stream_receiving(is_conn_open, &mut elem) { - return Poll::Ready(Ok(Either::Left(elem))); + frame_reader_rslt(hdpm.frame_reader_error)?; + return Poll::Ready(Ok((elem, None))); } drop(hdpm.hb.sorp.insert( *stream_id, @@ -140,9 +141,9 @@ where HpackStaticRequestHeaders { authority: uri.authority().as_bytes(), method: Some(req.method), - path: uri.href_slash().as_bytes(), + path: uri.relative_reference_slash().as_bytes(), protocol: None, - scheme: uri.schema().as_bytes(), + scheme: uri.scheme().as_bytes(), }, HpackStaticResponseHeaders::EMPTY, ), diff --git a/wtx/src/http2/frame_reader.rs b/wtx/src/http2/frame_reader.rs index 07078db6..a73c7afe 100644 --- a/wtx/src/http2/frame_reader.rs +++ b/wtx/src/http2/frame_reader.rs @@ -47,8 +47,7 @@ pub(crate) async fn frame_reader( mut pfb: PartitionedFilledBuffer, read_frame_waker: Arc, mut stream_reader: SR, -) -> crate::Result<()> -where +) where HB: LeaseMut>, HD: RefCounter, HD::Item: Lock>, @@ -70,33 +69,38 @@ where { Err(err) => { process_higher_operation_err(&err, &hd).await; - finish(&hd, &mut pfb).await; - return Err(err); + finish(Some(err), &hd, &mut pfb).await; + return; } Ok(None) => { - finish(&hd, &mut pfb).await; - return Ok(()); + finish(None, &hd, &mut pfb).await; + return; } Ok(Some(fi)) => fi, }; if let Err(err) = manage_fi(fi, &hd, &is_conn_open, &mut pfb, &mut stream_reader).await { process_higher_operation_err(&err, &hd).await; - finish(&hd, &mut pfb).await; - return Err(err); + finish(Some(err), &hd, &mut pfb).await; } } } #[inline] -async fn finish(hd: &HD, pfb: &mut PartitionedFilledBuffer) -where +async fn finish( + err: Option, + hd: &HD, + pfb: &mut PartitionedFilledBuffer, +) where HB: LeaseMut>, HD: RefCounter, HD::Item: Lock>, RRB: LeaseMut, SW: StreamWriter, { - mem::swap(pfb, &mut hd.lock().await.parts_mut().hb.pfb); + let mut lock = hd.lock().await; + let hdpm = lock.parts_mut(); + *hdpm.frame_reader_error = err; + mem::swap(pfb, &mut hdpm.hb.pfb); _trace!("Finishing the reading of frames"); } diff --git a/wtx/src/http2/http2_data.rs b/wtx/src/http2/http2_data.rs index a51e1d51..8a0b12d2 100644 --- a/wtx/src/http2/http2_data.rs +++ b/wtx/src/http2/http2_data.rs @@ -8,6 +8,7 @@ use core::marker::PhantomData; /// Internal resource used in every new instance of `Http2`. #[derive(Debug)] pub struct Http2Data { + frame_reader_error: Option, hb: HB, hp: Http2Params, hps: Http2ParamsSend, @@ -29,6 +30,7 @@ where let hps = Http2ParamsSend::default(); let windows = Windows::initial(&hp, &hps); Self { + frame_reader_error: None, hb, hp, hps, @@ -43,6 +45,7 @@ where #[inline] pub(crate) fn parts_mut(&mut self) -> Http2DataPartsMut<'_, RRB, SW> { Http2DataPartsMut { + frame_reader_error: &mut self.frame_reader_error, hb: self.hb.lease_mut(), hp: &mut self.hp, hps: &mut self.hps, @@ -73,6 +76,7 @@ impl LeaseMut { + pub(crate) frame_reader_error: &'instance mut Option, pub(crate) hb: &'instance mut Http2Buffer, pub(crate) hp: &'instance mut Http2Params, pub(crate) hps: &'instance mut Http2ParamsSend, diff --git a/wtx/src/http2/misc.rs b/wtx/src/http2/misc.rs index b4e101a4..137f2852 100644 --- a/wtx/src/http2/misc.rs +++ b/wtx/src/http2/misc.rs @@ -6,8 +6,8 @@ use crate::{ ResetStreamFrame, Scrp, Sorp, StreamOverallRecvParams, StreamState, UriBuffer, U31, }, misc::{ - AtomicWaker, Either, LeaseMut, Lock, PartitionedFilledBuffer, RefCounter, StreamReader, - StreamWriter, Usize, _read_until, + AtomicWaker, LeaseMut, Lock, PartitionedFilledBuffer, RefCounter, StreamReader, StreamWriter, + Usize, _read_until, }, }; use core::{ @@ -31,6 +31,14 @@ where Ok(()) } +#[inline] +pub(crate) fn frame_reader_rslt(err: &mut Option) -> crate::Result<()> { + match err.take() { + Some(elem) => Err(elem), + None => Ok(()), + } +} + #[inline] pub(crate) fn manage_initial_stream_receiving(is_conn_open: &AtomicBool, rrb: &mut RRB) -> bool where @@ -54,7 +62,7 @@ pub(crate) fn manage_recurrent_stream_receiving( &mut Http2DataPartsMut<'_, RRB, SW>, &StreamOverallRecvParams, ) -> T, -) -> Poll>> +) -> Poll)>> where RRB: LeaseMut, { @@ -90,7 +98,8 @@ where } }; if let Some(elem) = rrb_opt { - return Poll::Ready(Ok(Either::Left(elem))); + frame_reader_rslt(hdpm.frame_reader_error)?; + return Poll::Ready(Ok((elem, None))); } return Poll::Ready(Err(protocol_err(Http2Error::UnknownStreamReceiver))); } @@ -100,7 +109,7 @@ where check_content_length(idx, &elem)?; } let rslt = cb(cx, &mut hdpm, &elem); - return Poll::Ready(Ok(Either::Right((elem.rrb, rslt)))); + return Poll::Ready(Ok((elem.rrb, Some(rslt)))); } } else { sorp.waker.clone_from(cx.waker()); diff --git a/wtx/src/http2/send_msg.rs b/wtx/src/http2/send_msg.rs index ddc754ef..565480ac 100644 --- a/wtx/src/http2/send_msg.rs +++ b/wtx/src/http2/send_msg.rs @@ -68,11 +68,12 @@ where return Poll::Ready(Ok(None)); } let mut lock = lock_pin!(cx, hd, lock_pin); + let hdpm = lock.parts_mut(); let fut = do_send_msg::<_, _, IS_CLIENT>( &mut data_bytes, (&mut has_headers, &mut has_data), headers, - lock.parts_mut(), + hdpm, (hsreqh, hsresh), stream_id, cx.waker(), diff --git a/wtx/src/http2/server_stream.rs b/wtx/src/http2/server_stream.rs index fbc0a8b7..1a59077d 100644 --- a/wtx/src/http2/server_stream.rs +++ b/wtx/src/http2/server_stream.rs @@ -9,7 +9,7 @@ use crate::{ HpackStaticRequestHeaders, HpackStaticResponseHeaders, Http2Buffer, Http2Data, Http2ErrorCode, StreamControlRecvParams, U31, }, - misc::{Either, Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span, sleep}, + misc::{Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span, sleep}, }; use alloc::sync::Arc; use core::{ @@ -54,12 +54,12 @@ where /// /// Higher operation that awaits for the data necessary to build a request. /// - /// Returns [`Either::Left`] if the network/stream connection has been closed, either locally + /// Returns [`Option::None`] if the network/stream connection has been closed, either locally /// or externally. /// /// Shouldn't be called more than once. #[inline] - pub async fn recv_req(&mut self) -> crate::Result> { + pub async fn recv_req(&mut self) -> crate::Result<(RRB, Option)> { let Self { hd, is_conn_open, method, span, stream_id } = self; let _e = span._enter(); _trace!("Receiving request"); diff --git a/wtx/src/http2/tests/connections.rs b/wtx/src/http2/tests/connections.rs index 4955933f..ee584c9b 100644 --- a/wtx/src/http2/tests/connections.rs +++ b/wtx/src/http2/tests/connections.rs @@ -13,24 +13,20 @@ async fn connections() { let _rslt = crate::misc::tracing_tree_init(None); let uri = _uri(); server(&uri).await; - client(uri).await; + client(&uri).await; } -async fn client(uri: UriString) { +async fn client(uri: &UriString) { let mut rrb = ReqResBuffer::default(); rrb.headers.reserve(6, 1).unwrap(); let (frame_header, mut http2) = Http2Tokio::connect( Http2Buffer::new(NoStdRng::default()), Http2Params::default(), - TcpStream::connect(uri.host()).await.unwrap().into_split(), + TcpStream::connect(uri.hostname_with_implied_port()).await.unwrap().into_split(), ) .await .unwrap(); - let _jh = tokio::spawn(async { - if let Err(err) = frame_header.await { - panic!("{:?}", err); - } - }); + let _jh = tokio::spawn(frame_header); let uri_ref = uri.to_ref(); @@ -59,7 +55,7 @@ async fn client(uri: UriString) { } async fn server(uri: &UriString) { - let listener = TcpListener::bind(uri.host()).await.unwrap(); + let listener = TcpListener::bind(uri.hostname_with_implied_port()).await.unwrap(); let _server_jh = tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let mut rrb = ReqResBuffer::default(); @@ -97,10 +93,10 @@ async fn stream_server( mut cb: impl FnMut(Request<&mut ReqResBuffer>), ) -> ReqResBuffer { loop { - let Either::Right(mut stream) = server.stream(rrb).await else { + let Either::Right(mut stream) = server.stream(rrb).await.unwrap() else { panic!(); }; - let Ok(Either::Right((mut req_rrb, method))) = stream.recv_req().await else { + let (mut req_rrb, Some(method)) = stream.recv_req().await.unwrap() else { panic!(); }; cb(req_rrb.as_http2_request_mut(method)); @@ -116,10 +112,7 @@ async fn stream_client( ) -> ReqResBuffer { let mut stream = client.stream().await.unwrap(); stream.send_req(rrb.as_http2_request(Method::Get), uri).await.unwrap().unwrap(); - match stream.recv_res(rrb).await.unwrap() { - Either::Left(_) => panic!(), - Either::Right(elem) => elem.0, - } + stream.recv_res(rrb).await.unwrap().0 } #[track_caller] diff --git a/wtx/src/lib.rs b/wtx/src/lib.rs index 5441848a..b5fb7c21 100644 --- a/wtx/src/lib.rs +++ b/wtx/src/lib.rs @@ -2,14 +2,7 @@ #![cfg_attr(feature = "_bench", allow(soft_unstable))] #![cfg_attr(feature = "_bench", feature(test))] #![doc = include_str!("../README.md")] -#![feature( - const_mut_refs, - const_ptr_write, - const_refs_to_cell, - macro_metavar_expr, - noop_waker, - return_type_notation -)] +#![feature(macro_metavar_expr, noop_waker, return_type_notation, strict_provenance)] #![no_std] extern crate alloc; diff --git a/wtx/src/misc.rs b/wtx/src/misc.rs index c933b842..593be6ad 100644 --- a/wtx/src/misc.rs +++ b/wtx/src/misc.rs @@ -94,19 +94,8 @@ where T: serde::Serialize, { use serde::ser::SerializeSeq; - - fn iterator_len_hint(iter: &I) -> Option - where - I: Iterator, - { - match iter.size_hint() { - (lo, Some(hi)) if lo == hi => Some(lo), - _ => None, - } - } - let iter = into_iter.into_iter(); - let mut sq = ser.serialize_seq(iterator_len_hint(&iter))?; + let mut sq = ser.serialize_seq(_conservative_size_hint_len(iter.size_hint()))?; for elem in iter { sq.serialize_element(&elem?)?; } @@ -219,6 +208,14 @@ pub(crate) fn char_slice(buffer: &mut [u8; 4], ch: char) -> &[u8] { } } +#[inline] +pub(crate) fn _conservative_size_hint_len(size_hint: (usize, Option)) -> Option { + match size_hint { + (lo, Some(hi)) if lo == hi => Some(lo), + _ => None, + } +} + #[inline] pub(crate) fn _interspace( write: &mut W, diff --git a/wtx/src/misc/array_vector.rs b/wtx/src/misc/array_vector.rs index 3969e64d..75119c15 100644 --- a/wtx/src/misc/array_vector.rs +++ b/wtx/src/misc/array_vector.rs @@ -89,7 +89,7 @@ impl ArrayVector { /// Clears the vector, removing all values. #[inline] - pub const fn clear(&mut self) { + pub fn clear(&mut self) { self.len = 0; } @@ -108,7 +108,7 @@ impl ArrayVector { /// Return the inner fixed size array, if the capacity is full. #[inline] - pub const fn into_inner(self) -> Result<[D; N], ArrayVectorError> { + pub fn into_inner(self) -> Result<[D; N], ArrayVectorError> { if Usize::from_u32(self.len).into_usize() >= N { // SAFETY: All elements are initialized Ok(unsafe { ptr::read(self.data.as_ptr().cast()) }) @@ -119,7 +119,7 @@ impl ArrayVector { /// Shortens the vector, removing the last element. #[inline] - pub const fn pop(&mut self) -> bool { + pub fn pop(&mut self) -> bool { if let Some(elem) = self.len.checked_sub(1) { self.len = elem; true diff --git a/wtx/src/misc/optimization.rs b/wtx/src/misc/optimization.rs index 9dd8172e..02e4c11a 100644 --- a/wtx/src/misc/optimization.rs +++ b/wtx/src/misc/optimization.rs @@ -14,6 +14,18 @@ where return bytes.lease().iter().position(|byte| *byte == elem); } +/// Internally uses `memchr` if the feature is active. +#[inline] +pub fn bytes_rpos1(bytes: B, elem: u8) -> Option +where + B: Lease<[u8]>, +{ + #[cfg(feature = "memchr")] + return memchr::memrchr(elem, bytes.lease()); + #[cfg(not(feature = "memchr"))] + return bytes.lease().iter().rposition(|byte| *byte == elem); +} + /// Internally uses `memchr` if the feature is active. #[inline] pub fn bytes_rsplit1(bytes: &[u8], elem: u8) -> impl Iterator { @@ -104,15 +116,6 @@ pub fn str_pos1(str: &str, elem: u8) -> Option { return str.as_bytes().iter().position(|byte| *byte == elem); } -/// Internally uses `memchr` if the feature is active. -#[inline] -pub fn str_rpos1(str: &str, elem: u8) -> Option { - #[cfg(feature = "memchr")] - return memchr::memrchr(elem, str.as_bytes()); - #[cfg(not(feature = "memchr"))] - return str.as_bytes().iter().rev().position(|byte| *byte == elem); -} - /// Internally uses `memchr` if the feature is active. #[inline] pub fn str_rsplit_once1(str: &str, elem: u8) -> Option<(&str, &str)> { diff --git a/wtx/src/misc/rng.rs b/wtx/src/misc/rng.rs index bb561939..94dfbf4b 100644 --- a/wtx/src/misc/rng.rs +++ b/wtx/src/misc/rng.rs @@ -2,22 +2,20 @@ #[cfg(feature = "fastrand")] mod fastrand; +mod no_std_rng; #[cfg(feature = "rand")] mod rand; #[cfg(feature = "std")] -mod std; - -use crate::misc::{FromRadix10, Usize}; +mod std_rng; #[cfg(feature = "std")] -pub use self::std::{StdRng, StdRngSync}; -use alloc::boxed::Box; +pub use self::std_rng::{StdRng, StdRngSync}; +use crate::misc::Usize; use core::{ cell::Cell, ops::{Bound, RangeBounds}, - panic::Location, - ptr, }; +pub use no_std_rng::NoStdRng; /// Allows the creation of random instances. pub trait FromRng @@ -210,60 +208,6 @@ where } } -/// Uses a combination of weak strategies that will likely result in poor results. -/// -/// 1. The pointer of a heap allocation. -/// 2. The number provided by the static `WTX_NO_STD_RNG_SEED` environment variable (if available). -/// 3. The line and column of the caller location. -#[derive(Clone, Copy, Debug)] -pub struct NoStdRng(u64); - -impl Rng for NoStdRng { - #[inline] - fn u8(&mut self) -> u8 { - xor_u8(&mut self.0) - } - - #[inline] - fn u8_4(&mut self) -> [u8; 4] { - xor_u8_4(&mut self.0) - } - - #[inline] - fn u8_8(&mut self) -> [u8; 8] { - xor_u8_8(&mut self.0) - } - - #[inline] - fn u8_16(&mut self) -> [u8; 16] { - xor_u8_16(&mut self.0) - } -} - -impl Default for NoStdRng { - #[inline] - #[track_caller] - fn default() -> Self { - struct Foo { - _bar: usize, - _baz: usize, - } - let elem = Box::new(Foo { _bar: 1, _baz: 2 }); - let ref_ptr = ptr::addr_of!(elem).cast(); - // SAFETY: Memory validation is not relevant - let mut n = Usize::from_usize(unsafe { *ref_ptr }).into_u64(); - n = n.wrapping_add(11_400_714_819_323_198_485); - if let Some(env) = - option_env!("WTX_NO_STD_RNG_SEED").and_then(|el| u64::from_radix_10(el.as_bytes()).ok()) - { - n = n.wrapping_add(env); - } - let location = Location::caller(); - n ^= n << (u64::from(location.column().wrapping_add(location.line())) % 17); - Self(n) - } -} - #[inline] fn u8(n: u64) -> u8 { let [a, ..] = n.to_be_bytes(); diff --git a/wtx/src/misc/rng/no_std_rng.rs b/wtx/src/misc/rng/no_std_rng.rs new file mode 100644 index 00000000..c7d34445 --- /dev/null +++ b/wtx/src/misc/rng/no_std_rng.rs @@ -0,0 +1,63 @@ +use crate::misc::{ + rng::{xor_u8, xor_u8_16, xor_u8_4, xor_u8_8}, + Rng, Usize, +}; +use alloc::boxed::Box; +use core::{ + panic::Location, + ptr, + sync::atomic::{AtomicU64, Ordering}, +}; + +static COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Uses a combination of weak strategies that will likely result in poor results. +/// +/// 1. The address of a heap allocation. +/// 2. A large fixed number. +/// 3. The value of an ever increasing static counter. +/// 4. The line and column of the caller location. +#[derive(Clone, Copy, Debug)] +pub struct NoStdRng(u64); + +impl Rng for NoStdRng { + #[inline] + fn u8(&mut self) -> u8 { + xor_u8(&mut self.0) + } + + #[inline] + fn u8_4(&mut self) -> [u8; 4] { + xor_u8_4(&mut self.0) + } + + #[inline] + fn u8_8(&mut self) -> [u8; 8] { + xor_u8_8(&mut self.0) + } + + #[inline] + fn u8_16(&mut self) -> [u8; 16] { + xor_u8_16(&mut self.0) + } +} + +impl Default for NoStdRng { + #[inline] + #[track_caller] + fn default() -> Self { + let elem = Box::new(Foo { _bar: 1, _baz: 2 }); + let ref_ptr = ptr::addr_of!(elem); + let mut n = Usize::from_usize(ref_ptr.addr()).into_u64(); + n = n.wrapping_add(11_400_714_819_323_198_485); + n = n.wrapping_add(COUNTER.fetch_add(3, Ordering::Release)); + let location = Location::caller(); + n ^= n << (u64::from(location.column().wrapping_add(location.line())) % 17); + Self(n) + } +} + +struct Foo { + _bar: usize, + _baz: usize, +} diff --git a/wtx/src/misc/rng/std.rs b/wtx/src/misc/rng/std_rng.rs similarity index 100% rename from wtx/src/misc/rng/std.rs rename to wtx/src/misc/rng/std_rng.rs diff --git a/wtx/src/misc/uri.rs b/wtx/src/misc/uri.rs index 68a269ef..200a10d1 100644 --- a/wtx/src/misc/uri.rs +++ b/wtx/src/misc/uri.rs @@ -1,5 +1,6 @@ use crate::misc::{ - QueryWriter, _unlikely_dflt, str_rsplit_once1, str_split_once1, ArrayString, Lease, + QueryWriter, _unlikely_dflt, bytes_pos1, bytes_rpos1, str_split_once1, ArrayString, FromRadix10, + Lease, }; use alloc::string::String; use core::fmt::{Arguments, Debug, Display, Formatter, Write}; @@ -21,9 +22,11 @@ pub struct Uri where S: ?Sized, { - authority_start_idx: u8, - href_start_idx: u16, + authority_start: u8, + href_start: u8, initial_len: u16, + port: Option, + query_start: u16, uri: S, } @@ -33,21 +36,16 @@ where { #[inline] pub(crate) const fn _empty(uri: S) -> Self { - Self { authority_start_idx: 0, href_start_idx: 0, initial_len: 0, uri } - } - - /// Creates a new instance based on the provided indexes. - #[inline] - pub fn from_parts(uri: S, authority_start_idx: u8, href_start_idx: u16) -> Self { - let initial_len = uri.lease().len().try_into().unwrap_or(u16::MAX); - Self { authority_start_idx, href_start_idx, initial_len, uri } + Self { authority_start: 0, href_start: 0, initial_len: 0, port: None, query_start: 0, uri } } /// Analyzes the provided `uri` to create a new instance. #[inline] pub fn new(uri: S) -> Self { - let (authority_start_idx, href_start_idx, initial_len) = Self::parts(uri.lease()); - Self { authority_start_idx, href_start_idx, initial_len, uri } + let (initial_len, authority_start, href_start, query_start) = Self::parts(uri.lease()); + let mut this = Self { authority_start, href_start, initial_len, port: None, query_start, uri }; + this.process_port(); + this } /// Full URI string @@ -56,15 +54,8 @@ where self.uri.lease() } - /// ```rust - /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.authority_start_idx(), 6); - /// ``` - #[inline] - pub fn authority_start_idx(&self) -> u8 { - self.authority_start_idx - } - + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); /// assert_eq!(uri.authority(), "user:password@hostname:80"); @@ -74,25 +65,12 @@ where self .uri .lease() - .get(self.authority_start_idx.into()..self.href_start_idx.into()) + .get(self.authority_start.into()..self.href_start.into()) .unwrap_or_else(_unlikely_dflt) } - /// ```rust - /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.fragment(), "hash"); - /// ``` - #[inline] - pub fn fragment(&self) -> &str { - let href = self.href(); - let maybe_rslt = str_rsplit_once1(href, b'?').map_or(href, |el| el.1); - if let Some((_, rslt)) = str_rsplit_once1(maybe_rslt, b'#') { - rslt - } else { - maybe_rslt - } - } - + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); /// assert_eq!(uri.host(), "hostname:80"); @@ -107,6 +85,8 @@ where } } + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); /// assert_eq!(uri.hostname(), "hostname"); @@ -117,95 +97,111 @@ where str_split_once1(host, b':').map_or(host, |el| el.0) } + /// + /// + /// Returns the hostname with a zeroed port if [`Self::port`] is [`Option::None`]. + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.href(), "/path?query=value#hash"); + /// assert_eq!(uri.hostname_with_implied_port(), ("hostname", 80)); /// ``` #[inline] - pub fn href(&self) -> &str { - if let Some(elem) = self.uri.lease().get(self.href_start_idx.into()..) { - return elem; - } - "" + pub fn hostname_with_implied_port(&self) -> (&str, u16) { + (self.hostname(), self.port().unwrap_or_default()) } - /// If empty, returns a slash. + /// /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.href(), "/path?query=value#hash"); + /// assert_eq!(uri.password(), "password"); /// ``` #[inline] - pub fn href_slash(&self) -> &str { - let href = self.href(); - if href.is_empty() { - "/" + pub fn password(&self) -> &str { + if let Some(elem) = str_split_once1(self.userinfo(), b':') { + elem.1 } else { - href + "" } } + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.href_start_idx(), 31); + /// assert_eq!(uri.path(), "/path"); /// ``` #[inline] - pub fn href_start_idx(&self) -> u16 { - self.href_start_idx + pub fn path(&self) -> &str { + self.uri.lease().get(self.href_start.into()..self.query_start.into()).unwrap_or_default() } + /// + /// + /// Returns [`Option::None`] if the port couldn't be evaluated based on the schema or based on + /// an explicit `... :SOME_NUMBER ...` declaration. + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.password(), "password"); + /// assert_eq!(uri.port(), Some(80)); /// ``` #[inline] - pub fn password(&self) -> &str { - if let Some(elem) = str_split_once1(self.userinfo(), b':') { - elem.1 - } else { - "" - } + pub fn port(&self) -> Option { + self.port } + /// + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.path(), "/path"); + /// assert_eq!(uri.query_and_fragment(), "?query=value#hash"); /// ``` #[inline] - pub fn path(&self) -> &str { - let href = self.href(); - str_rsplit_once1(href, b'?').map_or(href, |el| el.0) + pub fn query_and_fragment(&self) -> &str { + self.uri.lease().get(self.query_start.into()..).unwrap_or_default() } + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.port(), "80"); + /// assert_eq!(uri.relative_reference(), "/path?query=value#hash"); /// ``` #[inline] - pub fn port(&self) -> &str { - let host = self.host(); - str_split_once1(host, b':').map_or(host, |el| el.1) + pub fn relative_reference(&self) -> &str { + if let Some(elem) = self.uri.lease().get(self.href_start.into()..) { + return elem; + } + "" } + /// Like [`Self::relative_reference`] with the additional feature of returning `/` if empty. + /// /// ```rust - /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.query(), "query=value"); + /// let uri = wtx::misc::Uri::new(""); + /// assert_eq!(uri.relative_reference_slash(), "/"); /// ``` #[inline] - pub fn query(&self) -> &str { - let href = self.href(); - let before_hash = if let Some((elem, _)) = str_rsplit_once1(href, b'#') { elem } else { href }; - str_rsplit_once1(before_hash, b'?').map(|el| el.1).unwrap_or_default() + pub fn relative_reference_slash(&self) -> &str { + let relative_reference = self.relative_reference(); + if relative_reference.is_empty() { + "/" + } else { + relative_reference + } } + /// + /// /// ```rust /// let uri = wtx::misc::Uri::new("foo://user:password@hostname:80/path?query=value#hash"); - /// assert_eq!(uri.schema(), "foo"); + /// assert_eq!(uri.scheme(), "foo"); /// ``` #[inline] - pub fn schema(&self) -> &str { + pub fn scheme(&self) -> &str { self - .authority_start_idx + .authority_start .checked_sub(3) .and_then(|index| self.uri.lease().get(..index.into())) .unwrap_or_default() @@ -215,9 +211,11 @@ where #[inline] pub fn to_ref(&self) -> UriRef<'_> { UriRef { - authority_start_idx: self.authority_start_idx, - href_start_idx: self.href_start_idx, + authority_start: self.authority_start, + href_start: self.href_start, initial_len: self.initial_len, + port: self.port, + query_start: self.query_start, uri: self.uri.lease(), } } @@ -226,9 +224,11 @@ where #[inline] pub fn to_string(&self) -> UriString { UriString { - authority_start_idx: self.authority_start_idx, - href_start_idx: self.href_start_idx, + authority_start: self.authority_start, + href_start: self.href_start, initial_len: self.initial_len, + port: self.port, + query_start: self.query_start, uri: self.uri.lease().into(), } } @@ -259,23 +259,44 @@ where } } - fn parts(uri: &str) -> (u8, u16, u16) { + fn parts(uri: &str) -> (u16, u8, u8, u16) { let initial_len = uri.len().try_into().unwrap_or(u16::MAX); - let valid_uri = uri.get(..initial_len.into()).unwrap_or_else(_unlikely_dflt); - let authority_start_idx: u8 = valid_uri + let valid_uri = uri.get(..initial_len.into()).unwrap_or_default(); + let authority_start: u8 = valid_uri .match_indices("://") .next() .and_then(|(element, _)| element.wrapping_add(3).try_into().ok()) .unwrap_or(0); - let href_start_idx = valid_uri - .as_bytes() - .iter() - .copied() - .enumerate() - .skip(authority_start_idx.into()) - .find_map(|(idx, el)| (el == b'/').then_some(idx).and_then(|_usize| _usize.try_into().ok())) + let after_authority = valid_uri.as_bytes().get(authority_start.into()..).unwrap_or_default(); + let href_start = bytes_pos1(after_authority, b'/') + .and_then(|idx| usize::from(authority_start).wrapping_add(idx).try_into().ok()) + .unwrap_or_else(|| initial_len.try_into().unwrap_or_default()); + let query_start = bytes_rpos1(valid_uri, b'?') + .and_then(|element| element.try_into().ok()) .unwrap_or(initial_len); - (authority_start_idx, href_start_idx, initial_len) + (initial_len, authority_start, href_start, query_start) + } + + #[inline] + fn process_port(&mut self) { + let uri = self.uri.lease().as_bytes(); + 'explicit_port: { + self.port = match uri.get(..self.href_start.into()) { + Some([.., b':', a, b]) => u16::from_radix_10(&[*a, *b]).ok(), + Some([.., b':', a, b, c]) => u16::from_radix_10(&[*a, *b, *c]).ok(), + Some([.., b':', a, b, c, d]) => u16::from_radix_10(&[*a, *b, *c, *d]).ok(), + Some([.., b':', a, b, c, d, e]) => u16::from_radix_10(&[*a, *b, *c, *d, *e]).ok(), + _ => break 'explicit_port, + }; + return; + } + self.port = match uri { + [b'h', b't', b't', b'p', b's', ..] | [b'w', b's', b's', ..] => Some(443), + [b'h', b't', b't', b'p', ..] | [b'w', b's', ..] => Some(80), + [b'p', b'o', b's', b't', b'g', b'r', b'e', b's', b'q', b'l', ..] + | [b'p', b'o', b's', b't', b'g', b'r', b'e', b's', ..] => Some(5432), + _ => return, + }; } } @@ -283,27 +304,32 @@ impl UriString { /// Removes all content. #[inline] pub fn clear(&mut self) { - let Self { authority_start_idx, href_start_idx, initial_len, uri } = self; - *authority_start_idx = 0; - *href_start_idx = 0; + let Self { authority_start, href_start, initial_len, port, query_start, uri } = self; + *authority_start = 0; + *href_start = 0; *initial_len = 0; + *port = None; + *query_start = 0; uri.clear(); } /// Pushes an additional path only if there is no query. #[inline] pub fn push_path(&mut self, args: Arguments<'_>) -> crate::Result<()> { - if !self.query().is_empty() { + if !self.query_and_fragment().is_empty() { return Err(crate::Error::UriCanNotBeOverwritten); } + let prev = self.uri.len(); self.uri.write_fmt(args)?; + let diff = self.uri.len().wrapping_sub(prev); + self.query_start = self.query_start.wrapping_add(diff.try_into().unwrap_or(u16::MAX)); Ok(()) } /// See [`QueryWriter`]. #[inline] pub fn query_writer(&mut self) -> crate::Result> { - if !self.query().is_empty() { + if !self.query_and_fragment().is_empty() { return Err(crate::Error::UriCanNotBeOverwritten); } Ok(QueryWriter::new(&mut self.uri)) @@ -314,10 +340,11 @@ impl UriString { pub fn reset(&mut self, uri: Arguments<'_>) -> crate::Result<()> { self.uri.clear(); self.uri.write_fmt(uri)?; - let (authority_start_idx, href_start_idx, initial_len) = Self::parts(&self.uri); - self.authority_start_idx = authority_start_idx; - self.href_start_idx = href_start_idx; + let (initial_len, authority_start, href_start, query_start) = Self::parts(&self.uri); + self.authority_start = authority_start; + self.href_start = href_start; self.initial_len = initial_len; + self.query_start = query_start; Ok(()) } @@ -325,6 +352,7 @@ impl UriString { #[inline] pub fn truncate_with_initial_len(&mut self) { self.uri.truncate(self.initial_len.into()); + self.query_start = self.query_start.min(self.initial_len); } } @@ -363,19 +391,19 @@ mod tests { use crate::misc::UriString; #[test] - fn mutable_methods_have_correct_behavior() { + fn dynamic_methods_have_correct_behavior() { let mut uri = UriString::new("http://dasdas.com/rewqd".into()); uri.push_path(format_args!("/tretre")).unwrap(); assert_eq!(uri.path(), "/rewqd/tretre"); - assert_eq!(uri.query(), ""); + assert_eq!(uri.query_and_fragment(), ""); assert_eq!(uri.as_str(), "http://dasdas.com/rewqd/tretre"); uri.truncate_with_initial_len(); assert_eq!(uri.path(), "/rewqd"); - assert_eq!(uri.query(), ""); + assert_eq!(uri.query_and_fragment(), ""); assert_eq!(uri.as_str(), "http://dasdas.com/rewqd"); uri.clear(); assert_eq!(uri.path(), ""); - assert_eq!(uri.query(), ""); + assert_eq!(uri.query_and_fragment(), ""); assert_eq!(uri.as_str(), ""); } } diff --git a/wtx/src/misc/vector.rs b/wtx/src/misc/vector.rs index 146ff1cf..fa5c0d92 100644 --- a/wtx/src/misc/vector.rs +++ b/wtx/src/misc/vector.rs @@ -32,6 +32,20 @@ impl Display for VectorError { } } +impl From for u8 { + #[inline] + fn from(from: VectorError) -> Self { + match from { + VectorError::ExtendFromSliceOverflow => 0, + VectorError::ExtendFromSlicesOverflow => 1, + VectorError::OutOfBoundsInsertIdx => 2, + VectorError::PushOverflow => 3, + VectorError::ReserveOverflow => 4, + VectorError::WithCapacityOverflow => 5, + } + } +} + impl core::error::Error for VectorError {} /// A wrapper around the std's vector. diff --git a/wtx/src/pool/resource_manager.rs b/wtx/src/pool/resource_manager.rs index 27b6c85d..4f454f93 100644 --- a/wtx/src/pool/resource_manager.rs +++ b/wtx/src/pool/resource_manager.rs @@ -155,8 +155,12 @@ pub(crate) mod database { async fn create(&self, _: &Self::CreateAux) -> Result { executor!(self.uri, |config, uri| { let eb = ExecutorBuffer::with_default_params(&mut &self.rng)?; - let stream = TcpStream::connect(uri.host()).await.map_err(Into::into)?; - Executor::connect(&config, eb, &mut &self.rng, stream) + Executor::connect( + &config, + eb, + &mut &self.rng, + TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?, + ) }) } @@ -174,7 +178,8 @@ pub(crate) mod database { let mut buffer = ExecutorBuffer::_empty(); mem::swap(&mut buffer, &mut resource.eb); *resource = executor!(self.uri, |config, uri| { - let stream = TcpStream::connect(uri.host()).await.map_err(Into::into)?; + let stream = + TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?; Executor::connect(&config, buffer, &mut &self.rng, stream) })?; Ok(()) @@ -225,7 +230,7 @@ pub(crate) mod database { Executor::connect_encrypted( &config, ExecutorBuffer::with_default_params(&mut &self.rng)?, - TcpStream::connect(uri.host()).await.map_err(Into::into)?, + TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?, &mut &self.rng, |stream| async { let mut rslt = TokioRustlsConnector::from_auto()?; @@ -255,7 +260,7 @@ pub(crate) mod database { Executor::connect_encrypted( &config, ExecutorBuffer::with_default_params(&mut &self.rng)?, - TcpStream::connect(uri.host()).await.map_err(Into::into)?, + TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?, &mut &self.rng, |stream| async { let mut rslt = TokioRustlsConnector::from_auto()?; diff --git a/wtx/src/web_socket/handshake.rs b/wtx/src/web_socket/handshake.rs index 084b6b47..f453c9e0 100644 --- a/wtx/src/web_socket/handshake.rs +++ b/wtx/src/web_socket/handshake.rs @@ -191,13 +191,17 @@ where C: Compression, { let key = gen_key(key_buffer, rng); - fbw._extend_from_slices_group_rn(&[b"GET ", uri.href_slash().as_bytes(), b" HTTP/1.1"])?; + fbw._extend_from_slices_group_rn(&[ + b"GET ", + uri.relative_reference_slash().as_bytes(), + b" HTTP/1.1", + ])?; for (name, value) in headers { fbw._extend_from_slices_group_rn(&[name, b": ", value])?; } fbw._extend_from_slice_rn(b"Connection: Upgrade")?; - match (uri.schema(), uri.port()) { - ("http" | "ws", "80") | ("https" | "wss", "443") => { + match uri.port() { + Some(80 | 443) => { fbw._extend_from_slices_group_rn(&[b"Host: ", uri.hostname().as_bytes()])?; } _ => fbw._extend_from_slices_group_rn(&[b"Host: ", uri.host().as_bytes()])?, diff --git a/wtx/src/web_socket/handshake/tests.rs b/wtx/src/web_socket/handshake/tests.rs index c875de89..651e2043 100644 --- a/wtx/src/web_socket/handshake/tests.rs +++ b/wtx/src/web_socket/handshake/tests.rs @@ -55,7 +55,7 @@ where { let uri = _uri(); - let listener = TcpListener::bind(uri.host()).await.unwrap(); + let listener = TcpListener::bind(uri.hostname_with_implied_port()).await.unwrap(); let _server_jh = tokio::spawn(async move { let (stream, _) = listener.accept().await.unwrap(); let mut fb = FrameBufferVec::with_capacity(0); @@ -89,7 +89,7 @@ where [], &mut HeadersBuffer::default(), NoStdRng::default(), - TcpStream::connect(uri.host()).await.unwrap(), + TcpStream::connect(uri.hostname_with_implied_port()).await.unwrap(), &uri.to_ref(), WebSocketBuffer::with_capacity(0, 0), )