Skip to content

Commit

Permalink
Merge upstream v0.31.2
Browse files Browse the repository at this point in the history
Signed-off-by: Tricster <[email protected]>
  • Loading branch information
MediosZ committed Feb 2, 2023
2 parents 05ca025 + fa7a5a7 commit 11e5fb2
Show file tree
Hide file tree
Showing 23 changed files with 1,103 additions and 174 deletions.
60 changes: 57 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MIT/Apache-2.0"
name = "mysql_async_wasi"
readme = "README.md"
repository = "https://github.com/WasmEdge/mysql_async_wasi"
version = "0.30.1"
version = "0.31.2"
exclude = ["test/*"]
edition = "2018"
categories = ["asynchronous", "database"]
Expand All @@ -20,12 +20,13 @@ futures-core = "0.3"
futures-util = "0.3"
futures-sink = "0.3"
lazy_static = "1"
lru = "0.7.0"
mysql_common = { version = "0.29.0", default-features = false }
lru = "0.8.1"
mysql_common = { version = "0.29.2", default-features = false }
once_cell = "1.7.2"
pem = "1.0.1"
percent-encoding = "2.1.0"
pin-project = "1.0.2"
priority-queue = "1"
serde = "1"
serde_json = "1"
thiserror = "1.0.4"
Expand All @@ -52,6 +53,40 @@ wasmedge_wasi_socket = "0.4.2"
# rand = "0.8.0"

[target.'cfg(target_os="wasi")'.dev-dependencies]
tokio = { version = "1.0", features = ["io-util", "fs", "net", "time", "rt"] }
tokio-util = { version = "0.7.2", features = ["codec", "io"] }


[dependencies.tokio-rustls]
version = "0.23.4"
optional = true

[dependencies.tokio-native-tls]
version = "0.3.0"
optional = true

[dependencies.native-tls]
version = "0.2"
optional = true

[dependencies.rustls]
version = "0.20.0"
features = ["dangerous_configuration"]
optional = true

[dependencies.rustls-pemfile]
version = "1.0.1"
optional = true

[dependencies.webpki]
version = "0.22.0"
optional = true

[dependencies.webpki-roots]
version = "0.22.1"
optional = true

[dev-dependencies]
tempfile = "3.1.0"
tokio_wasi = { version = "1", features = [ "io-util", "fs", "net", "time", "rt", "macros"] }
rand = "0.8.0"
Expand All @@ -63,6 +98,25 @@ default = [
"mysql_common/time03",
"mysql_common/uuid",
"mysql_common/frunk",
# "native-tls-tls",
]
default-rustls = [
"flate2/zlib",
"mysql_common/bigdecimal03",
"mysql_common/rust_decimal",
"mysql_common/time03",
"mysql_common/uuid",
"mysql_common/frunk",
"rustls-tls",
]
minimal = ["flate2/zlib"]
native-tls-tls = ["native-tls", "tokio-native-tls"]
rustls-tls = [
"rustls",
"tokio-rustls",
"webpki",
"webpki-roots",
"rustls-pemfile",
]
nightly = []
zlib = ["flate2/zlib"]
Expand Down
79 changes: 79 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,85 @@ Note: We do not yet support SSL / TLS connections to databases in this WebAssemb
mysql_async_wasi = "<desired version>"
```

## Crate Features

Default feature set is wide – it includes all default [`mysql_common` features][myslqcommonfeatures]
as well as `native-tls`-based TLS support.

### List Of Features

* `minimal` – enables only necessary features (at the moment the only necessary feature
is `flate2` backend). Enables:

- `flate2/zlib"

**Example:**

```toml
[dependencies]
mysql_async = { version = "*", default-features = false, features = ["minimal"]}
```

**Note:* it is possible to use another `flate2` backend by directly choosing it:

```toml
[dependencies]
mysql_async = { version = "*", default-features = false }
flate2 = { version = "*", default-features = false, features = ["rust_backend"] }
```

* `default` – enables the following set of crate's and dependencies' features:

- `native-tls-tls`
- `flate2/zlib"
- `mysql_common/bigdecimal03`
- `mysql_common/rust_decimal`
- `mysql_common/time03`
- `mysql_common/uuid`
- `mysql_common/frunk`

* `default-rustls` – same as default but with `rustls-tls` instead of `native-tls-tls`.

**Example:**

```toml
[dependencies]
mysql_async = { version = "*", default-features = false, features = ["default-rustls"] }
```

* `native-tls-tls` – enables `native-tls`-based TLS support _(conflicts with `rustls-tls`)_

**Example:**

```toml
[dependencies]
mysql_async = { version = "*", default-features = false, features = ["native-tls-tls"] }

* `rustls-tls` – enables `native-tls`-based TLS support _(conflicts with `native-tls-tls`)_

**Example:**

```toml
[dependencies]
mysql_async = { version = "*", default-features = false, features = ["rustls-tls"] }

[myslqcommonfeatures]: https://github.com/blackbeam/rust_mysql_common#crate-features

## TLS/SSL Support

SSL support comes in two flavors:

1. Based on native-tls – this is the default option, that usually works without pitfalls
(see the `native-tls-tls` crate feature).

2. Based on rustls – TLS backend written in Rust (see the `rustls-tls` crate feature).

Please also note a few things about rustls:
- it will fail if you'll try to connect to the server by its IP address,
hostname is required;
- it, most likely, won't work on windows, at least with default server certs,
generated by the MySql installer.

## Example

```rust
Expand Down
55 changes: 40 additions & 15 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use futures_util::FutureExt;
pub use mysql_common::named_params;

use mysql_common::{
constants::{DEFAULT_MAX_ALLOWED_PACKET, UTF8_GENERAL_CI},
constants::DEFAULT_MAX_ALLOWED_PACKET,
crypto,
io::ParseBuf,
packets::{
binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, CommonOkPacket, ErrPacket,
HandshakePacket, HandshakeResponse, OkPacket, OkPacketDeserializer, OldAuthSwitchRequest,
ResultSetTerminator, SslRequest,
OldEofPacket, ResultSetTerminator,
},
proto::MySerialize,
};
Expand Down Expand Up @@ -415,12 +415,14 @@ impl Conn {

/// Returns true if io stream is encrypted.
fn is_secure(&self) -> bool {
#[cfg(not(target_os = "wasi"))]
#[cfg(any(feature = "native-tls-tls", feature = "rustls-tls"))]
if let Some(ref stream) = self.inner.stream {
stream.is_secure()
} else {
false
}

#[cfg(not(any(feature = "native-tls-tls", feature = "rustls-tls")))]
false
}

Expand Down Expand Up @@ -492,10 +494,24 @@ impl Conn {
.get_capabilities()
.contains(CapabilityFlags::CLIENT_SSL)
{
if !self
.inner
.capabilities
.contains(CapabilityFlags::CLIENT_SSL)
{
return Err(DriverError::NoClientSslFlagFromServer.into());
}

let collation = if self.inner.version >= (5, 5, 3) {
UTF8MB4_GENERAL_CI
} else {
UTF8_GENERAL_CI
};

let ssl_request = SslRequest::new(
self.inner.capabilities,
DEFAULT_MAX_ALLOWED_PACKET as u32,
UTF8_GENERAL_CI as u8,
collation as u8,
);
self.write_struct(&ssl_request).await?;
let conn = self;
Expand Down Expand Up @@ -681,9 +697,18 @@ impl Conn {
/// Returns `true` for ProgressReport packet.
fn handle_packet(&mut self, packet: &PooledBuf) -> Result<bool> {
let ok_packet = if self.has_pending_result() {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(self.capabilities())
.map(|x| x.into_inner())
if self
.capabilities()
.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF)
{
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(self.capabilities())
.map(|x| x.into_inner())
} else {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<OldEofPacket>>(self.capabilities())
.map(|x| x.into_inner())
}
} else {
ParseBuf(&*packet)
.parse::<OkPacketDeserializer<CommonOkPacket>>(self.capabilities())
Expand Down Expand Up @@ -1046,7 +1071,7 @@ impl Conn {
mod test {
use bytes::Bytes;
use futures_util::stream::{self, StreamExt};
use mysql_common::binlog::events::EventData;
use mysql_common::{binlog::events::EventData, constants::MAX_PAYLOAD_LEN};
use tokio::time::timeout;

use std::time::Duration;
Expand Down Expand Up @@ -1435,15 +1460,15 @@ mod test {

#[tokio::test]
async fn should_perform_queries() -> super::Result<()> {
let long_string = ::std::iter::repeat('A')
.take(18 * 1024 * 1024)
.collect::<String>();
let mut conn = Conn::new(get_opts()).await?;
let result: Vec<(String, u8)> = conn
.query(format!(r"SELECT '{}', 231", long_string))
.await?;
for x in (MAX_PAYLOAD_LEN - 2)..=(MAX_PAYLOAD_LEN + 2) {
let long_string = ::std::iter::repeat('A').take(x).collect::<String>();
let result: Vec<(String, u8)> = conn
.query(format!(r"SELECT '{}', 231", long_string))
.await?;
assert_eq!((long_string, 231_u8), result[0]);
}
conn.disconnect().await?;
assert_eq!((long_string, 231_u8), result[0]);
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions src/conn/pool/futures/disconnect_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures_core::ready;
use tokio::sync::mpsc::UnboundedSender;

use crate::{
conn::pool::{Inner, Pool},
conn::pool::{Inner, Pool, QUEUE_END_ID},
error::Error,
Conn,
};
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Future for DisconnectPool {
self.pool_inner.close.store(true, atomic::Ordering::Release);
let mut exchange = self.pool_inner.exchange.lock().unwrap();
exchange.spawn_futures_if_needed(&self.pool_inner);
exchange.waiting.push_back(cx.waker().clone());
exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
drop(exchange);

if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
Expand Down
47 changes: 30 additions & 17 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
use futures_core::ready;

use crate::{
conn::{pool::Pool, Conn},
conn::{
pool::{Pool, QueueId},
Conn,
},
error::*,
};

Expand Down Expand Up @@ -58,13 +61,15 @@ impl GetConnInner {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct GetConn {
pub(crate) queue_id: Option<QueueId>,
pub(crate) pool: Option<Pool>,
pub(crate) inner: GetConnInner,
}

impl GetConn {
pub(crate) fn new(pool: &Pool) -> GetConn {
GetConn {
queue_id: None,
pool: Some(pool.clone()),
inner: GetConnInner::New,
}
Expand All @@ -91,23 +96,26 @@ impl Future for GetConn {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.inner {
GetConnInner::New => match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx))?
.inner
.take()
{
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
}
GetConnInner::Checking(conn_fut) => {
self.inner = GetConnInner::Checking(conn_fut);
}
GetConnInner::Done => unreachable!(
"Pool::poll_new_conn never gives out already-consumed GetConns"
),
GetConnInner::New => {
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
GetConnInner::New => {
let queued = self.queue_id.is_some();
let queue_id = *self.queue_id.get_or_insert_with(QueueId::next);
let next =
ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?;
match next {
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
}
GetConnInner::Checking(conn_fut) => {
self.inner = GetConnInner::Checking(conn_fut);
}
GetConnInner::Done => unreachable!(
"Pool::poll_new_conn never gives out already-consumed GetConns"
),
GetConnInner::New => {
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
}
}
},
}
GetConnInner::Done => {
unreachable!("GetConn::poll polled after returning Async::Ready");
}
Expand Down Expand Up @@ -158,6 +166,11 @@ impl Drop for GetConn {
// We drop a connection before it can be resolved, a.k.a. cancelling it.
// Make sure we maintain the necessary invariants towards the pool.
if let Some(pool) = self.pool.take() {
// Remove the waker from the pool's waitlist in case this task was
// woken by another waker, like from tokio::time::timeout.
if let Some(queue_id) = self.queue_id {
pool.unqueue(queue_id);
}
if let GetConnInner::Connecting(..) = self.inner.take() {
pool.cancel_connection();
}
Expand Down
Loading

0 comments on commit 11e5fb2

Please sign in to comment.