From 2a0c2a5117e32219a9d1117ef23eb34741e7d7bc Mon Sep 17 00:00:00 2001 From: Josh Wright Date: Sat, 27 Jul 2024 13:34:44 -0700 Subject: [PATCH] async streamify event bus --- Alchemy/Database/Rune/Model/Model+CRUD.swift | 80 +++++++------- Alchemy/Database/Rune/Model/Model.swift | 2 +- Alchemy/Database/Rune/Model/ModelEvents.swift | 22 ++-- Alchemy/Events/Event.swift | 4 +- Alchemy/Events/EventBus.swift | 101 ++++++++---------- Alchemy/Events/QueueableListener.swift | 2 +- .../Extensions/AsyncSequence+Utilities.swift | 7 ++ Example/App.swift | 2 +- Package.swift | 4 +- 9 files changed, 110 insertions(+), 114 deletions(-) create mode 100644 Alchemy/Utilities/Extensions/AsyncSequence+Utilities.swift diff --git a/Alchemy/Database/Rune/Model/Model+CRUD.swift b/Alchemy/Database/Rune/Model/Model+CRUD.swift index f1f93ece..fcbbd45e 100644 --- a/Alchemy/Database/Rune/Model/Model+CRUD.swift +++ b/Alchemy/Database/Rune/Model/Model+CRUD.swift @@ -220,9 +220,9 @@ extension Array where Element: Model { /// Inserts each element in this array to a database. public func insertAll(on db: Database = Element.database) async throws { - try await Element.willCreate(self) + Element.willCreate(self) try await Element.query(on: db).insert(try insertableFields(on: db)) - try await Element.didCreate(self) + Element.didCreate(self) } /// Inserts and returns each element in this array to a database. @@ -232,11 +232,11 @@ extension Array where Element: Model { func _insertReturnAll(on db: Database = Element.database, fieldOverrides: SQLFields = [:]) async throws -> Self { let fields = try insertableFields(on: db).map { $0 + fieldOverrides } - try await Element.willCreate(self) + Element.willCreate(self) let results = try await Element.query(on: db) .insertReturn(fields) .map { try $0.decodeModel(Element.self) } - try await Element.didCreate(results) + Element.didCreate(results) return results } @@ -250,27 +250,27 @@ extension Array where Element: Model { public func updateAll(on db: Database = Element.database, _ fields: SQLFields) async throws { let ids = map(\.id) let fields = touchUpdatedAt(on: db, fields) - try await Element.willUpdate(self) + Element.willUpdate(self) try await Element.query(on: db) .where(Element.idKey, in: ids) .update(fields) - try await Element.didUpdate(self) + Element.didUpdate(self) } // MARK: UPSERT public func upsertAll(on db: Database = Element.database, conflicts: [String] = Element.upsertConflictKeys) async throws { - try await Element.willUpsert(self) + Element.willUpsert(self) try await Element.query(on: db).upsert(try insertableFields(on: db), conflicts: conflicts) - try await Element.didUpsert(self) + Element.didUpsert(self) } public func upsertReturnAll(on db: Database = Element.database, conflicts: [String] = Element.upsertConflictKeys) async throws -> Self { - try await Element.willUpsert(self) + Element.willUpsert(self) let results = try await Element.query(on: db) .upsertReturn(try insertableFields(on: db), conflicts: conflicts) .map { try $0.decodeModel(Element.self) } - try await Element.didUpsert(results) + Element.didUpsert(results) return results } @@ -280,14 +280,14 @@ extension Array where Element: Model { /// array isn't actually in the database, it will be ignored. public func deleteAll(on db: Database = Element.database) async throws { let ids = map(\.id) - try await Element.willDelete(self) + Element.willDelete(self) try await Element.query(on: db) .where(Element.idKey, in: ids) .delete() forEach { ($0 as? any Model & SoftDeletes)?.deletedAt = Date() } - try await Element.didDelete(self) + Element.didDelete(self) } // MARK: Refresh @@ -339,53 +339,53 @@ extension Array where Element: Model { // MARK: Model Events extension Model { - static func didFetch(_ models: [Self]) async throws { - try await ModelDidFetch(models: models).fire() + static func didFetch(_ models: [Self]) { + ModelDidFetch(models: models).fire() } - static func willDelete(_ models: [Self]) async throws { - try await ModelWillDelete(models: models).fire() + static func willDelete(_ models: [Self]) { + ModelWillDelete(models: models).fire() } - static func didDelete(_ models: [Self]) async throws { - try await ModelDidDelete(models: models).fire() + static func didDelete(_ models: [Self]) { + ModelDidDelete(models: models).fire() } - fileprivate static func willCreate(_ models: [Self]) async throws { - try await ModelWillCreate(models: models).fire() - try await willSave(models) + fileprivate static func willCreate(_ models: [Self]) { + ModelWillCreate(models: models).fire() + willSave(models) } - fileprivate static func didCreate(_ models: [Self]) async throws { - try await ModelDidCreate(models: models).fire() - try await didSave(models) + fileprivate static func didCreate(_ models: [Self]) { + ModelDidCreate(models: models).fire() + didSave(models) } - fileprivate static func willUpsert(_ models: [Self]) async throws { - try await ModelWillUpsert(models: models).fire() - try await willSave(models) + fileprivate static func willUpsert(_ models: [Self]) { + ModelWillUpsert(models: models).fire() + willSave(models) } - fileprivate static func didUpsert(_ models: [Self]) async throws { - try await ModelDidUpsert(models: models).fire() - try await didSave(models) + fileprivate static func didUpsert(_ models: [Self]) { + ModelDidUpsert(models: models).fire() + didSave(models) } - fileprivate static func willUpdate(_ models: [Self]) async throws { - try await ModelWillUpdate(models: models).fire() - try await willSave(models) + fileprivate static func willUpdate(_ models: [Self]) { + ModelWillUpdate(models: models).fire() + willSave(models) } - fileprivate static func didUpdate(_ models: [Self]) async throws { - try await ModelDidUpdate(models: models).fire() - try await didSave(models) + fileprivate static func didUpdate(_ models: [Self]) { + ModelDidUpdate(models: models).fire() + didSave(models) } - private static func willSave(_ models: [Self]) async throws { - try await ModelWillSave(models: models).fire() + private static func willSave(_ models: [Self]) { + ModelWillSave(models: models).fire() } - private static func didSave(_ models: [Self]) async throws { - try await ModelDidSave(models: models).fire() + private static func didSave(_ models: [Self]) { + ModelDidSave(models: models).fire() } } diff --git a/Alchemy/Database/Rune/Model/Model.swift b/Alchemy/Database/Rune/Model/Model.swift index d06f5812..ba70c85a 100644 --- a/Alchemy/Database/Rune/Model/Model.swift +++ b/Alchemy/Database/Rune/Model/Model.swift @@ -108,7 +108,7 @@ extension Database { public func table(_ model: M.Type, as alias: String? = nil) -> Query { let tableName = alias.map { "\(model.table) AS \($0)" } ?? model.table return Query(db: self, table: tableName) - .didLoad { try await M.didFetch($0) } + .didLoad { M.didFetch($0) } } } diff --git a/Alchemy/Database/Rune/Model/ModelEvents.swift b/Alchemy/Database/Rune/Model/ModelEvents.swift index 55bb48b7..39fb9b87 100644 --- a/Alchemy/Database/Rune/Model/ModelEvents.swift +++ b/Alchemy/Database/Rune/Model/ModelEvents.swift @@ -44,46 +44,46 @@ public struct ModelDidSave: Event { extension EventBus { public func onDidFetch(_ type: M.Type, action: @escaping (ModelDidFetch) async throws -> Void) { - on(ModelDidFetch.self, handler: action) + listen(ModelDidFetch.self, handler: action) } public func onWillCreate(_ type: M.Type, action: @escaping (ModelWillCreate) async throws -> Void) { - on(ModelWillCreate.self, handler: action) + listen(ModelWillCreate.self, handler: action) } public func onDidCreate(_ type: M.Type, action: @escaping (ModelDidCreate) async throws -> Void) { - on(ModelDidCreate.self, handler: action) + listen(ModelDidCreate.self, handler: action) } public func onWillUpsert(_ type: M.Type, action: @escaping (ModelWillUpsert) async throws -> Void) { - on(ModelWillUpsert.self, handler: action) + listen(ModelWillUpsert.self, handler: action) } public func onDidUpsert(_ type: M.Type, action: @escaping (ModelDidUpsert) async throws -> Void) { - on(ModelDidUpsert.self, handler: action) + listen(ModelDidUpsert.self, handler: action) } public func onWillUpdate(_ type: M.Type, action: @escaping (ModelWillUpdate) async throws -> Void) { - on(ModelWillUpdate.self, handler: action) + listen(ModelWillUpdate.self, handler: action) } public func onDidUpdate(_ type: M.Type, action: @escaping (ModelDidUpdate) async throws -> Void) { - on(ModelDidUpdate.self, handler: action) + listen(ModelDidUpdate.self, handler: action) } public func onWillSave(_ type: M.Type, action: @escaping (ModelWillSave) async throws -> Void) { - on(ModelWillSave.self, handler: action) + listen(ModelWillSave.self, handler: action) } public func onDidSave(_ type: M.Type, action: @escaping (ModelDidSave) async throws -> Void) { - on(ModelDidSave.self, handler: action) + listen(ModelDidSave.self, handler: action) } public func onWillDelete(_ type: M.Type, action: @escaping (ModelWillDelete) async throws -> Void) { - on(ModelWillDelete.self, handler: action) + listen(ModelWillDelete.self, handler: action) } public func onDidDelete(_ type: M.Type, action: @escaping (ModelDidDelete) async throws -> Void) { - on(ModelDidDelete.self, handler: action) + listen(ModelDidDelete.self, handler: action) } } diff --git a/Alchemy/Events/Event.swift b/Alchemy/Events/Event.swift index 4fa68c7d..e2a21c2b 100644 --- a/Alchemy/Events/Event.swift +++ b/Alchemy/Events/Event.swift @@ -9,7 +9,7 @@ extension Event { public static var registrationKey: String { name(of: Self.self) } /// Fire this event on an `EventBus`. - public func fire(on bus: EventBus = Events) async throws { - try await bus.fire(self) + public func fire(on bus: EventBus = Events) { + bus.fire(self) } } diff --git a/Alchemy/Events/EventBus.swift b/Alchemy/Events/EventBus.swift index 22c47002..1deb5aa9 100644 --- a/Alchemy/Events/EventBus.swift +++ b/Alchemy/Events/EventBus.swift @@ -1,81 +1,68 @@ -import NIOConcurrencyHelpers +import AsyncAlgorithms public final class EventBus: IdentifiedService { public typealias Identifier = ServiceIdentifier - public typealias Handler = (E) async throws -> Void - private typealias AnyHandler = (Event) async throws -> Void + private struct EventJob: Job, Codable { + let event: E + let listenerId: String - private var listeners: [String: any Listener] = [:] - private var handlers: [String: [AnyHandler]] = [:] - private let lock = NIOLock() + func handle(context: JobContext) async throws { + guard let listener = Events.listeners[listenerId] as? any QueueableListener else { + throw JobError("Unable to find registered listener of type `\(listenerId)` to handle a queued event.") + } - public func on(_ event: E.Type, handler: @escaping Handler) { - lock.withLock { - handlers[E.registrationKey, default: []] += [convertHandler(handler)] + try await listener.handle(event: event) } } - public func register(listener: L) { - lock.withLock { - handlers[L.ObservedEvent.registrationKey, default: []] += [convertHandler(listener.handle)] - listeners[L.registryId] = listener - } - } + fileprivate var listeners: [String: any Listener] = [:] + private let channel = AsyncChannel() - public func register(listener: L) { - lock.withLock { - Jobs.register(EventJob.self) - handlers[L.ObservedEvent.registrationKey, default: []] += [convertHandler(listener.dispatch)] - listeners[L.registryId] = listener - } - } - - public func fire(_ event: E) async throws { - let handlers = lock.withLock { self.handlers[E.registrationKey] ?? [] } - for handle in handlers { - try await handle(event) - } + public func stream(of: E.Type) -> AsyncStream { + channel + .compactMap { $0 as? E } + .stream } - fileprivate func lookupListener(_ id: String, eventType: E.Type = E.self) throws -> any Listener { - guard let listener = Events.listeners[id] as? any Listener else { - throw JobError("Unable to find registered listener of type `\(id)` to handle a queued event.") + @discardableResult + public func listen(_ event: E.Type, handler: @escaping (E) async throws -> Void) -> Task { + Task { + for await event in stream(of: event) { + try await handler(event) + } } - - return listener } - private func convertHandler(_ handler: @escaping Handler) -> AnyHandler { - return { event in - guard let event = event as? E else { - Log.error("Event handler type mismatch for \(E.registrationKey)!") - return + @discardableResult + public func register(listener: L) -> Task { + listeners[L.registryId] = listener + return Task { + for await event in stream(of: L.ObservedEvent.self) { + try await listener.handle(event: event) } - - try await handler(event) } } -} - -private struct EventJob: Job, Codable { - let event: E - let listenerId: String - func handle(context: JobContext) async throws { - try await Events.lookupListener(listenerId, eventType: E.self).handle(event: event) + @discardableResult + public func register(listener: L) -> Task { + listeners[L.registryId] = listener + return Task { + for await event in stream(of: L.ObservedEvent.self) { + if listener.shouldQueue(event: event) { + try await listener.handle(event: event) + } else { + try await EventJob(event: event, listenerId: L.registryId) + .dispatch(on: listener.queue, channel: listener.channel) + } + } + } } -} - -extension QueueableListener { - fileprivate func dispatch(event: ObservedEvent) async throws { - guard shouldQueue(event: event) else { - try await handle(event: event) - return + + public func fire(_ event: E) { + Task { + await channel.send(event) } - - try await EventJob(event: event, listenerId: Self.registryId) - .dispatch(on: queue, channel: channel) } } diff --git a/Alchemy/Events/QueueableListener.swift b/Alchemy/Events/QueueableListener.swift index 299a1e22..6e8dfabe 100644 --- a/Alchemy/Events/QueueableListener.swift +++ b/Alchemy/Events/QueueableListener.swift @@ -1,5 +1,5 @@ /// A listener that handles its events on a background `Queue`. -public protocol QueueableListener: Listener where ObservedEvent: Codable { +public protocol QueueableListener: Listener where ObservedEvent: Codable { /// The queue where events will be dispatched. var queue: Queue { get } diff --git a/Alchemy/Utilities/Extensions/AsyncSequence+Utilities.swift b/Alchemy/Utilities/Extensions/AsyncSequence+Utilities.swift new file mode 100644 index 00000000..663b42f1 --- /dev/null +++ b/Alchemy/Utilities/Extensions/AsyncSequence+Utilities.swift @@ -0,0 +1,7 @@ +import ConcurrencyExtras + +extension AsyncSequence { + public var stream: AsyncStream { + eraseToStream() + } +} diff --git a/Example/App.swift b/Example/App.swift index a1b20696..e591c82f 100644 --- a/Example/App.swift +++ b/Example/App.swift @@ -3,7 +3,7 @@ import Alchemy @main @Application struct App { - func boot() throws { + func boot() { use(UserController()) } diff --git a/Package.swift b/Package.swift index 2239f2c3..2d6953dd 100644 --- a/Package.swift +++ b/Package.swift @@ -25,6 +25,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-syntax", from: "510.0.0"), .package(url: "https://github.com/hummingbird-project/hummingbird", from: "2.0.0-rc.2"), .package(url: "https://github.com/onevcat/Rainbow", .upToNextMajor(from: "4.0.0")), + .package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.1.0"), .package(url: "https://github.com/swift-server/async-http-client", from: "1.0.0"), .package(url: "https://github.com/swift-server/RediStack", from: "1.6.2"), .package(url: "https://github.com/vapor/async-kit", from: "1.0.0"), @@ -54,6 +55,7 @@ let package = Package( .target( name: "Alchemy", dependencies: [ + /// Experimental .product(name: "AlchemyX", package: "AlchemyX"), @@ -63,6 +65,7 @@ let package = Package( .product(name: "ArgumentParser", package: "swift-argument-parser"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "AsyncHTTPClient", package: "async-http-client"), + .product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"), .product(name: "Cron", package: "cron"), .product(name: "Crypto", package: "swift-crypto"), .product(name: "HTTPTypes", package: "swift-http-types"), @@ -74,7 +77,6 @@ let package = Package( .product(name: "Pluralize", package: "pluralize"), .product(name: "Rainbow", package: "Rainbow"), - /// Databases .product(name: "AsyncKit", package: "async-kit"),