Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MySQL 5.7 #3

Merged
merged 3 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
// See `Queue.context`.
public let context: QueueContext

let sqlDb: any SQLDatabase

let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable`

// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
Expand Down Expand Up @@ -56,6 +59,27 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.pop()`.
public func pop() -> EventLoopFuture<JobIdentifier?> {
self.sqlDb.eventLoop.makeFutureWithTask {
// Special case: For MySQL < 8.0, we can't use `SKIP LOCKED`. This is a really hackneyed solution,
// but we need to execute a database query to get the version information, `makeQueue(with:)`
// is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time
// of this writing.
if self._sqlLockingClause.withLockedValue({ $0 }) == nil {
switch self.sqlDb.dialect.name {
case "mysql":
let version = try await self.sqlDb.select()
.column(SQLFunction("version"), as: "version")
.first(decodingColumn: "version", as: String.self) ?? "" // always returns one row
// This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x.
if version.starts(with: "5.") || !(version.first?.isNumber ?? false) {
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClause.update }
} else {
fallthrough
}
default:
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClauseWithSkipLocked.updateSkippingLocked }
}
}

let select = self.sqlDb
.select()
.column("id")
Expand All @@ -65,7 +89,7 @@ public struct FluentQueue: Queue, Sendable {
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
.limit(1)
.lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked)
.lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public struct FluentQueuesDriver: QueuesDriver {
public func shutdown() {}
}

private struct FailingQueue: Queue {
/*private*/ struct FailingQueue: Queue {
Copy link
Member

@ptoffy ptoffy May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*private*/ struct FailingQueue: Queue {
struct FailingQueue: Queue {

let failure: any Error
let context: QueueContext

Expand Down
2 changes: 0 additions & 2 deletions Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ extension SQLExpression {

static func now() -> Self where Self == SQLDateValue<SQLNow> { .now() }

static func bind(_ value: some Encodable & Sendable) -> Self where Self == SQLBind { .init(value) }

static func function(_ name: String, _ args: any SQLExpression...) -> Self where Self == SQLFunction { .init(name, args: args) }

static func group(_ expr: some SQLExpression) -> Self where Self == SQLGroupExpression { .init(expr) }
Expand Down
85 changes: 73 additions & 12 deletions Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Logging
import FluentSQLiteDriver

final class QueuesFluentDriverTests: XCTestCase {
func testApplication() throws {
func testApplication() async throws {
let app = Application(.testing)
defer { app.shutdown() }

Expand All @@ -19,7 +19,7 @@ final class QueuesFluentDriverTests: XCTestCase {

app.queues.use(.fluent())

try app.autoMigrate().wait()
try await app.autoMigrate()

app.get("send-email") { req in
req.queue.dispatch(Email.self, .init(to: "[email protected]"))
Expand All @@ -31,19 +31,21 @@ final class QueuesFluentDriverTests: XCTestCase {
}

XCTAssertEqual(email.sent, [])
try app.queues.queue.worker.run().wait()
try await app.queues.queue.worker.run().get()
XCTAssertEqual(email.sent, [.init(to: "[email protected]")])

try await app.autoRevert()
}

func testFailedJobLoss() throws {
func testFailedJobLoss() async throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.queues.add(FailingJob())
app.queues.use(.fluent())
app.migrations.add(JobModelMigration())
try app.autoMigrate().wait()
try await app.autoMigrate()

let jobId = JobIdentifier()
app.get("test") { req in
Expand All @@ -55,15 +57,17 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

XCTAssertThrowsError(try app.queues.queue.worker.run().wait()) {
await XCTAssertThrowsErrorAsync(try await app.queues.queue.worker.run().get()) {
XCTAssert($0 is FailingJob.Failure)
}

XCTAssertNotNil(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first().wait())
await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first())

try await app.autoRevert()
}

func testDelayedJobIsRemovedFromProcessingQueue() throws {
func testDelayedJobIsRemovedFromProcessingQueue() async throws {
let app = Application(.testing)
defer { app.shutdown() }

Expand All @@ -74,7 +78,7 @@ final class QueuesFluentDriverTests: XCTestCase {
app.queues.use(.fluent())

app.migrations.add(JobModelMigration())
try app.autoMigrate().wait()
try await app.autoMigrate()

let jobId = JobIdentifier()
app.get("delay-job") { req in
Expand All @@ -88,9 +92,25 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

XCTAssertEqual(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase).wait()?.state, .pending)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)

try await app.autoRevert()
}

func testCoverageForFailingQueue() {
let app = Application(.testing)
defer { app.shutdown() }
let queue = FailingQueue(
failure: QueuesFluentError.unsupportedDatabase,
context: .init(queueName: .init(string: ""), configuration: .init(), application: app, logger: .init(label: ""), on: app.eventLoopGroup.any())
)
XCTAssertThrowsError(try queue.get(.init()).wait())
XCTAssertThrowsError(try queue.set(.init(), to: JobData(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: .init())).wait())
XCTAssertThrowsError(try queue.clear(.init()).wait())
XCTAssertThrowsError(try queue.push(.init()).wait())
XCTAssertThrowsError(try queue.pop().wait())
}

override func setUp() {
Expand Down Expand Up @@ -135,6 +155,47 @@ struct FailingJob: Job {
}
}

func XCTAssertEqualAsync<T>(
_ expression1: @autoclosure () async throws -> T,
_ expression2: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line
) async where T: Equatable {
do {
let expr1 = try await expression1(), expr2 = try await expression2()
return XCTAssertEqual(expr1, expr2, message(), file: file, line: line)
} catch {
return XCTAssertEqual(try { () -> Bool in throw error }(), false, message(), file: file, line: line)
}
}

func XCTAssertThrowsErrorAsync<T>(
_ expression: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line,
_ callback: (any Error) -> Void = { _ in }
) async {
do {
_ = try await expression()
XCTAssertThrowsError({}(), message(), file: file, line: line, callback)
} catch {
XCTAssertThrowsError(try { throw error }(), message(), file: file, line: line, callback)
}
}

func XCTAssertNotNilAsync(
_ expression: @autoclosure () async throws -> Any?,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line
) async {
do {
let result = try await expression()
XCTAssertNotNil(result, message(), file: file, line: line)
} catch {
return XCTAssertNotNil(try { throw error }(), message(), file: file, line: line)
}
}

func env(_ name: String) -> String? {
return ProcessInfo.processInfo.environment[name]
}
Expand Down