Skip to content

Commit

Permalink
7.1.0 (#187)
Browse files Browse the repository at this point in the history
* fix: don't panic when jitter is 0
* fix: support percent encoding in urls
* feat: add from tuple for RedisValue / MultipleKeys
* feat: make CLIENT ID checks optional

---------

Co-authored-by: 3Ti <[email protected]>
Co-authored-by: dupu <[email protected]>
  • Loading branch information
3 people committed Dec 13, 2023
1 parent 9bbb922 commit 31f8f56
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 114 deletions.
24 changes: 15 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "7.0.0"
version = "7.1.0"
authors = ["Alec Embke <[email protected]>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down Expand Up @@ -38,23 +38,24 @@ features = [
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
arc-swap = "1.5"
tokio = { version = "1.19.0", features = ["net", "sync", "rt", "rt-multi-thread", "macros"] }
tokio-util = { version = "0.7.1", features = ["codec"] }
bytes = "1.1"
arc-swap = "1.6"
tokio = { version = "1.34", features = ["net", "sync", "rt", "rt-multi-thread", "macros"] }
tokio-util = { version = "0.7", features = ["codec"] }
bytes = "1.5"
bytes-utils = "0.1"
futures = "0.3"
futures = { version = "0.3", features = ["std"] }
parking_lot = "0.12"
lazy_static = "1.4"
redis-protocol = { version = "4.1", features = ["decode-mut"] }
log = "0.4"
float-cmp = "0.9"
url = "2.3"
tokio-stream = "0.1.1"
url = "2.4"
tokio-stream = "0.1"
sha-1 = { version = "0.10", optional = true }
rand = "0.8"
semver = "1.0"
socket2 = "0.5"
urlencoding = "2.1"
async-trait = { version = "0.1" }
rustls = { version = "0.21", optional = true }
native-tls = { version = "0.2", optional = true }
Expand All @@ -72,8 +73,9 @@ trust-dns-resolver = { version = "0.23", optional = true }
base64 = "0.21"
subprocess = "0.2"
pretty_env_logger = "0.5"
bollard = "0.14"
bollard = "0.15"
serde = "1.0"
tokio-stream = { version = "0.1", features = ["sync"] }

[lib]
doc = true
Expand Down Expand Up @@ -108,6 +110,10 @@ required-features = ["client-tracking"]
name = "lua"
required-features = ["sha-1"]

[[example]]
name = "events"
required-features = ["tokio-stream/sync"]

[features]
default = ["ignore-auth-error", "pool-prefer-active"]
serde-json = ["serde_json"]
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Examples
* [Custom](./custom.rs) - Send custom commands or operate on RESP frames.
* [DNS](./dns.rs) - Customize the DNS resolution logic.
* [Client Tracking](./client_tracking.rs) - Implement [client side caching](https://redis.io/docs/manual/client-side-caching/).
* [Events](./events.rs) - Respond to connection events with the `EventsInterface`.
* [Keyspace Notifications](./keyspace.rs) - Use the [keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/) interface.
* [Misc](./misc.rs) - Miscellaneous or advanced features.

Or see the [tests](../tests/integration) for more examples.
106 changes: 106 additions & 0 deletions examples/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#![allow(clippy::disallowed_names)]
#![allow(clippy::let_underscore_future)]

use fred::prelude::*;
use futures::StreamExt;

// requires tokio_stream 0.1.3 or later
use tokio_stream::wrappers::BroadcastStream;

/// There are two interfaces for interacting with connection events on the `EventInterface`.
///
/// * The `on_*` functions are generally easier to use but require spawning a new tokio task. They also currently only
/// support synchronous functions.
/// * The `*_rx` functions are somewhat more complicated to use but allow the caller to control the underlying channel
/// receiver directly. Additionally, these functions do not require spawning a new tokio task.
///
/// See the source for `on_any` for an example of how one might handle multiple receivers in one task.
///
/// The best approach depends on how many tasks the caller is willing to create. The `setup_pool` function shows
/// how one might combine multiple receiver streams in a `RedisPool` to minimize the overhead of new tokio tasks for
/// each underlying client.
#[tokio::main]
async fn main() -> Result<(), RedisError> {
let client = Builder::default_centralized().build()?;

// use the on_* functions
let reconnect_task = client.on_reconnect(|server| {
println!("Reconnected to {}", server);
Ok(())
});
let error_task = client.on_error(|error| {
println!("Connection error: {:?}", error);
Ok(())
});

// use the *_rx functions to do the same thing shown above. although not shown here, callers have more freedom to
// reduce the number of spawned tokio tasks with this interface.
let mut reconnect_rx = client.reconnect_rx();
let reconnect_task_2 = tokio::spawn(async move {
while let Ok(server) = reconnect_rx.recv().await {
println!("Reconnected to {}", server);
}
});

let mut error_rx = client.error_rx();
let error_task_2 = tokio::spawn(async move {
while let Ok(error) = error_rx.recv().await {
println!("Connection error: {:?}", error);
}
});

client.connect();
client.wait_for_connect().await?;

// ...

client.quit().await?;
reconnect_task.await??;
error_task.await??;
reconnect_task_2.await?;
error_task_2.await?;
Ok(())
}

/// Shows how to combine multiple event streams from multiple clients into one tokio task.
#[allow(dead_code)]
async fn setup_pool() -> Result<(), RedisError> {
let pool = Builder::default_centralized().build_pool(5)?;

// `select_all` does most of the work here but requires that the channel receivers implement `Stream`. unfortunately
// `tokio::sync::broadcast::Receiver` does not do this, so we use `tokio_stream::wrappers::BroadcastStream`.
let error_rxs: Vec<_> = pool
.clients()
.iter()
.map(|client| BroadcastStream::new(client.error_rx()))
.collect();
let reconnect_rxs: Vec<_> = pool
.clients()
.iter()
.map(|client| BroadcastStream::new(client.reconnect_rx()))
.collect();
let mut error_rx = futures::stream::select_all(error_rxs);
let mut reconnect_rx = futures::stream::select_all(reconnect_rxs);

let all_events_task = tokio::spawn(async move {
loop {
tokio::select! {
Some(Ok(error)) = error_rx.next() => {
println!("Error: {:?}", error);
}
Some(Ok(server)) = reconnect_rx.next() => {
println!("Reconnected to {}", server);
}
}
}
});

pool.connect();
pool.wait_for_connect().await?;

// ...

pool.quit().await?;
all_events_task.abort();
Ok(())
}
136 changes: 136 additions & 0 deletions examples/keyspace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#![allow(clippy::disallowed_names)]
#![allow(clippy::let_underscore_future)]

use fred::prelude::*;
use std::time::Duration;
use tokio::time::sleep;

async fn fake_traffic(client: &RedisClient, amount: usize) -> Result<(), RedisError> {
// use a new client since the provided client is subscribed to keyspace events
let client = client.clone_new();
client.connect();
client.wait_for_connect().await?;

for idx in 0 .. amount {
let key: RedisKey = format!("foo-{}", idx).into();

client.set(&key, 1, None, None, false).await?;
client.incr(&key).await?;
client.del(&key).await?;
}

client.quit().await?;
Ok(())
}

/// Examples showing how to set up keyspace notifications with clustered or centralized/sentinel deployments.
///
/// The most complicated part of this process involves safely handling reconnections. Keyspace events rely on the
/// pubsub interface, and clients are required to subscribe or resubscribe whenever a new connection is created. These
/// examples show how to manually handle reconnections, but the caller can also use the `SubscriberClient` interface
/// to remove some of the boilerplate.
///
/// If callers do not need the keyspace subscriptions to survive reconnects then the process is more
/// straightforward.
///
/// Both examples assume that the server has been configured to emit keyspace events (via `notify-keyspace-events`).
#[tokio::main]
async fn main() -> Result<(), RedisError> {
clustered_keyspace_events().await?;
centralized_keyspace_events().await?;
Ok(())
}

async fn centralized_keyspace_events() -> Result<(), RedisError> {
let subscriber = Builder::default_centralized().build()?;

let reconnect_subscriber = subscriber.clone();
// resubscribe to the foo- prefix whenever we reconnect to a server
let reconnect_task = tokio::spawn(async move {
let mut reconnect_rx = reconnect_subscriber.reconnect_rx();

while let Ok(server) = reconnect_rx.recv().await {
println!("Reconnected to {}. Subscribing to keyspace events...", server);
reconnect_subscriber.psubscribe("__key__*:foo*").await?;
}

Ok::<_, RedisError>(())
});

// connect after setting up the reconnection logic
subscriber.connect();
subscriber.wait_for_connect().await?;

let mut keyspace_rx = subscriber.on_keyspace_event();
// set up a task that listens for keyspace events
let keyspace_task = tokio::spawn(async move {
while let Ok(event) = keyspace_rx.recv().await {
println!(
"Recv: {} on {} in db {}",
event.operation,
event.key.as_str_lossy(),
event.db
);
}

Ok::<_, RedisError>(())
});

// generate fake traffic and wait a second
fake_traffic(&subscriber, 1_000).await?;
sleep(Duration::from_secs(1)).await;
subscriber.quit().await?;
keyspace_task.await??;
reconnect_task.await??;

Ok(())
}

async fn clustered_keyspace_events() -> Result<(), RedisError> {
let subscriber = Builder::default_clustered().build()?;

let reconnect_subscriber = subscriber.clone();
// resubscribe to the foo- prefix whenever we reconnect to a server
let reconnect_task = tokio::spawn(async move {
let mut reconnect_rx = reconnect_subscriber.reconnect_rx();

// in 7.x the reconnection interface added a `Server` struct to reconnect events to make this easier.
while let Ok(server) = reconnect_rx.recv().await {
println!("Reconnected to {}. Subscribing to keyspace events...", server);
reconnect_subscriber
.with_cluster_node(server)
.psubscribe("__key__*:foo*")
.await?;
}

Ok::<_, RedisError>(())
});

// connect after setting up the reconnection logic
subscriber.connect();
subscriber.wait_for_connect().await?;

let mut keyspace_rx = subscriber.on_keyspace_event();
// set up a task that listens for keyspace events
let keyspace_task = tokio::spawn(async move {
while let Ok(event) = keyspace_rx.recv().await {
println!(
"Recv: {} on {} in db {}",
event.operation,
event.key.as_str_lossy(),
event.db
);
}

Ok::<_, RedisError>(())
});

// generate fake traffic and wait a second
fake_traffic(&subscriber, 1_000).await?;
sleep(Duration::from_secs(1)).await;
subscriber.quit().await?;
keyspace_task.await??;
reconnect_task.await??;

Ok(())
}
20 changes: 7 additions & 13 deletions examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ use fred::prelude::*;
#[tokio::main]
async fn main() -> Result<(), RedisError> {
let pool = Builder::default_centralized().build_pool(5)?;
let _ = pool.connect();
pool.connect();
pool.wait_for_connect().await?;

// use the pool like other clients
pool.get("foo").await?;
pool.set("foo", "bar", None, None, false).await?;
pool.get("foo").await?;

// interact with specific clients via next(), last(), or clients()
let pipeline = pool.next().pipeline();
pipeline.incr("foo").await?;
Expand All @@ -17,19 +22,8 @@ async fn main() -> Result<(), RedisError> {

for client in pool.clients() {
println!("{} connected to {:?}", client.id(), client.active_connections().await?);

// set up event listeners on each client
client.on_error(|error| {
println!("Connection error: {:?}", error);
Ok(())
});
}

// or use the pool like any other RedisClient
pool.get("foo").await?;
pool.set("foo", "bar", None, None, false).await?;
pool.get("foo").await?;

let _ = pool.quit().await;
pool.quit().await?;
Ok(())
}
Loading

0 comments on commit 31f8f56

Please sign in to comment.