diff --git a/wtx-fuzz/Cargo.toml b/wtx-fuzz/Cargo.toml index d06459fa..be1219e2 100644 --- a/wtx-fuzz/Cargo.toml +++ b/wtx-fuzz/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [[bin]] name = "web-socket" path = "web_socket.rs" @@ -11,7 +9,7 @@ tokio = { default-features = false, features = ["rt"], version = "1.0" } wtx = { default-features = false, features = ["arbitrary", "web-socket"], path = "../wtx" } [package] -edition = "2024" +edition = "2021" name = "wtx-fuzz" publish = false version = "0.0.0" diff --git a/wtx-instances/Cargo.toml b/wtx-instances/Cargo.toml index f281c676..f84c836e 100644 --- a/wtx-instances/Cargo.toml +++ b/wtx-instances/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - # Examples [[example]] @@ -90,7 +88,7 @@ wtx-macros = { default-features = false, path = "../wtx-macros" } _tracing-tree = ["wtx/_tracing-tree"] [package] -edition = "2024" +edition = "2021" name = "wtx-instances" publish = false version = "0.0.0" \ No newline at end of file diff --git a/wtx-instances/examples/grpc-server-tokio-rustls.rs b/wtx-instances/examples/grpc-server-tokio-rustls.rs index fe9180ef..36d8e7a1 100644 --- a/wtx-instances/examples/grpc-server-tokio-rustls.rs +++ b/wtx-instances/examples/grpc-server-tokio-rustls.rs @@ -8,7 +8,7 @@ extern crate wtx_instances; use std::borrow::Cow; use wtx::{ data_transformation::dnsn::QuickProtobuf, - grpc::{Server, ServerData}, + grpc::{GrpcStatusCode, Server, ServerData}, http::{ server_framework::{post, Router}, ReqResBuffer, Request, Response, StatusCode, @@ -33,7 +33,7 @@ async fn main() -> wtx::Result<()> { async fn wtx_generic_service_generic_method( (mut sd, mut req): (ServerData, Request), -) -> wtx::Result> { +) -> wtx::Result<(GrpcStatusCode, Response)> { let _generic_request: GenericRequest = sd.des_from_req_bytes(&req.rrd.data)?; req.rrd.clear(); sd.ser_to_res_bytes( @@ -43,5 +43,5 @@ async fn wtx_generic_service_generic_method( generic_response_field1: 321, }, )?; - Ok(req.into_response(StatusCode::Ok)) + Ok((GrpcStatusCode::Ok, req.into_response(StatusCode::Ok))) } diff --git a/wtx-macros/Cargo.toml b/wtx-macros/Cargo.toml index eaed2f5f..11998cff 100644 --- a/wtx-macros/Cargo.toml +++ b/wtx-macros/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [dependencies] proc-macro2 = { default-features = false, version = "1.0" } quote = { default-features = false, features = ["proc-macro"], version = "1.0" } @@ -18,7 +16,7 @@ proc-macro = true [package] categories = ["asynchronous", "data-structures", "network-programming"] description = "Procedural macros for wtx" -edition = "2024" +edition = "2021" exclude = ["tests"] keywords = ["api", "client", "io", "network"] license = "MIT" diff --git a/wtx-ui/Cargo.toml b/wtx-ui/Cargo.toml index 73c6bd41..37a21a7b 100644 --- a/wtx-ui/Cargo.toml +++ b/wtx-ui/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [dependencies] clap = { default-features = false, features = ["derive", "help", "std", "usage"], optional = true, version = "4.0" } dotenv = { default-features = false, optional = true, version = "0.15" } @@ -21,7 +19,7 @@ authors = ["Caio Fernandes "] categories = ["asynchronous", "command-line-interface", "gui"] description = "Different user interfaces for WTX" documentation = "https://docs.rs/wtx-ui" -edition = "2024" +edition = "2021" keywords = ["io", "network", "websocket"] license = "Apache-2.0" name = "wtx-ui" diff --git a/wtx/Cargo.toml b/wtx/Cargo.toml index ca29f4ec..24af98ab 100644 --- a/wtx/Cargo.toml +++ b/wtx/Cargo.toml @@ -1,5 +1,3 @@ -cargo-features = ["edition2024"] - [dependencies] ahash = { default-features = false, features = ["no-rng"], optional = true, version = "0.8" } arbitrary = { default-features = false, features = ["derive_arbitrary"], optional = true, version = "1.0" } @@ -50,7 +48,7 @@ base64 = ["dep:base64"] borsh = ["dep:borsh", "std"] chrono = ["dep:chrono"] cl-aux = ["dep:cl-aux"] -client-api-framework = ["cl-aux", "data-transformation"] +client-api-framework = ["data-transformation"] crypto-common = ["dep:crypto-common"] data-transformation = [] database = [] @@ -105,7 +103,7 @@ authors = ["Caio Fernandes "] categories = ["asynchronous", "database", "network-programming", "no-std", "web-programming"] description = "A collection of different transport implementations and related tools focused primarily on web technologies." documentation = "https://docs.rs/wtx" -edition = "2024" +edition = "2021" exclude = ["tests"] homepage = "https://c410-f3r.github.io/wtx" keywords = ["api", "database", "http", "network", "websocket"] diff --git a/wtx/src/client_api_framework/api.rs b/wtx/src/client_api_framework/api.rs index 5dd75f45..8fce31dd 100644 --- a/wtx/src/client_api_framework/api.rs +++ b/wtx/src/client_api_framework/api.rs @@ -1,4 +1,4 @@ -use core::fmt::Display; +use core::{fmt::Display, future::Future}; /// Api definitions group different packages into a common namespace and define custom additional /// logical through hooks. diff --git a/wtx/src/client_api_framework/network/transport.rs b/wtx/src/client_api_framework/network/transport.rs index f4d88d40..0df781f5 100644 --- a/wtx/src/client_api_framework/network/transport.rs +++ b/wtx/src/client_api_framework/network/transport.rs @@ -18,15 +18,11 @@ use crate::{ pkg::{BatchElems, BatchPkg, Package, PkgsAux}, Api, }, - data_transformation::{ - dnsn::{Deserialize, Serialize}, - Id, - }, + data_transformation::dnsn::{Deserialize, Serialize}, misc::Lease, }; pub use bi_transport::*; -use cl_aux::DynContigColl; -use core::ops::Range; +use core::{future::Future, ops::Range}; pub use mock::*; pub use transport_params::*; @@ -71,31 +67,28 @@ pub trait Transport { /// /// All the expected data must be available in a single response. #[inline] - fn send_recv_decode_batch( + fn send_recv_decode_batch<'pkgs, 'pkgs_aux, A, P>( &mut self, - pkgs: &mut [P], - pkgs_aux: &mut PkgsAux, - ress: &mut RESS, - ) -> impl Future> + pkgs: &'pkgs mut [P], + pkgs_aux: &'pkgs_aux mut PkgsAux, + ) -> impl Future< + Output = Result< + impl Iterator>>, + A::Error, + >, + > where A: Api, - A::Error: From, P: Package, - P::ExternalRequestContent: Lease + Ord, - for<'de> P::ExternalResponseContent<'de>: Lease + Ord, - RESS: for<'de> DynContigColl>, - for<'any> BatchElems<'any, A, DRSR, P, Self::Params>: Serialize, + BatchElems<'pkgs, A, DRSR, P, Self::Params>: Serialize, { async { - let batch_package = &mut BatchPkg::new(pkgs); - let range = self.send_recv(batch_package, pkgs_aux).await?; + let range = self.send_recv(&mut BatchPkg::new(pkgs), pkgs_aux).await?; log_res(pkgs_aux.byte_buffer.lease()); - batch_package.decode_and_push_from_bytes( - ress, + Ok(P::ExternalResponseContent::seq_from_bytes( pkgs_aux.byte_buffer.get(range).unwrap_or_default(), &mut pkgs_aux.drsr, - )?; - Ok(()) + )) } } @@ -181,18 +174,22 @@ mod tests { type ExternalResponseContent<'de> = _Pong; type PackageParams = (); + #[inline] fn ext_req_content(&self) -> &Self::ExternalRequestContent { &self.0 } + #[inline] fn ext_req_content_mut(&mut self) -> &mut Self::ExternalRequestContent { &mut self.0 } + #[inline] fn pkg_params(&self) -> &Self::PackageParams { &self.1 } + #[inline] fn pkg_params_mut(&mut self) -> &mut Self::PackageParams { &mut self.1 } @@ -202,6 +199,7 @@ mod tests { pub(crate) struct _Ping; impl Serialize for _Ping { + #[inline] fn to_bytes(&mut self, bytes: &mut Vector, _: &mut DRSR) -> crate::Result<()> { bytes.extend_from_slice(b"ping")?; Ok(()) @@ -211,21 +209,16 @@ mod tests { #[derive(Debug, Eq, PartialEq)] pub(crate) struct _Pong(pub(crate) &'static str); - impl Deserialize<'_, DRSR> for _Pong { + impl<'de, DRSR> Deserialize<'de, DRSR> for _Pong { + #[inline] fn from_bytes(bytes: &[u8], _: &mut DRSR) -> crate::Result { assert_eq!(bytes, b"ping"); Ok(Self("pong")) } - fn seq_from_bytes( - _: &[u8], - _: &mut DRSR, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: From, - { - Ok(()) + #[inline] + fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator> { + [].into_iter() } } } diff --git a/wtx/src/client_api_framework/network/transport/bi_transport.rs b/wtx/src/client_api_framework/network/transport/bi_transport.rs index 658554a4..20039318 100644 --- a/wtx/src/client_api_framework/network/transport/bi_transport.rs +++ b/wtx/src/client_api_framework/network/transport/bi_transport.rs @@ -8,7 +8,7 @@ use crate::{ data_transformation::dnsn::Deserialize, misc::Lease, }; -use core::ops::Range; +use core::{future::Future, ops::Range}; /// Bidirectional Transport /// diff --git a/wtx/src/client_api_framework/pkg.rs b/wtx/src/client_api_framework/pkg.rs index bc50b2b2..1daab318 100644 --- a/wtx/src/client_api_framework/pkg.rs +++ b/wtx/src/client_api_framework/pkg.rs @@ -11,6 +11,7 @@ use crate::{ data_transformation::dnsn::{Deserialize, Serialize}, }; pub use batch_pkg::{BatchElems, BatchPkg}; +use core::future::Future; pub use pkg_with_helper::*; pub use pkgs_aux::*; diff --git a/wtx/src/client_api_framework/pkg/batch_pkg.rs b/wtx/src/client_api_framework/pkg/batch_pkg.rs index dce7b758..ab645b13 100644 --- a/wtx/src/client_api_framework/pkg/batch_pkg.rs +++ b/wtx/src/client_api_framework/pkg/batch_pkg.rs @@ -1,14 +1,7 @@ use crate::{ - client_api_framework::{ - network::transport::TransportParams, pkg::Package, Api, ClientApiFrameworkError, - }, - data_transformation::{ - dnsn::{Deserialize, Serialize}, - Id, - }, - misc::Lease, + client_api_framework::{network::transport::TransportParams, pkg::Package, Api}, + data_transformation::dnsn::Serialize, }; -use cl_aux::DynContigColl; use core::marker::PhantomData; /// Used to perform batch requests with multiple packages. @@ -23,81 +16,6 @@ impl<'slice, A, DRSR, P, TP> BatchPkg<'slice, A, DRSR, P, TP> { } } -impl BatchPkg<'_, A, DRSR, P, TP> -where - A: Api, - P: Package, - P::ExternalRequestContent: Lease + Ord, - for<'de> P::ExternalResponseContent<'de>: Lease + Ord, - TP: TransportParams, -{ - /// Deserializes a sequence of bytes and then pushes them to the provided buffer. - #[inline] - pub fn decode_and_push_from_bytes( - &mut self, - buffer: &mut B, - bytes: &[u8], - drsr: &mut DRSR, - ) -> Result<(), A::Error> - where - A::Error: From, - B: for<'de> DynContigColl>, - { - if self.0 .0.is_empty() { - return Ok(()); - } - Self::is_sorted(self.0 .0.iter().map(|elem| elem.ext_req_content().lease()))?; - let mut pkgs_idx = 0; - let mut responses_are_not_sorted = false; - P::ExternalResponseContent::seq_from_bytes(bytes, drsr, |eresc| { - let eresc_id = *eresc.lease(); - let found_pkgs_idx = Self::search_slice(pkgs_idx, eresc_id, self.0 .0)?; - if pkgs_idx != found_pkgs_idx { - responses_are_not_sorted = true; - } - buffer.push(eresc).map_err(Into::into)?; - pkgs_idx = pkgs_idx.wrapping_add(1); - Ok::<_, A::Error>(()) - })?; - if responses_are_not_sorted { - buffer.sort_unstable(); - } - Ok(()) - } - - fn is_sorted(mut iter: impl Iterator) -> crate::Result<()> - where - T: PartialOrd, - { - let mut is_sorted = true; - let Some(mut previous) = iter.next() else { - return Ok(()); - }; - for curr in iter { - if previous > curr { - is_sorted = false; - break; - } - previous = curr; - } - if is_sorted { - Ok(()) - } else { - Err(ClientApiFrameworkError::BatchPackagesAreNotSorted.into()) - } - } - - // First try indexing and then falls back to binary search - fn search_slice(idx: usize, eresc_id: Id, pkgs: &[P]) -> crate::Result { - if pkgs.get(idx).map(|pkg| *pkg.ext_req_content().lease() == eresc_id).unwrap_or_default() { - return Ok(idx); - } - pkgs.binary_search_by(|req| req.ext_req_content().lease().cmp(&&eresc_id)).ok().ok_or( - ClientApiFrameworkError::ResponseIdIsNotPresentInTheOfSentBatchPackages(eresc_id).into(), - ) - } -} - impl<'slice, A, DRSR, P, TP> Package for BatchPkg<'slice, A, DRSR, P, TP> where A: Api, diff --git a/wtx/src/data_transformation.rs b/wtx/src/data_transformation.rs index 8c69a027..d64600c7 100644 --- a/wtx/src/data_transformation.rs +++ b/wtx/src/data_transformation.rs @@ -6,7 +6,6 @@ mod macros; mod data_transformation_error; pub mod dnsn; pub mod format; -mod seq_visitor; pub use data_transformation_error::DataTransformationError; diff --git a/wtx/src/data_transformation/dnsn/deserialize.rs b/wtx/src/data_transformation/dnsn/deserialize.rs index bdc4bc51..1326d1c3 100644 --- a/wtx/src/data_transformation/dnsn/deserialize.rs +++ b/wtx/src/data_transformation/dnsn/deserialize.rs @@ -1,5 +1,3 @@ -use core::fmt::Display; - /// Marker trait that has different bounds according to the given set of enabled deserializers. pub trait Deserialize<'de, DRSR> where @@ -9,30 +7,18 @@ where fn from_bytes(bytes: &'de [u8], drsr: &mut DRSR) -> crate::Result; /// Similar to [`Self::from_bytes`] but deals with sequences instead of a single element. - fn seq_from_bytes( - bytes: &'de [u8], - drsr: &mut DRSR, - cb: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From; + fn seq_from_bytes(bytes: &'de [u8], drsr: &mut DRSR) + -> impl Iterator>; } impl<'de, DRSR> Deserialize<'de, DRSR> for () { #[inline] - fn from_bytes(_: &[u8], _: &mut DRSR) -> crate::Result { + fn from_bytes(_: &'de [u8], _: &mut DRSR) -> crate::Result { Ok(()) } #[inline] - fn seq_from_bytes( - _: &[u8], - _: &mut DRSR, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator> { + [].into_iter() } } diff --git a/wtx/src/data_transformation/format/borsh/borsh_response.rs b/wtx/src/data_transformation/format/borsh/borsh_response.rs index 7d0b2aa7..8837be20 100644 --- a/wtx/src/data_transformation/format/borsh/borsh_response.rs +++ b/wtx/src/data_transformation/format/borsh/borsh_response.rs @@ -20,11 +20,8 @@ where } #[inline] - fn seq_from_bytes(_: &[u8], _: &mut (), _: impl FnMut(Self) -> Result<(), E>) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { + [].into_iter() } } @@ -37,9 +34,8 @@ impl Serialize<()> for BorshResponse { #[cfg(feature = "borsh")] mod borsh { - use crate::data_transformation::{dnsn::Borsh, format::BorshResponse, DataTransformationError}; + use crate::data_transformation::{dnsn::Borsh, format::BorshResponse}; use borsh::BorshDeserialize; - use core::fmt::Display; impl<'de, D> crate::data_transformation::dnsn::Deserialize<'de, Borsh> for BorshResponse where @@ -49,15 +45,8 @@ mod borsh { Ok(Self { data: D::deserialize(&mut bytes)? }) } - fn seq_from_bytes( - _: &[u8], - _: &mut Borsh, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - Err(E::from(DataTransformationError::UnsupportedOperation.into())) + fn seq_from_bytes(_: &'de [u8], _: &mut Borsh) -> impl Iterator> { + [].into_iter() } } } diff --git a/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs b/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs index 083b3181..27e9fd52 100644 --- a/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs +++ b/wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs @@ -116,11 +116,10 @@ mod serde { #[cfg(feature = "serde_json")] mod serde_json { use crate::{ - data_transformation::{dnsn::SerdeJson, format::GraphQlResponse, seq_visitor::_SeqVisitor}, + data_transformation::{dnsn::SerdeJson, format::GraphQlResponse}, misc::Vector, }; - use core::fmt::Display; - use serde::de::Deserializer; + use serde_json::{de::SliceRead, StreamDeserializer}; impl<'de, D, E> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for GraphQlResponse @@ -134,17 +133,11 @@ mod serde_json { } #[inline] - fn seq_from_bytes( + fn seq_from_bytes( bytes: &'de [u8], _: &mut SerdeJson, - cb: impl FnMut(Self) -> Result<(), ERR>, - ) -> Result<(), ERR> - where - ERR: Display + From, - { - let mut de = serde_json::Deserializer::from_slice(bytes); - de.deserialize_seq(_SeqVisitor::_new(cb)).map_err(Into::into)?; - Ok(()) + ) -> impl Iterator> { + StreamDeserializer::new(SliceRead::new(bytes)).map(|el| el.map_err(From::from)) } } diff --git a/wtx/src/data_transformation/format/json/json_response.rs b/wtx/src/data_transformation/format/json/json_response.rs index a5ab2f5f..e9c49219 100644 --- a/wtx/src/data_transformation/format/json/json_response.rs +++ b/wtx/src/data_transformation/format/json/json_response.rs @@ -20,15 +20,8 @@ where } #[inline] - fn seq_from_bytes( - _: &'de [u8], - _: &mut (), - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { + [].into_iter() } } @@ -42,11 +35,10 @@ impl Serialize<()> for JsonResponse { #[cfg(feature = "serde_json")] mod serde_json { use crate::{ - data_transformation::{dnsn::SerdeJson, format::JsonResponse, seq_visitor::_SeqVisitor}, + data_transformation::{dnsn::SerdeJson, format::JsonResponse}, misc::Vector, }; - use core::fmt::Display; - use serde::de::Deserializer; + use serde_json::{de::SliceRead, StreamDeserializer}; impl<'de, D> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for JsonResponse where @@ -58,17 +50,12 @@ mod serde_json { } #[inline] - fn seq_from_bytes( + fn seq_from_bytes( bytes: &'de [u8], _: &mut SerdeJson, - mut cb: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - let mut de = serde_json::Deserializer::from_slice(bytes); - de.deserialize_seq(_SeqVisitor::_new(|data| cb(Self { data }))).map_err(Into::into)?; - Ok(()) + ) -> impl Iterator> { + StreamDeserializer::new(SliceRead::new(bytes)) + .map(|el| el.map(|data| JsonResponse { data }).map_err(From::from)) } } diff --git a/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs b/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs index ba6db473..9eae0097 100644 --- a/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs +++ b/wtx/src/data_transformation/format/json_rpc/json_rpc_response.rs @@ -35,15 +35,8 @@ where } #[inline] - fn seq_from_bytes( - _: &'de [u8], - _: &mut (), - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { + [].into_iter() } } @@ -252,11 +245,12 @@ mod serde { #[cfg(feature = "serde_json")] mod serde_json { + use serde_json::{de::SliceRead, StreamDeserializer}; + use crate::{ - data_transformation::{dnsn::SerdeJson, format::JsonRpcResponse, seq_visitor::_SeqVisitor}, + data_transformation::{dnsn::SerdeJson, format::JsonRpcResponse}, misc::Vector, }; - use core::fmt::Display; impl<'de, R> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson> for JsonRpcResponse where @@ -268,18 +262,11 @@ mod serde_json { } #[inline] - fn seq_from_bytes( + fn seq_from_bytes( bytes: &'de [u8], _: &mut SerdeJson, - cb: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - use ::serde::de::Deserializer; - let mut de = serde_json::Deserializer::from_slice(bytes); - de.deserialize_seq(_SeqVisitor::_new(cb)).map_err(Into::into)?; - Ok(()) + ) -> impl Iterator> { + StreamDeserializer::new(SliceRead::new(bytes)).map(|el| el.map_err(crate::Error::from)) } } diff --git a/wtx/src/data_transformation/format/protobuf/protobuf_request.rs b/wtx/src/data_transformation/format/protobuf/protobuf_request.rs index 227a1c2e..6900d97d 100644 --- a/wtx/src/data_transformation/format/protobuf/protobuf_request.rs +++ b/wtx/src/data_transformation/format/protobuf/protobuf_request.rs @@ -20,11 +20,9 @@ mod quick_protobuf { data_transformation::{ dnsn::{Deserialize, QuickProtobuf, Serialize}, format::ProtobufRequest, - DataTransformationError, }, misc::Vector, }; - use core::fmt::Display; use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer}; impl<'de, D> Deserialize<'de, QuickProtobuf> for ProtobufRequest @@ -37,15 +35,11 @@ mod quick_protobuf { } #[inline] - fn seq_from_bytes( - _: &[u8], + fn seq_from_bytes( + _: &'de [u8], _: &mut QuickProtobuf, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - Err(E::from(DataTransformationError::UnsupportedOperation.into())) + ) -> impl Iterator> { + [].into_iter() } } diff --git a/wtx/src/data_transformation/format/protobuf/protobuf_response.rs b/wtx/src/data_transformation/format/protobuf/protobuf_response.rs index 4a8e4e09..e6718b17 100644 --- a/wtx/src/data_transformation/format/protobuf/protobuf_response.rs +++ b/wtx/src/data_transformation/format/protobuf/protobuf_response.rs @@ -20,11 +20,8 @@ where } #[inline] - fn seq_from_bytes(_: &[u8], _: &mut (), _: impl FnMut(Self) -> Result<(), E>) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { + [].into_iter() } } @@ -41,11 +38,9 @@ mod quick_protobuf { data_transformation::{ dnsn::{Deserialize, QuickProtobuf, Serialize}, format::ProtobufResponse, - DataTransformationError, }, misc::Vector, }; - use core::fmt::Display; use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer}; impl<'de, D> Deserialize<'de, QuickProtobuf> for ProtobufResponse @@ -58,15 +53,11 @@ mod quick_protobuf { } #[inline] - fn seq_from_bytes( - _: &[u8], + fn seq_from_bytes( + _: &'de [u8], _: &mut QuickProtobuf, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - Err(E::from(DataTransformationError::UnsupportedOperation.into())) + ) -> impl Iterator> { + [].into_iter() } } diff --git a/wtx/src/data_transformation/format/verbatim/verbatim_response.rs b/wtx/src/data_transformation/format/verbatim/verbatim_response.rs index 16bd593f..abe3d611 100644 --- a/wtx/src/data_transformation/format/verbatim/verbatim_response.rs +++ b/wtx/src/data_transformation/format/verbatim/verbatim_response.rs @@ -20,11 +20,8 @@ where } #[inline] - fn seq_from_bytes(_: &[u8], _: &mut (), _: impl FnMut(Self) -> Result<(), E>) -> Result<(), E> - where - E: From, - { - Ok(()) + fn seq_from_bytes(_: &'de [u8], _: &mut ()) -> impl Iterator> { + [].into_iter() } } @@ -37,8 +34,7 @@ impl Serialize<()> for VerbatimResponse { #[cfg(feature = "rkyv")] mod rkyv { - use crate::data_transformation::{dnsn::Rkyv, format::VerbatimResponse, DataTransformationError}; - use core::fmt::Display; + use crate::data_transformation::{dnsn::Rkyv, format::VerbatimResponse}; use rkyv::{ bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, validation::validators::DefaultValidator, Archive, @@ -50,6 +46,7 @@ mod rkyv { for<'any> D::Archived: CheckBytes> + rkyv::Deserialize, { + #[inline] fn from_bytes(bytes: &[u8], _: &mut Rkyv) -> crate::Result { Ok(Self { data: rkyv::from_bytes(bytes) @@ -57,15 +54,9 @@ mod rkyv { }) } - fn seq_from_bytes( - _: &[u8], - _: &mut Rkyv, - _: impl FnMut(Self) -> Result<(), E>, - ) -> Result<(), E> - where - E: Display + From, - { - Err(E::from(DataTransformationError::UnsupportedOperation.into())) + #[inline] + fn seq_from_bytes(_: &'de [u8], _: &mut Rkyv) -> impl Iterator> { + [].into_iter() } } } diff --git a/wtx/src/data_transformation/seq_visitor.rs b/wtx/src/data_transformation/seq_visitor.rs index a157f596..e8df5df1 100644 --- a/wtx/src/data_transformation/seq_visitor.rs +++ b/wtx/src/data_transformation/seq_visitor.rs @@ -1,10 +1,10 @@ use core::marker::PhantomData; -pub(crate) struct _SeqVisitor(F, PhantomData<(E, T)>); +pub(crate) struct _SeqVisitor(F, R, PhantomData<(A, E, T)>); -impl _SeqVisitor { - pub(crate) fn _new(cb: F) -> Self { - Self(cb, PhantomData) +impl _SeqVisitor { + pub(crate) fn _new(first: F, rest: R) -> Self { + Self(first, rest, PhantomData) } } @@ -13,33 +13,55 @@ mod serde { use crate::data_transformation::seq_visitor::_SeqVisitor; use core::{ any::type_name, - fmt::{Display, Formatter}, + fmt::Formatter, marker::PhantomData, }; use serde::{ de::{Error as _, SeqAccess, Visitor}, Deserialize, }; - impl<'de, E, F, T> Visitor<'de> for _SeqVisitor + impl<'de, A, E, F, R, T> Visitor<'de> for _SeqVisitor where - E: Display, - F: FnMut(T) -> Result<(), E>, + E: From, + F: FnOnce(T) -> Result, + R: FnMut(A, T) -> Result, T: Deserialize<'de>, { - type Value = (); + type Value = impl Iterator>; + #[inline] fn expecting(&self, formatter: &mut Formatter<'_>) -> core::fmt::Result { formatter.write_fmt(format_args!("generic sequence of {}", type_name::())) } - fn visit_seq(mut self, mut seq: A) -> Result + #[inline] + fn visit_seq(mut self, mut seq: SA) -> Result where - A: SeqAccess<'de>, + SA: SeqAccess<'de>, { - while let Some(elem) = seq.next_element::()? { - (self.0)(elem).map_err(A::Error::custom)?; + struct Iter<'de, E, SA, T> { + phantom: PhantomData<(&'de (), E, T)>, + sa: SA } - Ok(()) + + impl<'de, E, SA, T> Iterator for Iter<'de, E, SA, T> + where + E: From, + SA: SeqAccess<'de>, + T: Deserialize<'de>, + { + type Item = Result; + + #[inline] + fn next(&mut self) -> Option { + self.sa.next_element::().map_err(E::from).transpose() + } + } + + Ok(Iter { + phantom: PhantomData, + sa: seq + }) } } } diff --git a/wtx/src/database/client/postgres/executor.rs b/wtx/src/database/client/postgres/executor.rs index 9a972150..1f9a40b2 100644 --- a/wtx/src/database/client/postgres/executor.rs +++ b/wtx/src/database/client/postgres/executor.rs @@ -17,7 +17,7 @@ use crate::{ }, misc::{ConnectionState, FilledBufferWriter, Lease, LeaseMut, Rng, Stream, StreamWithTls}, }; -use core::marker::PhantomData; +use core::{future::Future, marker::PhantomData}; /// Executor #[derive(Debug)] diff --git a/wtx/src/database/schema_manager.rs b/wtx/src/database/schema_manager.rs index 562cc2df..cf41fae4 100644 --- a/wtx/src/database/schema_manager.rs +++ b/wtx/src/database/schema_manager.rs @@ -23,6 +23,7 @@ use crate::{ }; use alloc::{string::String, vec::Vec}; pub use commands::*; +use core::future::Future; pub use migration::*; pub use repeatability::Repeatability; pub use schema_manager_error::SchemaManagerError; diff --git a/wtx/src/database/transaction_manager.rs b/wtx/src/database/transaction_manager.rs index 37449821..2809fea9 100644 --- a/wtx/src/database/transaction_manager.rs +++ b/wtx/src/database/transaction_manager.rs @@ -1,3 +1,5 @@ +use core::future::Future; + /// Manages a set of atomic operations. pub trait TransactionManager: Sized { /// Executor diff --git a/wtx/src/error.rs b/wtx/src/error.rs index 3d117a2f..e955c379 100644 --- a/wtx/src/error.rs +++ b/wtx/src/error.rs @@ -105,7 +105,9 @@ pub enum Error { /// A buffer was partially read or write but should in fact be fully processed. UnexpectedBufferState, /// Unexpected end of file when reading from a stream. - UnexpectedStreamEOF, + UnexpectedStreamReadEOF, + /// Unexpected end of file when writing to a stream. + UnexpectedStreamWriteEOF, /// Unexpected String UnexpectedString { length: usize, diff --git a/wtx/src/grpc.rs b/wtx/src/grpc.rs index 95a7d2ec..2ab68959 100644 --- a/wtx/src/grpc.rs +++ b/wtx/src/grpc.rs @@ -2,11 +2,13 @@ //! framework. mod client; +mod grpc_status_code; mod server; mod server_data; use crate::{data_transformation::dnsn::Serialize, misc::Vector}; pub use client::Client; +pub use grpc_status_code::GrpcStatusCode; pub use server::Server; pub use server_data::ServerData; diff --git a/wtx/src/grpc/grpc_status_code.rs b/wtx/src/grpc/grpc_status_code.rs new file mode 100644 index 00000000..555d629b --- /dev/null +++ b/wtx/src/grpc/grpc_status_code.rs @@ -0,0 +1,66 @@ +/// gRPC status codes. +#[derive(Clone, Copy, Debug)] +pub enum GrpcStatusCode { + /// Not an error; returned on success. + Ok = 0, + /// The operation was cancelled, typically by the caller. + Cancelled = 1, + /// Unknown error. + Unknown = 2, + /// The client specified an invalid argument. + InvalidArgument = 3, + /// The deadline expired before the operation could complete. + DeadlineExceeded = 4, + /// Some requested entity (e.g., file or directory) was not found. + NotFound = 5, + /// The entity that a client attempted to create (e.g., file or directory) already exists. + AlreadyExists = 6, + /// The caller does not have permission to execute the specified operation. + PermissionDenied = 7, + /// Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file + /// system is out of space. + ResourceExhausted = 8, + /// The operation was rejected because the system is not in a state required for the + /// operation's execution. + FailedPrecondition = 9, + /// The operation was aborted, typically due to a concurrency issue such as a sequencer check + /// failure or transaction abort. + Aborted = 10, + /// The operation was attempted past the valid range. + OutOfRange = 11, + /// The operation is not implemented or is not supported/enabled in this service. + Unimplemented = 12, + /// Internal error. + Internal = 13, + /// The service is currently unavailable. + Unavailable = 14, + /// Unrecoverable data loss or corruption. + DataLoss = 15, + /// The request does not have valid authentication credentials for the operation. + Unauthenticated = 16, +} + +impl GrpcStatusCode { + #[inline] + pub(crate) fn number_as_str(self) -> &'static str { + match self { + GrpcStatusCode::Ok => "0", + GrpcStatusCode::Cancelled => "1", + GrpcStatusCode::Unknown => "2", + GrpcStatusCode::InvalidArgument => "3", + GrpcStatusCode::DeadlineExceeded => "4", + GrpcStatusCode::NotFound => "5", + GrpcStatusCode::AlreadyExists => "6", + GrpcStatusCode::PermissionDenied => "7", + GrpcStatusCode::ResourceExhausted => "8", + GrpcStatusCode::FailedPrecondition => "9", + GrpcStatusCode::Aborted => "10", + GrpcStatusCode::OutOfRange => "11", + GrpcStatusCode::Unimplemented => "12", + GrpcStatusCode::Internal => "13", + GrpcStatusCode::Unavailable => "14", + GrpcStatusCode::DataLoss => "15", + GrpcStatusCode::Unauthenticated => "16", + } + } +} diff --git a/wtx/src/http/low_level_server/tokio_http2.rs b/wtx/src/http/low_level_server/tokio_http2.rs index 2540cc43..fd86a7c9 100644 --- a/wtx/src/http/low_level_server/tokio_http2.rs +++ b/wtx/src/http/low_level_server/tokio_http2.rs @@ -3,6 +3,7 @@ use crate::{ http2::{Http2ErrorCode, Http2Params, Http2Tokio}, misc::{Either, FnFut, StreamReader, StreamWriter}, }; +use core::future::Future; use tokio::net::{TcpListener, TcpStream}; type Http2Buffer = crate::http2::Http2Buffer; @@ -28,10 +29,7 @@ impl LowLevelServer { ACPT: Send + 'static, AUX: Clone + Send + 'static, E: From + Send + 'static, - SR: Send - + StreamReader - + Unpin - + 'static, + SR: Send + StreamReader + Unpin + 'static, SW: Send + StreamWriter + Unpin + 'static, SF: Send + Future>, F: Copy @@ -82,10 +80,7 @@ async fn manage_conn( where AUX: Clone + Send + 'static, E: From + Send + 'static, - SR: Send - + StreamReader - + Unpin - + 'static, + SR: Send + StreamReader + Unpin + 'static, SW: Send + StreamWriter + Unpin + 'static, SF: Send + Future>, F: Copy + FnFut<(AUX, Request), Result, E>> + Send + 'static, diff --git a/wtx/src/http/low_level_server/tokio_web_socket.rs b/wtx/src/http/low_level_server/tokio_web_socket.rs index 1d49d3bd..377986b2 100644 --- a/wtx/src/http/low_level_server/tokio_web_socket.rs +++ b/wtx/src/http/low_level_server/tokio_web_socket.rs @@ -4,7 +4,7 @@ use crate::{ pool::{Pool, SimplePoolGetElem, SimplePoolResource, SimplePoolTokio, WebSocketRM}, web_socket::{Compression, FrameBuffer, FrameBufferVec, WebSocketBuffer, WebSocketServer}, }; -use core::fmt::Debug; +use core::{fmt::Debug, future::Future}; use std::sync::OnceLock; use tokio::{ net::{TcpListener, TcpStream}, diff --git a/wtx/src/http/req_res_buffer.rs b/wtx/src/http/req_res_buffer.rs index 76349289..6def0f6c 100644 --- a/wtx/src/http/req_res_buffer.rs +++ b/wtx/src/http/req_res_buffer.rs @@ -1,6 +1,6 @@ use crate::{ - http::{Headers, Method, Request, Response, StatusCode, Version}, - misc::{Lease, LeaseMut, UriString, Vector}, + http::{Headers, Method, ReqResData, ReqResDataMut, Request, Response, StatusCode, Version}, + misc::{Lease, LeaseMut, UriRef, UriString, Vector}, }; use alloc::string::String; @@ -74,6 +74,32 @@ impl ReqResBuffer { } } +impl ReqResData for ReqResBuffer { + type Body = [u8]; + + #[inline] + fn body(&self) -> &Self::Body { + &self.data + } + + #[inline] + fn headers(&self) -> &Headers { + &self.headers + } + + #[inline] + fn uri(&self) -> UriRef<'_> { + self.uri.to_ref() + } +} + +impl ReqResDataMut for ReqResBuffer { + #[inline] + fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } +} + impl Default for ReqResBuffer { #[inline] fn default() -> Self { diff --git a/wtx/src/http/req_res_data.rs b/wtx/src/http/req_res_data.rs index eaae3b15..d9f9a9cb 100644 --- a/wtx/src/http/req_res_data.rs +++ b/wtx/src/http/req_res_data.rs @@ -1,5 +1,5 @@ use crate::{ - http::{Headers, ReqResBuffer}, + http::Headers, misc::{Lease, Uri, UriRef}, }; use alloc::boxed::Box; @@ -164,25 +164,6 @@ where } } -impl ReqResData for ReqResBuffer { - type Body = [u8]; - - #[inline] - fn body(&self) -> &Self::Body { - &self.data - } - - #[inline] - fn headers(&self) -> &Headers { - &self.headers - } - - #[inline] - fn uri(&self) -> UriRef<'_> { - self.uri.to_ref() - } -} - impl ReqResData for Uri where S: Lease, diff --git a/wtx/src/http/server_framework/middlewares.rs b/wtx/src/http/server_framework/middlewares.rs index a4dbc0fd..e5f642f0 100644 --- a/wtx/src/http/server_framework/middlewares.rs +++ b/wtx/src/http/server_framework/middlewares.rs @@ -2,6 +2,7 @@ use crate::{ http::{Request, Response}, misc::FnFut, }; +use core::future::Future; /// Requests middlewares pub trait ReqMiddlewares diff --git a/wtx/src/http/server_framework/path_fun.rs b/wtx/src/http/server_framework/path_fun.rs index d8efc165..7c42025f 100644 --- a/wtx/src/http/server_framework/path_fun.rs +++ b/wtx/src/http/server_framework/path_fun.rs @@ -2,6 +2,7 @@ use crate::{ http::{HttpError, ReqResData, Request, Response}, misc::{atoi, bytes_pos1, bytes_split1}, }; +use core::future::Future; /// Path function pub trait PathFun { @@ -78,8 +79,8 @@ impl PathFun for fn((crate::grpc::ServerData, R where DRSR: Default, E: From, - FUT: Future, E>>, - RRD: ReqResData, + FUT: Future), E>>, + RRD: crate::http::ReqResDataMut, { #[inline] async fn call( @@ -88,6 +89,26 @@ where req: Request, _: [usize; 2], ) -> Result, E> { - (self)((crate::grpc::ServerData::new(DRSR::default()), req)).await + let mut tuple = (self)((crate::grpc::ServerData::new(DRSR::default()), req)).await?; + tuple.1.rrd.headers_mut().clear(); + tuple.1.rrd.headers_mut().push_front( + crate::http::Header { + is_sensitive: false, + is_trailer: false, + name: crate::http::KnownHeaderName::ContentType.into(), + value: b"application/grpc", + }, + &[], + )?; + tuple.1.rrd.headers_mut().push_front( + crate::http::Header { + is_sensitive: false, + is_trailer: true, + name: b"grpc-status", + value: tuple.0.number_as_str().as_bytes(), + }, + &[], + )?; + Ok(tuple.1) } } diff --git a/wtx/src/http/server_framework/path_management.rs b/wtx/src/http/server_framework/path_management.rs index 3db5bf08..d1142ec6 100644 --- a/wtx/src/http/server_framework/path_management.rs +++ b/wtx/src/http/server_framework/path_management.rs @@ -1,4 +1,5 @@ use crate::http::{server_framework::Path, HttpError, ReqResData, Request, Response}; +use core::future::Future; /// Used by all structures that somehow interact with incoming requests. pub trait PathManagement diff --git a/wtx/src/http2.rs b/wtx/src/http2.rs index a2b079e7..4f3c8685 100644 --- a/wtx/src/http2.rs +++ b/wtx/src/http2.rs @@ -58,7 +58,7 @@ pub use client_stream::ClientStream; pub(crate) use common_flags::CommonFlags; pub(crate) use continuation_frame::ContinuationFrame; use core::{ - future::poll_fn, + future::{poll_fn, Future}, mem, pin::pin, sync::atomic::{AtomicBool, Ordering}, @@ -203,7 +203,7 @@ where { hb.lease_mut().clear(); let mut buffer = [0; 24]; - stream_reader.read_exact(&mut buffer).await?; + let _ = stream_reader.read(&mut buffer).await?; if &buffer != PREFACE { let _rslt = stream_writer .write_all(&GoAwayFrame::new(Http2ErrorCode::ProtocolError, U31::ZERO).bytes()) diff --git a/wtx/src/http2/client_stream.rs b/wtx/src/http2/client_stream.rs index b2d84637..00c82119 100644 --- a/wtx/src/http2/client_stream.rs +++ b/wtx/src/http2/client_stream.rs @@ -11,7 +11,11 @@ use crate::{ }, misc::{Either, Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span}, }; -use core::{future::poll_fn, pin::pin, task::Poll}; +use core::{ + future::{poll_fn, Future}, + pin::pin, + task::Poll, +}; /// Groups the methods used by clients that connect to servers. #[derive(Debug)] diff --git a/wtx/src/http2/frame_reader.rs b/wtx/src/http2/frame_reader.rs index 1e4b8bce..d5297c33 100644 --- a/wtx/src/http2/frame_reader.rs +++ b/wtx/src/http2/frame_reader.rs @@ -27,7 +27,7 @@ use crate::{ misc::{LeaseMut, Lock, PartitionedFilledBuffer, RefCounter, StreamReader, StreamWriter}, }; use core::{ - future::poll_fn, + future::{poll_fn, Future}, marker::PhantomData, mem, pin::pin, @@ -79,7 +79,10 @@ where SR: StreamReader, SW: StreamWriter, { - let fi = read_frame_until(hd, is_conn_open, max_frame_len, pfb, stream_reader).await?; + let Some(fi) = read_frame_until(hd, is_conn_open, max_frame_len, pfb, stream_reader).await? + else { + return Ok(()); + }; match fi.ty { FrameInitTy::Data => { let mut lock = hd.lock().await; diff --git a/wtx/src/http2/misc.rs b/wtx/src/http2/misc.rs index 95227e4c..647fe985 100644 --- a/wtx/src/http2/misc.rs +++ b/wtx/src/http2/misc.rs @@ -221,7 +221,7 @@ pub(crate) async fn read_frame_until max_frame_len: u32, pfb: &mut PartitionedFilledBuffer, stream_reader: &mut SR, -) -> crate::Result +) -> crate::Result> where HB: LeaseMut>, HD: RefCounter, @@ -235,7 +235,8 @@ where match fi.ty { FrameInitTy::GoAway => { let gaf = GoAwayFrame::read(pfb._current(), fi)?; - return Err(crate::Error::Http2ErrorGoAway(gaf.error_code(), None)); + send_go_away(gaf.error_code(), &mut hd.lock().await.parts_mut()).await; + return Ok(None); } FrameInitTy::Ping => { let mut pf = PingFrame::read(pfb._current(), fi)?; @@ -273,7 +274,7 @@ where } } } - return Ok(fi); + return Ok(Some(fi)); } Err(protocol_err(Http2Error::VeryLargeAmountOfFrameMismatches)) } diff --git a/wtx/src/http2/send_msg.rs b/wtx/src/http2/send_msg.rs index 25fe7747..7f5ca31a 100644 --- a/wtx/src/http2/send_msg.rs +++ b/wtx/src/http2/send_msg.rs @@ -37,7 +37,7 @@ use crate::{ misc::{LeaseMut, Lock, RefCounter, StreamWriter, Usize, Vector}, }; use core::{ - future::poll_fn, + future::{poll_fn, Future}, pin::pin, sync::atomic::{AtomicBool, Ordering}, task::{Poll, Waker}, @@ -255,7 +255,6 @@ fn encode_trailers( headers: &Headers, (hpack_enc, hpack_enc_buffer): (&mut HpackEncoder, &mut Vector), ) -> crate::Result<()> { - hpack_enc_buffer.clear(); hpack_enc.encode(hpack_enc_buffer, [].into_iter(), headers.iter().filter(|el| el.is_trailer))?; Ok(()) } diff --git a/wtx/src/http2/server_stream.rs b/wtx/src/http2/server_stream.rs index c9e32927..5ff76068 100644 --- a/wtx/src/http2/server_stream.rs +++ b/wtx/src/http2/server_stream.rs @@ -11,7 +11,10 @@ use crate::{ }, misc::{Either, Lease, LeaseMut, Lock, RefCounter, StreamWriter, _Span}, }; -use core::{future::poll_fn, pin::pin}; +use core::{ + future::{poll_fn, Future}, + pin::pin, +}; /// Created when a server receives an initial stream. #[derive(Debug)] diff --git a/wtx/src/lib.rs b/wtx/src/lib.rs index 5d06166b..d92b9198 100644 --- a/wtx/src/lib.rs +++ b/wtx/src/lib.rs @@ -2,7 +2,7 @@ #![cfg_attr(feature = "_bench", allow(soft_unstable))] #![cfg_attr(feature = "_bench", feature(test))] #![doc = include_str!("../README.md")] -#![feature(macro_metavar_expr, noop_waker, return_type_notation)] +#![feature(impl_trait_in_assoc_type, macro_metavar_expr, noop_waker, return_type_notation)] #![no_std] extern crate alloc; diff --git a/wtx/src/misc.rs b/wtx/src/misc.rs index f0648475..0165c823 100644 --- a/wtx/src/misc.rs +++ b/wtx/src/misc.rs @@ -228,7 +228,7 @@ where let actual_buffer = buffer.get_mut(*read..).unwrap_or_default(); let local_read = stream_reader.read(actual_buffer).await?; if local_read == 0 { - return Err(crate::Error::UnexpectedStreamEOF); + return Err(crate::Error::UnexpectedStreamReadEOF); } *read = read.wrapping_add(local_read); } diff --git a/wtx/src/misc/fn_fut.rs b/wtx/src/misc/fn_fut.rs index b936b280..b7474656 100644 --- a/wtx/src/misc/fn_fut.rs +++ b/wtx/src/misc/fn_fut.rs @@ -1,3 +1,5 @@ +use core::future::Future; + /// Simulates `impl for<'any> Fn(&'any ..) -> impl Future + 'any` due to the lack of compiler /// support. /// diff --git a/wtx/src/misc/lock.rs b/wtx/src/misc/lock.rs index d50febde..f1d519ab 100644 --- a/wtx/src/misc/lock.rs +++ b/wtx/src/misc/lock.rs @@ -1,5 +1,5 @@ use alloc::{rc::Rc, sync::Arc}; -use core::ops::DerefMut; +use core::{future::Future, ops::DerefMut}; /// An asynchronous mutual exclusion primitive useful for protecting shared data. pub trait Lock { diff --git a/wtx/src/misc/stream.rs b/wtx/src/misc/stream.rs index 55d3cdbc..d9b898e3 100644 --- a/wtx/src/misc/stream.rs +++ b/wtx/src/misc/stream.rs @@ -8,7 +8,7 @@ macro_rules! _local_write_all_vectored { while !$io_slices.is_empty() { match $write_many { Err(e) => return Err(e.into()), - Ok(0) => return Err(crate::Error::UnexpectedStreamEOF), + Ok(0) => return Err(crate::Error::UnexpectedStreamWriteEOF), Ok(n) => crate::misc::stream::advance_slices(&mut &$bytes[..], &mut $io_slices, n), } } diff --git a/wtx/src/misc/stream/stream_reader.rs b/wtx/src/misc/stream/stream_reader.rs index d76a4886..ad0fd2bd 100644 --- a/wtx/src/misc/stream/stream_reader.rs +++ b/wtx/src/misc/stream/stream_reader.rs @@ -1,28 +1,11 @@ +use core::future::Future; + /// A stream of values sent asynchronously. pub trait StreamReader { /// Pulls some bytes from this source into the specified buffer, returning how many bytes /// were read. fn read(&mut self, bytes: &mut [u8]) -> impl Future>; - /// Reads the exact number of bytes required to fill `bytes`. - #[inline] - fn read_exact(&mut self, bytes: &mut [u8]) -> impl Future> { - async move { - let mut idx = 0; - for _ in 0..bytes.len() { - if idx >= bytes.len() { - break; - } - let read = self.read(bytes.get_mut(idx..).unwrap_or_default()).await?; - if read == 0 { - return Err(crate::Error::UnexpectedStreamEOF); - } - idx = idx.wrapping_add(read); - } - Ok(()) - } - } - /// Reads and at the same time discards exactly `len` bytes. #[inline] fn read_skip(&mut self, len: usize) -> impl Future> { @@ -36,7 +19,7 @@ pub trait StreamReader { let slice = if let Some(el) = buffer.get_mut(..counter) { el } else { &mut buffer[..] }; let read = self.read(slice).await?; if read == 0 { - return Err(crate::Error::UnexpectedStreamEOF); + return Err(crate::Error::UnexpectedStreamReadEOF); } counter = counter.wrapping_sub(read); } diff --git a/wtx/src/misc/stream/stream_writer.rs b/wtx/src/misc/stream/stream_writer.rs index e69524e4..6e34fbc4 100644 --- a/wtx/src/misc/stream/stream_writer.rs +++ b/wtx/src/misc/stream/stream_writer.rs @@ -1,3 +1,5 @@ +use core::future::Future; + /// A stream of values written asynchronously. pub trait StreamWriter { /// Attempts to write ***all*** `bytes`. diff --git a/wtx/src/misc/vector.rs b/wtx/src/misc/vector.rs index 092261f8..503fa181 100644 --- a/wtx/src/misc/vector.rs +++ b/wtx/src/misc/vector.rs @@ -27,7 +27,7 @@ pub enum VectorError { /// A wrapper around the std's vector. #[cfg_attr(feature = "test-strategy", derive(test_strategy::Arbitrary))] #[cfg_attr(feature = "test-strategy", arbitrary(bound(D: proptest::arbitrary::Arbitrary + 'static)))] -#[derive(Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq)] pub struct Vector { data: Vec, } diff --git a/wtx/src/pool.rs b/wtx/src/pool.rs index 7e229e96..bc6fd81a 100644 --- a/wtx/src/pool.rs +++ b/wtx/src/pool.rs @@ -4,6 +4,7 @@ mod resource_manager; #[cfg(feature = "std")] mod simple_pool; +use core::future::Future; #[cfg(feature = "postgres")] pub use resource_manager::database::PostgresRM; pub use resource_manager::{ResourceManager, SimpleRM}; diff --git a/wtx/src/pool/resource_manager.rs b/wtx/src/pool/resource_manager.rs index 38ae58f0..2c9b8b60 100644 --- a/wtx/src/pool/resource_manager.rs +++ b/wtx/src/pool/resource_manager.rs @@ -1,3 +1,5 @@ +use core::future::Future; + /// Manager of a specific pool resource. pub trait ResourceManager { /// Auxiliary data used by the [`Self::get`] method. diff --git a/wtx/src/pool/simple_pool.rs b/wtx/src/pool/simple_pool.rs index d453f367..bc223021 100644 --- a/wtx/src/pool/simple_pool.rs +++ b/wtx/src/pool/simple_pool.rs @@ -4,7 +4,7 @@ use crate::{ }; use alloc::{sync::Arc, vec::Vec}; use core::{ - future::poll_fn, + future::{poll_fn, Future}, ops::{Deref, DerefMut}, task::{Poll, Waker}, }; diff --git a/wtx/src/web_socket/handshake.rs b/wtx/src/web_socket/handshake.rs index 1ea7b26a..084b6b47 100644 --- a/wtx/src/web_socket/handshake.rs +++ b/wtx/src/web_socket/handshake.rs @@ -55,7 +55,7 @@ where let read_buffer = nb._following_mut().get_mut(read..).unwrap_or_default(); let local_read = stream.read(read_buffer).await?; if local_read == 0 { - return Err(crate::Error::UnexpectedStreamEOF); + return Err(crate::Error::UnexpectedStreamReadEOF); } read = read.wrapping_add(local_read); let mut req_buffer = [EMPTY_HEADER; MAX_READ_HEADER_LEN]; @@ -138,7 +138,7 @@ where let read_buffer = fb.payload_mut().get_mut(read..).unwrap_or_default(); let local_read = stream.read(read_buffer).await?; if local_read == 0 { - return Err(crate::Error::UnexpectedStreamEOF); + return Err(crate::Error::UnexpectedStreamReadEOF); } read = read.wrapping_add(local_read); match Response::new(&mut local_header).parse(fb.payload())? {