Skip to content

Commit

Permalink
Kafka Message Header API (#120)
Browse files Browse the repository at this point in the history
* Kafka Message Header API

Motivation:

Be able to attach headers to a `KafkaProducerMessage` and read out
headers attached to a `KafkaConsumerMessage`.

Modifications:

* create new type `struct KafkaHeader` representing a key-value pair of
  `String` key and `ByteBuffer` value
* add property `headers: [KafkaHeader]` to `KafkaConsumerMessage` and
  `KafkaConsumerMessage`
* use `rd_kafka_produceva` (varidadic arguments) to produce messages as
  `rd_kafka_produce` did not support setting message headers
* create helper class `RDKafkaUnsafeProducerMessage` that helps
  configuring the varidadic argument array for `rd_kafka_produceva`
* add new test asserting that both producing and consuming messages with
  message headers works

* Remove KafkaContiguousBytes TODOs

* Review Franz

Modifications:

* no copying of `KafkaProducerMessage` headers and values -> build
  scoped accessor helper that recursively accesses all underlying
  pointers of the `KafkaProducerMessage`'s `headers: [KafkaHeader]`
* only use `rd_kafka_produceva` when `message.headers.isEmpty == false`

* Review Franz: simplify recursion cases
  • Loading branch information
felixschlegel authored Aug 24, 2023
1 parent 1608c4a commit 8592c61
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ extension KafkaConfiguration {

/// Verify the identity of the broker.
///
/// Parameters:
/// - Parameters:
/// - trustRoots: File or directory path to CA certificate(s) for verifying the broker's key.
/// - certificateRevocationListPath: Path to CRL for verifying broker's certificate validity.
public static func verify(
Expand Down
83 changes: 83 additions & 0 deletions Sources/Kafka/KafkaConsumerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public struct KafkaConsumerMessage {
public var topic: String
/// The partition that the message was received from.
public var partition: KafkaPartition
/// The headers of the message.
public var headers: [KafkaHeader]
/// The key of the message.
public var key: ByteBuffer?
/// The body of the message.
Expand Down Expand Up @@ -57,6 +59,8 @@ public struct KafkaConsumerMessage {

self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))

self.headers = try Self.getHeaders(for: messagePointer)

if let keyPointer = rdKafkaMessage.key {
let keyBufferPointer = UnsafeRawBufferPointer(
start: keyPointer,
Expand All @@ -80,3 +84,82 @@ extension KafkaConsumerMessage: Hashable {}
// MARK: - KafkaConsumerMessage + Sendable

extension KafkaConsumerMessage: Sendable {}

// MARK: - Helpers

extension KafkaConsumerMessage {
/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
///
/// - Parameters:
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
private static func getHeaders(
for messagePointer: UnsafePointer<rd_kafka_message_t>
) throws -> [KafkaHeader] {
var result: [KafkaHeader] = []
var headers: OpaquePointer?

var readStatus = rd_kafka_message_headers(messagePointer, &headers)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headers else {
return result
}

let headerCount = rd_kafka_header_cnt(headers)
result.reserveCapacity(headerCount)

var headerIndex = 0

while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
var headerKeyPointer: UnsafePointer<CChar>?
var headerValuePointer: UnsafeRawPointer?
var headerValueSize = 0

readStatus = rd_kafka_header_get_all(
headers,
headerIndex,
&headerKeyPointer,
&headerValuePointer,
&headerValueSize
)

if readStatus == RD_KAFKA_RESP_ERR__NOENT {
// No Header Entries
return result
}

guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: readStatus)
}

guard let headerKeyPointer else {
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
}
let headerKey = String(cString: headerKeyPointer)

var headerValue: ByteBuffer?
if let headerValuePointer, headerValueSize > 0 {
let headerValueBufferPointer = UnsafeRawBufferPointer(
start: headerValuePointer,
count: headerValueSize
)
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
}

let newHeader = KafkaHeader(key: headerKey, value: headerValue)
result.append(newHeader)

headerIndex += 1
}

return result
}
}
38 changes: 38 additions & 0 deletions Sources/Kafka/KafkaHeader.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2023 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

/// A structure representing a header for a Kafka message.
/// Headers are key-value pairs that can be attached to Kafka messages to provide additional metadata.
public struct KafkaHeader: Sendable, Hashable {
/// The key associated with the header.
public var key: String

/// The value associated with the header.
public var value: ByteBuffer?

/// Initializes a new Kafka header with the provided key and optional value.
///
/// - Parameters:
/// - key: The key associated with the header.
/// - value: The optional binary value associated with the header.
public init(
key: String,
value: ByteBuffer? = nil
) {
self.key = key
self.value = value
}
}
15 changes: 12 additions & 3 deletions Sources/Kafka/KafkaProducerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public struct KafkaProducerMessage<Key: KafkaContiguousBytes, Value: KafkaContig
/// This means the message will be automatically assigned a partition using the topic's partitioner function.
public var partition: KafkaPartition

/// The headers of the message.
public var headers: [KafkaHeader]

/// The optional key associated with the message.
/// If the ``KafkaPartition`` is ``KafkaPartition/unassigned``, the ``KafkaProducerMessage/key`` is used to ensure
/// that two ``KafkaProducerMessage``s with the same key still get sent to the same ``KafkaPartition``.
Expand All @@ -38,18 +41,21 @@ public struct KafkaProducerMessage<Key: KafkaContiguousBytes, Value: KafkaContig
/// - Parameters:
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partition will be assigned automatically.
/// - headers: The headers of the message.
/// - key: Used to guarantee that messages with the same key will be sent to the same partition so that their order is preserved.
/// - value: The message's value.
public init(
topic: String,
partition: KafkaPartition = .unassigned,
headers: [KafkaHeader] = [],
key: Key,
value: Value
) {
self.topic = topic
self.partition = partition
self.headers = headers
self.key = key
self.value = value
self.partition = partition
}
}

Expand All @@ -59,16 +65,19 @@ extension KafkaProducerMessage where Key == Never {
/// - Parameters:
/// - topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
/// - partition: The topic partition the message will be sent to. If not set explicitly, the partition will be assigned automatically.
/// - headers: The headers of the message.
/// - value: The message body.
public init(
topic: String,
partition: KafkaPartition = .unassigned,
headers: [KafkaHeader] = [],
value: Value
) {
self.topic = topic
self.value = value
self.key = nil
self.partition = partition
self.headers = headers
self.key = nil
self.value = value
}
}

Expand Down
Loading

0 comments on commit 8592c61

Please sign in to comment.