Skip to content

Commit

Permalink
Address reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mimischi committed Nov 15, 2024
1 parent a84f1c0 commit 5565624
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
8 changes: 5 additions & 3 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//
//===----------------------------------------------------------------------===//

import Dispatch
import Logging
import NIOConcurrencyHelpers
import NIOCore
Expand Down Expand Up @@ -74,7 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
private let stateMachineHolder: MachineHolder
let pollInterval: Duration
#if swift(>=6.0)
let queue: NaiveQueueExecutor
private let queue: DispatchQueueTaskExecutor
#endif

private final class MachineHolder: Sendable { // only for deinit
Expand All @@ -93,7 +92,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
self.stateMachineHolder = .init(stateMachine: stateMachine)
self.pollInterval = pollInterval
#if swift(>=6.0)
self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer"))
self.queue = DispatchQueueTaskExecutor(
DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")
)
#endif
}

Expand All @@ -111,6 +112,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {

#if swift(>=6.0)
// Wait on a separate thread for the next message.
// The call below will block for `pollInterval`.
return try await withTaskExecutorPreference(queue) {
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#if swift(>=6.0)
import Dispatch

final class NaiveQueueExecutor: TaskExecutor {
final class DispatchQueueTaskExecutor: TaskExecutor {
let queue: DispatchQueue

init(_ queue: DispatchQueue) {
Expand Down

0 comments on commit 5565624

Please sign in to comment.