diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 06906ff..0350576 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -32,30 +32,30 @@ public final class RDKafkaClient: Sendable { } /// Handle for the C library's Kafka instance. - private let kafkaHandle: OpaquePointer + private let kafkaHandle: SendableOpaquePointer /// A logger. private let logger: Logger /// `librdkafka`'s `rd_kafka_queue_t` that events are received on. - private let queue: OpaquePointer + private let queueHandle: SendableOpaquePointer // Use factory method to initialize private init( type: ClientType, - kafkaHandle: OpaquePointer, + kafkaHandle: SendableOpaquePointer, logger: Logger ) { self.kafkaHandle = kafkaHandle self.logger = logger - self.queue = rd_kafka_queue_get_main(self.kafkaHandle) + self.queueHandle = .init(rd_kafka_queue_get_main(self.kafkaHandle.pointer)) - rd_kafka_set_log_queue(self.kafkaHandle, self.queue) + rd_kafka_set_log_queue(self.kafkaHandle.pointer, self.queueHandle.pointer) } deinit { // Loose reference to librdkafka's event queue - rd_kafka_queue_destroy(self.queue) - rd_kafka_destroy(kafkaHandle) + rd_kafka_queue_destroy(self.queueHandle.pointer) + rd_kafka_destroy(kafkaHandle.pointer) } /// Factory method creating a new instance of a ``RDKafkaClient``. @@ -90,7 +90,8 @@ public final class RDKafkaClient: Sendable { throw KafkaError.client(reason: errorString) } - return RDKafkaClient(type: type, kafkaHandle: handle, logger: logger) + let kafkaHandle = SendableOpaquePointer(handle) + return RDKafkaClient(type: type, kafkaHandle: kafkaHandle, logger: logger) } /// Produce a message to the Kafka cluster. @@ -215,7 +216,7 @@ public final class RDKafkaClient: Sendable { assert(arguments.count == size) return rd_kafka_produceva( - self.kafkaHandle, + self.kafkaHandle.pointer, arguments, arguments.count ) @@ -307,7 +308,7 @@ public final class RDKafkaClient: Sendable { events.reserveCapacity(maxEvents) for _ in 0.. KafkaConsumerMessage? { - guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else { // No error, there might be no more messages return nil } @@ -469,7 +470,7 @@ public final class RDKafkaClient: Sendable { /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs. func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws { try topicPartitionList.withListPointer { pointer in - let result = rd_kafka_subscribe(self.kafkaHandle, pointer) + let result = rd_kafka_subscribe(self.kafkaHandle.pointer, pointer) if result != RD_KAFKA_RESP_ERR_NO_ERROR { throw KafkaError.rdKafkaError(wrapping: result) } @@ -480,7 +481,7 @@ public final class RDKafkaClient: Sendable { /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs. func assign(topicPartitionList: RDKafkaTopicPartitionList) throws { try topicPartitionList.withListPointer { pointer in - let result = rd_kafka_assign(self.kafkaHandle, pointer) + let result = rd_kafka_assign(self.kafkaHandle.pointer, pointer) if result != RD_KAFKA_RESP_ERR_NO_ERROR { throw KafkaError.rdKafkaError(wrapping: result) } @@ -517,7 +518,7 @@ public final class RDKafkaClient: Sendable { let error = changesList.withListPointer { listPointer in rd_kafka_commit( - self.kafkaHandle, + self.kafkaHandle.pointer, listPointer, 1 // async = true ) @@ -560,9 +561,9 @@ public final class RDKafkaClient: Sendable { changesList.withListPointer { listPointer in rd_kafka_commit_queue( - self.kafkaHandle, + self.kafkaHandle.pointer, listPointer, - self.queue, + self.queueHandle.pointer, nil, opaquePointer ) @@ -581,7 +582,7 @@ public final class RDKafkaClient: Sendable { let queue = DispatchQueue(label: "com.swift-server.swift-kafka.flush") try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in queue.async { - let error = rd_kafka_flush(self.kafkaHandle, timeoutMilliseconds) + let error = rd_kafka_flush(self.kafkaHandle.pointer, timeoutMilliseconds) if error != RD_KAFKA_RESP_ERR_NO_ERROR { continuation.resume(throwing: KafkaError.rdKafkaError(wrapping: error)) } else { @@ -596,7 +597,7 @@ public final class RDKafkaClient: Sendable { /// /// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`. func consumerClose() throws { - let result = rd_kafka_consumer_close_queue(self.kafkaHandle, self.queue) + let result = rd_kafka_consumer_close_queue(self.kafkaHandle.pointer, self.queueHandle.pointer) let kafkaError = rd_kafka_error_code(result) if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR { throw KafkaError.rdKafkaError(wrapping: kafkaError) @@ -605,7 +606,7 @@ public final class RDKafkaClient: Sendable { /// Returns `true` if the underlying `librdkafka` consumer is closed. var isConsumerClosed: Bool { - rd_kafka_consumer_closed(self.kafkaHandle) == 1 + rd_kafka_consumer_closed(self.kafkaHandle.pointer) == 1 } /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle. @@ -613,6 +614,6 @@ public final class RDKafkaClient: Sendable { /// - Parameter body: The closure will use the Kafka handle pointer. @discardableResult func withKafkaHandlePointer(_ body: (OpaquePointer) throws -> T) rethrows -> T { - try body(self.kafkaHandle) + try body(self.kafkaHandle.pointer) } } diff --git a/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift b/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift index dc9e6ff..3d31f67 100644 --- a/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift +++ b/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift @@ -17,7 +17,7 @@ import NIOConcurrencyHelpers /// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles. internal final class RDKafkaTopicHandles: Sendable { - private let _internal: NIOLockedValueBox<[String: OpaquePointer]> + private let _internal: NIOLockedValueBox<[String: SendableOpaquePointer]> // Note: we retain the client to ensure it does not get // deinitialized before rd_kafka_topic_destroy() is invoked (required) @@ -31,7 +31,7 @@ internal final class RDKafkaTopicHandles: Sendable { deinit { self._internal.withLockedValue { dict in for (_, topicHandle) in dict { - rd_kafka_topic_destroy(topicHandle) + rd_kafka_topic_destroy(topicHandle.pointer) } } } @@ -59,7 +59,7 @@ internal final class RDKafkaTopicHandles: Sendable { ) throws -> OpaquePointer { try self._internal.withLockedValue { dict in if let handle = dict[topic] { - return handle + return handle.pointer } else { let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfiguration: topicConfiguration) let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in @@ -76,7 +76,7 @@ internal final class RDKafkaTopicHandles: Sendable { let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) throw error } - dict[topic] = newHandle + dict[topic] = .init(newHandle) return newHandle } } diff --git a/Sources/Kafka/Utilities/SendableOpaquePointer.swift b/Sources/Kafka/Utilities/SendableOpaquePointer.swift new file mode 100644 index 0000000..12ae530 --- /dev/null +++ b/Sources/Kafka/Utilities/SendableOpaquePointer.swift @@ -0,0 +1,25 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// A wrapper for the `OpaquePointer` used to represent different handles from `librdkafka`. +/// +/// This wrapper silences `Sendable` warnings for the pointer introduced in Swift 5.10, and should +/// only be used for handles from `librdkafka` that are known to be thread-safe. +struct SendableOpaquePointer: @unchecked Sendable { + let pointer: OpaquePointer + + init(_ pointer: OpaquePointer) { + self.pointer = pointer + } +}