Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow backends to emit errors #454

Merged
merged 3 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where

type Layer = AckLayer<Self, Req, RedisMqContext, Res>;

fn poll<Svc>(mut self, _worker_id: WorkerId) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<Req, RedisMqContext>> = Box::pin(rx);
let layer = AckLayer::new(self.clone());
Expand Down
9 changes: 5 additions & 4 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::Stream;
use poller::Poller;
use serde::{Deserialize, Serialize};
use tower::Service;
use worker::WorkerId;
use worker::{Context, Worker};

/// Represent utilities for creating worker instances.
pub mod builder;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub trait Backend<Req, Res> {
/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: WorkerId,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}
/// A codec allows backends to encode and decode data
Expand Down Expand Up @@ -165,7 +165,7 @@ pub mod test_utils {
use crate::error::BoxDynError;
use crate::request::Request;
use crate::task::task_id::TaskId;
use crate::worker::WorkerId;
use crate::worker::{Worker, WorkerId};
use crate::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -264,8 +264,9 @@ pub mod test_utils {
>>::Future: Send + 'static,
{
let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, crate::worker::Context::default());
let b = backend.clone();
let mut poller = b.poll::<S>(worker_id);
let mut poller = b.poll::<S>(&worker);
let (stop_tx, mut stop_rx) = channel::<()>(1);

let (mut res_tx, res_rx) = channel(10);
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
mq::MessageQueue,
poller::{controller::Controller, stream::BackendStream},
request::{Request, RequestStream},
worker::WorkerId,
worker::{self, Worker},
Backend, Poller,
};
use futures::{
Expand Down Expand Up @@ -101,7 +101,7 @@ impl<T: Send + 'static + Sync, Res> Backend<Request<T, ()>, Res> for MemoryStora

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<worker::Context>) -> Poller<Self::Stream> {
let stream = self.inner.map(|r| Ok(Some(r))).boxed();
Poller {
stream: BackendStream::new(stream, self.controller),
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
error::Error,
poller::Poller,
task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
worker::WorkerId,
worker::{Context, Worker},
Backend,
};

Expand Down Expand Up @@ -111,10 +111,10 @@ impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
Poller {
stream: self,
heartbeat: Box::pin(async {}),
heartbeat: Box::pin(futures::future::pending()),
layer: Identity::new(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<S, P> Worker<Ready<S, P>> {
};
let backend = self.state.backend;
let service = self.state.service;
let poller = backend.poll::<S>(worker_id.clone());
let poller = backend.poll::<S>(&worker);
let stream = poller.stream;
let heartbeat = poller.heartbeat.boxed();
let layer = poller.layer;
Expand Down Expand Up @@ -387,7 +387,7 @@ impl Future for Runnable {
}

/// Stores the Workers context
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Context {
task_count: Arc<AtomicUsize>,
wakers: Arc<Mutex<Vec<Waker>>>,
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use apalis_core::layers::Identity;
use apalis_core::poller::Poller;
use apalis_core::request::RequestStream;
use apalis_core::task::namespace::Namespace;
use apalis_core::worker::WorkerId;
use apalis_core::worker::{Context, Worker};
use apalis_core::Backend;
use apalis_core::{error::Error, request::Request};
use chrono::{DateTime, TimeZone, Utc};
Expand Down Expand Up @@ -145,8 +145,8 @@ where

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let stream = self.into_stream();
Poller::new(stream, async {})
Poller::new(stream, futures::future::pending())
}
}
1 change: 1 addition & 0 deletions packages/apalis-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio = { version = "1", features = ["rt", "net"], optional = true }
async-std = { version = "1.13.0", optional = true }
async-trait = "0.1.80"
tower = "0.4"
thiserror = "1"


[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ mod storage;
pub use storage::connect;
pub use storage::Config;
pub use storage::RedisContext;
pub use storage::RedisPollError;
pub use storage::RedisQueueInfo;
pub use storage::RedisStorage;
55 changes: 42 additions & 13 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
use apalis_core::storage::Storage;
use apalis_core::task::namespace::Namespace;
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::worker::{Event, Worker, WorkerId};
use apalis_core::{Backend, Codec};
use chrono::{DateTime, Utc};
use futures::channel::mpsc::{self, Sender};
use futures::channel::mpsc::{self, SendError, Sender};
use futures::{select, FutureExt, SinkExt, StreamExt, TryFutureExt};
use log::*;
use redis::aio::ConnectionLike;
Expand Down Expand Up @@ -106,6 +106,34 @@
}
}

/// Errors that can occur while polling a Redis backend.
#[derive(thiserror::Error, Debug)]
pub enum RedisPollError {
/// Error during a keep-alive heartbeat.
#[error("KeepAlive heartbeat encountered an error: `{0}`")]
KeepAliveError(RedisError),

/// Error during enqueueing scheduled tasks.
#[error("EnqueueScheduled heartbeat encountered an error: `{0}`")]
EnqueueScheduledError(RedisError),

/// Error during polling for the next task or message.
#[error("PollNext heartbeat encountered an error: `{0}`")]
PollNextError(RedisError),

/// Error during enqueueing tasks for worker consumption.
#[error("Enqueue for worker consumption encountered an error: `{0}`")]
EnqueueError(SendError),

/// Error during acknowledgment of tasks.
#[error("Ack heartbeat encountered an error: `{0}`")]
AckError(RedisError),

/// Error during re-enqueuing orphaned tasks.
#[error("ReenqueueOrphaned heartbeat encountered an error: `{0}`")]
ReenqueueOrphanedError(RedisError),
}

/// Config for a [RedisStorage]
#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -412,14 +440,15 @@

fn poll<Svc: Service<Request<T, RedisContext>>>(
mut self,
worker: WorkerId,
worker: &Worker<apalis_core::worker::Context>,
) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.buffer_size);
let (ack, ack_rx) = mpsc::channel(self.config.buffer_size);
let layer = AckLayer::new(ack);
let controller = self.controller.clone();
let config = self.config.clone();
let stream: RequestStream<Request<T, RedisContext>> = Box::pin(rx);
let worker = worker.clone();
let heartbeat = async move {
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();
Expand All @@ -433,32 +462,32 @@

let mut ack_stream = ack_rx.fuse();

if let Err(e) = self.keep_alive(&worker).await {
error!("RegistrationError: {}", e);
if let Err(e) = self.keep_alive(worker.id()).await {
worker.emit(Event::Error(Box::new(RedisPollError::KeepAliveError(e))));
}

loop {
select! {
_ = keep_alive_stm.next() => {
if let Err(e) = self.keep_alive(&worker).await {
error!("KeepAliveError: {}", e);
if let Err(e) = self.keep_alive(worker.id()).await {
worker.emit(Event::Error(Box::new(RedisPollError::KeepAliveError(e))));
}
}
_ = enqueue_scheduled_stm.next() => {
if let Err(e) = self.enqueue_scheduled(config.buffer_size).await {
error!("EnqueueScheduledError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::EnqueueScheduledError(e))));
}
}
_ = poll_next_stm.next() => {
let res = self.fetch_next(&worker).await;
let res = self.fetch_next(worker.id()).await;
match res {
Err(e) => {
error!("PollNextError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::PollNextError(e))));
}
Ok(res) => {
for job in res {
if let Err(e) = tx.send(Ok(Some(job))).await {
error!("EnqueueError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::EnqueueError(e))));
}
}
}
Expand All @@ -468,15 +497,15 @@
id_to_ack = ack_stream.next() => {
if let Some((ctx, res)) = id_to_ack {
if let Err(e) = self.ack(&ctx, &res).await {
error!("AckError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::AckError(e))));
}
}
}
_ = reenqueue_orphaned_stm.next() => {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = self.reenqueue_orphaned((config.buffer_size * 10) as i32, dead_since).await {
error!("ReenqueueOrphanedError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::ReenqueueOrphanedError(e))));
}
}
};
Expand Down Expand Up @@ -652,7 +681,7 @@
type Error = RedisError;
type Context = RedisContext;

async fn push_request(

Check warning on line 684 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<T, RedisContext>,
) -> Result<Parts<Self::Context>, RedisError> {
Expand All @@ -675,7 +704,7 @@
Ok(req.parts)
}

async fn schedule_request(

Check warning on line 707 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
req: Request<Self::Job, RedisContext>,
on: i64,
Expand Down Expand Up @@ -738,7 +767,7 @@
Ok(())
}

async fn reschedule(

Check warning on line 770 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
&mut self,
job: Request<T, RedisContext>,
wait: Duration,
Expand Down Expand Up @@ -797,7 +826,7 @@
C: Codec<Compact = Vec<u8>> + Send + 'static,
{
/// Attempt to retry a job
pub async fn retry(&mut self, worker_id: &WorkerId, task_id: &TaskId) -> Result<i32, RedisError>

Check warning on line 829 in packages/apalis-redis/src/storage.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`
where
T: Send + DeserializeOwned + Serialize + Unpin + Sync + 'static,
{
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { version = "1", features = ["rt", "net"], optional = true }
futures-lite = "2.3.0"
async-std = { version = "1.13.0", optional = true }
chrono = { version = "0.4", features = ["serde"] }
thiserror = "1"


[dev-dependencies]
Expand Down
Loading
Loading