Skip to content

Commit

Permalink
Merge pull request #25 from m-barthelemy/feature/3.0.0-beta1
Browse files Browse the repository at this point in the history
Feature/3.0.0 beta1
  • Loading branch information
m-barthelemy authored Sep 4, 2022
2 parents 4d48e65 + fb947fc commit e2ce677
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 137 deletions.
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.2
// swift-tools-version:5.4
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -17,7 +17,7 @@ let package = Package(
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.6.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.5.1"),
.package(url: "https://github.com/vapor/queues.git", from: "1.11.1"),
],
targets: [
.target(
Expand Down
56 changes: 16 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# QueuesFluentDriver

This Vapor Queues driver is an alternative to the (default) Redis driver, allowing you to use Fluent to store the Queues jobs into your relational database.
This Vapor Queues driver stores the Queues jobs metadata into a relational database. It is an alternative to the default Redis driver.


## Compatibility
Expand All @@ -15,8 +15,6 @@ This package should be compatible with:

> Sqlite will only work if you have a custom, very low number of Queues workers (1-2), which makes it useless except for testing purposes
> Postgres: This package relies on some recently added features in `sql-kit` and `postgres-kit` >= 2.1.0. **Make sure you use a release of postgres-kit that is at least 2.1.0**
 

## Usage
Expand All @@ -27,7 +25,7 @@ Add it to the `Package.swift` of your Vapor4 project:

```swift

// swift-tools-version:5.2
// swift-tools-version:5.4
import PackageDescription

let package = Package(
Expand All @@ -38,7 +36,7 @@ let package = Package(
...
dependencies: [
...
.package(name: "QueuesFluentDriver", url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "1.2.0"),
.package(name: "QueuesFluentDriver", url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", .branch("3.0.0-beta1")),
...
],
targets: [
Expand All @@ -55,10 +53,10 @@ let package = Package(

 

This package needs a table, named `_jobs` by default, to store the Vapor Queues jobs. Add `JobModelMigrate` to your migrations:
This package needs a table, named `_jobs_meta` by default, to store the Vapor Queues jobs. Make sure to add this to your migrations:
```swift
// Ensure the table for storing jobs is created
app.migrations.add(JobModelMigrate())
app.migrations.add(JobMetadataMigrate())
```

 
Expand All @@ -67,7 +65,8 @@ Finally, load the `QueuesFluentDriver` driver:
```swift
app.queues.use(.fluent())
```
Make sure you call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`!

⚠️ Make sure you call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`!

 

Expand All @@ -84,33 +83,20 @@ app.queues.use(.fluent(queuesDb))
```

### Customizing the jobs table name
By default the `JobModelMigrate` migration will create a table named `_jobs`. You can customize the name during the migration :
You can customize the name of the table used by this driver during the migration :
```swift
app.migrations.add(JobModelMigrate(schema: "vapor_queues"))
app.migrations.add(JobMetadataMigrate(schema: "my_jobs"))
```

### Listing jobs
If needed, you can list the jobs stored into the database:
### Soft Deletes
By default, completed jobs are deleted from the two database tables used by this driver.
If you want to keep them, you can use Fluent's "soft delete" feature, which just sets the `deleted_at` field to a non-null value and excludes rows from queries by default:

```swift
import QueuesFluentDriver

let queue = req.queue as! FluentQueue

// Get the pending jobs
queue.list()

// Get the ones currently running
queue.list(state: .processing)

// Get the completed ones (only if you didn't set `useSoftDeletes` to `false`)
queue.list(state: .completed)

// For a custom Queue
queue.list(queue: "myCustomQueue")
app.queues.use(.fluent(useSoftDeletes: true))
```


When enabling this option, it is probably a good idea to cleanup the completed jobs from time to time.

 

Expand All @@ -119,8 +105,8 @@ queue.list(queue: "myCustomQueue")


### Polling interval and number of workers
By default, the Vapor Queues package creates 2 workers per CPU core, and each worker would periodically poll the database for jobs to be run.
On a recent 4 cores MacBook, this means 8 workers querying the database every second by default.
By default, the Vapor Queues package creates 2 workers per CPU core, and each worker would poll the database for jobs to be run every second.
On a 4 cores system, this means 8 workers querying the database every second by default.

You can change the jobs polling interval by calling:

Expand All @@ -130,13 +116,3 @@ app.queues.configuration.refreshInterval = .seconds(custom_value)

With Queues >=1.4.0, you can also configure the number of workers that will be started by setting `app.queues.configuration.workerCount`


### Soft Deletes
By default, this driver uses Fluent's "soft delete" feature, meaning that completed jobs stay in the database, but with a non-null `deleted_at` value.
If you want to delete the entry as soon as job is completed, you can set the `useSoftDeletes` option to `false`:

```swift
app.queues.use(.fluent(useSoftDeletes: false))
```

When using the default soft deletes option, it is probably a good idea to cleanup the completed jobs from time to time.
75 changes: 21 additions & 54 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,36 @@ public struct FluentQueue {
extension FluentQueue: Queue {
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
return db.query(JobModel.self)
.filter(\.$jobId == id.string)
.filter(\.$state != .pending)
.filter(\.$id == id.string)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing { job in
return job.data
return JobData(
payload: Array(job.data.payload),
maxRetryCount: job.data.maxRetryCount,
jobName: job.data.jobName,
delayUntil: job.data.delayUntil,
queuedAt: job.data.queuedAt,
attempts: job.data.attempts ?? 0
)
}
}

public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
do {
let jobModel = try JobModel(jobId: id.string, queue: queueName.string, data: jobStorage)
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by creation date when querying
jobModel.createdAt = jobStorage.delayUntil ?? Date()
return jobModel.save(on: db)
let jobModel = JobModel(id: id, queue: queueName.string, jobData: JobDataModel(jobData: jobStorage))
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by date when querying
jobModel.runAtOrAfter = jobStorage.delayUntil ?? Date()

return jobModel.save(on: db).map { metadata in
return
}
catch {
return db.eventLoop.makeFailedFuture(QueuesFluentError.jobDataEncodingError(error.localizedDescription))
}
}

public func clear(_ id: JobIdentifier) -> EventLoopFuture<Void> {
// This does the equivalent of a Fluent Softdelete but sets the `state` to `completed`
return db.query(JobModel.self)
.filter(\.$jobId == id.string)
.filter(\.$id == id.string)
.filter(\.$state != .completed)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
Expand All @@ -60,7 +64,7 @@ extension FluentQueue: Queue {
return sqlDb
.update(JobModel.schema)
.set(SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.jobId)"), .equal, SQLBind(id.string))
.where(SQLColumn("\(FieldKey.id)"), .equal, SQLBind(id.string))
.run()
}

Expand All @@ -72,12 +76,12 @@ extension FluentQueue: Queue {

var selectQuery = sqlDb
.select()
.column("\(FieldKey.jobId)")
.column("\(FieldKey.id)")
.from(JobModel.schema)
.where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.queue)"), .equal, SQLBind(self.queueName.string))
.where(SQLColumn("\(FieldKey.createdAt)"), .lessThanOrEqual, SQLBind(Date()))
.orderBy("\(FieldKey.createdAt)")
.where(SQLColumn("\(FieldKey.runAt)"), .lessThanOrEqual, SQLBind(Date()))
.orderBy("\(FieldKey.runAt)")
.limit(1)
if self.dbType != .sqlite {
selectQuery = selectQuery.lockingClause(SQLSkipLocked.forUpdateSkipLocked)
Expand All @@ -91,46 +95,9 @@ extension FluentQueue: Queue {
popProvider = MySQLPop()
case .sqlite:
popProvider = SqlitePop()
default:
return db.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return popProvider.pop(db: db, select: selectQuery.query).optionalMap { id in
return JobIdentifier(string: id)
}
}

/// /!\ This is a non standard extension.
public func list(queue: String? = nil, state: QueuesFluentJobState = .pending) -> EventLoopFuture<[JobData]> {
guard let sqlDb = db as? SQLDatabase else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
var query = sqlDb
.select()
.column("*")
.from(JobModel.schema)
.where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(state))
if let queue = queue {
query = query.where(SQLColumn("\(FieldKey.queue)"), .equal, SQLBind(queue))
}
if self.dbType != .sqlite {
query = query.lockingClause(SQLSkipLocked.forShareSkipLocked)
}
query = query.limit(1000)

var jobs = [JobData]()
return sqlDb.execute(sql: query.query) { (row) -> Void in
do {
let job = try row.decode(column: "\(FieldKey.data)", as: Data.self)
let jobData = try JSONDecoder().decode(JobData.self, from: job)
jobs.append(jobData)
}
catch {
return self.db.eventLoop.makeFailedFuture(QueuesFluentError.jobDataDecodingError("\(error)"))
.whenSuccess {$0}
}
}
.map {
return jobs
}
}
}
50 changes: 50 additions & 0 deletions Sources/QueuesFluentDriver/JobDataModel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import Foundation
import Fluent
import Queues

extension FieldKey {
static var payload: Self { "payload" }
static var maxRetryCount: Self { "max_retries" }
static var attempts: Self { "attempt" }
static var delayUntil: Self { "delay_until" }
static var queuedAt: Self { "queued_at" }
static var jobName: Self { "job_name" }
}

/// Handles storage of a `JobData` into the database
final class JobDataModel: Fields {
required init() {}

/// The job data to be encoded.
@Field(key: .payload)
var payload: [UInt8]

/// The maxRetryCount for the `Job`.
@Field(key: .maxRetryCount)
var maxRetryCount: Int

/// The number of attempts made to run the `Job`.
@Field(key: .attempts)
var attempts: Int?

/// A date to execute this job after
@OptionalField(key: .delayUntil)
var delayUntil: Date?

/// The date this job was queued
@Field(key: .queuedAt)
var queuedAt: Date

/// The name of the `Job`
@Field(key: .jobName)
var jobName: String

init(jobData: JobData) {
self.payload = jobData.payload
self.maxRetryCount = jobData.maxRetryCount
self.attempts = jobData.attempts
self.delayUntil = jobData.delayUntil
self.jobName = jobData.jobName
self.queuedAt = jobData.queuedAt
}
}
32 changes: 13 additions & 19 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,49 @@ public enum QueuesFluentJobState: String, Codable, CaseIterable {
}

extension FieldKey {
static var jobId: Self { "job_id" }
static var queue: Self { "queue" }
static var data: Self { "data" }
static var state: Self { "state" }

static var createdAt: Self { "created_at" }
static var runAt: Self { "run_at" }
static var updatedAt: Self { "updated_at" }
static var deletedAt: Self { "deleted_at" }
}

class JobModel: Model {
public required init() {}

public static var schema = "_jobs"
public static var schema = "_jobs_meta"

/// The unique Job uuid
@ID(key: .id)
var id: UUID?

@Field(key: .jobId)
var jobId: String?
/// The unique Job ID
@ID(custom: .id, generatedBy: .user)
var id: String?

/// The Job key
@Field(key: .queue)
var queue: String

/// The Job data
@Field(key: .data)
var data: JobData

/// The current state of the Job
@Field(key: .state)
var state: QueuesFluentJobState

/// Creation date by default; `delayUntil` if it's a delayed job
@OptionalField(key: .createdAt)
var createdAt: Date?
/// Earliest date the job can run
@OptionalField(key: .runAt)
var runAtOrAfter: Date?

@Timestamp(key: .updatedAt, on: .update)
var updatedAt: Date?

@Timestamp(key: .deletedAt, on: .delete)
var deletedAt: Date?

@Group(key: .data)
var data: JobDataModel

init(jobId: String, queue: String, data: JobData) throws {
self.jobId = jobId
init(id: JobIdentifier, queue: String, jobData: JobDataModel) {
self.id = id.string
self.queue = queue
self.data = data
self.state = .pending
self.data = jobData
}
}
Loading

0 comments on commit e2ce677

Please sign in to comment.