Skip to content

Commit

Permalink
Use structs instead of type aliases
Browse files Browse the repository at this point in the history
Also address other review comments
  • Loading branch information
jaymell committed Sep 3, 2024
1 parent a49ab47 commit d6f3741
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 44 deletions.
41 changes: 20 additions & 21 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use time::OffsetDateTime;
use tracing::{error, trace};

use super::{
internal_from_list, internal_to_list_payload, InternalPayloadOwned, RawPayload,
RedisConnection, RedisConsumer, RedisProducer,
internal_from_list, internal_to_list_payload, InternalPayload, InternalPayloadOwned,
RawPayload, RedisConnection, RedisConsumer, RedisProducer,
};
use crate::{queue::Acker, Delivery, QueueError, Result};

Expand All @@ -24,7 +24,10 @@ pub(super) async fn send_raw<R: RedisConnection>(
.get()
.await
.map_err(QueueError::generic)?
.lpush(&producer.queue_key, internal_to_list_payload((payload, 0)))
.lpush(
&producer.queue_key,
internal_to_list_payload(InternalPayload(payload, 0)),
)
.await
.map_err(QueueError::generic)
}
Expand Down Expand Up @@ -65,25 +68,21 @@ async fn receive_with_timeout<R: RedisConnection>(
.map_err(QueueError::generic)?;

match payload {
Some(old_payload) => {
let (payload, num_receives) = internal_from_list(&old_payload)?;
Some(internal_to_delivery(
(payload.to_vec(), num_receives),
consumer,
old_payload,
))
.transpose()
}
Some(old_payload) => Some(internal_to_delivery(
internal_from_list(&old_payload)?.into(),
consumer,
old_payload,
))
.transpose(),
None => Ok(None),
}
}

fn internal_to_delivery<R: RedisConnection>(
internal: InternalPayloadOwned,
InternalPayloadOwned(payload, num_receives): InternalPayloadOwned,
consumer: &RedisConsumer<R>,
old_payload: Vec<u8>,
) -> Result<Delivery> {
let (payload, num_receives) = internal;
Ok(Delivery::new(
payload,
RedisFallbackAcker {
Expand All @@ -97,19 +96,19 @@ fn internal_to_delivery<R: RedisConnection>(
))
}

pub(super) struct RedisFallbackAcker<M: ManageConnection> {
pub(super) redis: bb8::Pool<M>,
pub(super) processing_queue_key: String,
struct RedisFallbackAcker<M: ManageConnection> {
redis: bb8::Pool<M>,
processing_queue_key: String,
// We delete based on the payload -- and since the
// `num_receives` changes after receiving it's the
// `old_payload`, since `num_receives` is part of the
// payload. Make sense?
pub(super) old_payload: RawPayload,
old_payload: RawPayload,

pub(super) already_acked_or_nacked: bool,
already_acked_or_nacked: bool,

pub(super) max_receives: usize,
pub(super) num_receives: usize,
max_receives: usize,
num_receives: usize,
}

impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
Expand Down
20 changes: 14 additions & 6 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,17 @@ impl RedisConnection for RedisClusterConnectionManager {
// First element is the raw payload slice, second
// is `num_receives`, the number of the times
// the message has previously been received.
type InternalPayload<'a> = (&'a [u8], usize);
struct InternalPayload<'a>(&'a [u8], usize);

// The same as `InternalPayload` but with an
// owned payload.
type InternalPayloadOwned = (Vec<u8>, usize);
struct InternalPayloadOwned(Vec<u8>, usize);

impl From<InternalPayload<'_>> for InternalPayloadOwned {
fn from(InternalPayload(payload, num_receives): InternalPayload) -> Self {
Self(payload.to_vec(), num_receives)
}
}

fn internal_from_list(payload: &[u8]) -> Result<InternalPayload<'_>> {
// All information is stored in the key in which the ID and the [optional]
Expand Down Expand Up @@ -125,12 +131,14 @@ fn internal_from_list(payload: &[u8]) -> Result<InternalPayload<'_>> {
1
};

Ok((&payload[payload_sep_pos + 1..], num_receives))
Ok(InternalPayload(
&payload[payload_sep_pos + 1..],
num_receives,
))
}

fn internal_to_list_payload(internal: InternalPayload) -> Vec<u8> {
fn internal_to_list_payload(InternalPayload(payload, num_receives): InternalPayload) -> Vec<u8> {
let id = delayed_key_id();
let (payload, num_receives) = internal;
let num_receives = num_receives.to_string();
let mut result =
Vec::with_capacity(id.len() + num_receives.as_bytes().len() + payload.len() + 3);
Expand Down Expand Up @@ -620,7 +628,7 @@ impl<R: RedisConnection> RedisProducer<R> {
.map_err(QueueError::generic)?
.zadd(
&self.delayed_queue_key,
internal_to_list_payload((payload, 0)),
internal_to_list_payload(InternalPayload(payload, 0)),
timestamp,
)
.await
Expand Down
35 changes: 18 additions & 17 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use redis::{
use tracing::{error, trace};

use super::{
internal_from_list, InternalPayloadOwned, RedisConnection, RedisConsumer, RedisProducer,
internal_from_list, InternalPayload, InternalPayloadOwned, RedisConnection, RedisConsumer,
RedisProducer,
};
use crate::{queue::Acker, Delivery, QueueError, Result};

Expand All @@ -27,10 +28,10 @@ const LISTEN_STREAM_ID: &str = ">";
const PENDING_BATCH_SIZE: usize = 1000;

macro_rules! internal_to_stream_payload {
(($payload:expr, $num_receives:expr), $payload_key:expr) => {
($internal_payload:expr, $payload_key:expr) => {
&[
($payload_key, $payload),
(NUM_RECEIVES, $num_receives.to_string().as_bytes()),
($payload_key, $internal_payload.0),
(NUM_RECEIVES, $internal_payload.1.to_string().as_bytes()),
]
};
}
Expand Down Expand Up @@ -136,15 +137,14 @@ fn internal_from_stream(stream_id: &StreamId, payload_key: &str) -> Result<Inter
.ok_or(QueueError::NoData)
.and_then(|x| redis::from_redis_value(x).map_err(QueueError::generic))?;

Ok((payload, num_receives))
Ok(InternalPayloadOwned(payload, num_receives))
}

fn internal_to_delivery<R: RedisConnection>(
internal: InternalPayloadOwned,
InternalPayloadOwned(payload, num_receives): InternalPayloadOwned,
consumer: &RedisConsumer<R>,
entry_id: String,
) -> Delivery {
let (payload, num_receives) = internal;
Delivery::new(
payload,
RedisStreamsAcker {
Expand All @@ -159,15 +159,15 @@ fn internal_to_delivery<R: RedisConnection>(
)
}

pub(super) struct RedisStreamsAcker<M: ManageConnection> {
pub(super) redis: bb8::Pool<M>,
pub(super) queue_key: String,
pub(super) consumer_group: String,
pub(super) entry_id: String,
struct RedisStreamsAcker<M: ManageConnection> {
redis: bb8::Pool<M>,
queue_key: String,
consumer_group: String,
entry_id: String,

pub(super) already_acked_or_nacked: bool,
pub(super) max_receives: usize,
pub(super) num_receives: usize,
already_acked_or_nacked: bool,
max_receives: usize,
num_receives: usize,
}

impl<R: RedisConnection> Acker for RedisStreamsAcker<R> {
Expand Down Expand Up @@ -223,7 +223,7 @@ pub(super) async fn add_to_main_queue(
for key in keys {
// We don't care about `num_receives` here since we're
// re-queuing from delayed queue:
let (payload, _) = internal_from_list(key)?;
let InternalPayload(payload, _) = internal_from_list(key)?;
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
Expand Down Expand Up @@ -319,7 +319,8 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(

// And reinsert the map of KV pairs into the MAIN queue with a new stream ID
for stream_id in &ids {
let (payload, num_receives) = internal_from_stream(stream_id, payload_key)?;
let InternalPayloadOwned(payload, num_receives) =
internal_from_stream(stream_id, payload_key)?;
if num_receives >= max_receives {
trace!(
entry_id = stream_id.id,
Expand Down

0 comments on commit d6f3741

Please sign in to comment.