Skip to content

Commit

Permalink
Merge pull request #24 from geofmureithi/develop
Browse files Browse the repository at this point in the history
[Draft] v0.3.5
  • Loading branch information
geofmureithi authored Dec 16, 2022
2 parents 3cfc8df + 3fa3148 commit 1c0b94e
Show file tree
Hide file tree
Showing 8 changed files with 954 additions and 51 deletions.
80 changes: 80 additions & 0 deletions .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
on:
push:
tags: ['v*']

name: Continuous delivery

jobs:
test:
uses: ./.github/workflows/ci.yaml
publish:
name: Publish to crates.io
runs-on: ubuntu-latest
needs: [test]
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: install cargo-get
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-get
- name: set variables
run: |
# vX.Y.Z is release version
# vX.Y.Z-foo is pre-release version
VERSION=${GITHUB_REF#refs/tags/v}
VERSION_NUMBER=${VERSION%-*}
PUBLISH_OPTS="--dry-run"
if [[ $VERSION == $VERSION_NUMBER ]]; then
PUBLISH_OPTS=""
fi
echo VERSION=${VERSION} >> $GITHUB_ENV
echo PUBLISH_OPTS=${PUBLISH_OPTS} >> $GITHUB_ENV
echo VERSION_NUMBER=${VERSION_NUMBER} >> $GITHUB_ENV
- name: check version integrity
run: |
ERROR=''
echo VERSION: ${VERSION}, VERSION_NUMBER: ${VERSION_NUMBER}
for dir in "." packages/apalis-{core,cron,redis,sql}; do
PACKAGE=$(cargo get --root $dir -n)
ACTUAL=$(cargo get --root $dir version)
if [[ $VERSION_NUMBER != $ACTUAL ]]; then
echo ${PACKAGE}: expected version ${VERSION_NUMBER} but found ${ACTUAL}
ERROR=1
fi
done
if [[ $ERROR ]]; then
exit 1
fi
- name: publish apalis-core
uses: actions-rs/cargo@v1
with:
command: publish
args: ${{ env.PUBLISH_OPTS }} -p apalis-core
- name: publish apalis-cron
uses: actions-rs/cargo@v1
with:
command: publish
args: ${{ env.PUBLISH_OPTS }} -p apalis-cron
- name: publish apalis-redis
uses: actions-rs/cargo@v1
with:
command: publish
args: ${{ env.PUBLISH_OPTS }} -p apalis-redis
- name: publish apalis-sql
uses: actions-rs/cargo@v1
with:
command: publish
args: ${{ env.PUBLISH_OPTS }} -p apalis-sql
- name: publish apalis
uses: actions-rs/cargo@v1
with:
command: publish
args: ${{ env.PUBLISH_OPTS }} -p apalis
64 changes: 61 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
on: [push, pull_request]
on:
push:
branches: [master, develop]
pull_request:
workflow_call:

name: Continuous integration

Expand Down Expand Up @@ -30,7 +34,27 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: test


test-redis:
name: Test Suite Redis
runs-on: ubuntu-latest
services:
mysql:
image: redis
ports:
- 6379:6379
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- run: cargo test -- --test-threads=1
working-directory: packages/apalis-redis
env:
REDIS_URL: redis://127.0.0.1/

test-mysql:
name: Test Suite with MySQL
runs-on: ubuntu-latest
Expand All @@ -51,11 +75,45 @@ jobs:
profile: minimal
toolchain: stable
override: true
- run: cargo test --no-default-features --features mysql,migrate
- run: cargo test --no-default-features --features mysql,migrate -- --test-threads=1
working-directory: packages/apalis-sql
env:
DATABASE_URL: mysql://test:test@localhost/test

test-postgres:
name: Test Suite with Postgres
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14
env:
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
env:
DATABASE_URL: postgres://postgres:postgres@localhost/postgres
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- run: cargo test --no-default-features --features postgres,migrate -- --test-threads=1
working-directory: packages/apalis-sql

test-sqlite:
name: Test Suite with Sqlite
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- run: cargo test --no-default-features --features sqlite,migrate -- --test-threads=1
working-directory: packages/apalis-sql

fmt:
name: Rustfmt
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Cargo.lock
examples/**/*.env
examples/sqlite/data.*
.DS_Store
5 changes: 5 additions & 0 deletions packages/apalis-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ futures = "0.3"
tokio = "1"
async-trait = "0.1.53"


[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
email-service = { path = "../../examples/email-service" }

[features]
default = ["storage"]
storage = ["apalis-core/storage"]
Expand Down
206 changes: 206 additions & 0 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,209 @@ where
.collect())
}
}

#[cfg(test)]
mod tests {
use std::ops::Sub;

use super::*;
use email_service::Email;
use futures::StreamExt;
use uuid::Uuid;

/// migrate DB and return a storage instance.
async fn setup() -> RedisStorage<Email> {
let redis_url = &std::env::var("REDIS_URL").expect("No REDIS_URL is specified");
// Because connections cannot be shared across async runtime
// (different runtimes are created for each test),
// we don't share the storage and tests must be run sequentially.
let storage = RedisStorage::connect(redis_url.as_str())
.await
.expect("failed to connect DB server");
storage
}

/// rollback DB changes made by tests.
///
/// You should execute this function in the end of a test
async fn cleanup(mut storage: RedisStorage<Email>, _worker_id: String) {
let _resp: String = redis::cmd("FLUSHDB")
.query_async(&mut storage.conn)
.await
.expect("failed to Flushdb");
}

struct DummyService {}

fn example_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example@postgres".to_string(),
text: "Some Text".to_string(),
}
}

async fn consume_one<S, T>(storage: &mut S, worker_id: String) -> JobRequest<T>
where
S: Storage<Output = T>,
{
let mut stream = storage.consume(worker_id, std::time::Duration::from_secs(10));
stream
.next()
.await
.expect("stream is empty")
.expect("failed to poll job")
.expect("no job is pending")
}

async fn register_worker_at(
storage: &mut RedisStorage<Email>,
_last_seen: DateTime<Utc>,
) -> String {
let worker_id = Uuid::new_v4().to_string();

storage
.keep_alive::<DummyService>(worker_id.clone())
.await
.expect("failed to register worker");
worker_id
}

async fn register_worker(storage: &mut RedisStorage<Email>) -> String {
register_worker_at(storage, Utc::now()).await
}

async fn push_email<S>(storage: &mut S, email: Email)
where
S: Storage<Output = Email>,
{
storage.push(email).await.expect("failed to push a job");
}

async fn get_job<S>(storage: &mut S, job_id: String) -> JobRequest<Email>
where
S: Storage<Output = Email>,
{
storage
.fetch_by_id(job_id)
.await
.expect("failed to fetch job by id")
.expect("no job found by id")
}

#[tokio::test]
async fn test_consume_last_pushed_job() {
let mut storage = setup().await;
push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;

let job = consume_one(&mut storage, worker_id.clone()).await;

// No worker yet
// Redis doesn't update jobs like in sql
assert_eq!(*job.context().status(), JobState::Pending);
assert_eq!(*job.context().lock_by(), None);
assert!(job.context().lock_at().is_none());

cleanup(storage, worker_id).await;
}

#[tokio::test]
async fn test_acknowledge_job() {
let mut storage = setup().await;
push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;

let job = consume_one(&mut storage, worker_id.clone()).await;
let job_id = job.context().id();

storage
.ack(worker_id.clone(), job_id.clone())
.await
.expect("failed to acknowledge the job");

let job = get_job(&mut storage, job_id.clone()).await;
assert_eq!(*job.context().status(), JobState::Pending); // Redis storage uses hset etc to manage status
assert!(job.context().done_at().is_none());

cleanup(storage, worker_id).await;
}

#[tokio::test]
async fn test_kill_job() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id = register_worker(&mut storage).await;

let job = consume_one(&mut storage, worker_id.clone()).await;
let job_id = job.context().id();

storage
.kill(worker_id.clone(), job_id.clone())
.await
.expect("failed to kill job");

let job = get_job(&mut storage, job_id.clone()).await;
assert_eq!(*job.context().status(), JobState::Pending);
assert!(job.context().done_at().is_none());

cleanup(storage, worker_id).await;
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_6min() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id =
register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(6))).await;

let job = consume_one(&mut storage, worker_id.clone()).await;
let result = storage
.heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 })
.await
.expect("failed to heartbeat");
assert_eq!(result, true);

let job_id = job.context().id();
let job = get_job(&mut storage, job_id.clone()).await;

assert_eq!(*job.context().status(), JobState::Pending);
assert!(job.context().done_at().is_none());
assert!(job.context().lock_by().is_none());
assert!(job.context().lock_at().is_none());
assert_eq!(*job.context().last_error(), None);

cleanup(storage, worker_id).await;
}

#[tokio::test]
async fn test_heartbeat_renqueueorphaned_pulse_last_seen_4min() {
let mut storage = setup().await;

push_email(&mut storage, example_email()).await;

let worker_id =
register_worker_at(&mut storage, Utc::now().sub(chrono::Duration::minutes(4))).await;

let job = consume_one(&mut storage, worker_id.clone()).await;
let result = storage
.heartbeat(StorageWorkerPulse::RenqueueOrpharned { count: 5 })
.await
.expect("failed to heartbeat");
assert_eq!(result, true);

let job_id = job.context().id();
let job = get_job(&mut storage, job_id.clone()).await;

assert_eq!(*job.context().status(), JobState::Pending);
assert_eq!(*job.context().lock_by(), None);

cleanup(storage, worker_id).await;
}
}
Loading

0 comments on commit 1c0b94e

Please sign in to comment.