Skip to content

Commit

Permalink
Fix some issues
Browse files Browse the repository at this point in the history
  • Loading branch information
c410-f3r committed Dec 16, 2023
1 parent a1a8299 commit 5e1cf81
Show file tree
Hide file tree
Showing 36 changed files with 419 additions and 258 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PostgreSQL is currently the only supported database and more SQL or NoSQL varian

Activation feature is called `postgres`.

![PostgreSQL Benchmark](https://i.imgur.com/0qhYBBs.jpg)
![PostgreSQL Benchmark](https://private-user-images.githubusercontent.com/1674512/290770284-25832439-4ff2-498e-abc1-f8c19d7f7100.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI2NDcwNzQsIm5iZiI6MTcwMjY0Njc3NCwicGF0aCI6Ii8xNjc0NTEyLzI5MDc3MDI4NC0yNTgzMjQzOS00ZmYyLTQ5OGUtYWJjMS1mOGMxOWQ3ZjcxMDAucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQUlXTkpZQVg0Q1NWRUg1M0ElMkYyMDIzMTIxNSUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyMzEyMTVUMTMyNjE0WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9ZTU5MjdjMzk3N2MxNTljYjNiOWYwY2Q0YTFjMDM5ZWZjNWIwY2UxZDM1ZGI1NDAwNWM0MzYwZTI5ODYxOTA1OCZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QmYWN0b3JfaWQ9MCZrZXlfaWQ9MCZyZXBvX2lkPTAifQ.BdeAsDqUBzrTtiT2PIWz2WP2is6rhlC_F31MVIR5d34)

```rust
#[cfg(feature = "postgres")]
Expand Down
4 changes: 2 additions & 2 deletions wtx-bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[dependencies]
diesel = { default-features = false, version = "2.0" }
diesel-async = { default-features = false, features = ["postgres"], version = "0.4" }
futures = { default-features = false, version = "0.3" }
plotters = { default-features = false, features = ["histogram", "svg_backend"], version = "0.3" }
sqlx = { default-features = false, features = ["postgres", "runtime-tokio"], version = "0.7" }
tokio = { default-features = false, features = ["macros", "rt-multi-thread"], version = "1.0" }
tokio-postgres = { default-features = false, features = ["runtime"], version = "0.7" }
diesel-async = { default-features = false, features = ["postgres"], version = "0.4.1" }
diesel = { default-features = false, version = "2.1" }
wtx = { default-features = false, features = ["atoi", "postgres", "simdutf8", "tokio", "web-socket-handshake"], path = "../wtx" }

[package]
Expand Down
12 changes: 6 additions & 6 deletions wtx-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ async fn main() {
[first, second, rest @ ..] => match first.as_str() {
"postgres" => {
let up = UriPartsRef::new(second.as_str());
let mut diesel_async = misc::Agent { name: "diesel-async".to_owned(), result: 0 };
let mut sqlx_postgres = misc::Agent { name: "sqlx-postgres-tokio".to_owned(), result: 0 };
let mut tokio_postgres = misc::Agent { name: "tokio-postgres".to_owned(), result: 0 };
let mut wtx = misc::Agent { name: "wtx-tokio".to_owned(), result: 0 };
let mut diesel_async = misc::Agent { name: "diesel_async".to_owned(), result: 0 };
postgres::bench(
&up,
[&mut sqlx_postgres, &mut tokio_postgres, &mut wtx, &mut diesel_async],
[&mut diesel_async, &mut sqlx_postgres, &mut tokio_postgres, &mut wtx],
)
.await;
misc::flush(
misc::plot(
&[sqlx_postgres, tokio_postgres, wtx, diesel_async],
&postgres::caption(),
"/tmp/wtx-postgres.png",
Expand All @@ -49,15 +49,15 @@ async fn main() {
web_socket::bench(up.authority(), &mut agent, uri).await;
agents.push(agent);
}
misc::flush(&agents, &web_socket::caption(), "/tmp/wtx-web-socket.png");
misc::plot(&agents, &web_socket::caption(), "/tmp/wtx-web-socket.png");
}
_ => {
panic!("Unknown benchmark target");
}
},
_ => {
panic!("Please provide a valid benchmark target");
panic!("Unknown benchmark target");
}
}
println!("Finished");
println!("Finished!");
}
2 changes: 1 addition & 1 deletion wtx-bench/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct Agent {
pub(crate) name: String,
}

pub(crate) fn flush(agents: &[Agent], caption: &str, output: &str) {
pub(crate) fn plot(agents: &[Agent], caption: &str, output: &str) {
if agents.is_empty() {
return;
}
Expand Down
144 changes: 72 additions & 72 deletions wtx-bench/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,68 @@ const QUERIES: usize = 1024;

pub(crate) async fn bench(
up: &UriPartsRef<'_>,
[sqlx_postgres, tokio_postgres, wtx, diesel_async]: [&mut Agent; 4],
[diesel_async, sqlx_postgres, tokio_postgres, wtx]: [&mut Agent; 4],
) {
populate_db(&mut StdRng::default(), up).await;
bench_diesel_async(diesel_async, up).await;
bench_sqlx_postgres(sqlx_postgres, up).await;
bench_tokio_postgres(tokio_postgres, up).await;
bench_wtx(wtx, up).await;
bench_diesel_async(diesel_async, up).await;
}

pub(crate) fn caption() -> String {
format!(
"{CONNECTIONS} connection(s) retrieving {QUERIES} sequential queries of {DATA_LEN} byte(s)"
)
}

#[allow(clippy::single_char_lifetime_names, unused_qualifications, clippy::shadow_unrelated)]
async fn bench_diesel_async(agent: &mut Agent, up: &UriPartsRef<'_>) {
use diesel::prelude::*;
use diesel_async::RunQueryDsl;

table! {
foo(bar, baz) {
bar -> Text,
baz -> Text,
}
}

let instant = Instant::now();
let mut set = JoinSet::new();
for _ in 0..CONNECTIONS {
let _handle = set.spawn({
let local_up = up.clone().into_string();
async move {
let (client, conn) = tokio_postgres::Config::new()
.dbname(local_up.path().get(1..).unwrap())
.host(local_up.hostname())
.password(local_up.password())
.port(local_up.port().parse().unwrap())
.user(local_up.user())
.connect(NoTls)
.await
.unwrap();
let _handle = tokio::spawn(async move {
if let Err(e) = conn.await {
println!("Error: {e}");
}
});
let mut pg_conn = AsyncPgConnection::try_from(client).await.unwrap();
for _ in 0..QUERIES {
let records = foo::table.load::<(String, String)>(&mut pg_conn).await.unwrap();
assert!(!records[0].0.is_empty());
assert!(!records[0].1.is_empty());
assert!(!records[1].0.is_empty());
assert!(!records[1].1.is_empty());
}
}
});
}
while let Some(rslt) = set.join_next().await {
rslt.unwrap();
}
agent.result = instant.elapsed().as_millis();
}

async fn bench_sqlx_postgres(agent: &mut Agent, up: &UriPartsRef<'_>) {
Expand All @@ -49,10 +104,10 @@ async fn bench_sqlx_postgres(agent: &mut Agent, up: &UriPartsRef<'_>) {
Either::Right(row) => rows.push(row),
}
}
assert!(!rows[0].get::<&str, _>("bar").is_empty());
assert!(!rows[0].get::<&str, _>("baz").is_empty());
assert!(!rows[1].get::<&str, _>("bar").is_empty());
assert!(!rows[1].get::<&str, _>("baz").is_empty());
assert!(!rows[0].get::<&str, _>(0).is_empty());
assert!(!rows[0].get::<&str, _>(1).is_empty());
assert!(!rows[1].get::<&str, _>(0).is_empty());
assert!(!rows[1].get::<&str, _>(1).is_empty());
}
}
});
Expand All @@ -71,11 +126,11 @@ async fn bench_tokio_postgres(agent: &mut Agent, up: &UriPartsRef<'_>) {
let local_up = up.clone().into_string();
async move {
let (client, conn) = tokio_postgres::Config::new()
.dbname(local_up.path().get(1..).unwrap())
.host(local_up.hostname())
.user(local_up.user())
.port(local_up.port().parse().unwrap())
.password(local_up.password())
.dbname(local_up.path().get(1..).unwrap())
.port(local_up.port().parse().unwrap())
.user(local_up.user())
.connect(NoTls)
.await
.unwrap();
Expand All @@ -87,10 +142,10 @@ async fn bench_tokio_postgres(agent: &mut Agent, up: &UriPartsRef<'_>) {
let p = client.prepare("SELECT * FROM foo").await.unwrap();
for _ in 0..QUERIES {
let rows = client.query(&p, &[]).await.unwrap();
assert!(!rows[0].get::<_, &str>("bar").is_empty());
assert!(!rows[0].get::<_, &str>("baz").is_empty());
assert!(!rows[1].get::<_, &str>("bar").is_empty());
assert!(!rows[1].get::<_, &str>("baz").is_empty());
assert!(!rows[0].get::<_, &str>(0).is_empty());
assert!(!rows[0].get::<_, &str>(1).is_empty());
assert!(!rows[1].get::<_, &str>(0).is_empty());
assert!(!rows[1].get::<_, &str>(1).is_empty());
}
}
});
Expand All @@ -111,10 +166,10 @@ async fn bench_wtx(agent: &mut Agent, up: &UriPartsRef<'_>) {
let mut executor = wtx_executor(&mut StdRng::default(), &local_up.as_ref()).await;
for _ in 0..QUERIES {
let records = executor.records("SELECT * FROM foo", (), |_| Ok(())).await.unwrap();
assert!(!records.record(0).unwrap().decode::<_, &str>("bar").unwrap().is_empty());
assert!(!records.record(0).unwrap().decode::<_, &str>("baz").unwrap().is_empty());
assert!(!records.record(1).unwrap().decode::<_, &str>("bar").unwrap().is_empty());
assert!(!records.record(1).unwrap().decode::<_, &str>("baz").unwrap().is_empty());
assert!(!records.record(0).unwrap().decode::<_, &str>(0).unwrap().is_empty());
assert!(!records.record(0).unwrap().decode::<_, &str>(1).unwrap().is_empty());
assert!(!records.record(1).unwrap().decode::<_, &str>(0).unwrap().is_empty());
assert!(!records.record(1).unwrap().decode::<_, &str>(1).unwrap().is_empty());
}
}
});
Expand All @@ -125,61 +180,6 @@ async fn bench_wtx(agent: &mut Agent, up: &UriPartsRef<'_>) {
agent.result = instant.elapsed().as_millis();
}

#[allow(clippy::single_char_lifetime_names, unused_qualifications, clippy::shadow_unrelated)]
async fn bench_diesel_async(agent: &mut Agent, up: &UriPartsRef<'_>) {
use diesel::prelude::*;
use diesel_async::RunQueryDsl;

table! {
foo(bar, baz) {
bar -> Text,
baz -> Text,
}
}

let instant = Instant::now();
let mut set = JoinSet::new();
for _ in 0..CONNECTIONS {
let _handle = set.spawn({
let local_up = up.clone().into_string();
async move {
let (client, conn) = tokio_postgres::Config::new()
.host(local_up.hostname())
.user(local_up.user())
.port(local_up.port().parse().unwrap())
.password(local_up.password())
.dbname(local_up.path().get(1..).unwrap())
.connect(NoTls)
.await
.unwrap();
let _handle = tokio::spawn(async move {
if let Err(e) = conn.await {
println!("Error: {e}");
}
});
let mut conn = AsyncPgConnection::try_from(client).await.unwrap();
for _ in 0..QUERIES {
let records = foo::table.load::<(String, String)>(&mut conn).await.unwrap();
assert!(!records[0].0.is_empty());
assert!(!records[0].1.is_empty());
assert!(!records[1].0.is_empty());
assert!(!records[1].1.is_empty());
}
}
});
}
while let Some(rslt) = set.join_next().await {
rslt.unwrap();
}
agent.result = instant.elapsed().as_millis();
}

pub(crate) fn caption() -> String {
format!(
"{CONNECTIONS} connection(s) retrieving {QUERIES} sequential queries of {DATA_LEN} byte(s)"
)
}

fn fill_and_split_data<'data>(
data: &'data mut String,
rng: &mut StdRng,
Expand Down
10 changes: 5 additions & 5 deletions wtx-macros/src/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ pub(crate) fn api_types(
let generic_pair_ident = create_ident(&mut buffer, ["Pair"]);
let generic_pair_tt = quote::quote_spanned!(api_ident.span() =>
#[allow(unused_qualifications)]
#[doc = concat!("[wtx::misc::Pair] with [", stringify!(#api_ident), "] as the API.")]
pub type #generic_pair_ident<DRSR, T> = wtx::misc::Pair<
#pkgs_aux_path<#api_ident, DRSR, <T as wtx::network::transport::Transport<DRSR>>::Params>,
#[doc = concat!("[wtx::client_api_framework::misc::Pair] with [", stringify!(#api_ident), "] as the API.")]
pub type #generic_pair_ident<DRSR, T> = wtx::client_api_framework::misc::Pair<
#pkgs_aux_path<#api_ident, DRSR, <T as wtx::client_api_framework::network::transport::Transport<DRSR>>::Params>,
T
>;
);
Expand Down Expand Up @@ -91,11 +91,11 @@ pub(crate) fn api_types(
#[doc = concat!(
"[", stringify!(#pkgs_aux_path), "] with [",
stringify!(#api_ident),
"] as the API and [wtx::network::",
"] as the API and [wtx::client_api_framework::network::",
stringify!(#local_tp_ident),
"] as the transport parameters."
)]
pub type #local_ty_ident<DRSR> = #pkgs_aux_path<#api_ident, DRSR, wtx::network::#local_tp_ident>;
pub type #local_ty_ident<DRSR> = #pkgs_aux_path<#api_ident, DRSR, wtx::client_api_framework::network::#local_tp_ident>;
));
}

Expand Down
6 changes: 3 additions & 3 deletions wtx-macros/src/pkg/data_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ impl DataFormat {
macro_rules! http_method_and_mime_type {
($method:ident, $mime_type:ident) => {
quote::quote!(
_ext_req_params.method = wtx::network::HttpMethod::$method;
_ext_req_params.mime_type = Some(wtx::network::HttpMimeType::$mime_type);
_ext_req_params.method = wtx::client_api_framework::network::HttpMethod::$method;
_ext_req_params.mime_type = Some(wtx::client_api_framework::network::HttpMimeType::$mime_type);
)
};
}
macro_rules! http_mime_type {
($mime_type:ident) => {
quote::quote!(
_ext_req_params.mime_type = Some(wtx::network::HttpMimeType::$mime_type);
_ext_req_params.mime_type = Some(wtx::client_api_framework::network::HttpMimeType::$mime_type);
)
};
}
Expand Down
2 changes: 1 addition & 1 deletion wtx-macros/src/pkg/sir/sir_final_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<'attrs, 'module, 'others>
wtx::client_api_framework::data_format::#dfe_ext_res_ctnt_wrapper<
#res_ident
>: wtx::client_api_framework::dnsn::Deserialize<DRSR>,
DRSR: wtx::client_api_framework::misc::AsyncBounds,
DRSR: wtx::misc::AsyncBounds,
{
type Api = #api;
type Error = #error;
Expand Down
2 changes: 1 addition & 1 deletion wtx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ simdutf8 = { default-features = false, features = ["aarch64_neon"], optional = t
smallvec = { default-features = false, features = ["const_generics", "union"], optional = true, version = "1.0" }
smol = { default-features = false, optional = true, version = "1.0" }
test-strategy = { default-features = false, optional = true, version = "0.3" }
tokio = { default-features = false, features = ["io-util", "net"], optional = true, version = "1.0" }
tokio = { default-features = false, features = ["io-util", "net", "time"], optional = true, version = "1.0" }
tokio-rustls = { default-features = false, features = ["ring"], optional = true, version = "0.25" }
tracing = { default-features = false, features = ["attributes"], optional = true, version = "0.1" }
tracing-subscriber = { default-features = false, features = ["env-filter", "fmt"], optional = true, version = "0.3" }
Expand Down
2 changes: 1 addition & 1 deletion wtx/examples/web-socket-client-cli-raw-tokio-rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ async fn main() {
compression: (),
fb,
headers_buffer: &mut <_>::default(),
wsb: WebSocketBuffer::default(),
rng: StdRng::default(),
stream: tls_stream::_tls_stream_host(uri_parts.host(), uri_parts.hostname()).await,
uri: &uri,
wsb: WebSocketBuffer::default(),
}
.connect()
.await
Expand Down
1 change: 1 addition & 0 deletions wtx/src/client_api_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod dnsn;
pub mod misc;
pub mod network;
pub mod pkg;
mod tests;

pub use api::Api;

Expand Down
Loading

0 comments on commit 5e1cf81

Please sign in to comment.