diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 0b7be6b..bd3f554 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Dispatch import Logging import NIOConcurrencyHelpers import NIOCore @@ -74,7 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { private let stateMachineHolder: MachineHolder let pollInterval: Duration #if swift(>=6.0) - let queue: NaiveQueueExecutor + private let queue: DispatchQueueTaskExecutor #endif private final class MachineHolder: Sendable { // only for deinit @@ -93,7 +92,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { self.stateMachineHolder = .init(stateMachine: stateMachine) self.pollInterval = pollInterval #if swift(>=6.0) - self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")) + self.queue = DispatchQueueTaskExecutor( + DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer") + ) #endif } @@ -111,6 +112,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { #if swift(>=6.0) // Wait on a separate thread for the next message. + // The call below will block for `pollInterval`. return try await withTaskExecutorPreference(queue) { try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds)) } diff --git a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift similarity index 94% rename from Sources/Kafka/Utilities/NaiveQueueExecutor.swift rename to Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift index 991e100..2a2051b 100644 --- a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift +++ b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift @@ -15,7 +15,7 @@ #if swift(>=6.0) import Dispatch -final class NaiveQueueExecutor: TaskExecutor { +final class DispatchQueueTaskExecutor: TaskExecutor { let queue: DispatchQueue init(_ queue: DispatchQueue) {