Skip to content

Commit

Permalink
Implement preservable jobs and configurable jobs table name (#11)
Browse files Browse the repository at this point in the history
Preserved jobs, if enabled, are marked with a `completed` state in the
database. Turning preservation off if it was previously on does _not_
clear old jobs from the table; it just prevents new ones from being
preserved.

Configuring the jobs table name requires passing the same name (and
optionally, space) to both the queue driver _and_ the migration. This is
necessary because there's no way for the migration to see the driver's
configuration.

Closes #9.
Closes #10.
  • Loading branch information
gwynne authored Oct 1, 2024
1 parent bc47078 commit 4e31dfa
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 147 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@ jobs:
fail-fast: false
matrix:
swift-image:
- swift:5.8-jammy
- swift:5.9-jammy
- swift:5.10-noble
- swiftlang/swift:nightly-6.0-jammy
- swift:6.0-noble
- swiftlang/swift:nightly-main-jammy
include:
- sanitize: '--sanitize=thread'
- swift-image: swift:5.8-jammy
sanitize: ''
runs-on: ubuntu-latest
container: ${{ matrix.swift-image }}
services:
Expand All @@ -50,7 +45,7 @@ jobs:
SANITIZE: ${{ matrix.sanitize }}
POSTGRES_HOST: psql
MYSQL_HOST: mysql
run: SWIFT_DETERMINISTIC_HASHING=1 swift test ${SANITIZE} --enable-code-coverage
run: SWIFT_DETERMINISTIC_HASHING=1 swift test --sanitize=thread --enable-code-coverage
- name: Upload coverage data
uses: vapor/[email protected]
with:
Expand Down
7 changes: 6 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// swift-tools-version:5.8
// swift-tools-version:5.9
import PackageDescription
import class Foundation.ProcessInfo

let package = Package(
name: "QueuesFluentDriver",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.watchOS(.v6),
.tvOS(.v13),
],
products: [
.library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]),
Expand Down Expand Up @@ -53,6 +56,8 @@ let package = Package(

var swiftSettings: [SwiftSetting] { [
.enableUpcomingFeature("ForwardTrailingClosures"),
.enableUpcomingFeature("ExistentialAny"),
.enableUpcomingFeature("ConciseMagicFile"),
.enableUpcomingFeature("DisableOutwardActorInference"),
.enableExperimentalFeature("StrictConcurrency=complete"),
] }
63 changes: 0 additions & 63 deletions [email protected]

This file was deleted.

41 changes: 25 additions & 16 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ public struct FluentQueue: AsyncQueue, Sendable {
// See `Queue.context`.
public let context: QueueContext

let sqlDb: any SQLDatabase

let sqlDB: any SQLDatabase
let preservesCompletedJobs: Bool
let jobsTable: SQLQualifiedTable

let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable`

// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) async throws -> JobData {
guard let job = try await self.sqlDb.select()
guard let job = try await self.sqlDB.select()
.columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at")
.from(JobModel.schema)
.from(self.jobsTable)
.where("id", .equal, id)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)
else {
Expand All @@ -28,7 +30,7 @@ public struct FluentQueue: AsyncQueue, Sendable {

// See `Queue.set(_:to:)`.
public func set(_ id: JobIdentifier, to jobStorage: JobData) async throws {
try await self.sqlDb.insert(into: JobModel.schema)
try await self.sqlDB.insert(into: self.jobsTable)
.columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at")
.values(
.bind(id),
Expand All @@ -48,14 +50,21 @@ public struct FluentQueue: AsyncQueue, Sendable {

// See `Queue.clear(_:)`.
public func clear(_ id: JobIdentifier) async throws {
try await self.sqlDb.delete(from: JobModel.schema)
.where("id", .equal, id)
.run()
if self.preservesCompletedJobs {
try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.completed))
.where("id", .equal, id)
.run()
} else {
try await self.sqlDB.delete(from: self.jobsTable)
.where("id", .equal, id)
.run()
}
}

// See `Queue.push(_:)`.
public func push(_ id: JobIdentifier) async throws {
try await self.sqlDb.update(JobModel.schema)
try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.pending))
.set("updated_at", to: .now())
.where("id", .equal, id)
Expand All @@ -69,9 +78,9 @@ public struct FluentQueue: AsyncQueue, Sendable {
// is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time
// of this writing.
if self._sqlLockingClause.withLockedValue({ $0 }) == nil {
switch self.sqlDb.dialect.name {
switch self.sqlDB.dialect.name {
case "mysql":
let version = try await self.sqlDb.select()
let version = try await self.sqlDB.select()
.column(.function("version"), as: "version")
.first(decodingColumn: "version", as: String.self)! // always returns one row
// This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x.
Expand All @@ -87,7 +96,7 @@ public struct FluentQueue: AsyncQueue, Sendable {

let select = SQLSubquery.select { $0
.column("id")
.from(JobModel.schema)
.from(self.jobsTable)
.where("state", .equal, .literal(StoredJobState.pending))
.where("queue_name", .equal, self.queueName)
.where(.dateValue(.function("coalesce", .column("delay_until"), SQLNow())), .lessThanOrEqual, .now())
Expand All @@ -97,24 +106,24 @@ public struct FluentQueue: AsyncQueue, Sendable {
.lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here
}

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
if self.sqlDB.dialect.supportsReturning {
return try await self.sqlDB.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.processing))
.set("updated_at", to: .now())
.where("id", .equal, select)
.returning("id")
.first(decodingColumn: "id", as: String.self)
.map(JobIdentifier.init(string:))
} else {
return try await self.sqlDb.transaction { transaction in
return try await self.sqlDB.transaction { transaction in
guard let id = try await transaction.raw("\(select)") // using raw() to make sure we run on the transaction connection
.first(decodingColumn: "id", as: String.self)
else {
return nil
}

try await transaction
.update(JobModel.schema)
.update(self.jobsTable)
.set("state", to: .literal(StoredJobState.processing))
.set("updated_at", to: .now())
.where("id", .equal, id)
Expand Down
32 changes: 24 additions & 8 deletions Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,21 @@ import struct Queues.JobIdentifier
import struct Queues.JobData

public struct FluentQueuesDriver: QueuesDriver {
let databaseId: DatabaseID?
let databaseID: DatabaseID?
let preservesCompletedJobs: Bool
let jobsTableName: String
let jobsTableSpace: String?

init(on databaseId: DatabaseID? = nil) {
self.databaseId = databaseId
init(
on databaseID: DatabaseID? = nil,
preserveCompletedJobs: Bool = false,
jobsTableName: String = "_jobs_meta",
jobsTableSpace: String? = nil
) {
self.databaseID = databaseID
self.preservesCompletedJobs = preserveCompletedJobs
self.jobsTableName = jobsTableName
self.jobsTableSpace = jobsTableSpace
}

public func makeQueue(with context: QueueContext) -> any Queue {
Expand All @@ -21,16 +32,21 @@ public struct FluentQueuesDriver: QueuesDriver {
///
/// `Fluent.Databases.database(_:logger:on:)` never returns nil; its optionality is an API mistake.
/// If a nonexistent `DatabaseID` is requested, it triggers a `fatalError()`.
let baseDb = context
let baseDB = context
.application
.databases
.database(self.databaseId, logger: context.logger, on: context.eventLoop)!
guard let sqlDb = baseDb as? any SQLDatabase else {
.database(self.databaseID, logger: context.logger, on: context.eventLoop)!

guard let sqlDB = baseDB as? any SQLDatabase else {
return FailingQueue(failure: QueuesFluentError.unsupportedDatabase, context: context)
}

return FluentQueue(context: context, sqlDb: sqlDb)
return FluentQueue(
context: context,
sqlDB: sqlDB,
preservesCompletedJobs: self.preservesCompletedJobs,
jobsTable: .init(self.jobsTableName, space: self.jobsTableSpace)
)
}

public func shutdown() {}
Expand Down
8 changes: 5 additions & 3 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ enum StoredJobState: String, Codable, CaseIterable {

/// Job is in progress.
case processing

/// Job is completed.
///
/// > Note: This state is only used if the driver is configured to preserve completed jobs.
case completed
}

/// Encapsulates a job's metadata and `JobData`.
struct JobModel: Codable, Sendable {
/// The name of the model's table.
static let schema = "_jobs_meta"

/// The job identifier. Corresponds directly to a `JobIdentifier`.
let id: String?

Expand Down
68 changes: 37 additions & 31 deletions Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import protocol SQLKit.SQLDatabase
import enum SQLKit.SQLColumnConstraintAlgorithm
import enum SQLKit.SQLDataType
import enum SQLKit.SQLLiteral
import struct SQLKit.SQLRaw
import SQLKit

public struct JobModelMigration: AsyncSQLMigration {
private let jobsTableString: String
private let jobsTable: SQLQualifiedTable

/// Public initializer.
public init() {}

public init(
jobsTableName: String = "_jobs_meta",
jobsTableSpace: String? = nil
) {
self.jobsTableString = "\(jobsTableSpace.map { "\($0)_" } ?? "")\(jobsTableName)"
self.jobsTable = .init(jobsTableName, space: jobsTableSpace)
}

// See `AsyncSQLMigration.prepare(on:)`.
public func prepare(on database: any SQLDatabase) async throws {
let stateEnumType: String
let stateEnumType: any SQLExpression

switch database.dialect.enumSyntax {
case .typeName:
stateEnumType = "\(JobModel.schema)_storedjobstatus"
try await database.create(enum: stateEnumType)
.value("pending")
.value("processing")
.run()
stateEnumType = .identifier("\(self.jobsTableString)_storedjobstatus")
var builder = database.create(enum: stateEnumType)
builder = StoredJobState.allCases.reduce(builder, { $0.value($1.rawValue) })
try await builder.run()
case .inline:
stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')"
// This is technically a misuse of SQLFunction, but it produces the correct syntax
stateEnumType = .function("enum", StoredJobState.allCases.map { .literal($0.rawValue) })
default:
stateEnumType = "varchar(16)"
// This is technically a misuse of SQLFunction, but it produces the correct syntax
stateEnumType = .function("varchar", .literal(16))
}

/// This whole pile of nonsense is only here because of
Expand All @@ -39,20 +45,20 @@ public struct JobModelMigration: AsyncSQLMigration {
autoTimestampConstraints = []
}

try await database.create(table: JobModel.schema)
.column("id", type: .text, .primaryKey(autoIncrement: false))
.column("queue_name", type: .text, .notNull)
.column("job_name", type: .text, .notNull)
.column("queued_at", type: manualTimestampType, .notNull)
.column("delay_until", type: manualTimestampType, .default(SQLLiteral.null))
.column("state", type: .custom(SQLRaw(stateEnumType)), .notNull)
.column("max_retry_count", type: .int, .notNull)
.column("attempts", type: .int, .notNull)
.column("payload", type: .blob, .notNull)
.column("updated_at", type: .timestamp, autoTimestampConstraints)
try await database.create(table: self.jobsTable)
.column("id", type: .text, .primaryKey(autoIncrement: false))
.column("queue_name", type: .text, .notNull)
.column("job_name", type: .text, .notNull)
.column("queued_at", type: manualTimestampType, .notNull)
.column("delay_until", type: manualTimestampType, .default(SQLLiteral.null))
.column("state", type: .custom(stateEnumType), .notNull)
.column("max_retry_count", type: .int, .notNull)
.column("attempts", type: .int, .notNull)
.column("payload", type: .blob, .notNull)
.column("updated_at", type: .timestamp, autoTimestampConstraints)
.run()
try await database.create(index: "i_\(JobModel.schema)_state_queue_delayUntil")
.on(JobModel.schema)
try await database.create(index: "i_\(self.jobsTableString)_state_queue_delayUntil")
.on(self.jobsTable)
.column("state")
.column("queue_name")
.column("delay_until")
Expand All @@ -61,10 +67,10 @@ public struct JobModelMigration: AsyncSQLMigration {

// See `AsyncSQLMigration.revert(on:)`.
public func revert(on database: any SQLDatabase) async throws {
try await database.drop(table: JobModel.schema).run()
try await database.drop(table: self.jobsTable).run()
switch database.dialect.enumSyntax {
case .typeName:
try await database.drop(enum: "\(JobModel.schema)_storedjobstatus").run()
try await database.drop(enum: "\(self.jobsTableString)_storedjobstatus").run()
default:
break
}
Expand Down
Loading

0 comments on commit 4e31dfa

Please sign in to comment.