From 4c423087281e3ac0f44d6abf744b9b1cd77b44a6 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi Date: Fri, 23 Dec 2022 15:36:15 +0300 Subject: [PATCH 1/3] Refactor alot of panics --- README.md | 10 ++--- examples/actix-web/Cargo.toml | 1 + examples/actix-web/src/main.rs | 23 ++++++---- examples/axum/Cargo.toml | 1 + examples/axum/src/main.rs | 5 ++- examples/mysql/Cargo.toml | 1 + examples/mysql/src/main.rs | 15 ++++--- examples/postgres/Cargo.toml | 2 +- examples/postgres/src/main.rs | 20 ++++----- examples/prometheus/Cargo.toml | 1 + examples/prometheus/src/main.rs | 13 +++--- examples/redis/Cargo.toml | 1 + examples/redis/src/main.rs | 16 +++---- examples/rest-api/Cargo.toml | 1 + examples/rest-api/src/main.rs | 42 +++++++++-------- examples/sentry/Cargo.toml | 1 + examples/sentry/src/main.rs | 21 +++++---- examples/sqlite/Cargo.toml | 1 + examples/sqlite/src/main.rs | 13 +++--- examples/tracing/Cargo.toml | 1 + examples/tracing/src/main.rs | 19 ++++---- packages/apalis-core/Cargo.toml | 1 + packages/apalis-core/src/layers/sentry/mod.rs | 45 +++++++++++-------- .../apalis-core/src/layers/tracing/mod.rs | 14 +++--- packages/apalis-core/src/worker/mod.rs | 21 ++++----- packages/apalis-core/src/worker/monitor.rs | 12 ++--- packages/apalis-sql/src/sqlite.rs | 29 ------------ 27 files changed, 162 insertions(+), 168 deletions(-) diff --git a/README.md b/README.md index e5e4d73e..dda0622c 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ apalis = { version = "0.3", features = ["redis"] } use apalis::prelude::*; use apalis::redis::RedisStorage; use serde::{Deserialize, Serialize}; +use anyhow::Result; #[derive(Debug, Deserialize, Serialize)] struct Email { @@ -48,11 +49,11 @@ async fn email_service(job: Email, _ctx: JobContext) -> Result std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); let redis = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL"); - let storage = RedisStorage::new(redis).await.unwrap(); + let storage = RedisStorage::new(redis).await?; Monitor::new() .register_with_count(2, move || { WorkerBuilder::new(storage.clone()) @@ -68,14 +69,13 @@ Then ```rust //This can be in another part of the program or another application -async fn produce_route_jobs(storage: &RedisStorage) { +async fn produce_route_jobs(storage: &RedisStorage) -> Result<()> { let mut storage = storage.clone(); storage .push(Email { to: "test@example.com".to_string(), }) - .await - .unwrap(); + .await?; } ``` diff --git a/examples/actix-web/Cargo.toml b/examples/actix-web/Cargo.toml index 39ef3a38..51593685 100644 --- a/examples/actix-web/Cargo.toml +++ b/examples/actix-web/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["redis"] } serde = "1" actix-rt = "2" diff --git a/examples/actix-web/src/main.rs b/examples/actix-web/src/main.rs index 270d14ff..8c1caa78 100644 --- a/examples/actix-web/src/main.rs +++ b/examples/actix-web/src/main.rs @@ -1,4 +1,5 @@ use actix_web::{web, App, HttpResponse, HttpServer}; +use anyhow::Result; use apalis::prelude::*; use apalis::{layers::TraceLayer, redis::RedisStorage}; use futures::future; @@ -19,19 +20,23 @@ async fn push_email( } #[actix_rt::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap(); + let storage = RedisStorage::connect("redis://127.0.0.1/").await?; let data = web::Data::new(storage.clone()); - let http = HttpServer::new(move || { - App::new() - .app_data(data.clone()) - .service(web::scope("/emails").route("/push", web::post().to(push_email))) - }) - .bind("127.0.0.1:8000")? - .run(); + let http = async { + HttpServer::new(move || { + App::new() + .app_data(data.clone()) + .service(web::scope("/emails").route("/push", web::post().to(push_email))) + }) + .bind("127.0.0.1:8000")? + .run() + .await?; + Ok(()) + }; let worker = Monitor::new() .register_with_count(2, move |_| { WorkerBuilder::new(storage.clone()) diff --git a/examples/axum/Cargo.toml b/examples/axum/Cargo.toml index cada0296..95a322d1 100644 --- a/examples/axum/Cargo.toml +++ b/examples/axum/Cargo.toml @@ -5,6 +5,7 @@ edition = "2018" publish = false [dependencies] +anyhow = "1" axum = "0.5.6" tokio = { version = "1.0", features = ["full"] } tracing = "0.1" diff --git a/examples/axum/src/main.rs b/examples/axum/src/main.rs index b4de0e5b..1e7c5d9e 100644 --- a/examples/axum/src/main.rs +++ b/examples/axum/src/main.rs @@ -15,6 +15,7 @@ use axum::{ use serde::{de::DeserializeOwned, Serialize}; use std::{fmt::Debug, io::Error, net::SocketAddr}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use anyhow::Result; use email_service::{send_email, Email, FORM_HTML}; @@ -46,14 +47,14 @@ where } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); - let storage: RedisStorage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap(); + let storage: RedisStorage = RedisStorage::connect("redis://127.0.0.1/").await?; // build our application with some routes let app = Router::new() .route("/", get(show_form).post(add_new_job::)) diff --git a/examples/mysql/Cargo.toml b/examples/mysql/Cargo.toml index 15178239..488de62f 100644 --- a/examples/mysql/Cargo.toml +++ b/examples/mysql/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["mysql"] } serde = "1" tracing-subscriber = "0.3.11" diff --git a/examples/mysql/src/main.rs b/examples/mysql/src/main.rs index a61d40d3..2c9e9087 100644 --- a/examples/mysql/src/main.rs +++ b/examples/mysql/src/main.rs @@ -1,34 +1,35 @@ +use anyhow::Result; use apalis::prelude::*; use apalis::{layers::TraceLayer, mysql::MysqlStorage}; use email_service::{send_email, Email}; -async fn produce_jobs(storage: &MysqlStorage) { +async fn produce_jobs(storage: &MysqlStorage) -> Result<()> { let mut storage = storage.clone(); for i in 0..100 { storage .push(Email { to: format!("test{}@example.com", i), - text: "Test backround job from Apalis".to_string(), + text: "Test background job from Apalis".to_string(), subject: "Background email job".to_string(), }) - .await - .unwrap(); + .await?; } + Ok(()) } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug,sqlx::query=error"); tracing_subscriber::fmt::init(); let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db"); - let mysql: MysqlStorage = MysqlStorage::connect(database_url).await.unwrap(); + let mysql: MysqlStorage = MysqlStorage::connect(database_url).await?; mysql .setup() .await .expect("unable to run migrations for mysql"); - produce_jobs(&mysql).await; + produce_jobs(&mysql).await?; Monitor::new() .register_with_count(2, move |_| { WorkerBuilder::new(mysql.clone()) diff --git a/examples/postgres/Cargo.toml b/examples/postgres/Cargo.toml index 1541004a..cf605226 100644 --- a/examples/postgres/Cargo.toml +++ b/examples/postgres/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["postgres", "broker"] } serde = "1" tracing-subscriber = "0.3.11" @@ -13,7 +14,6 @@ chrono = { version = "0.4" } tokio = { version ="1", features=["macros"]} email-service = { path = "../email-service" } - [dependencies.tracing] default_features = false version = "0.1" \ No newline at end of file diff --git a/examples/postgres/src/main.rs b/examples/postgres/src/main.rs index 9e8365a8..2fb39ce7 100644 --- a/examples/postgres/src/main.rs +++ b/examples/postgres/src/main.rs @@ -1,21 +1,21 @@ +use anyhow::Result; use apalis::prelude::*; use apalis::{layers::TraceLayer, postgres::PostgresStorage}; - use email_service::{send_email, Email}; -async fn produce_jobs(storage: &PostgresStorage) { - // The programatic way +async fn produce_jobs(storage: &PostgresStorage) -> Result<()> { + // The programmatic way let mut storage = storage.clone(); storage .push(Email { to: "test@example.com".to_string(), - text: "Test backround job from Apalis".to_string(), + text: "Test background job from Apalis".to_string(), subject: "Background email job".to_string(), }) - .await - .expect("Unable to push job"); + .await?; // The sql way - tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test Apalis', 'to', 'test1@example.com', 'text', 'Lorem Ipsum'));") + tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test Apalis', 'to', 'test1@example.com', 'text', 'Lorem Ipsum'));"); + Ok(()) } struct TracingListener; @@ -26,17 +26,17 @@ impl WorkerListener for TracingListener { } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug,sqlx::query=error"); tracing_subscriber::fmt::init(); let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db"); - let pg: PostgresStorage = PostgresStorage::connect(database_url).await.unwrap(); + let pg: PostgresStorage = PostgresStorage::connect(database_url).await?; pg.setup() .await .expect("unable to run migrations for postgres"); - produce_jobs(&pg).await; + produce_jobs(&pg).await?; Monitor::new() .register_with_count(4, move |_| { diff --git a/examples/prometheus/Cargo.toml b/examples/prometheus/Cargo.toml index 6fe49d05..d3774e4c 100644 --- a/examples/prometheus/Cargo.toml +++ b/examples/prometheus/Cargo.toml @@ -5,6 +5,7 @@ edition = "2018" publish = false [dependencies] +anyhow = "1" axum = "0.5.6" tokio = { version = "1.0", features = ["full"] } tracing = "0.1" diff --git a/examples/prometheus/src/main.rs b/examples/prometheus/src/main.rs index 1a2f02c0..8d7ca6d2 100644 --- a/examples/prometheus/src/main.rs +++ b/examples/prometheus/src/main.rs @@ -3,6 +3,7 @@ //! ```not_rust //! cd examples && cargo run -p prometheus-example //! ``` +use anyhow::Result; use apalis::prelude::*; use apalis::{layers::PrometheusLayer, redis::RedisStorage}; use axum::{ @@ -15,20 +16,20 @@ use axum::{ use futures::future::ready; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}; use serde::{de::DeserializeOwned, Serialize}; -use std::{fmt::Debug, io::Error, net::SocketAddr}; +use std::{fmt::Debug, net::SocketAddr}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use email_service::{send_email, Email, FORM_HTML}; #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); - let storage: RedisStorage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap(); + let storage: RedisStorage = RedisStorage::connect("redis://127.0.0.1/").await?; // build our application with some routes let recorder_handle = setup_metrics_recorder(); let app = Router::new() @@ -42,7 +43,7 @@ async fn main() -> std::io::Result<()> { axum::Server::bind(&addr) .serve(app.into_make_service()) .await - .map_err(|e| Error::new(std::io::ErrorKind::Interrupted, e)) + .map_err(|e| e.into()) }; let monitor = async { Monitor::new() @@ -70,9 +71,9 @@ fn setup_metrics_recorder() -> PrometheusHandle { Matcher::Full("job_requests_duration_seconds".to_string()), EXPONENTIAL_SECONDS, ) - .unwrap() + .expect("Could not setup Prometheus") .install_recorder() - .unwrap() + .expect("Could not install Prometheus recorder") } async fn show_form() -> Html<&'static str> { diff --git a/examples/redis/Cargo.toml b/examples/redis/Cargo.toml index b41fcc7f..594ad2b3 100644 --- a/examples/redis/Cargo.toml +++ b/examples/redis/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" tokio = { version ="1", features=["macros"]} apalis = { path = "../../", features = ["redis", "extensions"] } serde = "1" diff --git a/examples/redis/src/main.rs b/examples/redis/src/main.rs index b79f299e..45f4f406 100644 --- a/examples/redis/src/main.rs +++ b/examples/redis/src/main.rs @@ -1,33 +1,33 @@ +use anyhow::Result; use apalis::{ layers::{Extension, TraceLayer}, prelude::*, redis::RedisStorage, }; - use email_service::{send_email, Email}; -async fn produce_jobs(mut storage: RedisStorage) { +async fn produce_jobs(mut storage: RedisStorage) -> Result<()> { for _i in 0..10 { storage .push(Email { to: "test@example.com".to_string(), - text: "Test backround job from Apalis".to_string(), + text: "Test background job from Apalis".to_string(), subject: "Background email job".to_string(), }) - .await - .unwrap(); + .await?; } + Ok(()) } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug"); tracing_subscriber::fmt::init(); - let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap(); + let storage = RedisStorage::connect("redis://127.0.0.1/").await?; //This can be in another part of the program - produce_jobs(storage.clone()).await; + produce_jobs(storage.clone()).await?; Monitor::new() .register( diff --git a/examples/rest-api/Cargo.toml b/examples/rest-api/Cargo.toml index f01ad414..98ced593 100644 --- a/examples/rest-api/Cargo.toml +++ b/examples/rest-api/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["redis", "sqlite", "sentry", "postgres", "mysql"] } serde = "1" tokio = { version = "1", features =["macros", "rt-multi-thread"] } diff --git a/examples/rest-api/src/main.rs b/examples/rest-api/src/main.rs index acd77b87..d858a526 100644 --- a/examples/rest-api/src/main.rs +++ b/examples/rest-api/src/main.rs @@ -273,24 +273,24 @@ async fn produce_mysql_jobs(mut storage: MysqlStorage) { } #[tokio::main(flavor = "multi_thread", worker_threads = 10)] -async fn main() -> std::io::Result<()> { +async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "debug,sqlx::query=error"); env_logger::init(); let database_url = std::env::var("DATABASE_URL").expect("Must specify DATABASE_URL"); - let pg: PostgresStorage = PostgresStorage::connect(database_url).await.unwrap(); + let pg: PostgresStorage = PostgresStorage::connect(database_url).await?; let _res = pg.setup().await.expect("Unable to migrate"); let database_url = std::env::var("MYSQL_URL").expect("Must specify MYSQL_URL"); - let mysql: MysqlStorage = MysqlStorage::connect(database_url).await.unwrap(); + let mysql: MysqlStorage = MysqlStorage::connect(database_url).await?; mysql .setup() .await .expect("unable to run migrations for mysql"); - let storage = RedisStorage::connect("redis://127.0.0.1/").await.unwrap(); + let storage = RedisStorage::connect("redis://127.0.0.1/").await?; - let sqlite = SqliteStorage::connect("sqlite://data.db").await.unwrap(); + let sqlite = SqliteStorage::connect("sqlite://data.db").await?; let _res = sqlite.setup().await.expect("Unable to migrate"); let worker_storage = storage.clone(); @@ -302,20 +302,24 @@ async fn main() -> std::io::Result<()> { produce_sqlite_jobs(sqlite.clone()).await; produce_postgres_jobs(pg_storage.clone()).await; produce_mysql_jobs(mysql.clone()).await; - let http = HttpServer::new(move || { - App::new().wrap(Cors::permissive()).service( - web::scope("/api").service( - StorageApiBuilder::new() - .add_storage(storage.clone()) - .add_storage(sqlite.clone()) - .add_storage(pg.clone()) - .add_storage(mysql.clone()) - .build(), - ), - ) - }) - .bind("127.0.0.1:8000")? - .run(); + let http = async { + HttpServer::new(move || { + App::new().wrap(Cors::permissive()).service( + web::scope("/api").service( + StorageApiBuilder::new() + .add_storage(storage.clone()) + .add_storage(sqlite.clone()) + .add_storage(pg.clone()) + .add_storage(mysql.clone()) + .build(), + ), + ) + }) + .bind("127.0.0.1:8000")? + .run() + .await?; + Ok(()) + }; let worker = Monitor::new() .register_with_count(1, move |_| { diff --git a/examples/sentry/Cargo.toml b/examples/sentry/Cargo.toml index 59556db6..45321c99 100644 --- a/examples/sentry/Cargo.toml +++ b/examples/sentry/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["redis", "sentry"] } serde = "1" env_logger = "0.7" diff --git a/examples/sentry/src/main.rs b/examples/sentry/src/main.rs index c6cd0ff6..54b59795 100644 --- a/examples/sentry/src/main.rs +++ b/examples/sentry/src/main.rs @@ -5,14 +5,14 @@ use std::time::Duration; use tracing_subscriber::prelude::*; +use anyhow::Result; use apalis::{ layers::{SentryJobLayer, TraceLayer}, prelude::*, redis::RedisStorage, }; -use tokio::time::sleep; - use email_service::Email; +use tokio::time::sleep; #[derive(Debug)] struct InvalidEmailError { @@ -95,19 +95,19 @@ async fn email_service(email: Email, _ctx: JobContext) -> Result) { +async fn produce_jobs(mut storage: RedisStorage) -> Result<()> { storage .push(Email { to: "apalis@example".to_string(), - text: "Test backround job from Apalis".to_string(), + text: "Test background job from Apalis".to_string(), subject: "Welcome Sentry Email".to_string(), }) - .await - .unwrap(); + .await?; + Ok(()) } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { use tracing_subscriber::EnvFilter; std::env::set_var("RUST_LOG", "debug"); let sentry_dsn = @@ -123,9 +123,8 @@ async fn main() -> std::io::Result<()> { }, )); let fmt_layer = tracing_subscriber::fmt::layer().with_target(false); - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("debug")) - .unwrap(); + let filter_layer = + EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?; tracing_subscriber::registry() .with(filter_layer) .with(fmt_layer) @@ -136,7 +135,7 @@ async fn main() -> std::io::Result<()> { .await .expect("Could not connect to RedisStorage"); //This can be in another part of the program - produce_jobs(storage.clone()).await; + produce_jobs(storage.clone()).await?; Monitor::new() .register_with_count(2, move |_| { diff --git a/examples/sqlite/Cargo.toml b/examples/sqlite/Cargo.toml index f037ef96..f4971444 100644 --- a/examples/sqlite/Cargo.toml +++ b/examples/sqlite/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" tokio = { version = "1", features=["macros"] } apalis = { path = "../../", features = ["sqlite", "limit"] } serde = "1" diff --git a/examples/sqlite/src/main.rs b/examples/sqlite/src/main.rs index cf684acf..c01c859a 100644 --- a/examples/sqlite/src/main.rs +++ b/examples/sqlite/src/main.rs @@ -1,9 +1,10 @@ +use anyhow::Result; use apalis::{layers::TraceLayer, prelude::*, sqlite::SqliteStorage}; use chrono::Utc; use email_service::{send_email, Email}; -async fn produce_jobs(storage: &SqliteStorage) { +async fn produce_jobs(storage: &SqliteStorage) -> Result<()> { let mut storage = storage.clone(); for i in 0..2 { storage @@ -15,17 +16,17 @@ async fn produce_jobs(storage: &SqliteStorage) { }, Utc::now() + chrono::Duration::seconds(i), ) - .await - .unwrap(); + .await?; } + Ok(()) } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { std::env::set_var("RUST_LOG", "debug,sqlx::query=error"); tracing_subscriber::fmt::init(); - let sqlite: SqliteStorage = SqliteStorage::connect("sqlite://data.db").await.unwrap(); + let sqlite: SqliteStorage = SqliteStorage::connect("sqlite://data.db").await?; // Do migrations: Mainly for "sqlite::memory:" sqlite .setup() @@ -33,7 +34,7 @@ async fn main() -> std::io::Result<()> { .expect("unable to run migrations for sqlite"); // This can be in another part of the program - produce_jobs(&sqlite).await; + produce_jobs(&sqlite).await?; Monitor::new() .register_with_count(5, move |_| { diff --git a/examples/tracing/Cargo.toml b/examples/tracing/Cargo.toml index 8b0a797f..231c4104 100644 --- a/examples/tracing/Cargo.toml +++ b/examples/tracing/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "MIT OR Apache-2.0" [dependencies] +anyhow = "1" apalis = { path = "../../", features = ["redis"] } serde = "1" tokio = { version ="1", features = ["macros"]} diff --git a/examples/tracing/src/main.rs b/examples/tracing/src/main.rs index ffc0ba91..ad97fa3f 100644 --- a/examples/tracing/src/main.rs +++ b/examples/tracing/src/main.rs @@ -1,7 +1,7 @@ +use anyhow::Result; use std::error::Error; use std::fmt; use std::time::Duration; - use tracing_subscriber::prelude::*; use apalis::{layers::TraceLayer, prelude::*, redis::RedisStorage}; @@ -30,19 +30,19 @@ async fn email_service(_email: Email, _ctx: JobContext) -> Result) { +async fn produce_jobs(mut storage: RedisStorage) -> Result<()> { storage .push(Email { to: "test@example".to_string(), - text: "Test backround job from Apalis".to_string(), + text: "Test background job from Apalis".to_string(), subject: "Welcome Sentry Email".to_string(), }) - .await - .unwrap(); + .await?; + Ok(()) } #[tokio::main] -async fn main() -> std::io::Result<()> { +async fn main() -> Result<()> { use tracing_subscriber::EnvFilter; std::env::set_var("RUST_LOG", "debug"); @@ -50,9 +50,8 @@ async fn main() -> std::io::Result<()> { std::env::var("REDIS_URL").expect("Please set REDIS_URL environmental variable"); let fmt_layer = tracing_subscriber::fmt::layer().with_target(false); - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("debug")) - .unwrap(); + let filter_layer = + EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?; tracing_subscriber::registry() .with(filter_layer) .with(fmt_layer) @@ -62,7 +61,7 @@ async fn main() -> std::io::Result<()> { .await .expect("Could not connect to RedisStorage"); //This can be in another part of the program - produce_jobs(storage.clone()).await; + produce_jobs(storage.clone()).await?; Monitor::new() .register_with_count(2, move |_| { diff --git a/packages/apalis-core/Cargo.toml b/packages/apalis-core/Cargo.toml index 87f5d96b..d75ce45d 100644 --- a/packages/apalis-core/Cargo.toml +++ b/packages/apalis-core/Cargo.toml @@ -11,6 +11,7 @@ readme = "../../README.md" [dependencies] +anyhow = "1" serde = { version = "1.0", features = ["derive"] } serde_json = "1" futures = { version = "0.3", default-features = false, features=["async-await"] } diff --git a/packages/apalis-core/src/layers/sentry/mod.rs b/packages/apalis-core/src/layers/sentry/mod.rs index e01e4399..5890d5c8 100644 --- a/packages/apalis-core/src/layers/sentry/mod.rs +++ b/packages/apalis-core/src/layers/sentry/mod.rs @@ -81,25 +81,32 @@ where let slf = self.project(); if let Some((job_details, trx_ctx)) = slf.on_first_poll.take() { sentry_core::configure_scope(|scope| { - let event_id = uuid::Uuid::parse_str(&job_details.job_id).unwrap(); - scope.add_event_processor(move |mut event| { - event.event_id = event_id; - Some(event) - }); - scope.set_tag("job_type", job_details.job_type.to_string()); - let mut details = std::collections::BTreeMap::new(); - details.insert(String::from("job_id"), job_details.job_id.into()); - details.insert( - String::from("current_attempt"), - job_details.current_attempt.into(), - ); - scope.set_context("job", sentry_core::protocol::Context::Other(details)); - - let transaction: sentry_core::TransactionOrSpan = - sentry_core::start_transaction(trx_ctx).into(); - let parent_span = scope.get_span(); - scope.set_span(Some(transaction.clone())); - *slf.transaction = Some((transaction, parent_span)); + let uuid = uuid::Uuid::parse_str(&job_details.job_id); + match uuid { + Ok(event_id) => { + scope.add_event_processor(move |mut event| { + event.event_id = event_id; + Some(event) + }); + scope.set_tag("job_type", job_details.job_type.to_string()); + let mut details = std::collections::BTreeMap::new(); + details.insert(String::from("job_id"), job_details.job_id.into()); + details.insert( + String::from("current_attempt"), + job_details.current_attempt.into(), + ); + scope.set_context("job", sentry_core::protocol::Context::Other(details)); + + let transaction: sentry_core::TransactionOrSpan = + sentry_core::start_transaction(trx_ctx).into(); + let parent_span = scope.get_span(); + scope.set_span(Some(transaction.clone())); + *slf.transaction = Some((transaction, parent_span)); + } + Err(e) => { + log::error!("Unable to read job_id: {}", e) + } + } }); } match slf.future.poll(cx) { diff --git a/packages/apalis-core/src/layers/tracing/mod.rs b/packages/apalis-core/src/layers/tracing/mod.rs index 09eff68a..328417d7 100644 --- a/packages/apalis-core/src/layers/tracing/mod.rs +++ b/packages/apalis-core/src/layers/tracing/mod.rs @@ -377,19 +377,17 @@ where let _guard = this.span.enter(); let result = futures::ready!(this.inner.poll(cx)); let done_in = this.start.elapsed(); - let mut on_failure = this.on_failure.take().unwrap(); - match result { Ok(res) => { - this.on_response - .take() - .unwrap() - .on_response(&res, done_in, this.span); - + if let Some(responder) = this.on_response.take() { + responder.on_response(&res, done_in, this.span); + } Poll::Ready(Ok(res)) } Err(err) => { - on_failure.on_failure(&err, done_in, this.span); + if let Some(mut fail) = this.on_failure.take() { + fail.on_failure(&err, done_in, this.span); + } Poll::Ready(Err(err)) } } diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index eda69113..fd61ee26 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -335,7 +335,7 @@ where } } impl Message for JobRequestWrapper { - type Result = (); + type Result = anyhow::Result<()>; } #[async_trait::async_trait] @@ -344,19 +344,16 @@ where W: Worker + 'static, T: Job + Serialize + Debug + DeserializeOwned + Send + 'static, { - type Result = (); + type Result = anyhow::Result<()>; async fn handle(&mut self, job: JobRequestWrapper) -> Self::Result { match job.0 { Ok(Some(job)) => { - self.handle_job(job).await.unwrap(); + self.handle_job(job).await?; + Ok(()) } - Ok(None) => { - // on drain - } - Err(_e) => { - todo!() - } - }; + Ok(None) => Ok(()), + Err(e) => Err(e.into()), + } } } @@ -661,9 +658,9 @@ mod tests { S: Service + Send + 'static, F: Future> + Send, { - type Result = Result<(), HandlerResult>; + type Result = anyhow::Result<()>; async fn handle(&mut self, msg: Job) -> Self::Result { - let handle = self.service.ready().await.unwrap(); + let handle = self.service.ready().await?; let res = handle.call(msg).await; res } diff --git a/packages/apalis-core/src/worker/monitor.rs b/packages/apalis-core/src/worker/monitor.rs index d88281eb..91be73b2 100644 --- a/packages/apalis-core/src/worker/monitor.rs +++ b/packages/apalis-core/src/worker/monitor.rs @@ -101,10 +101,10 @@ impl Monitor>> { /// Start monitor without listening for Ctrl + C /// TODO: add the signals feature - pub async fn run_without_signals(self) -> std::io::Result<()> { + pub async fn run_without_signals(self) -> anyhow::Result<()> { let mut workers = Vec::new(); for worker in self.workers { - workers.push(worker.await.unwrap()); + workers.push(worker.await?); } let monitor = Monitor { workers, @@ -115,15 +115,15 @@ impl Monitor>> { } /// Start monitor listening for Ctrl + C - pub async fn run(self) -> std::io::Result<()> { - let res = self.run_without_signals().await; + pub async fn run(self) -> anyhow::Result<()> { + self.run_without_signals().await?; log::debug!("Listening shut down command (ctrl + c)"); tokio::signal::ctrl_c() .await .expect("failed to listen for event"); log::debug!("Workers shutdown complete"); - res + Ok(()) } } @@ -151,7 +151,7 @@ pub enum WorkerEvent { /// Emitted when a worker encounters a problem outside a job's processing scope /// Error(WorkerError), Error(String), - /// Emitted when a job is proccessd + /// Emitted when a job is processed Job { /// The job id id: String, diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 57516db2..082f5f2d 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -66,35 +66,6 @@ impl SqliteStorage { sqlx::migrate!("migrations/sqlite").run(&pool).await?; Ok(()) } - - async fn keep_alive_at( - &mut self, - worker_id: String, - last_seen: DateTime, - ) -> StorageResult<()> { - let pool = self.pool.clone(); - - let mut tx = pool - .acquire() - .await - .map_err(|e| StorageError::Database(Box::from(e)))?; - let worker_type = T::NAME; - let storage_name = std::any::type_name::(); - let query = "INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (id) DO - UPDATE SET last_seen = EXCLUDED.last_seen"; - sqlx::query(query) - .bind(worker_id.to_owned()) - .bind(worker_type) - .bind(storage_name) - .bind(std::any::type_name::()) - .bind(last_seen.timestamp()) - .execute(&mut tx) - .await - .map_err(|e| StorageError::Database(Box::from(e)))?; - Ok(()) - } } async fn fetch_next( From 701f049edb42d1331de2f6a1318618ae5ffb5a37 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi Date: Fri, 23 Dec 2022 15:50:44 +0300 Subject: [PATCH 2/3] Some minor fixes --- examples/axum/src/main.rs | 2 +- packages/apalis-sql/src/sqlite.rs | 30 ++++++++++++++++++++++++++++++ src/lib.rs | 3 ++- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/examples/axum/src/main.rs b/examples/axum/src/main.rs index 1e7c5d9e..5c9e1526 100644 --- a/examples/axum/src/main.rs +++ b/examples/axum/src/main.rs @@ -3,6 +3,7 @@ //! ```not_rust //! cd examples && cargo run -p axum-example //! ``` +use anyhow::Result; use apalis::prelude::*; use apalis::{layers::TraceLayer, redis::RedisStorage}; use axum::{ @@ -15,7 +16,6 @@ use axum::{ use serde::{de::DeserializeOwned, Serialize}; use std::{fmt::Debug, io::Error, net::SocketAddr}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use anyhow::Result; use email_service::{send_email, Email, FORM_HTML}; diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 082f5f2d..f916e85b 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -66,6 +66,36 @@ impl SqliteStorage { sqlx::migrate!("migrations/sqlite").run(&pool).await?; Ok(()) } + + /// Keeps a storage notified that the worker is still alive manually + pub async fn keep_alive_at( + &mut self, + worker_id: String, + last_seen: DateTime, + ) -> StorageResult<()> { + let pool = self.pool.clone(); + + let mut tx = pool + .acquire() + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + let worker_type = T::NAME; + let storage_name = std::any::type_name::(); + let query = "INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO + UPDATE SET last_seen = EXCLUDED.last_seen"; + sqlx::query(query) + .bind(worker_id.to_owned()) + .bind(worker_type) + .bind(storage_name) + .bind(std::any::type_name::()) + .bind(last_seen.timestamp()) + .execute(&mut tx) + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + Ok(()) + } } async fn fetch_next( diff --git a/src/lib.rs b/src/lib.rs index 58a5805a..613d8350 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ //! } //! //! #[tokio::main] -//! async fn main() -> std::io::Result<()> { +//! async fn main() { //! let redis = std::env::var("REDIS_URL").expect("Missing REDIS_URL env variable"); //! let storage = RedisStorage::connect(redis).await.expect("Storage failed"); //! Monitor::new() @@ -46,6 +46,7 @@ //! }) //! .run() //! .await +//! .unwrap(); //! } //!``` //! From bc4c633eb00cc8555e17b8080a39782225d7b0a0 Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi Date: Fri, 23 Dec 2022 16:27:51 +0300 Subject: [PATCH 3/3] Bump up version --- Cargo.toml | 12 ++++++------ packages/apalis-core/Cargo.toml | 2 +- packages/apalis-cron/Cargo.toml | 6 +++--- packages/apalis-redis/Cargo.toml | 4 ++-- packages/apalis-sql/Cargo.toml | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6dbc15dd..dc9154b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis" -version = "0.3.4" +version = "0.3.5" authors = ["Geoffrey Mureithi "] description = "Simple, extensible multithreaded background job processing for Rust" repository = "https://github.com/geofmureithi/apalis" @@ -54,26 +54,26 @@ docsrs = ["document-features"] [dependencies.apalis-redis] -version = "0.3.4" +version = "0.3.5" optional = true default-features = false path = "./packages/apalis-redis" [dependencies.apalis-sql] -version = "0.3.4" +version = "0.3.5" features = ["migrate"] optional = true default-features = false path = "./packages/apalis-sql" [dependencies.apalis-core] -version = "0.3.4" +version = "0.3.5" optional = true default-features = false path = "./packages/apalis-core" [dependencies.apalis-cron] -version = "0.3.4" +version = "0.3.5" optional = true default-features = false path = "./packages/apalis-cron" @@ -93,7 +93,7 @@ all-features = true criterion = { version = "0.3", features=["async_tokio"] } serde = "1" tokio = { version = "1", features =["macros"] } -apalis-redis = { version = "0.3.4", path = "./packages/apalis-redis" } +apalis-redis = { version = "0.3.5", path = "./packages/apalis-redis" } [[bench]] name = "redis_benchmark" diff --git a/packages/apalis-core/Cargo.toml b/packages/apalis-core/Cargo.toml index d75ce45d..1162717e 100644 --- a/packages/apalis-core/Cargo.toml +++ b/packages/apalis-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-core" -version = "0.3.4" +version = "0.3.5" authors = ["Geoffrey Mureithi "] edition = "2018" license = "MIT" diff --git a/packages/apalis-cron/Cargo.toml b/packages/apalis-cron/Cargo.toml index 5dbe4c0a..b1334730 100644 --- a/packages/apalis-cron/Cargo.toml +++ b/packages/apalis-cron/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-cron" -version = "0.3.4" +version = "0.3.5" edition = "2021" authors = ["Geoffrey Mureithi "] license = "MIT" @@ -9,7 +9,7 @@ description = "A simple yet extensible library for cron-like job scheduling for # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -apalis-core = { path = "../../packages/apalis-core", version = "0.3.4" } +apalis-core = { path = "../../packages/apalis-core", version = "0.3.5" } cron = "0.11.0" futures = "0.3" tower = { version = "0.4" } @@ -19,7 +19,7 @@ async-stream = "0.3.3" [dev-dependencies] tokio = { version = "1", features = ["macros"] } -apalis-core = { path = "../../packages/apalis-core", version = "0.3.4", features=["extensions", "retry"] } +apalis-core = { path = "../../packages/apalis-core", version = "0.3.5", features=["extensions", "retry"] } serde = { version = "1.0", features = ["derive"] } [package.metadata.docs.rs] diff --git a/packages/apalis-redis/Cargo.toml b/packages/apalis-redis/Cargo.toml index 50106408..a90c373b 100644 --- a/packages/apalis-redis/Cargo.toml +++ b/packages/apalis-redis/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-redis" -version = "0.3.4" +version = "0.3.5" authors = ["Geoffrey Mureithi "] edition = "2018" readme = "../../README.md" @@ -11,7 +11,7 @@ description = "Redis Storage for Apalis: simple and reliable background processi # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -apalis-core = { path = "../../packages/apalis-core", version = "0.3.4", default-features = false } +apalis-core = { path = "../../packages/apalis-core", version = "0.3.5", default-features = false } redis = { version = "0.21" , features = ["tokio-comp"] } serde = "1" log = "0.4" diff --git a/packages/apalis-sql/Cargo.toml b/packages/apalis-sql/Cargo.toml index 0f51cd15..bbe48d88 100644 --- a/packages/apalis-sql/Cargo.toml +++ b/packages/apalis-sql/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apalis-sql" -version = "0.3.4" +version = "0.3.5" authors = ["Geoffrey Mureithi "] edition = "2018" readme = "../../README.md" @@ -23,7 +23,7 @@ features = [ "runtime-tokio-rustls", "uuid" ] [dependencies] serde = { version = "1", features = ["derive"] } serde_json = "1" -apalis-core = { path = "../../packages/apalis-core", version = "0.3.4", features= ["storage"], default-features = false} +apalis-core = { path = "../../packages/apalis-core", version = "0.3.5", features= ["storage"], default-features = false} chrono = { version = "0.4", features = ["serde"] } futures = "0.3" async-stream = "0.3"