Skip to content

Commit

Permalink
Add headers to KafkaAcknowledgedMessage (#179)
Browse files Browse the repository at this point in the history
When a message is being sent to Kafka the acknowledgement should also
contains the headers.

### Motivation:

Resolves #144

### Modifications:

Reused the parsing function from KafkaConsumer in the producer path to
return headers in the ack.

### Result:

Headers available in the acknowledgement for producers and still
available for consumers as well.

---------

Co-authored-by: Michael Gecht <[email protected]>
  • Loading branch information
malletgu and mimischi authored Nov 15, 2024
1 parent 9986c5b commit 09aca6b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 81 deletions.
4 changes: 3 additions & 1 deletion Sources/Kafka/KafkaAcknowledgedMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public struct KafkaAcknowledgedMessage {
public var value: ByteBuffer
/// The offset of the message in its partition.
public var offset: KafkaOffset
/// The headers of the message.
public var headers: [KafkaHeader]

/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
Expand All @@ -53,7 +55,7 @@ public struct KafkaAcknowledgedMessage {
self.topic = topic

self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))

self.headers = try RDKafkaClient.getHeaders(for: messagePointer)
if let keyPointer = rdKafkaMessage.key {
let keyBufferPointer = UnsafeRawBufferPointer(
start: keyPointer,
Expand Down
81 changes: 1 addition & 80 deletions Sources/Kafka/KafkaConsumerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public struct KafkaConsumerMessage {

self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))

self.headers = try Self.getHeaders(for: messagePointer)
self.headers = try RDKafkaClient.getHeaders(for: messagePointer)

if let keyPointer = rdKafkaMessage.key {
let keyBufferPointer = UnsafeRawBufferPointer(
Expand All @@ -91,82 +91,3 @@ extension KafkaConsumerMessage: Hashable {}
// MARK: - KafkaConsumerMessage + Sendable

extension KafkaConsumerMessage: Sendable {}

// MARK: - Helpers

extension KafkaConsumerMessage {
/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
///
/// - Parameters:
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
private static func getHeaders(
for messagePointer: UnsafePointer<rd_kafka_message_t>
) throws -> [KafkaHeader] {
var result: [KafkaHeader] = []
var headers: OpaquePointer?

var readStatus = rd_kafka_message_headers(messagePointer, &headers)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headers else {
return result
}

let headerCount = rd_kafka_header_cnt(headers)
result.reserveCapacity(headerCount)

var headerIndex = 0

while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
var headerKeyPointer: UnsafePointer<CChar>?
var headerValuePointer: UnsafeRawPointer?
var headerValueSize = 0

readStatus = rd_kafka_header_get_all(
headers,
headerIndex,
&headerKeyPointer,
&headerValuePointer,
&headerValueSize
)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headerKeyPointer else {
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
}
let headerKey = String(cString: headerKeyPointer)

var headerValue: ByteBuffer?
if let headerValuePointer, headerValueSize > 0 {
let headerValueBufferPointer = UnsafeRawBufferPointer(
start: headerValuePointer,
count: headerValueSize
)
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
}

let newHeader = KafkaHeader(key: headerKey, value: headerValue)
result.append(newHeader)

headerIndex += 1
}

return result
}
}
76 changes: 76 additions & 0 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import Crdkafka
import Dispatch
import Logging
import NIOCore

import class Foundation.JSONDecoder

Expand Down Expand Up @@ -616,4 +617,79 @@ public final class RDKafkaClient: Sendable {
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
try body(self.kafkaHandle.pointer)
}

/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
///
/// - Parameters:
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
internal static func getHeaders(
for messagePointer: UnsafePointer<rd_kafka_message_t>
) throws -> [KafkaHeader] {
var result: [KafkaHeader] = []
var headers: OpaquePointer?

var readStatus = rd_kafka_message_headers(messagePointer, &headers)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headers else {
return result
}

let headerCount = rd_kafka_header_cnt(headers)
result.reserveCapacity(headerCount)

var headerIndex = 0

while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
var headerKeyPointer: UnsafePointer<CChar>?
var headerValuePointer: UnsafeRawPointer?
var headerValueSize = 0

readStatus = rd_kafka_header_get_all(
headers,
headerIndex,
&headerKeyPointer,
&headerValuePointer,
&headerValueSize
)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headerKeyPointer else {
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
}
let headerKey = String(cString: headerKeyPointer)

var headerValue: ByteBuffer?
if let headerValuePointer, headerValueSize > 0 {
let headerValueBufferPointer = UnsafeRawBufferPointer(
start: headerValuePointer,
count: headerValueSize
)
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
}

let newHeader = KafkaHeader(key: headerKey, value: headerValue)
result.append(newHeader)

headerIndex += 1
}

return result
}
}
3 changes: 3 additions & 0 deletions Tests/KafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ final class KafkaProducerTests: XCTestCase {
}

let expectedTopic = "test-topic"
let headers = [KafkaHeader(key: "some", value: ByteBuffer.init(string: "test"))]
let message = KafkaProducerMessage(
topic: expectedTopic,
headers: headers,
key: "key",
value: "Hello, World!"
)
Expand Down Expand Up @@ -118,6 +120,7 @@ final class KafkaProducerTests: XCTestCase {
XCTAssertEqual(expectedTopic, receivedMessage.topic)
XCTAssertEqual(ByteBuffer(string: message.key!), receivedMessage.key)
XCTAssertEqual(ByteBuffer(string: message.value), receivedMessage.value)
XCTAssertEqual(headers, receivedMessage.headers)

// Shutdown the serviceGroup
await serviceGroup.triggerGracefulShutdown()
Expand Down

0 comments on commit 09aca6b

Please sign in to comment.