Skip to content

Commit

Permalink
Introduce SendableOpaquePointer
Browse files Browse the repository at this point in the history
[Unsafe pointers are no longer `Sendable`](swiftlang/swift#70396 (comment)) starting with Swift 5.10, while a warning in
earlier versions. This PR introduces `SendableOpaquePointer` to wrap `OpaquePointer` and
marks it as `@unchecked Sendable`.

Resolves #159
  • Loading branch information
mimischi committed Nov 12, 2024
1 parent 1638b32 commit f84757b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
41 changes: 21 additions & 20 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,30 @@ public final class RDKafkaClient: Sendable {
}

/// Handle for the C library's Kafka instance.
private let kafkaHandle: OpaquePointer
private let kafkaHandle: SendableOpaquePointer
/// A logger.
private let logger: Logger

/// `librdkafka`'s `rd_kafka_queue_t` that events are received on.
private let queue: OpaquePointer
private let queueHandle: SendableOpaquePointer

// Use factory method to initialize
private init(
type: ClientType,
kafkaHandle: OpaquePointer,
kafkaHandle: SendableOpaquePointer,
logger: Logger
) {
self.kafkaHandle = kafkaHandle
self.logger = logger
self.queue = rd_kafka_queue_get_main(self.kafkaHandle)
self.queueHandle = .init(rd_kafka_queue_get_main(self.kafkaHandle.pointer))

rd_kafka_set_log_queue(self.kafkaHandle, self.queue)
rd_kafka_set_log_queue(self.kafkaHandle.pointer, self.queueHandle.pointer)
}

deinit {
// Loose reference to librdkafka's event queue
rd_kafka_queue_destroy(self.queue)
rd_kafka_destroy(kafkaHandle)
rd_kafka_queue_destroy(self.queueHandle.pointer)
rd_kafka_destroy(kafkaHandle.pointer)
}

/// Factory method creating a new instance of a ``RDKafkaClient``.
Expand Down Expand Up @@ -87,7 +87,8 @@ public final class RDKafkaClient: Sendable {
throw KafkaError.client(reason: errorString)
}

return RDKafkaClient(type: type, kafkaHandle: handle, logger: logger)
let kafkaHandle = SendableOpaquePointer(handle)
return RDKafkaClient(type: type, kafkaHandle: kafkaHandle, logger: logger)
}

/// Produce a message to the Kafka cluster.
Expand Down Expand Up @@ -212,7 +213,7 @@ public final class RDKafkaClient: Sendable {
assert(arguments.count == size)

return rd_kafka_produceva(
self.kafkaHandle,
self.kafkaHandle.pointer,
arguments,
arguments.count
)
Expand Down Expand Up @@ -304,7 +305,7 @@ public final class RDKafkaClient: Sendable {
events.reserveCapacity(maxEvents)

for _ in 0..<maxEvents {
let event = rd_kafka_queue_poll(self.queue, 0)
let event = rd_kafka_queue_poll(self.queueHandle.pointer, 0)
defer { rd_kafka_event_destroy(event) }

let rdEventType = rd_kafka_event_type(event)
Expand Down Expand Up @@ -437,7 +438,7 @@ public final class RDKafkaClient: Sendable {
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
func consumerPoll() throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
// No error, there might be no more messages
return nil
}
Expand All @@ -460,7 +461,7 @@ public final class RDKafkaClient: Sendable {
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_subscribe(self.kafkaHandle, pointer)
let result = rd_kafka_subscribe(self.kafkaHandle.pointer, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
Expand All @@ -471,7 +472,7 @@ public final class RDKafkaClient: Sendable {
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func assign(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_assign(self.kafkaHandle, pointer)
let result = rd_kafka_assign(self.kafkaHandle.pointer, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
Expand Down Expand Up @@ -508,7 +509,7 @@ public final class RDKafkaClient: Sendable {

let error = changesList.withListPointer { listPointer in
return rd_kafka_commit(
self.kafkaHandle,
self.kafkaHandle.pointer,
listPointer,
1 // async = true
)
Expand Down Expand Up @@ -551,9 +552,9 @@ public final class RDKafkaClient: Sendable {

changesList.withListPointer { listPointer in
rd_kafka_commit_queue(
self.kafkaHandle,
self.kafkaHandle.pointer,
listPointer,
self.queue,
self.queueHandle.pointer,
nil,
opaquePointer
)
Expand All @@ -572,7 +573,7 @@ public final class RDKafkaClient: Sendable {
let queue = DispatchQueue(label: "com.swift-server.swift-kafka.flush")
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
queue.async {
let error = rd_kafka_flush(self.kafkaHandle, timeoutMilliseconds)
let error = rd_kafka_flush(self.kafkaHandle.pointer, timeoutMilliseconds)
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
continuation.resume(throwing: KafkaError.rdKafkaError(wrapping: error))
} else {
Expand All @@ -587,7 +588,7 @@ public final class RDKafkaClient: Sendable {
///
/// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
func consumerClose() throws {
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, self.queue)
let result = rd_kafka_consumer_close_queue(self.kafkaHandle.pointer, self.queueHandle.pointer)
let kafkaError = rd_kafka_error_code(result)
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: kafkaError)
Expand All @@ -596,14 +597,14 @@ public final class RDKafkaClient: Sendable {

/// Returns `true` if the underlying `librdkafka` consumer is closed.
var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
rd_kafka_consumer_closed(self.kafkaHandle.pointer) == 1
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
@discardableResult
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
return try body(self.kafkaHandle)
return try body(self.kafkaHandle.pointer)
}
}
8 changes: 4 additions & 4 deletions Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIOConcurrencyHelpers

/// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles.
internal final class RDKafkaTopicHandles: Sendable {
private let _internal: NIOLockedValueBox<[String: OpaquePointer]>
private let _internal: NIOLockedValueBox<[String: SendableOpaquePointer]>

// Note: we retain the client to ensure it does not get
// deinitialized before rd_kafka_topic_destroy() is invoked (required)
Expand All @@ -31,7 +31,7 @@ internal final class RDKafkaTopicHandles: Sendable {
deinit {
self._internal.withLockedValue { dict in
for (_, topicHandle) in dict {
rd_kafka_topic_destroy(topicHandle)
rd_kafka_topic_destroy(topicHandle.pointer)
}
}
}
Expand Down Expand Up @@ -59,7 +59,7 @@ internal final class RDKafkaTopicHandles: Sendable {
) throws -> OpaquePointer {
try self._internal.withLockedValue { dict in
if let handle = dict[topic] {
return handle
return handle.pointer
} else {
let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfiguration: topicConfiguration)
let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in
Expand All @@ -76,7 +76,7 @@ internal final class RDKafkaTopicHandles: Sendable {
let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
throw error
}
dict[topic] = newHandle
dict[topic] = .init(newHandle)
return newHandle
}
}
Expand Down
25 changes: 25 additions & 0 deletions Sources/Kafka/Utilities/SendableOpaquePointer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// A wrapper for the `OpaquePointer` used to represent different handles from `librdkafka`.
///
/// This wrapper silences `Sendable` warnings for the pointer introduced in Swift 5.10, and should
/// only be used for handles from `librdkafka` that are known to be thread-safe.
internal struct SendableOpaquePointer: @unchecked Sendable {
let pointer: OpaquePointer

init(_ pointer: OpaquePointer) {
self.pointer = pointer
}
}

0 comments on commit f84757b

Please sign in to comment.