Skip to content

Commit

Permalink
Merge pull request #32 from geofmureithi/develop
Browse files Browse the repository at this point in the history
Minor fixes
  • Loading branch information
geofmureithi authored Dec 23, 2022
2 parents 1c0b94e + bc4c633 commit 195ca5b
Show file tree
Hide file tree
Showing 32 changed files with 180 additions and 155 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apalis"
version = "0.3.4"
version = "0.3.5"
authors = ["Geoffrey Mureithi <[email protected]>"]
description = "Simple, extensible multithreaded background job processing for Rust"
repository = "https://github.com/geofmureithi/apalis"
Expand Down Expand Up @@ -54,26 +54,26 @@ docsrs = ["document-features"]


[dependencies.apalis-redis]
version = "0.3.4"
version = "0.3.5"
optional = true
default-features = false
path = "./packages/apalis-redis"

[dependencies.apalis-sql]
version = "0.3.4"
version = "0.3.5"
features = ["migrate"]
optional = true
default-features = false
path = "./packages/apalis-sql"

[dependencies.apalis-core]
version = "0.3.4"
version = "0.3.5"
optional = true
default-features = false
path = "./packages/apalis-core"

[dependencies.apalis-cron]
version = "0.3.4"
version = "0.3.5"
optional = true
default-features = false
path = "./packages/apalis-cron"
Expand All @@ -93,7 +93,7 @@ all-features = true
criterion = { version = "0.3", features=["async_tokio"] }
serde = "1"
tokio = { version = "1", features =["macros"] }
apalis-redis = { version = "0.3.4", path = "./packages/apalis-redis" }
apalis-redis = { version = "0.3.5", path = "./packages/apalis-redis" }

[[bench]]
name = "redis_benchmark"
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ apalis = { version = "0.3", features = ["redis"] }
use apalis::prelude::*;
use apalis::redis::RedisStorage;
use serde::{Deserialize, Serialize};
use anyhow::Result;

#[derive(Debug, Deserialize, Serialize)]
struct Email {
Expand All @@ -48,11 +49,11 @@ async fn email_service(job: Email, _ctx: JobContext) -> Result<JobResult, JobErr
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let redis = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let storage = RedisStorage::new(redis).await.unwrap();
let storage = RedisStorage::new(redis).await?;
Monitor::new()
.register_with_count(2, move || {
WorkerBuilder::new(storage.clone())
Expand All @@ -68,14 +69,13 @@ Then

```rust
//This can be in another part of the program or another application
async fn produce_route_jobs(storage: &RedisStorage<Email>) {
async fn produce_route_jobs(storage: &RedisStorage<Email>) -> Result<()> {
let mut storage = storage.clone();
storage
.push(Email {
to: "[email protected]".to_string(),
})
.await
.unwrap();
.await?;
}

```
Expand Down
1 change: 1 addition & 0 deletions examples/actix-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2018"
license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
apalis = { path = "../../", features = ["redis"] }
serde = "1"
actix-rt = "2"
Expand Down
23 changes: 14 additions & 9 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use actix_web::{web, App, HttpResponse, HttpServer};
use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::TraceLayer, redis::RedisStorage};
use futures::future;
Expand All @@ -19,19 +20,23 @@ async fn push_email(
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();

let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
let storage = RedisStorage::connect("redis://127.0.0.1/").await?;
let data = web::Data::new(storage.clone());
let http = HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(web::scope("/emails").route("/push", web::post().to(push_email)))
})
.bind("127.0.0.1:8000")?
.run();
let http = async {
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(web::scope("/emails").route("/push", web::post().to(push_email)))
})
.bind("127.0.0.1:8000")?
.run()
.await?;
Ok(())
};
let worker = Monitor::new()
.register_with_count(2, move |_| {
WorkerBuilder::new(storage.clone())
Expand Down
1 change: 1 addition & 0 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2018"
publish = false

[dependencies]
anyhow = "1"
axum = "0.5.6"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
Expand Down
5 changes: 3 additions & 2 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! ```not_rust
//! cd examples && cargo run -p axum-example
//! ```
use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::TraceLayer, redis::RedisStorage};
use axum::{
Expand Down Expand Up @@ -46,14 +47,14 @@ where
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let storage: RedisStorage<Email> = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
let storage: RedisStorage<Email> = RedisStorage::connect("redis://127.0.0.1/").await?;
// build our application with some routes
let app = Router::new()
.route("/", get(show_form).post(add_new_job::<Email>))
Expand Down
1 change: 1 addition & 0 deletions examples/mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2018"
license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
apalis = { path = "../../", features = ["mysql"] }
serde = "1"
tracing-subscriber = "0.3.11"
Expand Down
15 changes: 8 additions & 7 deletions examples/mysql/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::TraceLayer, mysql::MysqlStorage};
use email_service::{send_email, Email};

async fn produce_jobs(storage: &MysqlStorage<Email>) {
async fn produce_jobs(storage: &MysqlStorage<Email>) -> Result<()> {
let mut storage = storage.clone();
for i in 0..100 {
storage
.push(Email {
to: format!("test{}@example.com", i),
text: "Test backround job from Apalis".to_string(),
text: "Test background job from Apalis".to_string(),
subject: "Background email job".to_string(),
})
.await
.unwrap();
.await?;
}
Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db");

let mysql: MysqlStorage<Email> = MysqlStorage::connect(database_url).await.unwrap();
let mysql: MysqlStorage<Email> = MysqlStorage::connect(database_url).await?;
mysql
.setup()
.await
.expect("unable to run migrations for mysql");

produce_jobs(&mysql).await;
produce_jobs(&mysql).await?;
Monitor::new()
.register_with_count(2, move |_| {
WorkerBuilder::new(mysql.clone())
Expand Down
2 changes: 1 addition & 1 deletion examples/postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ edition = "2018"
license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
apalis = { path = "../../", features = ["postgres", "broker"] }
serde = "1"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4" }
tokio = { version ="1", features=["macros"]}
email-service = { path = "../email-service" }


[dependencies.tracing]
default_features = false
version = "0.1"
20 changes: 10 additions & 10 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::TraceLayer, postgres::PostgresStorage};

use email_service::{send_email, Email};

async fn produce_jobs(storage: &PostgresStorage<Email>) {
// The programatic way
async fn produce_jobs(storage: &PostgresStorage<Email>) -> Result<()> {
// The programmatic way
let mut storage = storage.clone();
storage
.push(Email {
to: "[email protected]".to_string(),
text: "Test backround job from Apalis".to_string(),
text: "Test background job from Apalis".to_string(),
subject: "Background email job".to_string(),
})
.await
.expect("Unable to push job");
.await?;
// The sql way
tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test Apalis', 'to', '[email protected]', 'text', 'Lorem Ipsum'));")
tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test Apalis', 'to', '[email protected]', 'text', 'Lorem Ipsum'));");
Ok(())
}

struct TracingListener;
Expand All @@ -26,17 +26,17 @@ impl WorkerListener for TracingListener {
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db");

let pg: PostgresStorage<Email> = PostgresStorage::connect(database_url).await.unwrap();
let pg: PostgresStorage<Email> = PostgresStorage::connect(database_url).await?;
pg.setup()
.await
.expect("unable to run migrations for postgres");

produce_jobs(&pg).await;
produce_jobs(&pg).await?;

Monitor::new()
.register_with_count(4, move |_| {
Expand Down
1 change: 1 addition & 0 deletions examples/prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2018"
publish = false

[dependencies]
anyhow = "1"
axum = "0.5.6"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
Expand Down
13 changes: 7 additions & 6 deletions examples/prometheus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! ```not_rust
//! cd examples && cargo run -p prometheus-example
//! ```
use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::PrometheusLayer, redis::RedisStorage};
use axum::{
Expand All @@ -15,20 +16,20 @@ use axum::{
use futures::future::ready;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Debug, io::Error, net::SocketAddr};
use std::{fmt::Debug, net::SocketAddr};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use email_service::{send_email, Email, FORM_HTML};

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let storage: RedisStorage<Email> = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
let storage: RedisStorage<Email> = RedisStorage::connect("redis://127.0.0.1/").await?;
// build our application with some routes
let recorder_handle = setup_metrics_recorder();
let app = Router::new()
Expand All @@ -42,7 +43,7 @@ async fn main() -> std::io::Result<()> {
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.map_err(|e| Error::new(std::io::ErrorKind::Interrupted, e))
.map_err(|e| e.into())
};
let monitor = async {
Monitor::new()
Expand Down Expand Up @@ -70,9 +71,9 @@ fn setup_metrics_recorder() -> PrometheusHandle {
Matcher::Full("job_requests_duration_seconds".to_string()),
EXPONENTIAL_SECONDS,
)
.unwrap()
.expect("Could not setup Prometheus")
.install_recorder()
.unwrap()
.expect("Could not install Prometheus recorder")
}

async fn show_form() -> Html<&'static str> {
Expand Down
1 change: 1 addition & 0 deletions examples/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2018"
license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
tokio = { version ="1", features=["macros"]}
apalis = { path = "../../", features = ["redis", "extensions"] }
serde = "1"
Expand Down
16 changes: 8 additions & 8 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
use anyhow::Result;
use apalis::{
layers::{Extension, TraceLayer},
prelude::*,
redis::RedisStorage,
};

use email_service::{send_email, Email};

async fn produce_jobs(mut storage: RedisStorage<Email>) {
async fn produce_jobs(mut storage: RedisStorage<Email>) -> Result<()> {
for _i in 0..10 {
storage
.push(Email {
to: "[email protected]".to_string(),
text: "Test backround job from Apalis".to_string(),
text: "Test background job from Apalis".to_string(),
subject: "Background email job".to_string(),
})
.await
.unwrap();
.await?;
}
Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug");

tracing_subscriber::fmt::init();

let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap();
let storage = RedisStorage::connect("redis://127.0.0.1/").await?;
//This can be in another part of the program
produce_jobs(storage.clone()).await;
produce_jobs(storage.clone()).await?;

Monitor::new()
.register(
Expand Down
Loading

0 comments on commit 195ca5b

Please sign in to comment.