diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index fb48e4d..58b74ee 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -1,5 +1,6 @@ @preconcurrency import Queues @preconcurrency import SQLKit +import NIOConcurrencyHelpers /// An implementation of `Queue` which stores job data and metadata in a Fluent database. public struct FluentQueue: Queue, Sendable { @@ -7,6 +8,8 @@ public struct FluentQueue: Queue, Sendable { public let context: QueueContext let sqlDb: any SQLDatabase + + let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable` // See `Queue.get(_:)`. public func get(_ id: JobIdentifier) -> EventLoopFuture { @@ -56,6 +59,27 @@ public struct FluentQueue: Queue, Sendable { // See `Queue.pop()`. public func pop() -> EventLoopFuture { self.sqlDb.eventLoop.makeFutureWithTask { + // Special case: For MySQL < 8.0, we can't use `SKIP LOCKED`. This is a really hackneyed solution, + // but we need to execute a database query to get the version information, `makeQueue(with:)` + // is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time + // of this writing. + if self._sqlLockingClause.withLockedValue({ $0 }) == nil { + switch self.sqlDb.dialect.name { + case "mysql": + let version = try await self.sqlDb.select() + .column(SQLFunction("version"), as: "version") + .first(decodingColumn: "version", as: String.self) ?? "" // always returns one row + // This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x. + if version.starts(with: "5.") || !(version.first?.isNumber ?? false) { + self._sqlLockingClause.withLockedValue { $0 = SQLLockingClause.update } + } else { + fallthrough + } + default: + self._sqlLockingClause.withLockedValue { $0 = SQLLockingClauseWithSkipLocked.updateSkippingLocked } + } + } + let select = self.sqlDb .select() .column("id") @@ -65,7 +89,7 @@ public struct FluentQueue: Queue, Sendable { .where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now()) .orderBy("delay_until") .limit(1) - .lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked) + .lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here if self.sqlDb.dialect.supportsReturning { return try await self.sqlDb.update(JobModel.schema)