diff --git a/.github/workflows/cd.yaml b/.github/workflows/cd.yaml new file mode 100644 index 0000000..910cb11 --- /dev/null +++ b/.github/workflows/cd.yaml @@ -0,0 +1,80 @@ +on: + push: + tags: ['v*'] + +name: Continuous delivery + +jobs: + test: + uses: ./.github/workflows/ci.yaml + publish: + name: Publish to crates.io + runs-on: ubuntu-latest + needs: [test] + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - name: install cargo-get + uses: actions-rs/cargo@v1 + with: + command: install + args: cargo-get + - name: set variables + run: | + # vX.Y.Z is release version + # vX.Y.Z-foo is pre-release version + VERSION=${GITHUB_REF#refs/tags/v} + VERSION_NUMBER=${VERSION%-*} + PUBLISH_OPTS="--dry-run" + if [[ $VERSION == $VERSION_NUMBER ]]; then + PUBLISH_OPTS="" + fi + echo VERSION=${VERSION} >> $GITHUB_ENV + echo PUBLISH_OPTS=${PUBLISH_OPTS} >> $GITHUB_ENV + echo VERSION_NUMBER=${VERSION_NUMBER} >> $GITHUB_ENV + - name: check version integrity + run: | + ERROR='' + echo VERSION: ${VERSION}, VERSION_NUMBER: ${VERSION_NUMBER} + for dir in "." packages/apalis-{core,cron,redis,sql}; do + PACKAGE=$(cargo get --root $dir -n) + ACTUAL=$(cargo get --root $dir version) + if [[ $VERSION_NUMBER != $ACTUAL ]]; then + echo ${PACKAGE}: expected version ${VERSION_NUMBER} but found ${ACTUAL} + ERROR=1 + fi + done + if [[ $ERROR ]]; then + exit 1 + fi + - name: publish apalis-core + uses: actions-rs/cargo@v1 + with: + command: publish + args: ${{ env.PUBLISH_OPTS }} -p apalis-core + - name: publish apalis-cron + uses: actions-rs/cargo@v1 + with: + command: publish + args: ${{ env.PUBLISH_OPTS }} -p apalis-cron + - name: publish apalis-redis + uses: actions-rs/cargo@v1 + with: + command: publish + args: ${{ env.PUBLISH_OPTS }} -p apalis-redis + - name: publish apalis-sql + uses: actions-rs/cargo@v1 + with: + command: publish + args: ${{ env.PUBLISH_OPTS }} -p apalis-sql + - name: publish apalis + uses: actions-rs/cargo@v1 + with: + command: publish + args: ${{ env.PUBLISH_OPTS }} -p apalis diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c59b30d..8338f49 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,4 +1,8 @@ -on: [push, pull_request] +on: + push: + branches: [master, develop] + pull_request: + workflow_call: name: Continuous integration @@ -30,7 +34,27 @@ jobs: - uses: actions-rs/cargo@v1 with: command: test - + + test-redis: + name: Test Suite Redis + runs-on: ubuntu-latest + services: + mysql: + image: redis + ports: + - 6379:6379 + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - run: cargo test -- --test-threads=1 + working-directory: packages/apalis-redis + env: + REDIS_URL: redis://127.0.0.1/ + test-mysql: name: Test Suite with MySQL runs-on: ubuntu-latest @@ -51,11 +75,45 @@ jobs: profile: minimal toolchain: stable override: true - - run: cargo test --no-default-features --features mysql,migrate + - run: cargo test --no-default-features --features mysql,migrate -- --test-threads=1 working-directory: packages/apalis-sql env: DATABASE_URL: mysql://test:test@localhost/test + test-postgres: + name: Test Suite with Postgres + runs-on: ubuntu-latest + services: + postgres: + image: postgres:14 + env: + POSTGRES_PASSWORD: postgres + ports: + - 5432:5432 + env: + DATABASE_URL: postgres://postgres:postgres@localhost/postgres + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - run: cargo test --no-default-features --features postgres,migrate -- --test-threads=1 + working-directory: packages/apalis-sql + + test-sqlite: + name: Test Suite with Sqlite + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - run: cargo test --no-default-features --features sqlite,migrate -- --test-threads=1 + working-directory: packages/apalis-sql fmt: name: Rustfmt diff --git a/.gitignore b/.gitignore index ee9cadc..6f82536 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ Cargo.lock examples/**/*.env examples/sqlite/data.* +.DS_Store \ No newline at end of file diff --git a/packages/apalis-redis/Cargo.toml b/packages/apalis-redis/Cargo.toml index ccab1b0..5010640 100644 --- a/packages/apalis-redis/Cargo.toml +++ b/packages/apalis-redis/Cargo.toml @@ -23,6 +23,11 @@ futures = "0.3" tokio = "1" async-trait = "0.1.53" + +[dev-dependencies] +tokio = { version = "1", features = ["macros"] } +email-service = { path = "../../examples/email-service" } + [features] default = ["storage"] storage = ["apalis-core/storage"] diff --git a/packages/apalis-redis/src/storage.rs b/packages/apalis-redis/src/storage.rs index bb82c58..3fff19a 100644 --- a/packages/apalis-redis/src/storage.rs +++ b/packages/apalis-redis/src/storage.rs @@ -703,3 +703,209 @@ where .collect()) } } + +#[cfg(test)] +mod tests { + use std::ops::Sub; + + use super::*; + use email_service::Email; + use futures::StreamExt; + use uuid::Uuid; + + /// migrate DB and return a storage instance. + async fn setup() -> RedisStorage { + let redis_url = &std::env::var("REDIS_URL").expect("No REDIS_URL is specified"); + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let storage = RedisStorage::connect(redis_url.as_str()) + .await + .expect("failed to connect DB server"); + storage + } + + /// rollback DB changes made by tests. + /// + /// You should execute this function in the end of a test + async fn cleanup(mut storage: RedisStorage, _worker_id: String) { + let _resp: String = redis::cmd("FLUSHDB") + .query_async(&mut storage.conn) + .await + .expect("failed to Flushdb"); + } + + struct DummyService {} + + fn example_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@postgres".to_string(), + text: "Some Text".to_string(), + } + } + + async fn consume_one(storage: &mut S, worker_id: String) -> JobRequest + where + S: Storage, + { + let mut stream = storage.consume(worker_id, std::time::Duration::from_secs(10)); + stream + .next() + .await + .expect("stream is empty") + .expect("failed to poll job") + .expect("no job is pending") + } + + async fn register_worker_at( + storage: &mut RedisStorage, + _last_seen: DateTime, + ) -> String { + let worker_id = Uuid::new_v4().to_string(); + + storage + .keep_alive::(worker_id.clone()) + .await + .expect("failed to register worker"); + worker_id + } + + async fn register_worker(storage: &mut RedisStorage) -> String { + register_worker_at(storage, Utc::now()).await + } + + async fn push_email(storage: &mut S, email: Email) + where + S: Storage, + { + storage.push(email).await.expect("failed to push a job"); + } + + async fn get_job(storage: &mut S, job_id: String) -> JobRequest + where + S: Storage, + { + storage + .fetch_by_id(job_id) + .await + .expect("failed to fetch job by id") + .expect("no job found by id") + } + + #[tokio::test] + async fn test_consume_last_pushed_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + + // No worker yet + // Redis doesn't update jobs like in sql + assert_eq!(*job.context().status(), JobState::Pending); + assert_eq!(*job.context().lock_by(), None); + assert!(job.context().lock_at().is_none()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_acknowledge_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .ack(worker_id.clone(), job_id.clone()) + .await + .expect("failed to acknowledge the job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Pending); // Redis storage uses hset etc to manage status + assert!(job.context().done_at().is_none()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .kill(worker_id.clone(), job_id.clone()) + .await + .expect("failed to kill job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Pending); + assert!(job.context().done_at().is_none()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(6))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Pending); + assert!(job.context().done_at().is_none()); + assert!(job.context().lock_by().is_none()); + assert!(job.context().lock_at().is_none()); + assert_eq!(*job.context().last_error(), None); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(4))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Pending); + assert_eq!(*job.context().lock_by(), None); + + cleanup(storage, worker_id).await; + } +} diff --git a/packages/apalis-sql/src/mysql.rs b/packages/apalis-sql/src/mysql.rs index a6d384a..c981b22 100644 --- a/packages/apalis-sql/src/mysql.rs +++ b/packages/apalis-sql/src/mysql.rs @@ -92,7 +92,7 @@ impl MysqlStorage { Some(job) => { let job_id = job.id(); - let update_query = "UPDATE jobs SET status = 'Running', lock_by = ? WHERE id = ? AND status = 'Pending' AND lock_by IS NULL;"; + let update_query = "UPDATE jobs SET status = 'Running', lock_by = ?, lock_at = NOW() WHERE id = ? AND status = 'Pending' AND lock_by IS NULL;"; sqlx::query(update_query) .bind(worker_id.clone()) .bind(job_id.clone()) @@ -258,7 +258,8 @@ where .acquire() .await .map_err(|e| StorageError::Connection(Box::from(e)))?; - let query = "UPDATE jobs SET status = 'Kill', done_at = NOW() WHERE id = ? AND lock_by = ?"; + let query = + "UPDATE jobs SET status = 'Killed', done_at = NOW() WHERE id = ? AND lock_by = ?"; sqlx::query(query) .bind(job_id.to_owned()) .bind(worker_id.to_owned()) @@ -473,28 +474,42 @@ impl JobStreamExt for MysqlS #[cfg(test)] mod tests { - use std::ops::DerefMut; - - use once_cell::sync::OnceCell; - use tokio::sync::Mutex; - use tokio::sync::MutexGuard; - use super::*; use email_service::Email; use futures::StreamExt; - async fn setup<'a>() -> MutexGuard<'a, MysqlStorage> { - static INSTANCE: OnceCell>> = OnceCell::new(); - let mutex = INSTANCE.get_or_init(|| { - let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); - let pool: MySqlPool = MySqlPool::connect_lazy(db_url).expect("DATABASE_URL is wrong"); - Mutex::new(MysqlStorage::new(pool)) - }); - let storage = mutex.lock().await; - storage.setup().await.expect("failed to run migrations"); + /// migrate DB and return a storage instance. + async fn setup() -> MysqlStorage { + let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let storage = MysqlStorage::connect(db_url) + .await + .expect("DATABASE_URL is wrong"); + storage.setup().await.expect("failed to migrate DB"); storage } + /// rollback DB changes made by tests. + /// Delete the following rows: + /// - jobs whose state is `Pending` or locked by `worker_id` + /// - worker identified by `worker_id` + /// + /// You should execute this function in the end of a test + async fn cleanup(storage: MysqlStorage, worker_id: String) { + sqlx::query("DELETE FROM jobs WHERE lock_by = ? OR status = 'Pending'") + .bind(worker_id.clone()) + .execute(&storage.pool) + .await + .expect("failed to delete jobs"); + sqlx::query("DELETE FROM workers WHERE id = ?") + .bind(worker_id.clone()) + .execute(&storage.pool) + .await + .expect("failed to delete worker"); + } + async fn consume_one(storage: &mut S, worker_id: &String) -> JobRequest where S: Storage, @@ -516,9 +531,106 @@ mod tests { } } + struct DummyService {} + + async fn register_worker_at( + storage: &mut MysqlStorage, + last_seen: DateTime, + ) -> String { + let worker_id = Uuid::new_v4().to_string(); + + storage + .keep_alive_at::(worker_id.clone(), last_seen) + .await + .expect("failed to register worker"); + worker_id + } + + async fn register_worker(storage: &mut MysqlStorage) -> String { + register_worker_at(storage, Utc::now()).await + } + + async fn push_email(storage: &mut S, email: Email) + where + S: Storage, + { + storage.push(email).await.expect("failed to push a job"); + } + + async fn get_job(storage: &mut S, job_id: String) -> JobRequest + where + S: Storage, + { + storage + .fetch_by_id(job_id) + .await + .expect("failed to fetch job by id") + .expect("no job found by id") + } + + #[tokio::test] + async fn test_consume_last_pushed_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, &worker_id).await; + + assert_eq!(*job.context().status(), JobState::Running); + assert_eq!(*job.context().lock_by(), Some(worker_id.clone())); + assert!(job.context().lock_at().is_some()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_acknowledge_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, &worker_id).await; + let job_id = job.context().id(); + + storage + .ack(worker_id.clone(), job_id.clone()) + .await + .expect("failed to acknowledge the job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Done); + assert!(job.context().done_at().is_some()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, &worker_id).await; + let job_id = job.context().id(); + + storage + .kill(worker_id.clone(), job_id.clone()) + .await + .expect("failed to kill job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Killed); + assert!(job.context().done_at().is_some()); + + cleanup(storage, worker_id).await; + } + #[tokio::test] async fn test_storage_heartbeat_reenqueuorphaned_pulse_last_seen_6min() { - // acquire a lock for storage let mut storage = setup().await; // push an Email job @@ -536,7 +648,7 @@ mod tests { .unwrap(); // fetch job - let job = consume_one(storage.deref_mut(), &worker_id).await; + let job = consume_one(&mut storage, &worker_id).await; assert_eq!(*job.context().status(), JobState::Running); // heartbeat with ReenqueueOrpharned pulse @@ -553,11 +665,12 @@ mod tests { assert!(context.lock_at().is_none()); assert!(context.done_at().is_none()); assert_eq!(*context.last_error(), Some("Job was abandoned".to_string())); + + cleanup(storage, worker_id).await; } #[tokio::test] async fn test_storage_heartbeat_reenqueuorphaned_pulse_last_seen_4min() { - // acquire a lock for storage let mut storage = setup().await; // push an Email job @@ -575,7 +688,7 @@ mod tests { .unwrap(); // fetch job - let job = consume_one(storage.deref_mut(), &worker_id).await; + let job = consume_one(&mut storage, &worker_id).await; assert_eq!(*job.context().status(), JobState::Running); // heartbeat with ReenqueueOrpharned pulse @@ -589,5 +702,7 @@ mod tests { let context = job.context(); assert_eq!(*context.status(), JobState::Running); assert_eq!(*context.lock_by(), Some(worker_id.clone())); + + cleanup(storage, worker_id).await; } } diff --git a/packages/apalis-sql/src/postgres.rs b/packages/apalis-sql/src/postgres.rs index ec6eef0..e4cf57e 100644 --- a/packages/apalis-sql/src/postgres.rs +++ b/packages/apalis-sql/src/postgres.rs @@ -100,6 +100,35 @@ impl PostgresStorage { } } } + + async fn keep_alive_at( + &mut self, + worker_id: String, + last_seen: DateTime, + ) -> StorageResult<()> { + let pool = self.pool.clone(); + + let mut tx = pool + .acquire() + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + let worker_type = T::NAME; + let storage_name = std::any::type_name::(); + let query = "INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO + UPDATE SET last_seen = EXCLUDED.last_seen"; + sqlx::query(query) + .bind(worker_id.to_owned()) + .bind(worker_type) + .bind(storage_name) + .bind(std::any::type_name::()) + .bind(last_seen) + .execute(&mut tx) + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + Ok(()) + } } #[async_trait::async_trait] @@ -219,7 +248,7 @@ where .await .map_err(|e| StorageError::Database(Box::from(e)))?; let query = - "UPDATE apalis.jobs SET status = 'Kill', done_at = now() WHERE id = $1 AND lock_by = $2"; + "UPDATE apalis.jobs SET status = 'Killed', done_at = now() WHERE id = $1 AND lock_by = $2"; sqlx::query(query) .bind(job_id.to_owned()) .bind(worker_id.to_owned()) @@ -344,28 +373,7 @@ where } async fn keep_alive(&mut self, worker_id: String) -> StorageResult<()> { - let pool = self.pool.clone(); - - let mut tx = pool - .acquire() - .await - .map_err(|e| StorageError::Database(Box::from(e)))?; - let worker_type = T::NAME; - let storage_name = std::any::type_name::(); - let query = - "INSERT INTO apalis.workers (id, worker_type, storage_name, layers, last_seen) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) - DO - UPDATE SET last_seen = NOW()"; - sqlx::query(query) - .bind(worker_id.to_owned()) - .bind(worker_type) - .bind(storage_name) - .bind(std::any::type_name::()) - .bind(Utc::now()) - .execute(&mut tx) - .await - .map_err(|e| StorageError::Database(Box::from(e)))?; - Ok(()) + self.keep_alive_at::(worker_id, Utc::now()).await } } @@ -452,3 +460,224 @@ impl JobStreamExt for Postgr .collect()) } } + +#[cfg(test)] +mod tests { + use std::ops::Sub; + + use super::*; + use email_service::Email; + use futures::StreamExt; + + /// migrate DB and return a storage instance. + async fn setup() -> PostgresStorage { + let db_url = &std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified"); + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let storage = PostgresStorage::connect(db_url) + .await + .expect("failed to connect DB server"); + storage.setup().await.expect("failed to migrate DB"); + storage + } + + /// rollback DB changes made by tests. + /// Delete the following rows: + /// - jobs whose state is `Pending` or locked by `worker_id` + /// - worker identified by `worker_id` + /// + /// You should execute this function in the end of a test + async fn cleanup(storage: PostgresStorage, worker_id: String) { + let mut tx = storage + .pool + .acquire() + .await + .expect("failed to get connection"); + sqlx::query("Delete from apalis.jobs where lock_by = $1 or status = 'Pending'") + .bind(worker_id.clone()) + .execute(&mut tx) + .await + .expect("failed to delete jobs"); + sqlx::query("Delete from apalis.workers where id = $1") + .bind(worker_id.clone()) + .execute(&mut tx) + .await + .expect("failed to delete worker"); + } + + struct DummyService {} + + fn example_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@postgres".to_string(), + text: "Some Text".to_string(), + } + } + + async fn consume_one(storage: &mut S, worker_id: String) -> JobRequest + where + S: Storage, + { + let mut stream = storage.consume(worker_id, std::time::Duration::from_secs(10)); + stream + .next() + .await + .expect("stream is empty") + .expect("failed to poll job") + .expect("no job is pending") + } + + async fn register_worker_at( + storage: &mut PostgresStorage, + last_seen: DateTime, + ) -> String { + let worker_id = Uuid::new_v4().to_string(); + + storage + .keep_alive_at::(worker_id.clone(), last_seen) + .await + .expect("failed to register worker"); + worker_id + } + + async fn register_worker(storage: &mut PostgresStorage) -> String { + register_worker_at(storage, Utc::now()).await + } + + async fn push_email(storage: &mut S, email: Email) + where + S: Storage, + { + storage.push(email).await.expect("failed to push a job"); + } + + async fn get_job(storage: &mut S, job_id: String) -> JobRequest + where + S: Storage, + { + storage + .fetch_by_id(job_id) + .await + .expect("failed to fetch job by id") + .expect("no job found by id") + } + + #[tokio::test] + async fn test_consume_last_pushed_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Running); + assert_eq!(*job.context().lock_by(), Some(worker_id.clone())); + assert!(job.context().lock_at().is_some()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_acknowledge_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .ack(worker_id.clone(), job_id.clone()) + .await + .expect("failed to acknowledge the job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Done); + assert!(job.context().done_at().is_some()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .kill(worker_id.clone(), job_id.clone()) + .await + .expect("failed to kill job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Killed); + assert!(job.context().done_at().is_some()); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(6))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Pending); + assert!(job.context().done_at().is_none()); + assert!(job.context().lock_by().is_none()); + assert!(job.context().lock_at().is_none()); + assert_eq!( + *job.context().last_error(), + Some("Job was abandoned".to_string()) + ); + + cleanup(storage, worker_id).await; + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(4))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Running); + assert_eq!(*job.context().lock_by(), Some(worker_id.clone())); + + cleanup(storage, worker_id).await; + } +} diff --git a/packages/apalis-sql/src/sqlite.rs b/packages/apalis-sql/src/sqlite.rs index 0c78215..57516db 100644 --- a/packages/apalis-sql/src/sqlite.rs +++ b/packages/apalis-sql/src/sqlite.rs @@ -35,7 +35,7 @@ impl Clone for SqliteStorage { } } -impl SqliteStorage { +impl SqliteStorage { /// Construct a new Storage from a pool pub fn new(pool: SqlitePool) -> Self { Self { @@ -66,6 +66,35 @@ impl SqliteStorage { sqlx::migrate!("migrations/sqlite").run(&pool).await?; Ok(()) } + + async fn keep_alive_at( + &mut self, + worker_id: String, + last_seen: DateTime, + ) -> StorageResult<()> { + let pool = self.pool.clone(); + + let mut tx = pool + .acquire() + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + let worker_type = T::NAME; + let storage_name = std::any::type_name::(); + let query = "INSERT INTO Workers (id, worker_type, storage_name, layers, last_seen) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO + UPDATE SET last_seen = EXCLUDED.last_seen"; + sqlx::query(query) + .bind(worker_id.to_owned()) + .bind(worker_type) + .bind(storage_name) + .bind(std::any::type_name::()) + .bind(last_seen.timestamp()) + .execute(&mut tx) + .await + .map_err(|e| StorageError::Database(Box::from(e)))?; + Ok(()) + } } async fn fetch_next( @@ -84,10 +113,11 @@ where .await .map_err(|e| JobStreamError::BrokenPipe(Box::from(e)))?; let job_id = job.id(); - let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2 WHERE id = ?1 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2"; + let update_query = "UPDATE Jobs SET status = 'Running', lock_by = ?2, lock_at = ?3 WHERE id = ?1 AND status = 'Pending' AND lock_by IS NULL; Select * from Jobs where id = ?1 AND lock_by = ?2"; let job: Option> = sqlx::query_as(update_query) .bind(job_id.clone()) .bind(worker_id) + .bind(Utc::now().timestamp()) .fetch_optional(&mut tx) .await .map_err(|e| JobStreamError::BrokenPipe(Box::from(e)))?; @@ -257,7 +287,7 @@ where .await .map_err(|e| StorageError::Database(Box::from(e)))?; let query = - "UPDATE Jobs SET status = 'Kill', done_at = strftime('%s','now') WHERE id = ?1 AND lock_by = ?2"; + "UPDATE Jobs SET status = 'Killed', done_at = strftime('%s','now') WHERE id = ?1 AND lock_by = ?2"; sqlx::query(query) .bind(job_id.to_owned()) .bind(worker_id.to_owned()) @@ -493,6 +523,20 @@ mod tests { use super::*; use email_service::Email; + use futures::StreamExt; + use std::ops::Sub; + + /// migrate DB and return a storage instance. + async fn setup() -> SqliteStorage { + // Because connections cannot be shared across async runtime + // (different runtimes are created for each test), + // we don't share the storage and tests must be run sequentially. + let storage = SqliteStorage::::connect("sqlite::memory:") + .await + .expect("failed to connect DB server"); + storage.setup().await.expect("failed to migrate DB"); + storage + } #[tokio::test] async fn test_inmemory_sqlite_worker() { @@ -510,6 +554,171 @@ mod tests { .expect("Unable to push job"); let len = sqlite.len().await.expect("Could not fetch the jobs count"); assert_eq!(len, 1); - assert!(sqlite.is_empty().await.is_err()) + // assert!(sqlite.is_empty().await.is_err()) + } + + struct DummyService {} + + fn example_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@postgres".to_string(), + text: "Some Text".to_string(), + } + } + + async fn consume_one(storage: &mut S, worker_id: String) -> JobRequest + where + S: Storage, + { + let mut stream = storage.consume(worker_id, std::time::Duration::from_secs(10)); + stream + .next() + .await + .expect("stream is empty") + .expect("failed to poll job") + .expect("no job is pending") + } + + async fn register_worker_at( + storage: &mut SqliteStorage, + last_seen: DateTime, + ) -> String { + let worker_id = Uuid::new_v4().to_string(); + + storage + .keep_alive_at::(worker_id.clone(), last_seen) + .await + .expect("failed to register worker"); + worker_id + } + + async fn register_worker(storage: &mut SqliteStorage) -> String { + register_worker_at(storage, Utc::now()).await + } + + async fn push_email(storage: &mut S, email: Email) + where + S: Storage, + { + storage.push(email).await.expect("failed to push a job"); + } + + async fn get_job(storage: &mut S, job_id: String) -> JobRequest + where + S: Storage, + { + storage + .fetch_by_id(job_id) + .await + .expect("failed to fetch job by id") + .expect("no job found by id") + } + + #[tokio::test] + async fn test_consume_last_pushed_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Running); + assert_eq!(*job.context().lock_by(), Some(worker_id.clone())); + assert!(job.context().lock_at().is_some()); + } + + #[tokio::test] + async fn test_acknowledge_job() { + let mut storage = setup().await; + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .ack(worker_id.clone(), job_id.clone()) + .await + .expect("failed to acknowledge the job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Done); + assert!(job.context().done_at().is_some()); + } + + #[tokio::test] + async fn test_kill_job() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = register_worker(&mut storage).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let job_id = job.context().id(); + + storage + .kill(worker_id.clone(), job_id.clone()) + .await + .expect("failed to kill job"); + + let job = get_job(&mut storage, job_id.clone()).await; + assert_eq!(*job.context().status(), JobState::Killed); + assert!(job.context().done_at().is_some()); + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(6))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Pending); + assert!(job.context().done_at().is_none()); + assert!(job.context().lock_by().is_none()); + assert!(job.context().lock_at().is_none()); + assert_eq!( + *job.context().last_error(), + Some("Job was abandoned".to_string()) + ); + } + + #[tokio::test] + async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() { + let mut storage = setup().await; + + push_email(&mut storage, example_email()).await; + + let worker_id = + register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(4))).await; + + let job = consume_one(&mut storage, worker_id.clone()).await; + let result = storage + .heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 }) + .await + .expect("failed to heartbeat"); + assert_eq!(result, true); + + let job_id = job.context().id(); + let job = get_job(&mut storage, job_id.clone()).await; + + assert_eq!(*job.context().status(), JobState::Running); + assert_eq!(*job.context().lock_by(), Some(worker_id.clone())); } }