Skip to content

Commit

Permalink
Merge pull request #225 from c410-f3r/misc
Browse files Browse the repository at this point in the history
Infer port from URI
  • Loading branch information
c410-f3r authored Sep 16, 2024
2 parents a68add0 + 7738f3d commit a159fb7
Show file tree
Hide file tree
Showing 53 changed files with 544 additions and 386 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Anything marked with `#[bench]` in the repository is considered a low-level benc

Take a look at <https://bencher.dev/perf/wtx> to see all low-level benchmarks over different periods of time.

## Examples

Demonstrations of different use-cases can be found in the `wtx-instances` directory as well as in the documentation.

## Limitations

Does not support systems with 16bit memory addresses and expects the infallible addition of the sizes of 8 allocated chunks of memories, otherwise the program will overflow in certain arithmetic operations involving `usize` potentially resulting in unexpected operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use wtx::database::{

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE";
let uri = "postgres://USER:PASSWORD@localhost/DATABASE";
let mut executor = wtx_instances::executor(&uri).await?;
let _ = executor
.execute_with_stmt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use wtx::database::{

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE";
let uri = "postgres://USER:PASSWORD@localhost/DATABASE";
let mut executor = wtx_instances::executor(&uri).await?;
let _ = executor
.execute_with_stmt("INSERT INTO custom_enum_table VALUES ($1, $2)", (1, Enum::Bar))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use wtx::database::{Executor as _, Record, Records, TransactionManager};

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = "postgres://USER:PASSWORD@localhost:5432/DATABASE";
let uri = "postgres://USER:PASSWORD@localhost/DATABASE";
let mut executor = wtx_instances::executor(&uri).await?;
let mut tm = executor.transaction().await?;
tm.executor().execute("CREATE TABLE IF NOT EXISTS example(id INT, name VARCHAR)", |_| {}).await?;
Expand Down
6 changes: 3 additions & 3 deletions wtx-instances/generic-examples/client-api-framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn http_pair(
rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
},
SerdeJson,
HttpParams::from_uri("ws://generic_web_socket_uri.com:80".into()),
HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
),
ClientFrameworkTokio::tokio(1).build(),
)
Expand All @@ -101,14 +101,14 @@ async fn web_socket_pair() -> wtx::Result<
>,
> {
let mut fb = FrameBufferVec::default();
let uri: Uri<&str> = Uri::new("ws://generic_web_socket_uri.com:80");
let uri = Uri::new("ws://generic_web_socket_uri.com");
let web_socket = WebSocketClient::connect(
(),
&mut fb,
[],
&mut HeadersBuffer::default(),
NoStdRng::default(),
TcpStream::connect(uri.host()).await?,
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
WebSocketBuffer::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/generic-examples/http-client-framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use wtx::{

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = Uri::new("http://www.example.com:80");
let uri = Uri::new("http://www.example.com");
let buffer = ReqResBuffer::default();
let client = ClientFramework::tokio(1).build();
let res = client.send(Method::Get, buffer, &uri.to_ref()).await?;
Expand Down
19 changes: 7 additions & 12 deletions wtx-instances/generic-examples/http2-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,25 @@ use tokio::net::TcpStream;
use wtx::{
http::{Method, ReqResBuffer, Request},
http2::{Http2Buffer, Http2ErrorCode, Http2Params, Http2Tokio},
misc::{from_utf8_basic, Either, NoStdRng, Uri},
misc::{from_utf8_basic, NoStdRng, Uri},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = Uri::new("http://www.example.com:80");
let uri = Uri::new("http://www.example.com");
let (frame_reader, mut http2) = Http2Tokio::connect(
Http2Buffer::new(NoStdRng::default()),
Http2Params::default(),
TcpStream::connect(uri.host()).await?.into_split(),
TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(),
)
.await?;
let _jh = tokio::spawn(async move {
if let Err(err) = frame_reader.await {
eprintln!("{err}");
}
});
let _jh = tokio::spawn(frame_reader);
let rrb = ReqResBuffer::default();
let mut stream = http2.stream().await?;
stream.send_req(Request::http2(Method::Get, b"Hello!"), &uri.to_ref()).await?;
let Either::Right(res) = stream.recv_res(rrb).await? else {
return Err(wtx::Error::ClosedConnection);
};
println!("{}", from_utf8_basic(&res.0.data)?);
let (res_rrb, opt) = stream.recv_res(rrb).await?;
let _status_code = opt.unwrap();
println!("{}", from_utf8_basic(&res_rrb.data)?);
http2.send_go_away(Http2ErrorCode::NoError).await;
Ok(())
}
4 changes: 2 additions & 2 deletions wtx-instances/generic-examples/web-socket-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use wtx::{

#[tokio::main]
async fn main() -> wtx::Result<()> {
let uri = Uri::new("ws://www.example.com:80");
let uri = Uri::new("ws://www.example.com");
let fb = &mut FrameBufferVec::default();
let (_, mut ws) = WebSocketClient::connect(
(),
fb,
[],
&mut HeadersBuffer::default(),
StdRng::default(),
TcpStream::connect(uri.host()).await?,
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri.to_ref(),
WebSocketBuffer::default(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> wtx::Result<()> {
(SessionDecoder::new(), SessionEnforcer::new([LOGIN, LOGOUT])),
(),
)?;
let pool = Pool::new(4, PostgresRM::tokio("postgres://USER:PASSWORD@localhost:5432/DB_NAME"));
let pool = Pool::new(4, PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME"));
let mut rng = StdRng::default();
let mut key = [0; 16];
rng.fill_slice(&mut key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> wtx::Result<()> {
)?,
),
))?;
let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost:5432/DB_NAME");
let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME");
let pool = Pool::new(4, rm);
ServerFrameworkBuilder::new(router)
.with_req_aux(move || pool.clone())
Expand Down
2 changes: 1 addition & 1 deletion wtx-instances/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn executor(
&Config::from_uri(&uri)?,
ExecutorBuffer::with_default_params(&mut rng)?,
&mut rng,
TcpStream::connect(uri.host()).await?,
TcpStream::connect(uri.hostname_with_implied_port()).await?,
)
.await
}
Expand Down
4 changes: 2 additions & 2 deletions wtx-ui/src/schema_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ pub(crate) async fn schema_manager(sm: SchemaManager) -> wtx::Result<()> {

let var = std::env::var(DEFAULT_URI_VAR)?;
let uri = UriRef::new(&var);
match uri.schema() {
match uri.scheme() {
"postgres" | "postgresql" => {
let mut rng = StdRng::default();
let executor = Executor::connect(
&Config::from_uri(&uri)?,
ExecutorBuffer::with_default_params(&mut rng)?,
&mut rng,
TcpStream::connect(uri.host()).await.map_err(wtx::Error::from)?,
TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(wtx::Error::from)?,
)
.await?;
handle_commands(executor, &sm).await?;
Expand Down
4 changes: 2 additions & 2 deletions wtx-ui/src/web_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) async fn connect(uri: &str, cb: impl Fn(&str)) -> wtx::Result<()> {
[],
&mut HeadersBuffer::default(),
StdRng::default(),
TcpStream::connect(uri.host()).await?,
TcpStream::connect(uri.hostname_with_implied_port()).await?,
&uri,
wsb,
)
Expand Down Expand Up @@ -53,7 +53,7 @@ pub(crate) async fn serve(
str: fn(&str),
) -> wtx::Result<()> {
let uri = UriRef::new(uri);
let listener = TcpListener::bind(uri.host()).await?;
let listener = TcpListener::bind(uri.hostname_with_implied_port()).await?;
loop {
let (stream, _) = listener.accept().await?;
let _jh = tokio::spawn(async move {
Expand Down
20 changes: 9 additions & 11 deletions wtx/src/client_api_framework/network/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
Api,
},
data_transformation::dnsn::{Deserialize, Serialize},
misc::Lease,
misc::{Lease, Vector},
};
pub use bi_transport::*;
use core::{future::Future, ops::Range};
Expand Down Expand Up @@ -69,14 +69,10 @@ pub trait Transport<DRSR> {
#[inline]
fn send_recv_decode_batch<'pkgs, 'pkgs_aux, A, P>(
&mut self,
buffer: &mut Vector<P::ExternalResponseContent<'pkgs_aux>>,
pkgs: &'pkgs mut [P],
pkgs_aux: &'pkgs_aux mut PkgsAux<A, DRSR, Self::Params>,
) -> impl Future<
Output = Result<
impl Iterator<Item = crate::Result<P::ExternalResponseContent<'pkgs_aux>>>,
A::Error,
>,
>
) -> impl Future<Output = Result<(), A::Error>>
where
A: Api,
P: Package<A, DRSR, Self::Params>,
Expand All @@ -85,10 +81,12 @@ pub trait Transport<DRSR> {
async {
let range = self.send_recv(&mut BatchPkg::new(pkgs), pkgs_aux).await?;
log_res(pkgs_aux.byte_buffer.lease());
Ok(P::ExternalResponseContent::seq_from_bytes(
P::ExternalResponseContent::seq_from_bytes(
buffer,
pkgs_aux.byte_buffer.get(range).unwrap_or_default(),
&mut pkgs_aux.drsr,
))
)?;
Ok(())
}
}

Expand Down Expand Up @@ -217,8 +215,8 @@ mod tests {
}

#[inline]
fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator<Item = crate::Result<Self>> {
[].into_iter()
fn seq_from_bytes(_: &mut Vector<Self>, _: &'de [u8], _: &mut DRSR) -> crate::Result<()> {
Ok(())
}
}
}
4 changes: 2 additions & 2 deletions wtx/src/client_api_framework/network/transport/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ mod tests {
let uri_client = _uri();
let uri_server = uri_client.to_string();
let _server = tokio::spawn(async move {
let tcp_listener = TcpListener::bind(uri_server.host()).unwrap();
let tcp_listener = TcpListener::bind(uri_server.hostname_with_implied_port()).unwrap();
let mut buffer = [0; 8];
let (mut stream, _) = tcp_listener.accept().unwrap();
let idx = stream.read(&mut buffer).unwrap();
stream.write_all(&buffer[..idx]).unwrap();
});
sleep(Duration::from_millis(100)).await.unwrap();
let mut pa = PkgsAux::from_minimum((), (), TcpParams::from_uri(uri_client.as_str()));
let mut trans = TcpStream::connect(uri_client.host()).unwrap();
let mut trans = TcpStream::connect(uri_client.hostname_with_implied_port()).unwrap();
let res = trans.send_recv_decode_contained(&mut _PingPong(_Ping, ()), &mut pa).await.unwrap();
assert_eq!(res, _Pong("pong"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use core::fmt::Debug;

/// Additional information or metadata received or transmitted by a transport.
pub trait TransportParams {
/// For example, HTTP has request headers.
type ExternalRequestParams: Debug;
type ExternalRequestParams;
/// For example, HTTP has response headers.
type ExternalResponseParams: Debug;
type ExternalResponseParams;

/// External Request Parameters.
fn ext_req_params(&self) -> &Self::ExternalRequestParams;
Expand Down
13 changes: 9 additions & 4 deletions wtx/src/data_transformation/dnsn/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::misc::Vector;

/// Marker trait that has different bounds according to the given set of enabled deserializers.
pub trait Deserialize<'de, DRSR>
where
Expand All @@ -7,8 +9,11 @@ where
fn from_bytes(bytes: &'de [u8], drsr: &mut DRSR) -> crate::Result<Self>;

/// Similar to [`Self::from_bytes`] but deals with sequences instead of a single element.
fn seq_from_bytes(bytes: &'de [u8], drsr: &mut DRSR)
-> impl Iterator<Item = crate::Result<Self>>;
fn seq_from_bytes(
buffer: &mut Vector<Self>,
bytes: &'de [u8],
drsr: &mut DRSR,
) -> crate::Result<()>;
}

impl<'de, DRSR> Deserialize<'de, DRSR> for () {
Expand All @@ -18,7 +23,7 @@ impl<'de, DRSR> Deserialize<'de, DRSR> for () {
}

#[inline]
fn seq_from_bytes(_: &'de [u8], _: &mut DRSR) -> impl Iterator<Item = crate::Result<Self>> {
[].into_iter()
fn seq_from_bytes(_: &mut Vector<Self>, _: &'de [u8], _: &mut DRSR) -> crate::Result<()> {
Ok(())
}
}
1 change: 1 addition & 0 deletions wtx/src/data_transformation/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
mod graph_ql;
mod json_rpc;
mod misc;
mod verbatim;

pub use graph_ql::*;
Expand Down
20 changes: 12 additions & 8 deletions wtx/src/data_transformation/format/graph_ql/graph_ql_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,19 @@ mod serde {
#[cfg(feature = "serde_json")]
mod serde_json {
use crate::{
data_transformation::{dnsn::SerdeJson, format::GraphQlResponse},
data_transformation::{
dnsn::SerdeJson,
format::{misc::collect_using_serde_json, GraphQlResponse},
},
misc::Vector,
};
use serde_json::{de::SliceRead, StreamDeserializer};
use serde::{Deserialize, Serialize};

impl<'de, D, E> crate::data_transformation::dnsn::Deserialize<'de, SerdeJson>
for GraphQlResponse<D, E>
where
D: serde::Deserialize<'de>,
E: serde::Deserialize<'de>,
D: Deserialize<'de>,
E: Deserialize<'de>,
{
#[inline]
fn from_bytes(bytes: &'de [u8], _: &mut SerdeJson) -> crate::Result<Self> {
Expand All @@ -136,17 +139,18 @@ mod serde_json {

#[inline]
fn seq_from_bytes(
buffer: &mut Vector<Self>,
bytes: &'de [u8],
_: &mut SerdeJson,
) -> impl Iterator<Item = crate::Result<Self>> {
StreamDeserializer::new(SliceRead::new(bytes)).map(|el| el.map_err(From::from))
) -> crate::Result<()> {
collect_using_serde_json(buffer, bytes)
}
}

impl<D, E> crate::data_transformation::dnsn::Serialize<SerdeJson> for GraphQlResponse<D, E>
where
D: serde::Serialize,
E: serde::Serialize,
D: Serialize,
E: Serialize,
{
#[inline]
fn to_bytes(&mut self, bytes: &mut Vector<u8>, _: &mut SerdeJson) -> crate::Result<()> {
Expand Down
Loading

0 comments on commit a159fb7

Please sign in to comment.