Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MySQL and PostgreSQL (again) #4

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ let package = Package(
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
],
targets: [
.target(
Expand All @@ -35,6 +37,8 @@ let package = Package(
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"),
.product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"),
.target(name: "QueuesFluentDriver"),
],
swiftSettings: swiftSettings
Expand Down
4 changes: 4 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ let package = Package(
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
],
targets: [
.target(
Expand All @@ -38,6 +40,8 @@ let package = Package(
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"),
.product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"),
.target(name: "QueuesFluentDriver"),
],
swiftSettings: swiftSettings
Expand Down
37 changes: 25 additions & 12 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers
import struct Foundation.Data

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
Expand All @@ -14,24 +15,36 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
self.sqlDb.select()
.columns("payload", "max_retry_count", "job_name", "delay_until", "queued_at", "attempts")
.columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at")
.from(JobModel.schema)
.where("id", .equal, id.string)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing {
try $0.decode(model: JobData.self, keyDecodingStrategy: .convertFromSnakeCase)
try $0.decode(model: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)
}.map {
.init(payload: .init($0.payload), maxRetryCount: $0.maxRetryCount, jobName: $0.jobName, delayUntil: $0.delayUntil, queuedAt: $0.queuedAt)
}
}

// See `Queue.set(_:to:)`.
public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
self.sqlDb.eventLoop.makeFutureWithTask {
try await self.sqlDb.insert(into: JobModel.schema)
.model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
.onConflict { try $0
.set(excludedContentOf: JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
}
.columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at")
.values(
SQLBind(id.string),
SQLBind(self.queueName.string),
SQLBind(jobStorage.jobName),
SQLBind(jobStorage.queuedAt),
SQLBind(jobStorage.delayUntil),
SQLLiteral.string(StoredJobState.pending.rawValue),
SQLBind(jobStorage.maxRetryCount),
SQLBind(jobStorage.attempts),
SQLBind(Data(jobStorage.payload)),
.now()
)
// .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) // because enums!
.run()
}
}
Expand All @@ -41,7 +54,7 @@ public struct FluentQueue: Queue, Sendable {
self.get(id).flatMap { _ in
self.sqlDb.delete(from: JobModel.schema)
.where("id", .equal, id.string)
.where("state", .notEqual, StoredJobState.completed)
.where("state", .notEqual, SQLLiteral.string(StoredJobState.completed.rawValue))
.run()
}
}
Expand All @@ -50,7 +63,7 @@ public struct FluentQueue: Queue, Sendable {
public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.sqlDb
.update(JobModel.schema)
.set("state", to: StoredJobState.pending)
.set("state", to: SQLLiteral.string(StoredJobState.pending.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id.string)
.run()
Expand Down Expand Up @@ -84,7 +97,7 @@ public struct FluentQueue: Queue, Sendable {
.select()
.column("id")
.from(JobModel.schema)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.where("queue_name", .equal, self.queueName.string)
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
Expand All @@ -93,7 +106,7 @@ public struct FluentQueue: Queue, Sendable {

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, .group(select.query))
.returning("id")
Expand All @@ -109,10 +122,10 @@ public struct FluentQueue: Queue, Sendable {

try await transaction
.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.run()

return JobIdentifier(string: id)
Expand Down
5 changes: 3 additions & 2 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import struct Foundation.Date
import struct Foundation.Data
import struct Queues.JobData
import struct Queues.JobIdentifier
import struct Queues.QueueName
Expand Down Expand Up @@ -45,7 +46,7 @@ struct JobModel: Codable, Sendable {
let attempts: Int

/// The job's payload.
let payload: [UInt8]
let payload: Data

/// The standard automatic update tracking timestamp.
let updatedAt: Date
Expand All @@ -59,7 +60,7 @@ struct JobModel: Codable, Sendable {
self.state = .pending
self.maxRetryCount = jobData.maxRetryCount
self.attempts = jobData.attempts ?? 0
self.payload = jobData.payload
self.payload = Data(jobData.payload)
self.updatedAt = .init()
}
}
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public struct JobModelMigration: AsyncSQLMigration {
.value("completed")
.run()
case .inline:
stateEnumType = "enum(\(StoredJobState.allCases.map(\.rawValue).joined(separator: ",")))"
stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')"
default:
stateEnumType = "varchar(16)"
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ struct SQLNow: SQLExpression {
switch serializer.dialect.name {
case "sqlite": // For SQLite, write out the literal string 'now' (see below)
SQLLiteral.string("now").serialize(to: &serializer)
case "postgresql": // For Postgres, "current_timestamp" is a keyword, not a function, so use "now()" instead.
SQLFunction("now").serialize(to: &serializer)
default: // Everywhere else, just call the SQL standard function.
SQLFunction("current_timestamp").serialize(to: &serializer)
}
Expand Down
49 changes: 38 additions & 11 deletions Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,46 @@ import Logging
@testable import QueuesFluentDriver
@preconcurrency import Queues
import FluentSQLiteDriver
import FluentPostgresDriver
import FluentMySQLDriver
import NIOSSL

final class QueuesFluentDriverTests: XCTestCase {
var dbid: DatabaseID { .sqlite }

private func useDbs(_ app: Application) throws {
app.databases.use(.sqlite(.memory), as: .sqlite)
app.databases.use(DatabaseConfigurationFactory.postgres(configuration: .init(
hostname: Environment.get("DATABASE_HOST") ?? "localhost",
port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? SQLPostgresConfiguration.ianaPortNumber,
username: Environment.get("DATABASE_USERNAME") ?? "test_username",
password: Environment.get("DATABASE_PASSWORD") ?? "test_password",
database: Environment.get("DATABASE_NAME") ?? "test_database",
tls: .prefer(try .init(configuration: .clientDefault)))
), as: .psql)
var config = TLSConfiguration.clientDefault
config.certificateVerification = .none
app.databases.use(DatabaseConfigurationFactory.mysql(configuration: .init(
hostname: Environment.get("DATABASE_HOST") ?? "localhost",
port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? MySQLConfiguration.ianaPortNumber,
username: Environment.get("DATABASE_USERNAME") ?? "test_username",
password: Environment.get("DATABASE_PASSWORD") ?? "test_password",
database: Environment.get("DATABASE_NAME") ?? "test_database",
tlsConfiguration: config
)), as: .mysql)
}

func testApplication() async throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.migrations.add(JobModelMigration())
try self.useDbs(app)
app.migrations.add(JobModelMigration(), to: self.dbid)

let email = Email()
app.queues.add(email)

app.queues.use(.fluent())
app.queues.use(.fluent(self.dbid))

try await app.autoMigrate()

Expand All @@ -41,10 +68,10 @@ final class QueuesFluentDriverTests: XCTestCase {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
try self.useDbs(app)
app.queues.add(FailingJob())
app.queues.use(.fluent())
app.migrations.add(JobModelMigration())
app.queues.use(.fluent(self.dbid))
app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()

let jobId = JobIdentifier()
Expand All @@ -61,7 +88,7 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssert($0 is FailingJob.Failure)
}

await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertNotNilAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first())

try await app.autoRevert()
Expand All @@ -71,13 +98,13 @@ final class QueuesFluentDriverTests: XCTestCase {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
try self.useDbs(app)

app.queues.add(DelayedJob())

app.queues.use(.fluent())
app.queues.use(.fluent(self.dbid))

app.migrations.add(JobModelMigration())
app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()

let jobId = JobIdentifier()
Expand All @@ -92,7 +119,7 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertEqualAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)

Expand Down
Loading