Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SendableOpaquePointer #176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,30 @@ public final class RDKafkaClient: Sendable {
}

/// Handle for the C library's Kafka instance.
private let kafkaHandle: OpaquePointer
private let kafkaHandle: SendableOpaquePointer
/// A logger.
private let logger: Logger

/// `librdkafka`'s `rd_kafka_queue_t` that events are received on.
private let queue: OpaquePointer
private let queueHandle: SendableOpaquePointer

// Use factory method to initialize
private init(
type: ClientType,
kafkaHandle: OpaquePointer,
kafkaHandle: SendableOpaquePointer,
logger: Logger
) {
self.kafkaHandle = kafkaHandle
self.logger = logger
self.queue = rd_kafka_queue_get_main(self.kafkaHandle)
self.queueHandle = .init(rd_kafka_queue_get_main(self.kafkaHandle.pointer))

rd_kafka_set_log_queue(self.kafkaHandle, self.queue)
rd_kafka_set_log_queue(self.kafkaHandle.pointer, self.queueHandle.pointer)
}

deinit {
// Loose reference to librdkafka's event queue
rd_kafka_queue_destroy(self.queue)
rd_kafka_destroy(kafkaHandle)
rd_kafka_queue_destroy(self.queueHandle.pointer)
rd_kafka_destroy(kafkaHandle.pointer)
}

/// Factory method creating a new instance of a ``RDKafkaClient``.
Expand Down Expand Up @@ -90,7 +90,8 @@ public final class RDKafkaClient: Sendable {
throw KafkaError.client(reason: errorString)
}

return RDKafkaClient(type: type, kafkaHandle: handle, logger: logger)
let kafkaHandle = SendableOpaquePointer(handle)
return RDKafkaClient(type: type, kafkaHandle: kafkaHandle, logger: logger)
}

/// Produce a message to the Kafka cluster.
Expand Down Expand Up @@ -215,7 +216,7 @@ public final class RDKafkaClient: Sendable {
assert(arguments.count == size)

return rd_kafka_produceva(
self.kafkaHandle,
self.kafkaHandle.pointer,
arguments,
arguments.count
)
Expand Down Expand Up @@ -307,7 +308,7 @@ public final class RDKafkaClient: Sendable {
events.reserveCapacity(maxEvents)

for _ in 0..<maxEvents {
let event = rd_kafka_queue_poll(self.queue, 0)
let event = rd_kafka_queue_poll(self.queueHandle.pointer, 0)
defer { rd_kafka_event_destroy(event) }

let rdEventType = rd_kafka_event_type(event)
Expand Down Expand Up @@ -446,7 +447,7 @@ public final class RDKafkaClient: Sendable {
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
func consumerPoll() throws -> KafkaConsumerMessage? {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle, 0) else {
guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else {
// No error, there might be no more messages
return nil
}
Expand All @@ -469,7 +470,7 @@ public final class RDKafkaClient: Sendable {
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func subscribe(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_subscribe(self.kafkaHandle, pointer)
let result = rd_kafka_subscribe(self.kafkaHandle.pointer, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
Expand All @@ -480,7 +481,7 @@ public final class RDKafkaClient: Sendable {
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
func assign(topicPartitionList: RDKafkaTopicPartitionList) throws {
try topicPartitionList.withListPointer { pointer in
let result = rd_kafka_assign(self.kafkaHandle, pointer)
let result = rd_kafka_assign(self.kafkaHandle.pointer, pointer)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
}
Expand Down Expand Up @@ -517,7 +518,7 @@ public final class RDKafkaClient: Sendable {

let error = changesList.withListPointer { listPointer in
rd_kafka_commit(
self.kafkaHandle,
self.kafkaHandle.pointer,
listPointer,
1 // async = true
)
Expand Down Expand Up @@ -560,9 +561,9 @@ public final class RDKafkaClient: Sendable {

changesList.withListPointer { listPointer in
rd_kafka_commit_queue(
self.kafkaHandle,
self.kafkaHandle.pointer,
listPointer,
self.queue,
self.queueHandle.pointer,
nil,
opaquePointer
)
Expand All @@ -581,7 +582,7 @@ public final class RDKafkaClient: Sendable {
let queue = DispatchQueue(label: "com.swift-server.swift-kafka.flush")
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
queue.async {
let error = rd_kafka_flush(self.kafkaHandle, timeoutMilliseconds)
let error = rd_kafka_flush(self.kafkaHandle.pointer, timeoutMilliseconds)
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
continuation.resume(throwing: KafkaError.rdKafkaError(wrapping: error))
} else {
Expand All @@ -596,7 +597,7 @@ public final class RDKafkaClient: Sendable {
///
/// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
func consumerClose() throws {
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, self.queue)
let result = rd_kafka_consumer_close_queue(self.kafkaHandle.pointer, self.queueHandle.pointer)
let kafkaError = rd_kafka_error_code(result)
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: kafkaError)
Expand All @@ -605,14 +606,14 @@ public final class RDKafkaClient: Sendable {

/// Returns `true` if the underlying `librdkafka` consumer is closed.
var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
rd_kafka_consumer_closed(self.kafkaHandle.pointer) == 1
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
@discardableResult
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
try body(self.kafkaHandle)
try body(self.kafkaHandle.pointer)
}
}
8 changes: 4 additions & 4 deletions Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIOConcurrencyHelpers

/// Swift class that matches topic names with their respective `rd_kafka_topic_t` handles.
internal final class RDKafkaTopicHandles: Sendable {
private let _internal: NIOLockedValueBox<[String: OpaquePointer]>
private let _internal: NIOLockedValueBox<[String: SendableOpaquePointer]>

// Note: we retain the client to ensure it does not get
// deinitialized before rd_kafka_topic_destroy() is invoked (required)
Expand All @@ -31,7 +31,7 @@ internal final class RDKafkaTopicHandles: Sendable {
deinit {
self._internal.withLockedValue { dict in
for (_, topicHandle) in dict {
rd_kafka_topic_destroy(topicHandle)
rd_kafka_topic_destroy(topicHandle.pointer)
}
}
}
Expand Down Expand Up @@ -59,7 +59,7 @@ internal final class RDKafkaTopicHandles: Sendable {
) throws -> OpaquePointer {
try self._internal.withLockedValue { dict in
if let handle = dict[topic] {
return handle
return handle.pointer
} else {
let rdTopicConf = try RDKafkaTopicConfig.createFrom(topicConfiguration: topicConfiguration)
let newHandle = self.client.withKafkaHandlePointer { kafkaHandle in
Expand All @@ -76,7 +76,7 @@ internal final class RDKafkaTopicHandles: Sendable {
let error = KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
throw error
}
dict[topic] = newHandle
dict[topic] = .init(newHandle)
return newHandle
}
}
Expand Down
25 changes: 25 additions & 0 deletions Sources/Kafka/Utilities/SendableOpaquePointer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// A wrapper for the `OpaquePointer` used to represent different handles from `librdkafka`.
///
/// This wrapper silences `Sendable` warnings for the pointer introduced in Swift 5.10, and should
/// only be used for handles from `librdkafka` that are known to be thread-safe.
struct SendableOpaquePointer: @unchecked Sendable {
let pointer: OpaquePointer
mimischi marked this conversation as resolved.
Show resolved Hide resolved

init(_ pointer: OpaquePointer) {
self.pointer = pointer
}
}
Loading