Skip to content

Commit

Permalink
Merge pull request #3 from geofmureithi/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
geofmureithi authored Aug 13, 2022
2 parents 98641e1 + 303018b commit 4673a25
Show file tree
Hide file tree
Showing 29 changed files with 383 additions and 93 deletions.
22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "apalis"
version = "0.3.1"
version = "0.3.3"
authors = ["Geoffrey Mureithi <[email protected]>"]
description = "Simple, extensible multithreaded background job processing for Rust"
repository = "https://github.com/geofmureithi/apalis"
documentation = "https://docs.rs/apalis"
readme = "README.md"
license = "MIT OR Apache-2.0"
keywords = ["jobs", "background", "task", "actors", "workers"]
keywords = ["job", "task", "scheduler", "worker", "cron" ]
edition = "2018"

[features]
Expand All @@ -23,6 +23,8 @@ postgres = ["apalis-sql/postgres"]
sqlite = [ "apalis-sql/sqlite"]
## Include MySql storage
mysql = [ "apalis-sql/mysql"]
## Include Cron functionality
cron = ["apalis-cron"]
## Include Storage utils, and build your own
storage = ["apalis-core/storage"]

Expand All @@ -47,28 +49,35 @@ redis-pubsub = ["apalis-redis/pubsub", "broker"]
## Support event collection, communication between workers.
broker = ["apalis-core/broker"]


docsrs = ["document-features"]


[dependencies.apalis-redis]
version = "0.3.1"
version = "0.3.3"
optional = true
default-features = false
path = "./packages/apalis-redis"

[dependencies.apalis-sql]
version = "0.3.1"
version = "0.3.3"
features = ["migrate"]
optional = true
default-features = false
path = "./packages/apalis-sql"

[dependencies.apalis-core]
version = "0.3.1"
version = "0.3.3"
optional = true
default-features = false
path = "./packages/apalis-core"

[dependencies.apalis-cron]
version = "0.3.3"
optional = true
default-features = false
path = "./packages/apalis-cron"

[dependencies.document-features]
version = "0.2"
optional = true
Expand All @@ -84,7 +93,7 @@ all-features = true
criterion = { version = "0.3", features=["async_tokio"] }
serde = "1"
tokio = { version = "1", features =["macros"] }
apalis-redis = { version = "0.3.1", path = "./packages/apalis-redis" }
apalis-redis = { version = "0.3.3", path = "./packages/apalis-redis" }

[[bench]]
name = "redis_benchmark"
Expand All @@ -95,6 +104,7 @@ members = [
"packages/apalis-core",
"packages/apalis-redis",
"packages/apalis-sql",
"packages/apalis-cron",
# Examples
"examples/email-service",
"examples/redis",
Expand Down
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Apalis [![Build Status](https://travis-ci.org/geofmureithi/apalis.svg?branch=master)](https://travis-ci.org/geofmureithi/apalis)
# Apalis [![Build Status](https://github.com/geofmureithi/apalis/actions/workflows/ci.yaml/badge.svg)](https://github.com/geofmureithi/apalis/actions)

Apalis is a simple, extensible multithreaded background job processing library for Rust.

Expand All @@ -8,33 +8,34 @@ Apalis is a simple, extensible multithreaded background job processing library f
- Jobs handlers with a macro free API.
- Take full advantage of the [`tower`] ecosystem of
middleware, services, and utilities.
- Workers take full of the actor model.
- Fully Tokio compatible.
- Optional Web interface to help you manage your jobs.

Apalis job processing is powered by [`tower::Service`] which means you have access to the [`tower`] and [`tower-http`] middleware.
Apalis job processing is powered by [`tower::Service`] which means you have access to the [`tower`] middleware.

Apalis has support for

- Redis
- SQlite
- PostgresSQL
- MySQL
- Bring Your Own Job Source eg Cron or Twitter streams
- Cron Jobs
- Bring Your Own Job Source eg Twitter streams

## Getting Started

To get started, just add to Cargo.toml

```toml
[dependencies]
apalis = { version = "0.3.1", features = ["redis"] }
apalis = { version = "0.3", features = ["redis"] }
```

## Usage

```rust
use apalis::{redis::RedisStorage, JobError, JobRequest, JobResult, WorkerBuilder, Storage, Monitor, JobContext};
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -92,6 +93,7 @@ If you are running [Apalis Board](https://github.com/geofmureithi/apalis-board),
- _postgres_ — Include Postgres storage
- _sqlite_ — Include SQlite storage
- _mysql_ — Include MySql storage
- _cron_ — Include cron job processing
- _sentry_ — Support for Sentry exception and performance monitoring
- _prometheus_ — Support Prometheus metrics
- _retry_ — Support direct retrying jobs
Expand All @@ -104,12 +106,12 @@ If you are running [Apalis Board](https://github.com/geofmureithi/apalis-board),

Since we provide a few storage solutions, here is a table comparing them:

| Feature | Redis | Sqlite | Postgres | Sled | Mysql | Mongo |
| :-------------- | :---: | :----: | :------: | :--: | ----- | ----- |
| Scheduled jobs |||| x | | x |
| Retryable jobs |||| x | | x |
| Persistence |||| x | | x |
| Rerun Dead jobs |||| x | \* | x |
| Feature | Redis | Sqlite | Postgres | Sled | Mysql | Mongo | Cron |
| :-------------- | :---: | :----: | :------: | :--: | :---: | :---: | :--: |
| Scheduled jobs |||| x | | x | |
| Retryable jobs |||| x | | x | |
| Persistence |||| x | | x | BYO |
| Rerun Dead jobs |||| x | \* | x | x |

## Thanks to

Expand All @@ -124,7 +126,7 @@ v 0.4
- [ ] Improve monitoring
- [ ] Improve Apalis Board
- [ ] Add job progress
- [ ] Add more sources
- [ ] Add more sources \*

v 0.3

Expand All @@ -142,6 +144,10 @@ v 0.2
- [x] Redis Example
- [x] Actix Web Example

## Resources

- [Background job processing with rust using actix and redis](https://mureithi.me/blog/background-job-processing-with-rust-actix-redis)

## Contributing

Please read [CONTRIBUTING.md](CONTRIBUTING.md) for details on our code of conduct, and the process for submitting pull requests to us.
Expand Down
5 changes: 2 additions & 3 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use actix_web::{web, App, HttpResponse, HttpServer};
use apalis::{
layers::TraceLayer, redis::RedisStorage, Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};
use apalis::prelude::*;
use apalis::{layers::TraceLayer, redis::RedisStorage};
use futures::future;

use email_service::{send_email, Email};
Expand Down
6 changes: 2 additions & 4 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
//! ```not_rust
//! cd examples && cargo run -p axum-example
//! ```
use apalis::{
layers::TraceLayer, redis::RedisStorage, Job, Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};
use apalis::prelude::*;
use apalis::{layers::TraceLayer, redis::RedisStorage};
use axum::{
extract::Form,
http::StatusCode,
Expand Down
5 changes: 2 additions & 3 deletions examples/email-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use apalis::{Job, JobContext, JobError, JobResult};
use apalis::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -12,9 +12,8 @@ impl Job for Email {
const NAME: &'static str = "apalis::Email";
}

pub async fn send_email(job: Email, _ctx: JobContext) -> Result<JobResult, JobError> {
pub async fn send_email(job: Email, _ctx: JobContext) -> impl IntoJobResponse {
log::info!("Attempting to send email to {}", job.to);
Ok(JobResult::Success)
}

pub const FORM_HTML: &str = r#"
Expand Down
5 changes: 2 additions & 3 deletions examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use apalis::{
layers::TraceLayer, mysql::MysqlStorage, Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};
use apalis::prelude::*;
use apalis::{layers::TraceLayer, mysql::MysqlStorage};
use email_service::{send_email, Email};

async fn produce_jobs(storage: &MysqlStorage<Email>) {
Expand Down
6 changes: 2 additions & 4 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use apalis::{
layers::TraceLayer, postgres::PostgresStorage, Monitor, Storage, WorkerBuilder, WorkerEvent,
WorkerFactoryFn, WorkerListener,
};
use apalis::prelude::*;
use apalis::{layers::TraceLayer, postgres::PostgresStorage};

use email_service::{send_email, Email};

Expand Down
7 changes: 2 additions & 5 deletions examples/prometheus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
//! ```not_rust
//! cd examples && cargo run -p prometheus-example
//! ```
use apalis::{
layers::PrometheusLayer, redis::RedisStorage, Job, Monitor, Storage, WorkerBuilder,
WorkerFactoryFn,
};
use apalis::prelude::*;
use apalis::{layers::PrometheusLayer, redis::RedisStorage};
use axum::{
extract::Form,
http::StatusCode,
Expand Down
2 changes: 1 addition & 1 deletion examples/redis/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use apalis::{
layers::{Extension, TraceLayer},
prelude::*,
redis::RedisStorage,
Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};

use email_service::{send_email, Email};
Expand Down
3 changes: 1 addition & 2 deletions examples/rest-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use apalis::{
layers::{SentryJobLayer, TraceLayer},
mysql::MysqlStorage,
postgres::PostgresStorage,
prelude::*,
redis::RedisStorage,
sqlite::SqliteStorage,
Counts, Job, JobContext, JobError, JobRequest, JobResult, JobState, JobStreamExt, Monitor,
Storage, WorkerBuilder, WorkerFactoryFn,
};
use futures::future;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion examples/sentry/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tracing_subscriber::prelude::*;

use apalis::{
layers::{SentryJobLayer, TraceLayer},
prelude::*,
redis::RedisStorage,
JobContext, JobError, JobResult, Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};
use tokio::time::sleep;

Expand Down
4 changes: 1 addition & 3 deletions examples/sqlite/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use apalis::{
layers::TraceLayer, sqlite::SqliteStorage, Monitor, Storage, WorkerBuilder, WorkerFactoryFn,
};
use apalis::{layers::TraceLayer, prelude::*, sqlite::SqliteStorage};
use chrono::Utc;

use email_service::{send_email, Email};
Expand Down
5 changes: 1 addition & 4 deletions examples/tracing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::time::Duration;

use tracing_subscriber::prelude::*;

use apalis::{
layers::TraceLayer, redis::RedisStorage, JobContext, JobError, JobResult, Monitor, Storage,
WorkerBuilder, WorkerFactoryFn,
};
use apalis::{layers::TraceLayer, prelude::*, redis::RedisStorage};

use tokio::time::sleep;

Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apalis-core"
version = "0.3.1"
version = "0.3.3"
authors = ["Geoffrey Mureithi <[email protected]>"]
edition = "2018"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
///
/// ```rust,ignore
///
/// use apalis::WorkerBuilder;
/// use apalis::prelude::*;
/// use apalis::sqlite::SqliteStorage;
///
/// let sqlite = SqliteStorage::new("sqlite::memory:").await.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions packages/apalis-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::error::Error as StdError;
use thiserror::Error;

#[cfg(feature = "storage")]
#[cfg_attr(docsrs, doc(cfg(feature = "storage")))]
use crate::storage::StorageError;

/// Convenience type alias for usage within Apalis.
Expand Down
1 change: 0 additions & 1 deletion packages/apalis-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ pub struct Counts {

/// JobStream extension usually useful for management via cli, web etc
#[async_trait::async_trait]

pub trait JobStreamExt<Job>: JobStream<Job = Job>
where
Self: Sized,
Expand Down
22 changes: 14 additions & 8 deletions packages/apalis-core/src/job_fn.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use futures::future::BoxFuture;
use futures::FutureExt;
use std::fmt::{self, Debug};
use std::future::Future;
use std::pin::Pin;
Expand All @@ -8,7 +10,7 @@ use crate::context::JobContext;
use crate::error::JobError;
use crate::job::Job;
use crate::request::JobRequest;
use crate::response::JobResult;
use crate::response::{IntoJobResponse, JobResult};

/// Returns a new [`JobFn`] with the given closure.
///
Expand Down Expand Up @@ -69,9 +71,9 @@ pin_project_lite::pin_project! {
}
}

impl<F> Future for JobFnHttpFuture<F>
impl<F, Res> Future for JobFnHttpFuture<F>
where
F: Future<Output = Result<JobResult, JobError>> + 'static,
F: Future<Output = Res> + 'static,
{
type Output = F::Output;

Expand All @@ -81,24 +83,28 @@ where
}
}

impl<T, F, Request> Service<JobRequest<Request>> for JobFn<T>
impl<T, F, Res, Request> Service<JobRequest<Request>> for JobFn<T>
where
Request: Debug + 'static,
T: Fn(Request, JobContext) -> F,
F: Future<Output = Result<JobResult, JobError>> + 'static,
Res: IntoJobResponse,
F: Future<Output = Res> + 'static + Send,
Request: Job,
{
type Response = JobResult;
type Error = JobError;
type Future = JobFnHttpFuture<F>;
// TODO: Improve this to remove the send
type Future = JobFnHttpFuture<BoxFuture<'static, Result<JobResult, JobError>>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, job: JobRequest<Request>) -> Self::Future {
let fut = (self.f)(job.job, job.context);
let fut = (self.f)(job.job, job.context).map(|res| res.into_response());

JobFnHttpFuture { future: fut }
JobFnHttpFuture {
future: Box::pin(fut),
}
}
}
Loading

0 comments on commit 4673a25

Please sign in to comment.