From 7cdc1299b199de04218c364e9a9ddd466e1514c1 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Sun, 28 Apr 2024 17:39:56 -0500 Subject: [PATCH 1/3] Add some really cockeyed support for MySQL 5.7 users. --- Sources/QueuesFluentDriver/FluentQueue.swift | 26 +++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index fb48e4d..58b74ee 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -1,5 +1,6 @@ @preconcurrency import Queues @preconcurrency import SQLKit +import NIOConcurrencyHelpers /// An implementation of `Queue` which stores job data and metadata in a Fluent database. public struct FluentQueue: Queue, Sendable { @@ -7,6 +8,8 @@ public struct FluentQueue: Queue, Sendable { public let context: QueueContext let sqlDb: any SQLDatabase + + let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable` // See `Queue.get(_:)`. public func get(_ id: JobIdentifier) -> EventLoopFuture { @@ -56,6 +59,27 @@ public struct FluentQueue: Queue, Sendable { // See `Queue.pop()`. public func pop() -> EventLoopFuture { self.sqlDb.eventLoop.makeFutureWithTask { + // Special case: For MySQL < 8.0, we can't use `SKIP LOCKED`. This is a really hackneyed solution, + // but we need to execute a database query to get the version information, `makeQueue(with:)` + // is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time + // of this writing. + if self._sqlLockingClause.withLockedValue({ $0 }) == nil { + switch self.sqlDb.dialect.name { + case "mysql": + let version = try await self.sqlDb.select() + .column(SQLFunction("version"), as: "version") + .first(decodingColumn: "version", as: String.self) ?? "" // always returns one row + // This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x. + if version.starts(with: "5.") || !(version.first?.isNumber ?? false) { + self._sqlLockingClause.withLockedValue { $0 = SQLLockingClause.update } + } else { + fallthrough + } + default: + self._sqlLockingClause.withLockedValue { $0 = SQLLockingClauseWithSkipLocked.updateSkippingLocked } + } + } + let select = self.sqlDb .select() .column("id") @@ -65,7 +89,7 @@ public struct FluentQueue: Queue, Sendable { .where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now()) .orderBy("delay_until") .limit(1) - .lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked) + .lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here if self.sqlDb.dialect.supportsReturning { return try await self.sqlDb.update(JobModel.schema) From f36fcca494c38b8a306deb00934995cd95b925f5 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Thu, 2 May 2024 03:13:37 -0500 Subject: [PATCH 2/3] Remove unused method --- Sources/QueuesFluentDriver/SQLKit+Convenience.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift index ee61506..42ed2bc 100644 --- a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift +++ b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift @@ -70,8 +70,6 @@ extension SQLExpression { static func now() -> Self where Self == SQLDateValue { .now() } - static func bind(_ value: some Encodable & Sendable) -> Self where Self == SQLBind { .init(value) } - static func function(_ name: String, _ args: any SQLExpression...) -> Self where Self == SQLFunction { .init(name, args: args) } static func group(_ expr: some SQLExpression) -> Self where Self == SQLGroupExpression { .init(expr) } From 63f9ccb194f01b684440d69168fd4c751952154d Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Thu, 2 May 2024 03:13:52 -0500 Subject: [PATCH 3/3] Async-ify the tests, such as they are --- .../FluentQueuesDriver.swift | 2 +- .../QueuesFluentDriverTests.swift | 85 ++++++++++++++++--- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift index 6279fc7..a8e8970 100644 --- a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift +++ b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift @@ -35,7 +35,7 @@ public struct FluentQueuesDriver: QueuesDriver { public func shutdown() {} } -private struct FailingQueue: Queue { +/*private*/ struct FailingQueue: Queue { let failure: any Error let context: QueueContext diff --git a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift index 59d7940..b469b6b 100644 --- a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift +++ b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift @@ -7,7 +7,7 @@ import Logging import FluentSQLiteDriver final class QueuesFluentDriverTests: XCTestCase { - func testApplication() throws { + func testApplication() async throws { let app = Application(.testing) defer { app.shutdown() } @@ -19,7 +19,7 @@ final class QueuesFluentDriverTests: XCTestCase { app.queues.use(.fluent()) - try app.autoMigrate().wait() + try await app.autoMigrate() app.get("send-email") { req in req.queue.dispatch(Email.self, .init(to: "tanner@vapor.codes")) @@ -31,11 +31,13 @@ final class QueuesFluentDriverTests: XCTestCase { } XCTAssertEqual(email.sent, []) - try app.queues.queue.worker.run().wait() + try await app.queues.queue.worker.run().get() XCTAssertEqual(email.sent, [.init(to: "tanner@vapor.codes")]) + + try await app.autoRevert() } - func testFailedJobLoss() throws { + func testFailedJobLoss() async throws { let app = Application(.testing) defer { app.shutdown() } @@ -43,7 +45,7 @@ final class QueuesFluentDriverTests: XCTestCase { app.queues.add(FailingJob()) app.queues.use(.fluent()) app.migrations.add(JobModelMigration()) - try app.autoMigrate().wait() + try await app.autoMigrate() let jobId = JobIdentifier() app.get("test") { req in @@ -55,15 +57,17 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssertEqual(res.status, .ok) } - XCTAssertThrowsError(try app.queues.queue.worker.run().wait()) { + await XCTAssertThrowsErrorAsync(try await app.queues.queue.worker.run().get()) { XCTAssert($0 is FailingJob.Failure) } - XCTAssertNotNil(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) - .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first().wait()) + await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first()) + + try await app.autoRevert() } - func testDelayedJobIsRemovedFromProcessingQueue() throws { + func testDelayedJobIsRemovedFromProcessingQueue() async throws { let app = Application(.testing) defer { app.shutdown() } @@ -74,7 +78,7 @@ final class QueuesFluentDriverTests: XCTestCase { app.queues.use(.fluent()) app.migrations.add(JobModelMigration()) - try app.autoMigrate().wait() + try await app.autoMigrate() let jobId = JobIdentifier() app.get("delay-job") { req in @@ -88,9 +92,25 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssertEqual(res.status, .ok) } - XCTAssertEqual(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string) - .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase).wait()?.state, .pending) + .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending) + + try await app.autoRevert() + } + + func testCoverageForFailingQueue() { + let app = Application(.testing) + defer { app.shutdown() } + let queue = FailingQueue( + failure: QueuesFluentError.unsupportedDatabase, + context: .init(queueName: .init(string: ""), configuration: .init(), application: app, logger: .init(label: ""), on: app.eventLoopGroup.any()) + ) + XCTAssertThrowsError(try queue.get(.init()).wait()) + XCTAssertThrowsError(try queue.set(.init(), to: JobData(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: .init())).wait()) + XCTAssertThrowsError(try queue.clear(.init()).wait()) + XCTAssertThrowsError(try queue.push(.init()).wait()) + XCTAssertThrowsError(try queue.pop().wait()) } override func setUp() { @@ -135,6 +155,47 @@ struct FailingJob: Job { } } +func XCTAssertEqualAsync( + _ expression1: @autoclosure () async throws -> T, + _ expression2: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async where T: Equatable { + do { + let expr1 = try await expression1(), expr2 = try await expression2() + return XCTAssertEqual(expr1, expr2, message(), file: file, line: line) + } catch { + return XCTAssertEqual(try { () -> Bool in throw error }(), false, message(), file: file, line: line) + } +} + +func XCTAssertThrowsErrorAsync( + _ expression: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line, + _ callback: (any Error) -> Void = { _ in } +) async { + do { + _ = try await expression() + XCTAssertThrowsError({}(), message(), file: file, line: line, callback) + } catch { + XCTAssertThrowsError(try { throw error }(), message(), file: file, line: line, callback) + } +} + +func XCTAssertNotNilAsync( + _ expression: @autoclosure () async throws -> Any?, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await expression() + XCTAssertNotNil(result, message(), file: file, line: line) + } catch { + return XCTAssertNotNil(try { throw error }(), message(), file: file, line: line) + } +} + func env(_ name: String) -> String? { return ProcessInfo.processInfo.environment[name] }