Skip to content

Commit

Permalink
Add some really cockeyed support for MySQL 5.7 users.
Browse files Browse the repository at this point in the history
  • Loading branch information
gwynne committed Apr 28, 2024
1 parent 199bd0e commit 7cdc129
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
// See `Queue.context`.
public let context: QueueContext

let sqlDb: any SQLDatabase

let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable`

// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
Expand Down Expand Up @@ -56,6 +59,27 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.pop()`.
public func pop() -> EventLoopFuture<JobIdentifier?> {
self.sqlDb.eventLoop.makeFutureWithTask {
// Special case: For MySQL < 8.0, we can't use `SKIP LOCKED`. This is a really hackneyed solution,
// but we need to execute a database query to get the version information, `makeQueue(with:)`
// is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time
// of this writing.
if self._sqlLockingClause.withLockedValue({ $0 }) == nil {
switch self.sqlDb.dialect.name {
case "mysql":
let version = try await self.sqlDb.select()
.column(SQLFunction("version"), as: "version")
.first(decodingColumn: "version", as: String.self) ?? "" // always returns one row
// This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x.
if version.starts(with: "5.") || !(version.first?.isNumber ?? false) {
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClause.update }
} else {
fallthrough
}
default:
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClauseWithSkipLocked.updateSkippingLocked }
}
}

let select = self.sqlDb
.select()
.column("id")
Expand All @@ -65,7 +89,7 @@ public struct FluentQueue: Queue, Sendable {
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
.limit(1)
.lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked)
.lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
Expand Down

0 comments on commit 7cdc129

Please sign in to comment.