diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..bfbc683 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @gwynne diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2ef9b86..3dbf46d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,26 +1,13 @@ name: test +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true on: - pull_request: - push: - branches: - - master + pull_request: { types: [opened, reopened, synchronize, ready_for_review] } + push: { branches: [ main ] } + jobs: - linux: - runs-on: ubuntu-latest - container: swift:5.2-bionic - steps: - - name: Check out code - uses: actions/checkout@v2 - - name: Run tests with Thread Sanitizer - run: swift test --enable-test-discovery --sanitize=thread - macOS: - runs-on: macos-latest - steps: - - name: Select latest available Xcode - uses: maxim-lobanov/setup-xcode@1.0 - with: - xcode-version: latest - - name: Check out code - uses: actions/checkout@v2 - - name: Run tests with Thread Sanitizer - run: swift test --enable-test-discovery --sanitize=thread \ No newline at end of file + unit-tests: + uses: vapor/ci/.github/workflows/run-unit-tests.yml@main + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/Package.swift b/Package.swift index 78d8d09..49c4223 100644 --- a/Package.swift +++ b/Package.swift @@ -1,23 +1,21 @@ -// swift-tools-version:5.4 -// The swift-tools-version declares the minimum version of Swift required to build this package. - +// swift-tools-version:5.8 import PackageDescription let package = Package( name: "QueuesFluentDriver", platforms: [ - .macOS(.v10_15) + .macOS(.v10_15), ], products: [ - .library( - name: "QueuesFluentDriver", - targets: ["QueuesFluentDriver"]), + .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"), - .package(url: "https://github.com/vapor/fluent.git", from: "4.0.0"), - .package(url: "https://github.com/vapor/sql-kit.git", from: "3.6.0"), - .package(url: "https://github.com/vapor/queues.git", from: "1.11.1"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"), + .package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"), + .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"), + .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), + .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), + .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), ], targets: [ .target( @@ -25,14 +23,29 @@ let package = Package( dependencies: [ .product(name: "Vapor", package: "vapor"), .product(name: "Fluent", package: "fluent"), + .product(name: "FluentKit", package: "fluent-kit"), + .product(name: "FluentSQL", package: "fluent-kit"), .product(name: "SQLKit", package: "sql-kit"), .product(name: "Queues", package: "queues") ], - path: "Sources" + swiftSettings: swiftSettings ), .testTarget( name: "QueuesFluentDriverTests", - dependencies: ["QueuesFluentDriver"] + dependencies: [ + .product(name: "XCTVapor", package: "vapor"), + .product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"), + .target(name: "QueuesFluentDriver"), + ], + swiftSettings: swiftSettings ), ] ) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ConciseMagicFile"), + .enableUpcomingFeature("DisableOutwardActorInference"), + .enableUpcomingFeature("StrictConcurrency"), + .enableExperimentalFeature("StrictConcurrency=complete"), +] } diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift new file mode 100644 index 0000000..f2d7c78 --- /dev/null +++ b/Package@swift-5.9.swift @@ -0,0 +1,55 @@ +// swift-tools-version:5.9 +import PackageDescription + +let package = Package( + name: "QueuesFluentDriver", + platforms: [ + .macOS(.v10_15), + .iOS(.v13), + .watchOS(.v6), + .tvOS(.v13), + ], + products: [ + .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), + ], + dependencies: [ + .package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"), + .package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"), + .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"), + .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), + .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), + .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), + ], + targets: [ + .target( + name: "QueuesFluentDriver", + dependencies: [ + .product(name: "Vapor", package: "vapor"), + .product(name: "Fluent", package: "fluent"), + .product(name: "FluentKit", package: "fluent-kit"), + .product(name: "FluentSQL", package: "fluent-kit"), + .product(name: "SQLKit", package: "sql-kit"), + .product(name: "Queues", package: "queues") + ], + swiftSettings: swiftSettings + ), + .testTarget( + name: "QueuesFluentDriverTests", + dependencies: [ + .product(name: "XCTVapor", package: "vapor"), + .product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"), + .target(name: "QueuesFluentDriver"), + ], + swiftSettings: swiftSettings + ), + ] +) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ExistentialAny"), + .enableUpcomingFeature("ConciseMagicFile"), + .enableUpcomingFeature("DisableOutwardActorInference"), + .enableUpcomingFeature("StrictConcurrency"), + .enableExperimentalFeature("StrictConcurrency=complete"), +] } diff --git a/README.md b/README.md index c3106d1..397b388 100644 --- a/README.md +++ b/README.md @@ -1,74 +1,63 @@ # QueuesFluentDriver -This Vapor Queues driver stores the Queues jobs metadata into a relational database. It is an alternative to the default Redis driver. +A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database. +[Queues]: https://github.com/vapor/queues +[Fluent]: https://github.com/vapor/fluent ## Compatibility -This package makes use of some relatively recent, non standard SQL extensions added to some major database engines to support this exact use case: queuing systems, where there must be a guarantee that a task or job won't be picked by multiple workers. +This package makes use of the `SKIP LOCKED` feature supported by some of the major database engines (most notably [PostgresSQL][postgres-skip-locked] and [MySQL][mysql-skip-locked]) when available to make a best-effort guarantee that a task or job won't be picked by multiple workers. This package should be compatible with: -- Postgres >= 11 -- Mysql >= 8.0.1 -- MariaDB >= 10.3 +- PostgreSQL 11.0+ +- MySQL 8.0+ +- MariaDB 10.5+ -> Sqlite will only work if you have a custom, very low number of Queues workers (1-2), which makes it useless except for testing purposes +> [!NOTE] +> Although SQLite can be used with this package, SQLite has no support for advanced locking. It is not likely to function correctly with more than one or two queue workers. -  +[postgres-skip-locked]: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE +[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.3/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a -## Usage +## Getting started +#### Adding the dependency - -Add it to the `Package.swift` of your Vapor4 project: +Add `QueuesFluentDriver` as dependency to your `Package.swift`: ```swift - -// swift-tools-version:5.4 -import PackageDescription - -let package = Package( - name: "app", - platforms: [ - .macOS(.v10_15) - ], + dependencies: [ + .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"), ... - dependencies: [ - ... - .package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "3.0.0-beta1"), - ... - ], - targets: [ - .target(name: "App", dependencies: [ - ... - .product(name: "QueuesFluentDriver", package: "vapor-queues-fluent-driver"), - ... - ]), - ... - ] -) + ] +``` +Add `QueuesFluentDriver` to the target you want to use it in: +```swift + targets: [ + .target(name: "MyFancyTarget", dependencies: [ + .product(name: "QueuesFluentDriver", package: "vapor-queues-fluent-driver"), + ]) + ] ``` -  +#### Configuration -This package needs a table, named `_jobs_meta` by default, to store the Vapor Queues jobs. Make sure to add this to your migrations: -```swift -// Ensure the table for storing jobs is created -app.migrations.add(JobMetadataMigrate()) -``` +This package includes a migration to create the database table which holds job metadata; add it to your Fluent configuration as you would any other migration: -  +```swift +app.migrations.add(JobModelMigration()) +``` Finally, load the `QueuesFluentDriver` driver: ```swift app.queues.use(.fluent()) ``` -⚠️ Make sure you call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`! - -  +> [!WARNING] +> Always call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`! ## Options @@ -88,31 +77,20 @@ You can customize the name of the table used by this driver during the migration app.migrations.add(JobMetadataMigrate(schema: "my_jobs")) ``` -### Soft Deletes -By default, completed jobs are deleted from the two database tables used by this driver. -If you want to keep them, you can use Fluent's "soft delete" feature, which just sets the `deleted_at` field to a non-null value and excludes rows from queries by default: - -```swift -app.queues.use(.fluent(useSoftDeletes: true)) -``` - -When enabling this option, it is probably a good idea to cleanup the completed jobs from time to time. - -  - - ## Caveats - ### Polling interval and number of workers -By default, the Vapor Queues package creates 2 workers per CPU core, and each worker would poll the database for jobs to be run every second. -On a 4 cores system, this means 8 workers querying the database every second by default. -You can change the jobs polling interval by calling: +By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. + +The polling interval can be changed using the `refreshInterval` configuration setting: ```swift -app.queues.configuration.refreshInterval = .seconds(custom_value) +app.queues.configuration.refreshInterval = .seconds(5) ``` -With Queues >=1.4.0, you can also configure the number of workers that will be started by setting `app.queues.configuration.workerCount` +Likewise, the number of workers to start can be changed via the `workerCount` setting: +```swift +app.queues.configuration.workerCount = 1 +``` diff --git a/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md b/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md new file mode 100644 index 0000000..8590d54 --- /dev/null +++ b/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md @@ -0,0 +1,71 @@ +# ``QueuesFluentDriver`` + +@Metadata { + @TitleHeading("Package") +} + +A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database. + +[Queues]: https://github.com/vapor/queues +[Fluent]: https://github.com/vapor/fluent + +## Overview + +## Getting started + +#### Adding the dependency + +Add `QueuesFluentDriver` as dependency to your `Package.swift`: + +```swift +dependencies: [ + .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"), + ... +] +``` + +Add `QueuesFluentDriver` to the target you want to use it in: +```swift +targets: [ + .target( + name: "MyFancyTarget", + dependencies: [ + .product(name: "QueuesFluentDriver", package: "vapor-queues-fluent-driver"), + ... + ] + ), +] +``` + +#### Configuration + +This package includes a migration to create the database table which holds job metadata; add it to your Fluent configuration as you would any other migration: + +```swift +app.migrations.add(JobModelMigration()) +``` + +Finally, load the `QueuesFluentDriver` driver: +```swift +app.queues.use(.fluent()) +``` + +> Warning: Always call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`! + +## Caveats + +### Polling interval and number of workers + +By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. + +The polling interval can be changed using the `refreshInterval` configuration setting: + +```swift +app.queues.configuration.refreshInterval = .seconds(5) +``` + +Likewise, the number of workers to start can be changed via the `workerCount` setting: + +```swift +app.queues.configuration.workerCount = 1 +``` diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 9c1432a..fb48e4d 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -1,103 +1,99 @@ -import Foundation -import Queues -import Fluent -import SQLKit +@preconcurrency import Queues +@preconcurrency import SQLKit -public struct FluentQueue { +/// An implementation of `Queue` which stores job data and metadata in a Fluent database. +public struct FluentQueue: Queue, Sendable { + // See `Queue.context`. public let context: QueueContext - let db: Database - let dbType: QueuesFluentDbType - let useSoftDeletes: Bool -} -extension FluentQueue: Queue { + let sqlDb: any SQLDatabase + + // See `Queue.get(_:)`. public func get(_ id: JobIdentifier) -> EventLoopFuture { - return db.query(JobModel.self) - .filter(\.$id == id.string) + self.sqlDb.select() + .columns("payload", "max_retry_count", "job_name", "delay_until", "queued_at", "attempts") + .from(JobModel.schema) + .where("id", .equal, id.string) .first() .unwrap(or: QueuesFluentError.missingJob(id)) - .flatMapThrowing { job in - return JobData( - payload: Array(job.data.payload), - maxRetryCount: job.data.maxRetryCount, - jobName: job.data.jobName, - delayUntil: job.data.delayUntil, - queuedAt: job.data.queuedAt, - attempts: job.data.attempts ?? 0 - ) + .flatMapThrowing { + try $0.decode(model: JobData.self, keyDecodingStrategy: .convertFromSnakeCase) } } + // See `Queue.set(_:to:)`. public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture { - let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage)) - // If the job must run at a later time, ensure it won't be picked earlier since - // we sort pending jobs by date when querying - jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date() - - return jobModel.save(on: db).map { metadata in - return + self.sqlDb.eventLoop.makeFutureWithTask { + try await self.sqlDb.insert(into: JobModel.schema) + .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) + .onConflict { try $0 + .set(excludedContentOf: JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) + } + .run() } } + // See `Queue.clear(_:)`. public func clear(_ id: JobIdentifier) -> EventLoopFuture { - // This does the equivalent of a Fluent Softdelete but sets the `state` to `completed` - return db.query(JobModel.self) - .filter(\.$id == id.string) - .filter(\.$state != .completed) - .first() - .unwrap(or: QueuesFluentError.missingJob(id)) - .flatMap { job in - if self.useSoftDeletes { - job.state = .completed - job.deletedAt = Date() - return job.update(on: self.db) - } else { - return job.delete(force: true, on: self.db) - } + self.get(id).flatMap { _ in + self.sqlDb.delete(from: JobModel.schema) + .where("id", .equal, id.string) + .where("state", .notEqual, StoredJobState.completed) + .run() } } + // See `Queue.push(_:)`. public func push(_ id: JobIdentifier) -> EventLoopFuture { - guard let sqlDb = db as? SQLDatabase else { - return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) - } - return sqlDb + self.sqlDb .update(JobModel.schema) - .set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending)) - .where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string)) + .set("state", to: StoredJobState.pending) + .set("updated_at", to: .now()) + .where("id", .equal, id.string) .run() } - /// Currently selects the oldest job pending execution + // See `Queue.pop()`. public func pop() -> EventLoopFuture { - guard let sqlDb = db as? SQLDatabase else { - return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) - } - - var selectQuery = sqlDb - .select() - .column("\(FieldKey.id)") - .from(JobModel.schema) - .where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending)) - .where(SQLColumn("\(FieldKey.queue)"), .equal, SQLBind(self.queueName.string)) - .where(SQLColumn("\(FieldKey.runAt)"), .lessThanOrEqual, SQLBind(Date())) - .orderBy("\(FieldKey.runAt)") - .limit(1) - if self.dbType != .sqlite { - selectQuery = selectQuery.lockingClause(SQLSkipLocked.forUpdateSkipLocked) - } - - var popProvider: PopQueryProtocol! - switch (self.dbType) { - case .postgresql: - popProvider = PostgresPop() - case .mysql: - popProvider = MySQLPop() - case .sqlite: - popProvider = SqlitePop() - } - return popProvider.pop(db: db, select: selectQuery.query).optionalMap { id in - return JobIdentifier(string: id) + self.sqlDb.eventLoop.makeFutureWithTask { + let select = self.sqlDb + .select() + .column("id") + .from(JobModel.schema) + .where("state", .equal, StoredJobState.pending) + .where("queue_name", .equal, self.queueName.string) + .where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now()) + .orderBy("delay_until") + .limit(1) + .lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked) + + if self.sqlDb.dialect.supportsReturning { + return try await self.sqlDb.update(JobModel.schema) + .set("state", to: StoredJobState.processing) + .set("updated_at", to: .now()) + .where("id", .equal, .group(select.query)) + .returning("id") + .first(decodingColumn: "id", as: String.self) + .map(JobIdentifier.init(string:)) + } else { + return try await self.sqlDb.transaction { transaction in + guard let id = try await transaction.raw("\(select.query)") // using raw() to make sure we run on the transaction connection + .first(decodingColumn: "id", as: String.self) + else { + return nil + } + + try await transaction + .update(JobModel.schema) + .set("state", to: StoredJobState.processing) + .set("updated_at", to: .now()) + .where("id", .equal, id) + .where("state", .equal, StoredJobState.pending) + .run() + + return JobIdentifier(string: id) + } + } } } } diff --git a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift index b99212d..6279fc7 100644 --- a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift +++ b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift @@ -1,53 +1,47 @@ -import Fluent -import SQLKit -import Queues +import class NIOCore.EventLoopFuture +import struct Fluent.DatabaseID +import protocol SQLKit.SQLDatabase +import protocol Queues.QueuesDriver +import protocol Queues.Queue +import struct Queues.QueueContext +import struct Queues.JobIdentifier +import struct Queues.JobData -public enum QueuesFluentDbType: String { - case postgresql - case mysql - case sqlite -} - -public struct FluentQueuesDriver { +public struct FluentQueuesDriver: QueuesDriver { let databaseId: DatabaseID? - let useSoftDeletes: Bool - let eventLoopGroup: EventLoopGroup - - init(on databaseId: DatabaseID? = nil, useSoftDeletes: Bool, on eventLoopGroup: EventLoopGroup) { + + init(on databaseId: DatabaseID? = nil) { self.databaseId = databaseId - self.useSoftDeletes = useSoftDeletes - self.eventLoopGroup = eventLoopGroup } -} -extension FluentQueuesDriver: QueuesDriver { - public func makeQueue(with context: QueueContext) -> Queue { - let db = context + public func makeQueue(with context: QueueContext) -> any Queue { + /// `QueuesDriver` methods cannot throw, so we report errors by returning a fake queue which + /// always throws errors when used. + /// + /// `Fluent.Databases.database(_:logger:on:)` never returns nil; its optionality is an API mistake. + /// If a nonexistent `DatabaseID` is requested, it triggers a `fatalError()`. + let baseDb = context .application .databases - .database(databaseId, logger: context.logger, on: context.eventLoop) - - // How do we report that something goes wrong here? Since makeQueue cannot throw. - let dialect = (db as? SQLDatabase)?.dialect.name - if db == nil || dialect == nil { - context.logger.error( - "\(Self.self): Database misconfigured or unsupported." - ) - } + .database(self.databaseId, logger: context.logger, on: context.eventLoop)! - let dbType = QueuesFluentDbType(rawValue: dialect!) - if dbType == nil { - context.logger.error("\(Self.self): Unsupported Database type '\(dialect!)'") + guard let sqlDb = baseDb as? any SQLDatabase else { + return FailingQueue(failure: QueuesFluentError.unsupportedDatabase, context: context) } - - return FluentQueue( - context: context, - db: db!, - dbType: dbType!, - useSoftDeletes: self.useSoftDeletes - ) + + return FluentQueue(context: context, sqlDb: sqlDb) } - public func shutdown() { - } + public func shutdown() {} +} + +private struct FailingQueue: Queue { + let failure: any Error + let context: QueueContext + + func get(_: JobIdentifier) -> EventLoopFuture { self.eventLoop.future(error: self.failure) } + func set(_: JobIdentifier, to: JobData) -> EventLoopFuture { self.eventLoop.future(error: self.failure) } + func clear(_: JobIdentifier) -> EventLoopFuture { self.eventLoop.future(error: self.failure) } + func push(_: JobIdentifier) -> EventLoopFuture { self.eventLoop.future(error: self.failure) } + func pop() -> EventLoopFuture { self.eventLoop.future(error: self.failure) } } diff --git a/Sources/QueuesFluentDriver/JobDataModel.swift b/Sources/QueuesFluentDriver/JobDataModel.swift deleted file mode 100644 index aefee99..0000000 --- a/Sources/QueuesFluentDriver/JobDataModel.swift +++ /dev/null @@ -1,50 +0,0 @@ -import Foundation -import Fluent -import Queues - -extension FieldKey { - static var payload: Self { "payload" } - static var maxRetryCount: Self { "max_retries" } - static var attempts: Self { "attempt" } - static var delayUntil: Self { "delay_until" } - static var queuedAt: Self { "queued_at" } - static var jobName: Self { "job_name" } -} - -/// Handles storage of a `JobData` into the database -final class JobDataModel: Fields { - required init() {} - - /// The job data to be encoded. - @Field(key: .payload) - var payload: [UInt8] - - /// The maxRetryCount for the `Job`. - @Field(key: .maxRetryCount) - var maxRetryCount: Int - - /// The number of attempts made to run the `Job`. - @Field(key: .attempts) - var attempts: Int? - - /// A date to execute this job after - @OptionalField(key: .delayUntil) - var delayUntil: Date? - - /// The date this job was queued - @Field(key: .queuedAt) - var queuedAt: Date - - /// The name of the `Job` - @Field(key: .jobName) - var jobName: String - - init(jobData: JobData) { - self.payload = jobData.payload - self.maxRetryCount = jobData.maxRetryCount - self.attempts = jobData.attempts - self.delayUntil = jobData.delayUntil - self.jobName = jobData.jobName - self.queuedAt = jobData.queuedAt - } -} diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index 7e761ef..e01566f 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -1,59 +1,65 @@ -import Foundation -import Fluent -import Queues +import struct Foundation.Date +import struct Queues.JobData +import struct Queues.JobIdentifier +import struct Queues.QueueName -public enum QueuesFluentJobState: String, Codable, CaseIterable { - /// Ready to be picked up for execution +/// The various states of a job currently stored in the database. +enum StoredJobState: String, Codable, CaseIterable { + /// Job is ready to be picked up for execution. case pending + + /// Job is in progress. case processing - /// Executed, regardless if it was successful or not + + /// Job has finished, whether successfully or not. case completed } -extension FieldKey { - static var queue: Self { "queue" } - static var data: Self { "data" } - static var state: Self { "state" } - - static var runAt: Self { "run_at" } - static var updatedAt: Self { "updated_at" } - static var deletedAt: Self { "deleted_at" } -} - -class JobModel: Model { - public required init() {} +/// Encapsulates a job's metadata and `JobData`. +struct JobModel: Codable, Sendable { + /// The name of the model's table. + static let schema = "_jobs_meta" + + /// The job identifier. Corresponds directly to a `JobIdentifier`. + let id: String? + + /// The queue to which the job was dispatched. Corresponds directly to a `QueueName`. + let queueName: String - public static var schema = "_jobs_meta" + /// The name of the job. + let jobName: String - /// The unique Job ID - @ID(custom: .id, generatedBy: .user) - var id: String? + /// The date this job was queued. + let queuedAt: Date - /// The Job key - @Field(key: .queue) - var queue: String + /// An optional `Date` before which the job shall not run. + let delayUntil: Date? /// The current state of the Job - @Field(key: .state) - var state: QueuesFluentJobState + let state: StoredJobState - /// Earliest date the job can run - @OptionalField(key: .runAt) - var runAtOrAfter: Date? + /// The maximum retry count for the job. + let maxRetryCount: Int - @Timestamp(key: .updatedAt, on: .update) - var updatedAt: Date? + /// The number of attempts made to run the job so far. + let attempts: Int - @Timestamp(key: .deletedAt, on: .delete) - var deletedAt: Date? + /// The job's payload. + let payload: [UInt8] - @Group(key: .data) - var data: JobDataModel + /// The standard automatic update tracking timestamp. + let updatedAt: Date - init(id: JobIdentifier, queue: String, jobData: JobDataModel) { + init(id: JobIdentifier, queue: QueueName, jobData: JobData) { self.id = id.string - self.queue = queue + self.queueName = queue.string + self.jobName = jobData.jobName + self.queuedAt = jobData.queuedAt + self.delayUntil = jobData.delayUntil self.state = .pending - self.data = jobData + self.maxRetryCount = jobData.maxRetryCount + self.attempts = jobData.attempts ?? 0 + self.payload = jobData.payload + self.updatedAt = .init() } } diff --git a/Sources/QueuesFluentDriver/JobModelMigrate.swift b/Sources/QueuesFluentDriver/JobModelMigrate.swift index 143ef12..2681a36 100644 --- a/Sources/QueuesFluentDriver/JobModelMigrate.swift +++ b/Sources/QueuesFluentDriver/JobModelMigrate.swift @@ -1,49 +1,53 @@ -import Foundation -import Fluent -import SQLKit +import protocol SQLKit.SQLDatabase +import struct SQLKit.SQLRaw -public struct JobMetadataMigrate: Migration { +public struct JobModelMigration: AsyncSQLMigration { + /// Public initializer. public init() {} - public init(schema: String) { - JobModel.schema = schema - } - - public func prepare(on database: Database) -> EventLoopFuture { - return database.schema(JobModel.schema) - .field(.id, .string, .identifier(auto: false)) - .field(FieldKey.queue, .string, .required) - .field(FieldKey.state, .string, .required) - .field(FieldKey.runAt, .datetime) - .field(FieldKey.updatedAt, .datetime) - .field(FieldKey.deletedAt, .datetime) - // "group"/nested model JobDataModel - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.payload)"), .array(of: .uint8), .required) - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.maxRetryCount)"), .int, .required) - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.attempts)"), .int) - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.delayUntil)"), .datetime) - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.queuedAt)"), .datetime, .required) - .field(.init(stringLiteral: "\(FieldKey.data)_\(FieldKey.jobName)"), .string, .required) - .create() - .flatMap { - // Mysql could lock the entire table if there's no index on the fields of the WHERE clause used in `FluentQueue.pop()`. - // Order of the fields in the composite index and order of the fields in the WHERE clauses should match. - // Or I got totally confused reading their doc, which is also a possibility. - // Postgres seems to not be so sensitive and should be happy with the following indices. - let sqlDb = database as! SQLDatabase - let stateIndex = sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)_\(FieldKey.queue)") - .on(JobModel.schema) - .column("\(FieldKey.state)") - .column("\(FieldKey.queue)") - .column("\(FieldKey.runAt)") - .run() - return stateIndex.map { index in - return - } - } + // See `AsyncSQLMigration.prepare(on:)`. + public func prepare(on database: any SQLDatabase) async throws { + let stateEnumType = "\(JobModel.schema)_StoredJobStatus" + + switch database.dialect.enumSyntax { + case .typeName: + try await database.create(enum: stateEnumType) + .value("pending") + .value("processing") + .value("completed") + .run() + default: + break + } + + try await database.create(table: JobModel.schema) + .column("id", type: .text, .primaryKey(autoIncrement: false)) + .column("queue_name", type: .text, .notNull) + .column("job_name", type: .text, .notNull) + .column("queued_at", type: .timestamp, .notNull) + .column("delay_until", type: .timestamp) + .column("state", type: .custom(SQLRaw(stateEnumType)), .notNull) + .column("max_retry_count", type: .int, .notNull) + .column("attempts", type: .int, .notNull) + .column("payload", type: .blob, .notNull) + .column("updated_at", type: .timestamp) + .run() + try await database.create(index: "i_\(JobModel.schema)_state_queue_delayUntil") + .on(JobModel.schema) + .column("state") + .column("queue_name") + .column("delay_until") + .run() } - public func revert(on database: Database) -> EventLoopFuture { - return database.schema(JobModel.schema).delete() + // See `AsyncSQLMigration.revert(on:)`. + public func revert(on database: any SQLDatabase) async throws { + try await database.drop(table: JobModel.schema).run() + switch database.dialect.enumSyntax { + case .typeName: + try await database.drop(enum: "\(JobModel.schema)_StoredJobStatus").run() + default: + break + } } } diff --git a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift deleted file mode 100644 index 127bcf7..0000000 --- a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift +++ /dev/null @@ -1,34 +0,0 @@ -import Foundation -import SQLKit -import Fluent - -final class MySQLPop: PopQueryProtocol { - // MySQL is a bit challenging since it doesn't support updating a table that is - // used in a subquery. - // So we first select, then update, with the whole process wrapped in a transaction. - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { - db.transaction { transaction in - let database = transaction as! SQLDatabase - var id: String? - - return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: String.self) - } - .flatMap { - guard let id = id else { - return database.eventLoop.makeSucceededFuture(nil) - } - let updateQuery = database - .update(JobModel.schema) - .set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing)) - .set(SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date())) - .where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id)) - .where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending)) - .query - return database.execute(sql: updateQuery) { (row) in } - .map { id } - } - - } - } -} diff --git a/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift b/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift deleted file mode 100644 index c81a798..0000000 --- a/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift +++ /dev/null @@ -1,6 +0,0 @@ -import SQLKit -import Fluent - -protocol PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture -} diff --git a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift deleted file mode 100644 index a3538bb..0000000 --- a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift +++ /dev/null @@ -1,27 +0,0 @@ -import Foundation -import Foundation -import SQLKit -import Fluent - -final class PostgresPop: PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { - let db = db as! SQLDatabase - let subQueryGroup = SQLGroupExpression.init(select) - let query = db - .update (JobModel.schema) - .set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing)) - .set (SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date())) - .where ( - SQLBinaryExpression(left: SQLColumn("\(FieldKey.id)"), op: SQLBinaryOperator.equal , right: subQueryGroup) - ) - .returning(SQLColumn("\(FieldKey.id)")) - .query - - var id: String? - return db.execute(sql: query) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: String.self) - }.map { - return id - } - } -} diff --git a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift deleted file mode 100644 index ce72571..0000000 --- a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift +++ /dev/null @@ -1,31 +0,0 @@ -import Foundation -import SQLKit -import Fluent - -final class SqlitePop: PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { - db.transaction { transaction in - let database = transaction as! SQLDatabase - var id: String? - - return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: String.self) - } - .flatMap { - guard let id = id else { - return database.eventLoop.makeSucceededFuture(nil) - } - let updateQuery = database - .update(JobModel.schema) - .set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing)) - .set(SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date())) - .where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id)) - .where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending)) - .query - return database.execute(sql: updateQuery) { (row) in } - .map { id } - } - - } - } -} diff --git a/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift b/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift new file mode 100644 index 0000000..08e0297 --- /dev/null +++ b/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift @@ -0,0 +1,25 @@ +import class Vapor.Application +import struct FluentKit.DatabaseID +import Queues + +extension Application.Queues.Provider { + /// Retrieve a queues provider which specifies use of the Fluent driver with a given database. + /// + /// Example usage: + /// + /// ```swift + /// func configure(_ app: Application) async throws { + /// // ... + /// app.databases.use(.sqlite(.memory), as: .sqlite) + /// app.queues.use(.fluent(.sqlite)) + /// } + /// ``` + /// + /// - Parameters: + /// - databaseId: A Fluent `DatabaseID` configured for a compatible database, or `nil` to use the + /// default database. + /// - Returns: An appropriately configured provider for `Application.Queues.use(_:)`. + public static func fluent(_ databaseId: DatabaseID? = nil) -> Self { + .init { $0.queues.use(custom: FluentQueuesDriver(on: databaseId)) } + } +} diff --git a/Sources/QueuesFluentDriver/Queues.Provider+fluent.swift b/Sources/QueuesFluentDriver/Queues.Provider+fluent.swift deleted file mode 100644 index 7a96204..0000000 --- a/Sources/QueuesFluentDriver/Queues.Provider+fluent.swift +++ /dev/null @@ -1,16 +0,0 @@ -import Foundation -import Vapor -import Fluent -import Queues - -extension Application.Queues.Provider { - /// `databaseId`: a Fluent `DatabaseID` configured in your application. - /// `useSoftDeletes`: if set to `true`, flag completed jobs using Fluent's default SoftDelete feature instead of actually deleting them. - public static func fluent(_ databaseId: DatabaseID? = nil, useSoftDeletes: Bool = false) -> Self { - .init { - $0.queues.use(custom: - FluentQueuesDriver(on: databaseId, useSoftDeletes: useSoftDeletes, on: $0.eventLoopGroup) - ) - } - } -} diff --git a/Sources/QueuesFluentDriver/QueuesFluentError.swift b/Sources/QueuesFluentDriver/QueuesFluentError.swift index 1565b1f..36147ba 100644 --- a/Sources/QueuesFluentDriver/QueuesFluentError.swift +++ b/Sources/QueuesFluentDriver/QueuesFluentError.swift @@ -1,13 +1,11 @@ -import Queues +@preconcurrency import struct Queues.JobIdentifier enum QueuesFluentError: Error { - /// Couldn't find a job with this Id - case missingJob(_ id: JobIdentifier) - /// The JobIdentifier is not a valid UUID - case invalidIdentifier - /// Error encoding the json Payload to JSON - case jobDataEncodingError(_ message: String? = nil) - case jobDataDecodingError(_ message: String? = nil) - /// The given DatabaseID doesn't match any existing database configured in the Vapor app. - case databaseNotFound + /// The queues system attempted to act on a job identifier which could not be found. + case missingJob(JobIdentifier) + + /// The provided database is unsupported. + /// + /// This error is thrown if a non-SQL database (such as MongoDB) is specified. + case unsupportedDatabase } diff --git a/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift b/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift deleted file mode 100644 index a61decb..0000000 --- a/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift +++ /dev/null @@ -1,18 +0,0 @@ -import Fluent -import SQLKit - -enum SQLSkipLocked: SQLExpression { - case forUpdateSkipLocked - case forShareSkipLocked - - public func serialize(to serializer: inout SQLSerializer) { - switch self { - case .forUpdateSkipLocked: - serializer.write("FOR UPDATE SKIP LOCKED") - case .forShareSkipLocked: - // This is the "lightest" locking that is supported by both Postgres and Mysql - serializer.write("FOR SHARE SKIP LOCKED") - } - } -} - diff --git a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift new file mode 100644 index 0000000..ee61506 --- /dev/null +++ b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift @@ -0,0 +1,125 @@ +import class NIOCore.EventLoopFuture +import SQLKit +import protocol FluentKit.Database +import protocol FluentKit.AsyncMigration + +/// Provides a database-independent way to express a date value representing "now". +struct SQLNow: SQLExpression { + func serialize(to serializer: inout SQLSerializer) { + switch serializer.dialect.name { + case "sqlite": // For SQLite, write out the literal string 'now' (see below) + SQLLiteral.string("now").serialize(to: &serializer) + default: // Everywhere else, just call the SQL standard function. + SQLFunction("current_timestamp").serialize(to: &serializer) + } + } +} + +/// Provides a wrapper which enables safely referring to expressions which should be interpreted as datetime +/// values when using the occasional database (**cough**SQLite**cough**) which doesn't do the right thing when +/// given such expressions as other databases do. +struct SQLDateValue: SQLExpression { + static func now() -> Self where E == SQLNow { .init(.init()) } + + let value: E + init(_ value: E) { self.value = value } + + func serialize(to serializer: inout SQLSerializer) { + switch serializer.dialect.name { + case "sqlite": // For SQLite, explicitly convert the inputs to UNIX timestamps + SQLFunction("unixepoch", args: self.value).serialize(to: &serializer) + default: // Everywhere else, this is a no-op passthrough waste of time. + self.value.serialize(to: &serializer) + } + } +} + +/// An alternative of `SQLLockingClause` which specifies the `SKIP LOCKED` modifier when the underlying database +/// supports it. As MySQL's and PostgreSQL's manuals both note, this should not be used except in very specific +/// scenarios, such as that of this package. +enum SQLLockingClauseWithSkipLocked: SQLExpression { + /// Request an exclusive "writer" lock, skipping rows that are already locked. + case updateSkippingLocked + + /// Request a shared "reader" lock, skipping rows that are already locked. + /// + /// > Note: This is the "lightest" locking that is supported by both Postgres and MySQL. + case shareSkippingLocked + + // See `SQLExpression.serialize(to:)`. + func serialize(to serializer: inout SQLSerializer) { + serializer.statement { + switch self { + case .updateSkippingLocked: + guard $0.dialect.exclusiveSelectLockExpression != nil else { return } + $0.append(SQLLockingClause.update) + case .shareSkippingLocked: + guard $0.dialect.sharedSelectLockExpression != nil else { return } + $0.append(SQLLockingClause.share) + } + $0.append("SKIP LOCKED") + } + } +} + +/// These overloads allow specifying various commonly-used `SQLExpression`s using more concise syntax. For example, +/// `.bind("hello")` rather than `SQLBind("hello")`, `.group(expr)` rather than `SQLGroupExpression(expr)`, etc. + +extension SQLExpression { + static func dateValue(_ value: E) -> Self where Self == SQLDateValue { .init(value) } + + static func now() -> Self where Self == SQLDateValue { .now() } + + static func bind(_ value: some Encodable & Sendable) -> Self where Self == SQLBind { .init(value) } + + static func function(_ name: String, _ args: any SQLExpression...) -> Self where Self == SQLFunction { .init(name, args: args) } + + static func group(_ expr: some SQLExpression) -> Self where Self == SQLGroupExpression { .init(expr) } +} + +/// The following extension allows using `Database's` `transaction(_:)` wrapper with an `SQLDatabase`. +extension SQLDatabase { + func transaction(_ closure: @escaping @Sendable (any SQLDatabase) -> EventLoopFuture) -> EventLoopFuture { + guard let fluentSelf = self as? any Database else { fatalError("Cannot use `SQLDatabase.transaction(_:)` on a non-Fluent database.") } + + return fluentSelf.transaction { fluentTransaction in closure(fluentTransaction as! any SQLDatabase) } + } + + func transaction(_ closure: @escaping @Sendable (any SQLDatabase) async throws -> T) async throws -> T { + guard let fluentSelf = self as? any Database else { fatalError("Cannot use `SQLDatabase.transaction(_:)` on a non-Fluent database.") } + + return try await fluentSelf.transaction { fluentTransaction in try await closure(fluentTransaction as! any SQLDatabase) } + } +} + +/// A variant of `AsyncMigration` designed to simplify using SQLKit to write migrations. +/// +/// > Warning: Use of ``AsyncSQLMigration`` will cause runtime errors if the migration is added to a Fluent +/// > database which is not compatible with SQLKit (such as MongoDB). +public protocol AsyncSQLMigration: AsyncMigration { + /// Perform the desired migration. + /// + /// - Parameter database: The database to migrate. + func prepare(on database: any SQLDatabase) async throws + + /// Reverse, if possible, the migration performed by ``prepare(on:)-7nlxz``. + /// + /// It is not uncommon for a given migration to be lossy if run in reverse, or to be irreversible in the + /// entire. While it is recommended that such a migration throw an error (thus stopping any further progression + /// of the revert operation), there is no requirement that it do so. In practice, most irreversible migrations + /// choose to simply do nothing at all in this method. Implementors should carefully consider the consequences + /// of progressively older migrations attempting to revert themselves afterwards before leaving this method blank. + /// + /// - Parameter database: The database to revert. + func revert(on database: any SQLDatabase) async throws +} + +extension AsyncSQLMigration { + public func prepare(on database: any Database) async throws { + try await self.prepare(on: database as! any SQLDatabase) + } + + public func revert(on database: any Database) async throws { + try await self.revert(on: database as! any SQLDatabase) + } +} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift deleted file mode 100644 index 6493e32..0000000 --- a/Tests/LinuxMain.swift +++ /dev/null @@ -1 +0,0 @@ -fatalError("Please use swift test --enable-test-discovery to run the tests instead") diff --git a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift index 2e9ca3c..59d7940 100644 --- a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift +++ b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift @@ -1,11 +1,149 @@ import XCTest +import XCTVapor +import FluentKit +import Logging @testable import QueuesFluentDriver +@preconcurrency import Queues +import FluentSQLiteDriver final class QueuesFluentDriverTests: XCTestCase { - func testExample() { - // This is an example of a functional test case. - // Use XCTAssert and related functions to verify your tests produce the correct - // results. - //XCTAssertEqual(QueuesFluentDriver().text, "Hello, World!") + func testApplication() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.databases.use(.sqlite(.memory), as: .sqlite) + app.migrations.add(JobModelMigration()) + + let email = Email() + app.queues.add(email) + + app.queues.use(.fluent()) + + try app.autoMigrate().wait() + + app.get("send-email") { req in + req.queue.dispatch(Email.self, .init(to: "tanner@vapor.codes")) + .map { HTTPStatus.ok } + } + + try app.testable().test(.GET, "send-email") { res in + XCTAssertEqual(res.status, .ok) + } + + XCTAssertEqual(email.sent, []) + try app.queues.queue.worker.run().wait() + XCTAssertEqual(email.sent, [.init(to: "tanner@vapor.codes")]) + } + + func testFailedJobLoss() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.databases.use(.sqlite(.memory), as: .sqlite) + app.queues.add(FailingJob()) + app.queues.use(.fluent()) + app.migrations.add(JobModelMigration()) + try app.autoMigrate().wait() + + let jobId = JobIdentifier() + app.get("test") { req in + req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobId) + .map { HTTPStatus.ok } + } + + try app.testable().test(.GET, "test") { res in + XCTAssertEqual(res.status, .ok) + } + + XCTAssertThrowsError(try app.queues.queue.worker.run().wait()) { + XCTAssert($0 is FailingJob.Failure) + } + + XCTAssertNotNil(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first().wait()) + } + + func testDelayedJobIsRemovedFromProcessingQueue() throws { + let app = Application(.testing) + defer { app.shutdown() } + + app.databases.use(.sqlite(.memory), as: .sqlite) + + app.queues.add(DelayedJob()) + + app.queues.use(.fluent()) + + app.migrations.add(JobModelMigration()) + try app.autoMigrate().wait() + + let jobId = JobIdentifier() + app.get("delay-job") { req in + req.queue.dispatch(DelayedJob.self, .init(name: "vapor"), + delayUntil: Date().addingTimeInterval(3600), + id: jobId) + .map { HTTPStatus.ok } + } + + try app.testable().test(.GET, "delay-job") { res in + XCTAssertEqual(res.status, .ok) + } + + XCTAssertEqual(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string) + .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase).wait()?.state, .pending) + } + + override func setUp() { + XCTAssert(isLoggingConfigured) } } + +final class Email: Job { + struct Message: Codable, Equatable { + let to: String + } + + var sent: [Message] = [] + + func dequeue(_ context: QueueContext, _ message: Message) -> EventLoopFuture { + self.sent.append(message) + context.logger.info("sending email \(message)") + return context.eventLoop.makeSucceededFuture(()) + } +} + +final class DelayedJob: Job { + struct Message: Codable, Equatable { + let name: String + } + + func dequeue(_ context: QueueContext, _ message: Message) -> EventLoopFuture { + context.logger.info("Hello \(message.name)") + return context.eventLoop.makeSucceededFuture(()) + } +} + +struct FailingJob: Job { + struct Failure: Error {} + + func dequeue(_ context: QueueContext, _ message: [String: String]) -> EventLoopFuture { + context.eventLoop.makeFailedFuture(Failure()) + } + + func error(_ context: QueueContext, _ error: any Error, _ payload: [String: String]) -> EventLoopFuture { + context.eventLoop.makeFailedFuture(Failure()) + } +} + +func env(_ name: String) -> String? { + return ProcessInfo.processInfo.environment[name] +} + +let isLoggingConfigured: Bool = { + LoggingSystem.bootstrap { label in + var handler = StreamLogHandler.standardOutput(label: label) + handler.logLevel = env("LOG_LEVEL").flatMap { .init(rawValue: $0) } ?? .info + return handler + } + return true +}()