diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index 87206bb..2163c86 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -10,8 +10,8 @@ use time::OffsetDateTime; use tracing::{error, trace}; use super::{ - internal_from_list, internal_to_list_payload, InternalPayloadOwned, RawPayload, - RedisConnection, RedisConsumer, RedisProducer, + internal_from_list, internal_to_list_payload, InternalPayload, InternalPayloadOwned, + RawPayload, RedisConnection, RedisConsumer, RedisProducer, }; use crate::{queue::Acker, Delivery, QueueError, Result}; @@ -24,7 +24,10 @@ pub(super) async fn send_raw( .get() .await .map_err(QueueError::generic)? - .lpush(&producer.queue_key, internal_to_list_payload((payload, 0))) + .lpush( + &producer.queue_key, + internal_to_list_payload(InternalPayload(payload, 0)), + ) .await .map_err(QueueError::generic) } @@ -65,25 +68,21 @@ async fn receive_with_timeout( .map_err(QueueError::generic)?; match payload { - Some(old_payload) => { - let (payload, num_receives) = internal_from_list(&old_payload)?; - Some(internal_to_delivery( - (payload.to_vec(), num_receives), - consumer, - old_payload, - )) - .transpose() - } + Some(old_payload) => Some(internal_to_delivery( + internal_from_list(&old_payload)?.into(), + consumer, + old_payload, + )) + .transpose(), None => Ok(None), } } fn internal_to_delivery( - internal: InternalPayloadOwned, + InternalPayloadOwned(payload, num_receives): InternalPayloadOwned, consumer: &RedisConsumer, old_payload: Vec, ) -> Result { - let (payload, num_receives) = internal; Ok(Delivery::new( payload, RedisFallbackAcker { @@ -97,19 +96,19 @@ fn internal_to_delivery( )) } -pub(super) struct RedisFallbackAcker { - pub(super) redis: bb8::Pool, - pub(super) processing_queue_key: String, +struct RedisFallbackAcker { + redis: bb8::Pool, + processing_queue_key: String, // We delete based on the payload -- and since the // `num_receives` changes after receiving it's the // `old_payload`, since `num_receives` is part of the // payload. Make sense? - pub(super) old_payload: RawPayload, + old_payload: RawPayload, - pub(super) already_acked_or_nacked: bool, + already_acked_or_nacked: bool, - pub(super) max_receives: usize, - pub(super) num_receives: usize, + max_receives: usize, + num_receives: usize, } impl Acker for RedisFallbackAcker { diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index ca6a255..5a04a07 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -88,11 +88,17 @@ impl RedisConnection for RedisClusterConnectionManager { // First element is the raw payload slice, second // is `num_receives`, the number of the times // the message has previously been received. -type InternalPayload<'a> = (&'a [u8], usize); +struct InternalPayload<'a>(&'a [u8], usize); // The same as `InternalPayload` but with an // owned payload. -type InternalPayloadOwned = (Vec, usize); +struct InternalPayloadOwned(Vec, usize); + +impl From> for InternalPayloadOwned { + fn from(InternalPayload(payload, num_receives): InternalPayload) -> Self { + Self(payload.to_vec(), num_receives) + } +} fn internal_from_list(payload: &[u8]) -> Result> { // All information is stored in the key in which the ID and the [optional] @@ -125,12 +131,14 @@ fn internal_from_list(payload: &[u8]) -> Result> { 1 }; - Ok((&payload[payload_sep_pos + 1..], num_receives)) + Ok(InternalPayload( + &payload[payload_sep_pos + 1..], + num_receives, + )) } -fn internal_to_list_payload(internal: InternalPayload) -> Vec { +fn internal_to_list_payload(InternalPayload(payload, num_receives): InternalPayload) -> Vec { let id = delayed_key_id(); - let (payload, num_receives) = internal; let num_receives = num_receives.to_string(); let mut result = Vec::with_capacity(id.len() + num_receives.as_bytes().len() + payload.len() + 3); @@ -620,7 +628,7 @@ impl RedisProducer { .map_err(QueueError::generic)? .zadd( &self.delayed_queue_key, - internal_to_list_payload((payload, 0)), + internal_to_list_payload(InternalPayload(payload, 0)), timestamp, ) .await diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index e4f73dd..af542b2 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -12,7 +12,8 @@ use redis::{ use tracing::{error, trace}; use super::{ - internal_from_list, InternalPayloadOwned, RedisConnection, RedisConsumer, RedisProducer, + internal_from_list, InternalPayload, InternalPayloadOwned, RedisConnection, RedisConsumer, + RedisProducer, }; use crate::{queue::Acker, Delivery, QueueError, Result}; @@ -27,10 +28,10 @@ const LISTEN_STREAM_ID: &str = ">"; const PENDING_BATCH_SIZE: usize = 1000; macro_rules! internal_to_stream_payload { - (($payload:expr, $num_receives:expr), $payload_key:expr) => { + ($internal_payload:expr, $payload_key:expr) => { &[ - ($payload_key, $payload), - (NUM_RECEIVES, $num_receives.to_string().as_bytes()), + ($payload_key, $internal_payload.0), + (NUM_RECEIVES, $internal_payload.1.to_string().as_bytes()), ] }; } @@ -136,15 +137,14 @@ fn internal_from_stream(stream_id: &StreamId, payload_key: &str) -> Result( - internal: InternalPayloadOwned, + InternalPayloadOwned(payload, num_receives): InternalPayloadOwned, consumer: &RedisConsumer, entry_id: String, ) -> Delivery { - let (payload, num_receives) = internal; Delivery::new( payload, RedisStreamsAcker { @@ -159,15 +159,15 @@ fn internal_to_delivery( ) } -pub(super) struct RedisStreamsAcker { - pub(super) redis: bb8::Pool, - pub(super) queue_key: String, - pub(super) consumer_group: String, - pub(super) entry_id: String, +struct RedisStreamsAcker { + redis: bb8::Pool, + queue_key: String, + consumer_group: String, + entry_id: String, - pub(super) already_acked_or_nacked: bool, - pub(super) max_receives: usize, - pub(super) num_receives: usize, + already_acked_or_nacked: bool, + max_receives: usize, + num_receives: usize, } impl Acker for RedisStreamsAcker { @@ -223,7 +223,7 @@ pub(super) async fn add_to_main_queue( for key in keys { // We don't care about `num_receives` here since we're // re-queuing from delayed queue: - let (payload, _) = internal_from_list(key)?; + let InternalPayload(payload, _) = internal_from_list(key)?; let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, @@ -319,7 +319,8 @@ async fn reenqueue_timed_out_messages( // And reinsert the map of KV pairs into the MAIN queue with a new stream ID for stream_id in &ids { - let (payload, num_receives) = internal_from_stream(stream_id, payload_key)?; + let InternalPayloadOwned(payload, num_receives) = + internal_from_stream(stream_id, payload_key)?; if num_receives >= max_receives { trace!( entry_id = stream_id.id,