diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index d725cf4a..ec0bff9a 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -13,18 +13,333 @@ //===----------------------------------------------------------------------===// import Benchmark +import Crdkafka +import Dispatch +import struct Foundation.Date +import struct Foundation.UUID import Kafka +import Logging +import ServiceLifecycle let benchmarks = { + var uniqueTestTopic: String! + let messageCount: UInt = 1000 + Benchmark.defaultConfiguration = .init( - metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory, .contextSwitches, .throughput] + .arc, + metrics: [ + .wallClock, + .cpuTotal, + .contextSwitches, + .throughput, + .allocatedResidentMemory, + ] + .arc, warmupIterations: 0, scalingFactor: .one, maxDuration: .seconds(5), - maxIterations: 100 + maxIterations: 100, + thresholds: [ + .wallClock: .init(relative: [.p90: 35]), + .cpuTotal: .init(relative: [.p90: 35]), + .allocatedResidentMemory: .init(relative: [.p90: 20]), + .contextSwitches: .init(relative: [.p90: 35]), + .throughput: .init(relative: [.p90: 35]), + .objectAllocCount: .init(relative: [.p90: 20]), + .retainCount: .init(relative: [.p90: 20]), + .releaseCount: .init(relative: [.p90: 20]), + .retainReleaseDelta: .init(relative: [.p90: 20]), + ] ) - Benchmark.setup = {} + Benchmark.setup = { + uniqueTestTopic = try await prepareTopic(messagesCount: messageCount, partitions: 6) + } + + Benchmark.teardown = { + if let uniqueTestTopic { + try deleteTopic(uniqueTestTopic) + } + uniqueTestTopic = nil + } + + Benchmark("SwiftKafkaConsumer_basic_consumer_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [brokerAddress] + ) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + // We must specify it at least 10 otherwise CI will timeout + consumerConfig.pollInterval = .milliseconds(1) + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .perfLogger + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start consuming") + defer { + benchLog("Finish consuming") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Second Consumer Task + group.addTask { + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + try await benchmark.withMeasurement { + for try await record in consumer.messages { + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + if ctr >= messageCount { + break + } + } + } + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + // Wait for second Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + + Benchmark("SwiftKafkaConsumer_with_offset_commit_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [brokerAddress] + ) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + consumerConfig.isAutoCommitEnabled = false + // We must specify it at least 10 otherwise CI will timeout + consumerConfig.pollInterval = .milliseconds(1) + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .perfLogger + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start consuming") + defer { + benchLog("Finish consuming") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Second Consumer Task + group.addTask { + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + try await benchmark.withMeasurement { + for try await record in consumer.messages { + try consumer.scheduleCommit(record) + + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + if ctr >= messageCount { + break + } + } + } + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + // Wait for second Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + + Benchmark("librdkafka_basic_consumer_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning", + ] + + let configPointer: OpaquePointer = rd_kafka_conf_new() + for (key, value) in rdKafkaConsumerConfig { + precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK) + } + + let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0) + guard let kafkaHandle else { + preconditionFailure("Kafka handle was not created") + } + defer { + rd_kafka_destroy(kafkaHandle) + } + + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.withMeasurement { + while ctr < messageCount { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else { + continue + } + defer { + rd_kafka_message_destroy(record) + } + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + } + } + + rd_kafka_consumer_close(kafkaHandle) + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + Benchmark("librdkafka_with_offset_commit_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning", + "enable.auto.commit": "false", + ] + + let configPointer: OpaquePointer = rd_kafka_conf_new() + for (key, value) in rdKafkaConsumerConfig { + precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK) + } + + let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0) + guard let kafkaHandle else { + preconditionFailure("Kafka handle was not created") + } + defer { + rd_kafka_destroy(kafkaHandle) + } + + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.withMeasurement { + while ctr < messageCount { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else { + continue + } + defer { + rd_kafka_message_destroy(record) + } + guard record.pointee.err != RD_KAFKA_RESP_ERR__PARTITION_EOF else { + continue + } + let result = rd_kafka_commit_message(kafkaHandle, record, 0) + precondition(result == RD_KAFKA_RESP_ERR_NO_ERROR) + + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + } + } + + rd_kafka_consumer_close(kafkaHandle) - Benchmark.teardown = {} + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } } diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift new file mode 100644 index 00000000..304dc1fb --- /dev/null +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift @@ -0,0 +1,128 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Benchmark +import class Foundation.ProcessInfo +import struct Foundation.UUID +import Kafka +@_spi(Internal) import Kafka +import Logging +import ServiceLifecycle + +let brokerAddress = KafkaConfiguration.BrokerAddress( + host: ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost", + port: 9092 +) + +extension Logger { + static let perfLogger = { + var logger = Logger(label: "perf logger") + logger.logLevel = .critical + return logger + }() +} + +// For perf tests debugging +func benchLog(_ log: @autoclosure () -> Logger.Message) { + #if DEBUG + Logger.perfLogger.info(log()) + #endif +} + +func createTopic(partitions: Int32) throws -> String { + var basicConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "no-group", topics: []), + bootstrapBrokerAddresses: [brokerAddress] + ) + basicConfig.broker.addressFamily = .v4 + + let client = try RDKafkaClient.makeClientForTopics(config: basicConfig, logger: .perfLogger) + return try client._createUniqueTopic(partitions: partitions, timeout: 10 * 1000) +} + +func deleteTopic(_ topic: String) throws { + var basicConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "no-group", topics: []), + bootstrapBrokerAddresses: [brokerAddress] + ) + basicConfig.broker.addressFamily = .v4 + + let client = try RDKafkaClient.makeClientForTopics(config: basicConfig, logger: .perfLogger) + try client._deleteTopic(topic, timeout: 10 * 1000) +} + +func prepareTopic(messagesCount: UInt, partitions: Int32 = -1, logger: Logger = .perfLogger) async throws -> String { + let uniqueTestTopic = try createTopic(partitions: partitions) + + benchLog("Created topic \(uniqueTestTopic)") + + benchLog("Generating \(messagesCount) messages") + let testMessages = _createTestMessages(topic: uniqueTestTopic, count: messagesCount) + benchLog("Finish generating \(messagesCount) messages") + + var producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(messagesCount) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Producer Task + group.addTask { + try await _sendAndAcknowledgeMessages( + producer: producer, + events: acks, + messages: testMessages, + skipConsistencyCheck: true + ) + } + + // Wait for Producer Task to complete + try await group.next() + await serviceGroup.triggerGracefulShutdown() + } + + return uniqueTestTopic +} + +extension Benchmark { + @discardableResult + func withMeasurement(_ body: () throws -> T) rethrows -> T { + self.startMeasurement() + defer { + self.stopMeasurement() + } + return try body() + } + + @discardableResult + func withMeasurement(_ body: () async throws -> T) async rethrows -> T { + self.startMeasurement() + defer { + self.stopMeasurement() + } + return try await body() + } +} diff --git a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift index 87f0a50b..1971d9e0 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift @@ -22,7 +22,19 @@ let benchmarks = { warmupIterations: 0, scalingFactor: .one, maxDuration: .seconds(5), - maxIterations: 100 + maxIterations: 100, + thresholds: [ + // Thresholds are wild guess mostly. Have to adjust with time. + .wallClock: .init(relative: [.p90: 10]), + .cpuTotal: .init(relative: [.p90: 10]), + .allocatedResidentMemory: .init(relative: [.p90: 20]), + .contextSwitches: .init(relative: [.p90: 10]), + .throughput: .init(relative: [.p90: 10]), + .objectAllocCount: .init(relative: [.p90: 10]), + .retainCount: .init(relative: [.p90: 10]), + .releaseCount: .init(relative: [.p90: 10]), + .retainReleaseDelta: .init(relative: [.p90: 10]), + ] ) Benchmark.setup = {} diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index e27d3106..4ea81f8c 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -22,7 +22,7 @@ let package = Package( ], dependencies: [ .package(path: "../"), - .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.11.1"), + .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.22.3"), ], targets: [ .executableTarget( diff --git a/Sources/Crdkafka/librdkafka b/Sources/Crdkafka/librdkafka index 95a542c8..267367c9 160000 --- a/Sources/Crdkafka/librdkafka +++ b/Sources/Crdkafka/librdkafka @@ -1 +1 @@ -Subproject commit 95a542c87c61d2c45b445f91c73dd5442eb04f3c +Subproject commit 267367c9475c2154e72eafe6ff1957518cb2ed1a diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index dae5691f..2d76e2da 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Crdkafka import struct Foundation.UUID public struct KafkaConsumerConfiguration { @@ -23,41 +22,10 @@ public struct KafkaConsumerConfiguration { /// Default: `.milliseconds(100)` public var pollInterval: Duration = .milliseconds(100) - /// A struct representing different back pressure strategies for consuming messages in ``KafkaConsumer``. - public struct BackPressureStrategy: Sendable, Hashable { - enum _BackPressureStrategy: Sendable, Hashable { - case watermark(low: Int, high: Int) - } - - let _internal: _BackPressureStrategy - - private init(backPressureStrategy: _BackPressureStrategy) { - self._internal = backPressureStrategy - } - - /// A back pressure strategy based on high and low watermarks. - /// - /// The consumer maintains a buffer size between a low watermark and a high watermark - /// to control the flow of incoming messages. - /// - /// - Parameter low: The lower threshold for the buffer size (low watermark). - /// - Parameter high: The upper threshold for the buffer size (high watermark). - public static func watermark(low: Int, high: Int) -> BackPressureStrategy { - return .init(backPressureStrategy: .watermark(low: low, high: high)) - } - } - - /// The backpressure strategy to be used for message consumption. - /// See ``KafkaConsumerConfiguration/BackPressureStrategy-swift.struct`` for more information. - public var backPressureStrategy: BackPressureStrategy = .watermark( - low: 10, - high: 50 - ) - /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Sendable, Hashable { enum _ConsumptionStrategy: Sendable, Hashable { - case partition(topic: String, partition: KafkaPartition, offset: KafkaOffset) + case partition(groupID: String?, topic: String, partition: KafkaPartition, offset: KafkaOffset) case group(groupID: String, topics: [String]) } @@ -72,14 +40,16 @@ public struct KafkaConsumerConfiguration { /// /// - Parameters: /// - partition: The partition of the topic to consume from. + /// - groupID: The ID of the consumer group to commit to. Defaults to no group ID. Specifying a group ID is useful if partitions assignment is manually managed but committed offsets should still be tracked in a consumer group. /// - topic: The name of the Kafka topic. /// - offset: The offset to start consuming from. Defaults to the end of the Kafka partition queue (meaning wait for the next produced message). public static func partition( _ partition: KafkaPartition, + groupID: String? = nil, topic: String, offset: KafkaOffset = .end ) -> ConsumptionStrategy { - return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset)) + return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset)) } /// A consumption strategy based on consumer group membership. @@ -261,12 +231,17 @@ extension KafkaConsumerConfiguration { var resultDict: [String: String] = [:] switch self.consumptionStrategy._internal { - case .partition: - // Although an assignment is not related to a consumer group, - // librdkafka requires us to set a `group.id`. - // This is a known issue: - // https://github.com/edenhill/librdkafka/issues/3261 - resultDict["group.id"] = UUID().uuidString + case .partition(groupID: let groupID, topic: _, partition: _, offset: _): + if let groupID = groupID { + resultDict["group.id"] = groupID + } else { + // Although an assignment is not related to a consumer group, + // librdkafka requires us to set a `group.id`. + // This is a known issue: + // https://github.com/edenhill/librdkafka/issues/3261 + resultDict["group.id"] = UUID().uuidString + } + case .group(groupID: let groupID, topics: _): resultDict["group.id"] = groupID } diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift new file mode 100644 index 00000000..58f2453d --- /dev/null +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift @@ -0,0 +1,157 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import struct Foundation.UUID +import Logging + +@_spi(Internal) +extension RDKafkaClient { + /// Create a topic with a unique name (`UUID`). + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter partitions: Partitions in topic (default: -1 - default for broker) + /// - Parameter timeout: Timeout in milliseconds. + /// - Returns: Name of newly created topic. + /// - Throws: A ``KafkaError`` if the topic creation failed. + public func _createUniqueTopic(partitions: Int32 = -1, timeout: Int32) throws -> String { + let uniqueTopicName = UUID().uuidString + + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + + guard let newTopic = rd_kafka_NewTopic_new( + uniqueTopicName, + partitions, + -1, // use default replication_factor + errorChars, + RDKafkaClient.stringSize + ) else { + let errorString = String(cString: errorChars) + throw KafkaError.topicCreation(reason: errorString) + } + defer { rd_kafka_NewTopic_destroy(newTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var newTopicsArray: [OpaquePointer?] = [newTopic] + rd_kafka_CreateTopics( + kafkaHandle, + &newTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { + throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_CreateTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicCreation(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == uniqueTopicName else { + throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") + } + } + + return uniqueTopicName + } + + /// Delete a topic. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter topic: Topic to delete. + /// - Parameter timeout: Timeout in milliseconds. + /// - Throws: A ``KafkaError`` if the topic deletion failed. + public func _deleteTopic(_ topic: String, timeout: Int32) throws { + let deleteTopic = rd_kafka_DeleteTopic_new(topic) + defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] + rd_kafka_DeleteTopics( + kafkaHandle, + &deleteTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { + throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_DeleteTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == topic else { + throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") + } + } + } + + public static func makeClientForTopics(config: KafkaConsumerConfiguration, logger: Logger) throws -> RDKafkaClient { + return try Self.makeClient(type: .consumer, configDictionary: config.dictionary, events: [], logger: logger) + } +} diff --git a/Sources/Kafka/ForTesting/TestMessages.swift b/Sources/Kafka/ForTesting/TestMessages.swift new file mode 100644 index 00000000..f9df6224 --- /dev/null +++ b/Sources/Kafka/ForTesting/TestMessages.swift @@ -0,0 +1,104 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import struct Foundation.Date +import NIOCore + +@_spi(Internal) +public enum _TestMessagesError: Error { + case deliveryReportsIdsIncorrect + case deliveryReportsNotAllMessagesAcknoledged + case deliveryReportsIncorrect +} + +@_spi(Internal) +public func _createTestMessages( + topic: String, + headers: [KafkaHeader] = [], + count: UInt +) -> [KafkaProducerMessage] { + return Array(0..], + skipConsistencyCheck: Bool = false +) async throws { + var messageIDs = Set() + messageIDs.reserveCapacity(messages.count) + + for message in messages { + while true { + do { + messageIDs.insert(try producer.send(message)) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + // That means we have to flush queue immediately but there is no interface for that + // producer.flush() + } + } + } + + var receivedDeliveryReports = Set() + receivedDeliveryReports.reserveCapacity(messages.count) + + for await event in events { + switch event { + case .deliveryReports(let deliveryReports): + for deliveryReport in deliveryReports { + receivedDeliveryReports.insert(deliveryReport) + } + default: + break // Ignore any other events + } + + if receivedDeliveryReports.count >= messages.count { + break + } + } + + guard Set(receivedDeliveryReports.map(\.id)) == messageIDs else { + throw _TestMessagesError.deliveryReportsIdsIncorrect + } + + let acknowledgedMessages: [KafkaAcknowledgedMessage] = receivedDeliveryReports.compactMap { + guard case .acknowledged(let receivedMessage) = $0.status else { + return nil + } + return receivedMessage + } + + guard messages.count == acknowledgedMessages.count else { + throw _TestMessagesError.deliveryReportsNotAllMessagesAcknoledged + } + if skipConsistencyCheck { + return + } + for message in messages { + guard acknowledgedMessages.contains(where: { $0.topic == message.topic }), + acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) }), + acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }) else { + throw _TestMessagesError.deliveryReportsIncorrect + } + } +} diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index c5ec5226..2fdead66 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -34,23 +34,6 @@ extension KafkaConsumerEventsDelegate: NIOAsyncSequenceProducerDelegate { } } -// MARK: - KafkaConsumerMessagesDelegate - -/// `NIOAsyncSequenceProducerDelegate` for ``KafkaConsumerMessages``. -internal struct KafkaConsumerMessagesDelegate: Sendable { - let stateMachine: NIOLockedValueBox -} - -extension KafkaConsumerMessagesDelegate: NIOAsyncSequenceProducerDelegate { - func produceMore() { - self.stateMachine.withLockedValue { $0.produceMore() } - } - - func didTerminate() { - self.stateMachine.withLockedValue { $0.finishMessageConsumption() } - } -} - // MARK: - KafkaConsumerEvents /// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka. @@ -78,60 +61,62 @@ public struct KafkaConsumerEvents: Sendable, AsyncSequence { /// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct KafkaConsumerMessages: Sendable, AsyncSequence { - let stateMachine: NIOLockedValueBox + typealias LockedMachine = NIOLockedValueBox + + let stateMachine: LockedMachine + let pollInterval: Duration public typealias Element = KafkaConsumerMessage - typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark - typealias WrappedSequence = NIOThrowingAsyncSequenceProducer< - Result, - Error, - BackPressureStrategy, - KafkaConsumerMessagesDelegate - > - let wrappedSequence: WrappedSequence /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``). public struct AsyncIterator: AsyncIteratorProtocol { - let stateMachine: NIOLockedValueBox - var wrappedIterator: WrappedSequence.AsyncIterator? + private let stateMachineHolder: MachineHolder + let pollInterval: Duration - public mutating func next() async throws -> Element? { - guard let result = try await self.wrappedIterator?.next() else { - self.deallocateIterator() - return nil + private final class MachineHolder: Sendable { // only for deinit + let stateMachine: LockedMachine + + init(stateMachine: LockedMachine) { + self.stateMachine = stateMachine } - switch result { - case .success(let message): - let action = self.stateMachine.withLockedValue { $0.storeOffset() } + deinit { + self.stateMachine.withLockedValue { $0.finishMessageConsumption() } + } + } + + init(stateMachine: LockedMachine, pollInterval: Duration) { + self.stateMachineHolder = .init(stateMachine: stateMachine) + self.pollInterval = pollInterval + } + + public func next() async throws -> Element? { + // swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165 + // Currently use Task.sleep() if no new messages, should use task executor preference when implemented: + // https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md + while !Task.isCancelled { + let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } + switch action { - case .storeOffset(let client): - do { - try client.storeMessageOffset(message) - } catch { - self.deallocateIterator() - throw error + case .poll(let client): + if let message = try client.consumerPoll() { // non-blocking call + return message } - return message - case .terminateConsumerSequence: - self.deallocateIterator() + try await Task.sleep(for: self.pollInterval) + case .suspendPollLoop: + try await Task.sleep(for: self.pollInterval) // not started yet + case .terminatePollLoop: return nil } - case .failure(let error): - self.deallocateIterator() - throw error } - } - - private mutating func deallocateIterator() { - self.wrappedIterator = nil + return nil } } public func makeAsyncIterator() -> AsyncIterator { return AsyncIterator( stateMachine: self.stateMachine, - wrappedIterator: self.wrappedSequence.makeAsyncIterator() + pollInterval: self.pollInterval ) } } @@ -140,13 +125,6 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { /// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster. public final class KafkaConsumer: Sendable, Service { - typealias Producer = NIOThrowingAsyncSequenceProducer< - Result, - Error, - NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, - KafkaConsumerMessagesDelegate - > - /// The configuration object of the consumer client. private let configuration: KafkaConsumerConfiguration /// A logger. @@ -178,30 +156,14 @@ public final class KafkaConsumer: Sendable, Service { self.stateMachine = stateMachine self.logger = logger - let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence( - elementType: Result.self, - backPressureStrategy: { - switch configuration.backPressureStrategy._internal { - case .watermark(let lowWatermark, let highWatermark): - return NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark( - lowWatermark: lowWatermark, - highWatermark: highWatermark - ) - } - }(), - finishOnDeinit: true, - delegate: KafkaConsumerMessagesDelegate(stateMachine: self.stateMachine) - ) - self.messages = KafkaConsumerMessages( stateMachine: self.stateMachine, - wrappedSequence: sourceAndSequence.sequence + pollInterval: configuration.pollInterval ) self.stateMachine.withLockedValue { $0.initialize( - client: client, - source: sourceAndSequence.source + client: client ) } } @@ -359,25 +321,12 @@ public final class KafkaConsumer: Sendable, Service { private func _run() async throws { switch self.configuration.consumptionStrategy._internal { - case .partition(topic: let topic, partition: let partition, offset: let offset): + case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset): try self.assign(topic: topic, partition: partition, offset: offset) case .group(groupID: _, topics: let topics): try self.subscribe(topics: topics) } - - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - try await self.eventRunLoop() - } - - group.addTask { - try await self.messageRunLoop() - } - - // Throw when one of the two child task throws - try await group.next() - try await group.next() - } + try await self.eventRunLoop() } /// Run loop polling Kafka for new events. @@ -403,83 +352,6 @@ public final class KafkaConsumer: Sendable, Service { } } - /// Run loop polling Kafka for new consumer messages. - private func messageRunLoop() async throws { - while !Task.isCancelled { - let nextAction = self.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } - switch nextAction { - case .pollForAndYieldMessages(let client, let source): - // Poll for new consumer messages. - let messageResults = self.batchConsumerPoll(client: client) - if messageResults.isEmpty { - self.stateMachine.withLockedValue { $0.waitForNewMessages() } - } else { - let yieldResult = source.yield(contentsOf: messageResults) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - return - } - } - case .pollForMessagesIfAvailable(let client, let source): - let messageResults = self.batchConsumerPoll(client: client) - if messageResults.isEmpty { - // Still no new messages, so sleep. - try await Task.sleep(for: self.configuration.pollInterval) - } else { - // New messages were produced to the partition that we previously finished reading. - let yieldResult = source.yield(contentsOf: messageResults) - switch yieldResult { - case .produceMore: - break - case .stopProducing: - self.stateMachine.withLockedValue { $0.stopProducing() } - case .dropped: - return - } - } - case .suspendPollLoop: - try await Task.sleep(for: self.configuration.pollInterval) - case .terminatePollLoop: - return - } - } - } - - /// Read `maxMessages` consumer messages from Kafka. - /// - /// - Parameters: - /// - client: Client used for handling the connection to the Kafka cluster. - /// - maxMessages: Maximum amount of consumer messages to read in this invocation. - private func batchConsumerPoll( - client: RDKafkaClient, - maxMessages: Int = 100 - ) -> [Result] { - var messageResults = [Result]() - messageResults.reserveCapacity(maxMessages) - - for _ in 0..? - do { - if let message = try client.consumerPoll() { - result = .success(message) - } - } catch { - result = .failure(error) - } - - guard let result else { - return messageResults - } - messageResults.append(result) - } - - return messageResults - } - /// Mark all messages up to the passed message in the topic as read. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. @@ -572,25 +444,6 @@ public final class KafkaConsumer: Sendable, Service { extension KafkaConsumer { /// State machine representing the state of the ``KafkaConsumer``. struct StateMachine: Sendable { - /// State of the event loop fetching new consumer messages. - enum MessagePollLoopState { - /// The sequence can take more messages. - /// - /// - Parameter source: The source for yielding new messages. - case running(source: Producer.Source) - /// Sequence suspended due to back pressure. - /// - /// - Parameter source: The source for yielding new messages. - case suspended(source: Producer.Source) - /// We have read to the end of a partition and are now waiting for new messages - /// to be produced. - /// - /// - Parameter source: The source for yielding new messages. - case waitingForMessages(source: Producer.Source) - /// The sequence has finished, and no more messages will be produced. - case finished - } - /// The state of the ``StateMachine``. enum State: Sendable { /// The state machine has been initialized with init() but is not yet Initialized @@ -602,14 +455,13 @@ extension KafkaConsumer { /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter source: The source for yielding new messages. case initializing( - client: RDKafkaClient, - source: Producer.Source + client: RDKafkaClient ) /// The ``KafkaConsumer`` is consuming messages. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. /// - Parameter state: State of the event loop fetching new consumer messages. - case running(client: RDKafkaClient, messagePollLoopState: MessagePollLoopState) + case running(client: RDKafkaClient) /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked. /// We are now in the process of commiting our last state to the broker. /// @@ -625,15 +477,13 @@ extension KafkaConsumer { /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are /// not yet available when the normal initialization occurs. mutating func initialize( - client: RDKafkaClient, - source: Producer.Source + client: RDKafkaClient ) { guard case .uninitialized = self.state else { fatalError("\(#function) can only be invoked in state .uninitialized, but was invoked in state \(self.state)") } self.state = .initializing( - client: client, - source: source + client: client ) } @@ -657,7 +507,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): return .pollForEvents(client: client) case .finishing(let client): if client.isConsumerClosed { @@ -676,20 +526,7 @@ extension KafkaConsumer { /// Poll for a new ``KafkaConsumerMessage``. /// /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForAndYieldMessages( - client: RDKafkaClient, - source: Producer.Source - ) - /// Poll for a new ``KafkaConsumerMessage`` or sleep for ``KafkaConsumerConfiguration/pollInterval`` - /// if there are no new messages to read from the partition. - /// - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements. - case pollForMessagesIfAvailable( - client: RDKafkaClient, - source: Producer.Source - ) + case poll(client: RDKafkaClient) /// Sleep for ``KafkaConsumerConfiguration/pollInterval``. case suspendPollLoop /// Terminate the poll loop. @@ -705,18 +542,9 @@ extension KafkaConsumer { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, let consumerState): - switch consumerState { - case .running(let source): - return .pollForAndYieldMessages(client: client, source: source) - case .suspended(source: _): - return .suspendPollLoop - case .waitingForMessages(let source): - return .pollForMessagesIfAvailable(client: client, source: source) - case .finished: - return .terminatePollLoop - } + return .suspendPollLoop + case .running(let client): + return .poll(client: client) case .finishing, .finished: return .terminatePollLoop } @@ -738,8 +566,8 @@ extension KafkaConsumer { switch self.state { case .uninitialized: fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing(let client, let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) + case .initializing(let client): + self.state = .running(client: client) return .setUpConnection(client: client) case .running: fatalError("\(#function) should not be invoked more than once") @@ -750,30 +578,6 @@ extension KafkaConsumer { } } - /// Action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). - enum StoreOffsetAction { - /// Store the message offset with the given `client`. - /// - Parameter client: Client used for handling the connection to the Kafka cluster. - case storeOffset(client: RDKafkaClient) - /// The consumer is in the process of `.finishing` or even `.finished`. - /// Stop yielding new elements and terminate the asynchronous sequence. - case terminateConsumerSequence - } - - /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`). - func storeOffset() -> StoreOffsetAction { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): - return .storeOffset(client: client) - case .finishing, .finished: - return .terminateConsumerSequence - } - } - /// Action to be taken when wanting to do a commit. enum CommitAction { /// Do a commit. @@ -794,7 +598,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): return .commit(client: client) case .finishing, .finished: return .throwClosedError @@ -819,7 +623,7 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages") - case .running(let client, _): + case .running(let client): self.state = .finishing(client: client) return .triggerGracefulShutdown(client: client) case .finishing, .finished: @@ -827,71 +631,6 @@ extension KafkaConsumer { } } - // MARK: - Consumer Messages Poll Loop Actions - - /// The partition that was previously finished reading has got new messages produced to it. - mutating func newMessagesProduced() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .running, .suspended, .finished: - fatalError("\(#function) should not be invoked in state \(self.state)") - case .waitingForMessages(let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) - } - } - - /// The consumer has read to the end of a partition and shall now go into a sleep loop until new messages are produced. - mutating func waitForNewMessages() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .running(let source): - self.state = .running(client: client, messagePollLoopState: .waitingForMessages(source: source)) - case .suspended, .waitingForMessages, .finished: - fatalError("\(#function) should not be invoked in state \(self.state)") - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to produce more messages. - mutating func produceMore() { - switch self.state { - case .uninitialized: - fatalError("\(#function) invoked while still in state \(self.state)") - case .initializing: - break // This case can be triggered by the KafkaConsumerMessagesDeletgate - case .running(let client, let consumerState): - switch consumerState { - case .running, .waitingForMessages, .finished: - break - case .suspended(let source): - self.state = .running(client: client, messagePollLoopState: .running(source: source)) - } - case .finishing, .finished: - break - } - } - - /// ``KafkaConsumerMessages``'s back pressure mechanism asked us to temporarily stop producing messages. - mutating func stopProducing() { - guard case .running(let client, let consumerState) = self.state else { - fatalError("\(#function) invoked while still in state \(self.state)") - } - - switch consumerState { - case .suspended, .finished: - break - case .running(let source): - self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) - case .waitingForMessages(let source): - self.state = .running(client: client, messagePollLoopState: .suspended(source: source)) - } - } - /// The ``KafkaConsumerMessages`` asynchronous sequence was terminated. mutating func finishMessageConsumption() { switch self.state { @@ -899,8 +638,8 @@ extension KafkaConsumer { fatalError("\(#function) invoked while still in state \(self.state)") case .initializing: self.state = .finished - case .running(let client, _): - self.state = .running(client: client, messagePollLoopState: .finished) + case .running: + self.state = .finished case .finishing, .finished: break } diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 4ac95995..de953312 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -238,8 +238,10 @@ public final class KafkaProducer: Service, Sendable { 0...Int(Int32.max) ~= self.configuration.flushTimeoutMilliseconds, "Flush timeout outside of valid range \(0...Int32.max)" ) + defer { // we should finish source indefinetely of exception in client.flush() + source?.finish() + } try await client.flush(timeoutMilliseconds: Int32(self.configuration.flushTimeoutMilliseconds)) - source?.finish() return case .terminatePollLoop: return diff --git a/Sources/Kafka/KafkaProducerMessage.swift b/Sources/Kafka/KafkaProducerMessage.swift index f9bc1ec2..5b17d983 100644 --- a/Sources/Kafka/KafkaProducerMessage.swift +++ b/Sources/Kafka/KafkaProducerMessage.swift @@ -48,7 +48,7 @@ public struct KafkaProducerMessage.allocate(capacity: RDKafkaClient.stringSize) @@ -149,7 +146,7 @@ final class RDKafkaClient: Sendable { } if error != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error()) + throw KafkaError.rdKafkaError(wrapping: error) } } @@ -492,40 +489,6 @@ final class RDKafkaClient: Sendable { } } - /// Store `message`'s offset for next auto-commit. - /// - /// - Important: `enable.auto.offset.store` must be set to `false` when using this API. - func storeMessageOffset(_ message: KafkaConsumerMessage) throws { - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) - ) - - let error = changesList.withListPointer { listPointer in - rd_kafka_offsets_store( - self.kafkaHandle, - listPointer - ) - } - - if error != RD_KAFKA_RESP_ERR_NO_ERROR { - // Ignore RD_KAFKA_RESP_ERR__STATE error. - // RD_KAFKA_RESP_ERR__STATE indicates an attempt to commit to an unassigned partition, - // which can occur during rebalancing or when the consumer is shutting down. - // See "Upgrade considerations" for more details: https://github.com/confluentinc/librdkafka/releases/tag/v1.9.0 - // Since Kafka Consumers are designed for at-least-once processing, failing to commit here is acceptable. - if error == RD_KAFKA_RESP_ERR__STATE { - return - } - throw KafkaError.rdKafkaError(wrapping: error) - } - } - /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index e6cf82e5..d23942c5 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -12,8 +12,10 @@ // //===----------------------------------------------------------------------===// +import Atomics import struct Foundation.UUID @testable import Kafka +@_spi(Internal) import Kafka import NIOCore import ServiceLifecycle import XCTest @@ -79,7 +81,9 @@ final class KafkaTests: XCTestCase { events: [], logger: .kafkaTest ) - try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000) + if let uniqueTestTopic = self.uniqueTestTopic { + try client._deleteTopic(uniqueTestTopic, timeout: 10 * 1000) + } self.bootstrapBrokerAddress = nil self.producerConfig = nil @@ -445,7 +449,7 @@ final class KafkaTests: XCTestCase { try await group.next() // Verify that we receive the first message - var consumerIterator = consumer.messages.makeAsyncIterator() + let consumerIterator = consumer.messages.makeAsyncIterator() let consumedMessage = try await consumerIterator.next() XCTAssertEqual(testMessages.first!.topic, consumedMessage!.topic) @@ -599,65 +603,175 @@ final class KafkaTests: XCTestCase { } } - // MARK: - Helpers - - private static func createTestMessages( - topic: String, - headers: [KafkaHeader] = [], - count: UInt - ) -> [KafkaProducerMessage] { - return Array(0..] - ) async throws { - var messageIDs = Set() + let numOfMessages: UInt = 1000 + let testMessages = Self.createTestMessages(topic: uniqueTestTopic, count: numOfMessages) + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: .kafkaTest) + + let producerServiceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let producerServiceGroup = ServiceGroup(configuration: producerServiceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task + group.addTask { + try await producerServiceGroup.run() + } + + // Producer Task + group.addTask { + try await Self.sendAndAcknowledgeMessages( + producer: producer, + events: acks, + messages: testMessages, + skipConsistencyCheck: true + ) + } - for message in messages { - messageIDs.insert(try producer.send(message)) + // Wait for Producer Task to complete + try await group.next() + // Shutdown the serviceGroup + await producerServiceGroup.triggerGracefulShutdown() } - var receivedDeliveryReports = Set() + // MARK: Consumer - for await event in events { - switch event { - case .deliveryReports(let deliveryReports): - for deliveryReport in deliveryReports { - receivedDeliveryReports.insert(deliveryReport) - } - default: - break // Ignore any other events + let uniqueGroupID = UUID().uuidString + + var consumer1Config = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress] + ) + consumer1Config.autoOffsetReset = .beginning + consumer1Config.broker.addressFamily = .v4 + consumer1Config.pollInterval = .milliseconds(1) + consumer1Config.isAutoCommitEnabled = false + + let consumer1 = try KafkaConsumer( + configuration: consumer1Config, + logger: .kafkaTest + ) + + var consumer2Config = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [bootstrapBrokerAddress] + ) + consumer2Config.autoOffsetReset = .beginning + consumer2Config.broker.addressFamily = .v4 + consumer2Config.pollInterval = .milliseconds(1) + consumer2Config.isAutoCommitEnabled = false + + let consumer2 = try KafkaConsumer( + configuration: consumer2Config, + logger: .kafkaTest + ) + + let serviceGroupConfiguration1 = ServiceGroupConfiguration(services: [consumer1], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroup1 = ServiceGroup(configuration: serviceGroupConfiguration1) + + let serviceGroupConfiguration2 = ServiceGroupConfiguration(services: [consumer2], gracefulShutdownSignals: [.sigterm, .sigint], logger: .kafkaTest) + let serviceGroup2 = ServiceGroup(configuration: serviceGroupConfiguration2) + + let sharedCtr = ManagedAtomic(0) + + try await withThrowingTaskGroup(of: Void.self) { group in + // Run Task for 1st consumer + group.addTask { + try await serviceGroup1.run() + } + // Run Task for 2nd consumer + group.addTask { + try await Task.sleep(for: .seconds(20)) // wait a bit that first consumer would form a queue + try await serviceGroup2.run() } - if receivedDeliveryReports.count >= messages.count { - break + // First Consumer Task + group.addTask { + // 6 partitions + for try await record in consumer1.messages { + sharedCtr.wrappingIncrement(ordering: .relaxed) + + try consumer1.scheduleCommit(record) // commit time to time + try await Task.sleep(for: .milliseconds(100)) // don't read all messages before 2nd consumer + } } - } - XCTAssertEqual(Set(receivedDeliveryReports.map(\.id)), messageIDs) + // Second Consumer Task + group.addTask { + // 6 partitions + for try await record in consumer2.messages { + sharedCtr.wrappingIncrement(ordering: .relaxed) - let acknowledgedMessages: [KafkaAcknowledgedMessage] = receivedDeliveryReports.compactMap { - guard case .acknowledged(let receivedMessage) = $0.status else { - return nil + try consumer2.scheduleCommit(record) // commit time to time + } } - return receivedMessage - } - XCTAssertEqual(messages.count, acknowledgedMessages.count) - for message in messages { - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message.topic })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) })) + // Monitoring task + group.addTask { + while true { + let currentCtr = sharedCtr.load(ordering: .relaxed) + guard currentCtr >= numOfMessages else { + try await Task.sleep(for: .seconds(5)) // wait if new messages come here + continue + } + try await Task.sleep(for: .seconds(5)) // wait for extra messages + await serviceGroup1.triggerGracefulShutdown() + await serviceGroup2.triggerGracefulShutdown() + break + } + } + + try await group.next() + try await group.next() + try await group.next() + + // Wait for second Consumer Task to complete + let totalCtr = sharedCtr.load(ordering: .relaxed) + XCTAssertEqual(totalCtr, Int(numOfMessages)) } } + + // MARK: - Helpers + + private static func createTestMessages( + topic: String, + headers: [KafkaHeader] = [], + count: UInt + ) -> [KafkaProducerMessage] { + return _createTestMessages(topic: topic, headers: headers, count: count) + } + + private static func sendAndAcknowledgeMessages( + producer: KafkaProducer, + events: KafkaProducerEvents, + messages: [KafkaProducerMessage], + skipConsistencyCheck: Bool = false + ) async throws { + return try await _sendAndAcknowledgeMessages(producer: producer, events: events, messages: messages, skipConsistencyCheck: skipConsistencyCheck) + } } diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift index db86c0a0..94dbf374 100644 --- a/Tests/IntegrationTests/Utilities.swift +++ b/Tests/IntegrationTests/Utilities.swift @@ -12,9 +12,7 @@ // //===----------------------------------------------------------------------===// -import Crdkafka import struct Foundation.UUID -@testable import Kafka import Logging extension Logger { @@ -24,159 +22,3 @@ extension Logger { return logger } } - -extension RDKafkaClient { -// func createUniqueTopic(timeout: Int32 = 10000) async throws -> String { -// try await withCheckedThrowingContinuation { continuation in -// do { -// let uniqueTopic = try self._createUniqueTopic(timeout: timeout) -// continuation.resume(returning: uniqueTopic) -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Create a topic with a unique name (`UUID`). - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter timeout: Timeout in milliseconds. - /// - Returns: Name of newly created topic. - /// - Throws: A ``KafkaError`` if the topic creation failed. - func _createUniqueTopic(timeout: Int32) throws -> String { - let uniqueTopicName = UUID().uuidString - - let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) - defer { errorChars.deallocate() } - - guard let newTopic = rd_kafka_NewTopic_new( - uniqueTopicName, - -1, // use default num_partitions - -1, // use default replication_factor - errorChars, - RDKafkaClient.stringSize - ) else { - let errorString = String(cString: errorChars) - throw KafkaError.topicCreation(reason: errorString) - } - defer { rd_kafka_NewTopic_destroy(newTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var newTopicsArray: [OpaquePointer?] = [newTopic] - rd_kafka_CreateTopics( - kafkaHandle, - &newTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { - throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_CreateTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicCreation(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == uniqueTopicName else { - throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") - } - } - - return uniqueTopicName - } - -// func deleteTopic(_ topic: String, timeout: Int32 = 10000) async throws { -// try await withCheckedThrowingContinuation { continuation in -// do { -// try self._deleteTopic(topic, timeout: timeout) -// continuation.resume() -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Delete a topic. - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter topic: Topic to delete. - /// - Parameter timeout: Timeout in milliseconds. - /// - Throws: A ``KafkaError`` if the topic deletion failed. - func _deleteTopic(_ topic: String, timeout: Int32) throws { - let deleteTopic = rd_kafka_DeleteTopic_new(topic) - defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] - rd_kafka_DeleteTopics( - kafkaHandle, - &deleteTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { - throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_DeleteTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == topic else { - throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") - } - } - } -} diff --git a/dev/test-benchmark-thresholds.sh b/dev/test-benchmark-thresholds.sh new file mode 100644 index 00000000..731c3e97 --- /dev/null +++ b/dev/test-benchmark-thresholds.sh @@ -0,0 +1,42 @@ +#!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the swift-kafka-client open source project +## +## Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## + +cd Benchmarks +swift package --disable-sandbox benchmark baseline update PR --no-progress +git checkout main +swift package --disable-sandbox benchmark baseline update main --no-progress + +swift package benchmark baseline check main PR +BENCHMARK_RESULT=$? + +echo "Retcode is $BENCHMARK_RESULT" + +if [ $BENCHMARK_RESULT -eq 0 ]; then + echo "Benchmark results are the same as for main" +fi + +if [ $BENCHMARK_RESULT -eq 4 ]; then + echo "Benchmark results are better as for main" +fi + +if [ $BENCHMARK_RESULT -eq 1 ]; then + echo "Benchmark failed" + exit 1 +fi + +if [ $BENCHMARK_RESULT -eq 2 ]; then + echo "Benchmark results are worse than main" + exit 1 +fi diff --git a/docker/docker-compose.2204.57.yaml b/docker/docker-compose.2204.57.yaml index af7cda0c..a465a610 100644 --- a/docker/docker-compose.2204.57.yaml +++ b/docker/docker-compose.2204.57.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.7 environment: + - SWIFT_VERSION=5.7 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still diff --git a/docker/docker-compose.2204.58.yaml b/docker/docker-compose.2204.58.yaml index 521c6ac9..47b02679 100644 --- a/docker/docker-compose.2204.58.yaml +++ b/docker/docker-compose.2204.58.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.8 environment: + - SWIFT_VERSION=5.8 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.2204.59.yaml b/docker/docker-compose.2204.59.yaml index e0a562d7..8d9cf29d 100644 --- a/docker/docker-compose.2204.59.yaml +++ b/docker/docker-compose.2204.59.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.9 environment: + - SWIFT_VERSION=5.9 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.2204.main.yaml b/docker/docker-compose.2204.main.yaml index b4e098cf..acac1a54 100644 --- a/docker/docker-compose.2204.main.yaml +++ b/docker/docker-compose.2204.main.yaml @@ -11,6 +11,7 @@ services: test: image: swift-kafka-client:22.04-main environment: + - SWIFT_VERSION=main - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index d8789f7c..10f1665c 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -50,12 +50,12 @@ services: <<: *common depends_on: [kafka, runtime-setup] environment: + SWIFT_VERSION: 5.7 KAFKA_HOST: kafka command: > /bin/bash -xcl " swift build --build-tests $${SANITIZER_ARG-} && \ - swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} && \ - cd Benchmarks && swift package --disable-sandbox benchmark baseline check --check-absolute-path Thresholds/$${SWIFT_VERSION-}/ + swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} " benchmark: @@ -73,7 +73,7 @@ services: depends_on: [kafka, runtime-setup] environment: KAFKA_HOST: kafka - command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/" + command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/ --no-progress" # util