Skip to content

Commit

Permalink
Upgrade redis dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Aug 14, 2024
1 parent c286082 commit 1a09519
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
4 changes: 2 additions & 2 deletions omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ aws-sdk-sqs = { version = "1.13.0", optional = true }
azure_storage = { version = "0.20.0", optional = true }
azure_storage_queues = { version = "0.20.0", optional = true }
bb8 = { version = "0.8", optional = true }
bb8-redis = { version = "0.15.0", optional = true }
bb8-redis = { version = "0.16.0", optional = true }
bytesize = "1.3.0"
futures-util = { version = "0.3.28", default-features = false, features = ["async-await", "std"], optional = true }
google-cloud-googleapis = { version = "0.12.0", optional = true }
google-cloud-pubsub = { version = "0.24.0", optional = true }
lapin = { version = "2", optional = true }
redis = { version = "0.25.3", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true }
redis = { version = "0.26.1", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true }
serde = "1.0.196"
serde_json = "1"
svix-ksuid = { version = "0.8.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn check_eviction_policy<R: RedisConnection>(
let results: Vec<String> = redis::cmd("CONFIG")
.arg("GET")
.arg("maxmemory-policy")
.query_async::<R::Connection, Vec<String>>(&mut *conn)
.query_async(&mut *conn)
.await
.map_err(|_| EvictionCheckError::CheckEvictionPolicyFailed)?;

Expand Down
28 changes: 14 additions & 14 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::time::Duration;

use bb8::ManageConnection;
use redis::{
streams::{StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply},
streams::{
StreamAutoClaimOptions, StreamClaimReply, StreamId, StreamReadOptions, StreamReadReply,
},
AsyncCommands as _, FromRedisValue, RedisResult,
};
use tracing::{error, trace};
Expand All @@ -20,7 +22,7 @@ const LISTEN_STREAM_ID: &str = ">";
/// The maximum number of pending messages to reinsert into the queue after
/// becoming stale per loop
// FIXME(onelson): expose in config?
const PENDING_BATCH_SIZE: i16 = 1000;
const PENDING_BATCH_SIZE: usize = 1000;

pub(super) async fn send_raw<R: RedisConnection>(
producer: &RedisProducer<R>,
Expand Down Expand Up @@ -250,17 +252,15 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(

// Every iteration checks whether the processing queue has items that should
// be picked back up, claiming them in the process
let mut cmd = redis::cmd("XAUTOCLAIM");
cmd.arg(main_queue_name)
.arg(consumer_group)
.arg(consumer_name)
.arg(ack_deadline_ms)
.arg("-")
.arg("COUNT")
.arg(PENDING_BATCH_SIZE);

let StreamAutoclaimReply { ids } = cmd
.query_async(&mut *conn)
let StreamAutoclaimReply { ids } = conn
.xautoclaim_options(
main_queue_name,
consumer_group,
consumer_name,
ack_deadline_ms,
"-",
StreamAutoClaimOptions::default().count(PENDING_BATCH_SIZE),
)
.await
.map_err(QueueError::generic)?;

Expand All @@ -276,7 +276,7 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(
GENERATE_STREAM_ID,
&map.iter()
.filter_map(|(k, v)| {
if let redis::Value::Data(data) = v {
if let redis::Value::BulkString(data) = v {
Some((k.as_str(), data.as_slice()))
} else {
None
Expand Down

0 comments on commit 1a09519

Please sign in to comment.