Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer: Backpressure for Ack'd messages #53

Conversation

felixschlegel
Copy link
Contributor

@felixschlegel felixschlegel commented May 26, 2023

Motivation

Our current implementation of polling librdkafka for new message acknowledgements in KafkaProducer did not support backpressure. Also, it made use of some weak references which we generally want to avoid due to performance reasons.

Modifications

  • create a new class called KafkaBackPressurePollingSystem
  • implement backpressure for KafkaProducer's poll-loop with KafkaBackPressurePollingSystem
  • fix typo in SwiftKafkaTests

TODO

  • write sensible tests for this backpressure mechanism and the state machine
  • how to handle dependency injection into our KafkaBackPressurePollingSystem, given that some of its dependencies are not Sendable
  • discuss if we want to expose a Kafka.run() method to the user or if we want to keep the poll task to ourselves (cc: @FranzBusch )

///
/// - Parameter pollIntervalNanos: The poll interval in nanoseconds.
/// - Returns: An awaitable task representing the execution of the poll loop.
func run(pollIntervalNanos: UInt64) async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a Duration here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah works, will have to update our minimum Package version to macOS .v13 / iOS .v16 though 👍

typealias Producer = NIOAsyncSequenceProducer<Element, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, KafkaBackPressurePollingSystem>

/// The state machine that manages the system's state transitions.
let stateMachine: StateMachine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main thing that we have to protect via a lock. Can you put this inside a NIOLockedValueBox please

Comment on lines 34 to 35
private let _client: NIOLockedValueBox<KafkaClient?>
private let _sequenceSource: NIOLockedValueBox<Producer.Source?>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having state outside of the state machine just put it into the associated values of the state. This way they are protected by the lock. If an action has to do something with those just return it in the action.

let action = self.stateMachine.nextPollLoopAction()

switch action {
case .poll:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this action poolAndSleep

self.client?.withKafkaHandlePointer { handle in
rd_kafka_poll(handle, 0)
}
try? await Task.sleep(nanoseconds: pollIntervalNanos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't silence the error here. If our task got cancelled we should exit out of the run loop. Importantly though we probably want to finish the source then.

/// of `produceMore()` to continue producing messages.
case loopSuspended(CheckedContinuation<Void, Never>)
/// The system is shut down.
case shutDown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to finished to follow our other state machine naming

}

/// The current state of the state machine.
let state = NIOLockedValueBox(State.initial)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the locked value box here around an enum has not effect. The enum itself is thread safe. What is not thread safe are the calls to the various mutating methods on the StateMachine.

/// The state machine used by the ``KafkaBackPressurePollingSystem``.
struct StateMachine {
/// The possible actions for the poll loop.
enum PollLoopAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the actions right above the method that returns them. This makes it easier to understand what is going on.

/// Triggers an event in the state machine, causing a state transition.
///
/// - Parameter event: The event to be triggered.
func eventTriggered(_ event: Event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event triggered is way too broad. Please introduce separate methods for all the various events you model here e.g. pollSuspended, pollCancelled etc.

import NIOConcurrencyHelpers
import NIOCore

// TODO: write test for evaluating right state transitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make this testable instead of passing in a KafkaClient you can pass in a closure that polls. In real code you can pass in a closure that does the actual polling and in the test you can pass something in which you can assert on

@felixschlegel
Copy link
Contributor Author

Thanks for your review! I have implemented your changes and added 3 test cases 😄
I still get warnings about conforming to Sendable. Please see my comments inline!

/// The state machine used by the ``KafkaBackPressurePollingSystem``.
struct StateMachine: Sendable {
/// The ``NIOAsyncSequenceProducer.Source`` used for yielding the messages to the ``NIOAsyncSequenceProducer``.
var sequenceSource: Producer.Source? // TODO: make sendable?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIOAsyncSequenceProducer.Source is not Sendable at the moment which is giving us a warning here. Should I try making it Sendable in swift-nio or do we have a workaround for that in our case here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StateMachine doesn't necessarily need to be Sendable we are holding it in a Lock which makes it Sendable We just need to use @unchecked Sendable on the KafkaBackPressurePollingSystem

let stateMachineLock: NIOLockedValueBox<StateMachine>

/// Closure that takes care of polling `librdkafka` for new messages.
var pollClosure: (() -> Void)?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atm this is a var as it captures self in KafkaProducer so it can get set after our KafkaProducer has been initialized. However, having a var here makes us not conform to Sendable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this closure into the state machine. Furthermore, I don't see why this should capture the KafkaProducer the only thing that we need is capturing the KafkaClient to call poll on it

@felixschlegel felixschlegel requested a review from FranzBusch May 31, 2023 13:56
Copy link
Contributor

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting close! The state machine looks already quite good we just have to move some of the things outside of the state machine. I left some comments inline

let stateMachineLock: NIOLockedValueBox<StateMachine>

/// Closure that takes care of polling `librdkafka` for new messages.
var pollClosure: (() -> Void)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this closure into the state machine. Furthermore, I don't see why this should capture the KafkaProducer the only thing that we need is capturing the KafkaClient to call poll on it

/// Closure that takes care of polling `librdkafka` for new messages.
var pollClosure: (() -> Void)?
/// The ``NIOAsyncSequenceProducer.Source`` used for yielding the messages to the ``NIOAsyncSequenceProducer``.
var sequenceSource: Producer.Source? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we pass this in the init or rather could we actually create the sequence in here and return it from a factory method?

/// - Parameter pollInterval: The desired time interval between two consecutive polls.
/// - Returns: An awaitable task representing the execution of the poll loop.
func run(pollInterval: Duration) async {
actionLoop: while true {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the label here instead we can just use return instead of break further down

do {
try await Task.sleep(for: pollInterval)
} catch {
self.stateMachineLock.withLockedValue { $0.shutDown() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably have to call source.finish() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source.finish() gets invoked by shutDown() 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but that's inside the state machine and we want to have this outside the state machine! Since the state machine should not execute any side effects

}

/// The delivery report callback function that handles acknowledged messages.
private(set) lazy var deliveryReportCallback: (UnsafePointer<rd_kafka_message_t>?) -> Void = { messagePointer in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scope of this PR can we make a change so that we don't use UnsafePointer here but rather bridge the call-back to something well typed inside the method where we set the callback?

case .finished:
return
case .stopProducing(let existingContinuation) where existingContinuation != nil:
fatalError("Created leaking continuation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fatalError("Created leaking continuation")
fatalError("Internal state inconsistency. Run loop is running more than once")


/// Shut down the state machine and finish producing elements.
mutating func shutDown() {
self.sequenceSource?.finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here please move this outside of the state machine

Comment on lines 246 to 250
case .stopProducing(let continuation):
continuation?.resume()
fallthrough
default:
self.sequenceSource?.finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please apply all previous comments to this as well

}

/// Yields a new elements to the ``NIOAsyncSequenceProducer``.
fileprivate func yield(_ element: Element) -> Producer.Source.YieldResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please yield outside of the state machine

///
/// - Parameter pollInterval: The desired time interval between two consecutive polls.
/// - Returns: An awaitable task representing the execution of the poll loop.
func run(pollInterval: Duration) async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure the run() method is only called once. We can do this with a method in the state machine that transition from initial to producing or sets a flag on the initial state.

Motivation:

Our current implementation of polling `librdkafka` for new message
acknowledgements in `KafkaProducer` did not support backpressure. Also,
it made use of some `weak` references which we generally want to avoid
due to performance reasons.

Modifications:

* create a new class called `KafkaBackPressurePollingSystem`
* implement backpressure for `KafkaProducer`'s poll-loop with
  `KafkaBackPressurePollingSystem`
* fix typo in `SwiftKafkaTests`
Modifications:

* upgrade minimum OS versions to support the `Duration` type
* change type of `KafkaBackPressurePollingSystem.run`'s `pollInterval`
  to `Duration`
* rename `PollLoopAction.poll` -> `PollLoopAction.pollAndSleep`
* rename `StateMachine.State.shutDown` -> `StateMachine.State.finished`
* move lock outside of `StateMachine`
* synchronize access to
  KafkaBackPressurePollingSystem.client/producerSource` by moving them
  into the `StateMachine`
* break down `StateMachine.eventTriggered` into separate methods
* finish `sequenceSource` when `StateMachine.shutDown` is invoked
* shutdown poll loop when `Task.sleep` fails
Motivation:

We want to be able to inject a closure that contains the polling
mechanism into our `KafkaBackPressurePollingSystem` so that we can
assert that it has been called in testing.
Condition for checking leaking continuations in
`KafkaBackPressurePollingSystem.StateMachine.suspendLoop` was wrong.
Modifications:

* added test testing that invocations to the `pollClosure` in
  `KafkaBackPressurePollingSystem` only occur when backpressure allows
  it
* added test that makes sure the `KafkaBackPressurePollingSystem` shuts
  down properly when it's `run()` method's parent task gets cancelled
Modifications:

* remove flakey tests testing that poll is not invoked when loop is
  suspended
* add dedicated test that tests the case mentioned above
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from ca5bc3d to b7d5fec Compare June 1, 2023 11:56
Modifications:

* moved `KafkaAcknowledgedMessageAsyncSequnce` to
  `KafkaBackPressurePollingSystem`
* made `init` of `KafkaBackPressurePollingSystem` private and make it
  only instantiable via the
  `KafkaBackPressurePollingSystem.createSystemAndSequence` factory method
Modifications:

* remove `actionLoop` label and replace `break actionLoop` with plain
  `return`s
* remove unused type `KafkaBackPressurePollingSystem.StateMachine.Event`
* improve syntax of `.stopProducing` case in `suspendLoop()`
* better error message for `.stopProducing` case in `suspendLoop()`
* `KafkaBackPressurePollingSystem`: remove `default` and `fallthrough`s
  in `switch` statements
* `fatalError` if `KafkaBackPressurePollingSystem.run` is invoked more
  than once
Modifications:

* create a new enum `StateMachine.Command`
* return `StateMachine.Command` on the `StateMachine` transitions with
  side-effects
* execute `StateMachine.Command`s inside a dedicated command handler in
  `KafkaBackPressurePollingSystem`
* remove `KafkaBackPressurePollingSystem.StateMachine.yield(_:)` method
Modifications:

* move conversion of `rd_kafka_message_t` to
  `KafkaAcknowledgementResult` to `RDKafkaConfig` so that we can pass
  the `KafkaAcknowledgementResult` type as early as possible and don't
  have to bother with `UnsafePointer<rd_kafka_message_t>` in all our
  delivery callback logic
Modifications:

* add locked `running` variable to `KafkaBackPressurePollingSystem` to
  determin if `run` has been invoked already (Bug fix, relying on
  `state` here was wrong as methods other than `run()` can change to
  state from `.initial` to anything else
* made `init` of `KafkaBackPressurePollingSystem` `internal` to avoid
  having a `NIOAsyncSequenceProducerDelegate` which could make our tests
  flakey
@felixschlegel felixschlegel requested a review from FranzBusch June 2, 2023 13:55
Motivation:

In `KafkaProducer`, instead of launching the `runTask` ourselves, we
expose a `run()` method to the user. By doing so, we can adapt to
the `Service` protocol from `swift-service-lifecycle` in the future.

Modifications:

* create a new method `KafkaProducer.run()` wrapping the
  `pollingSystem`'s `run()` method
* make tests use their own `runTask`
* adapt `KafkaProducer` example in `README`
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from 521d9b2 to 52e8d25 Compare June 7, 2023 14:44
@felixschlegel felixschlegel requested a review from FranzBusch June 7, 2023 14:46
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from 5da5e10 to 486cfb2 Compare June 7, 2023 15:29
Modifications:

* remove `ClosureWrapper` classes
* make `pollClosure` `Optional`
* make `pollClosure` a get/set property of `KafkaPollingSystem`
* adjust tests
Modifications:

* move common logic into `setUp()` method
}

group.addTask {
let messageID = try await producer.sendAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up we should probably rename the method to send and just document that it is async under the hood since we are not planning to add a sync variant to this type anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#59

Comment on lines 31 to 49
var source: Producer.Source? {
get {
self.stateMachine.withLockedValue { $0.source }
}
set {
self.stateMachine.withLockedValue { $0.source = newValue }
}
}

/// Closure that is used to poll the upstream producer for new updates.
/// In our case the upstream producer is the Kafka cluster.
var pollClosure: (() -> Void)? {
get {
self.stateMachine.withLockedValue { $0.pollClosure }
}
set {
self.stateMachine.withLockedValue { $0.pollClosure = newValue }
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to inject this via a factory method and init.

private let stateMachine: NIOLockedValueBox<StateMachine?>

init() {}

func initialize(pollClosure: @escaping () -> Void, source: Producer.Source) {
  ...
}

static makePollingSystem() -> Self {
}

func run(pollInterval: Duration) async throws {
switch self.stateMachine.withLockedValue({ $0.run() }) {
case .alreadyClosed:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error here

/// Yield new elements to the underlying `NIOAsyncSequenceProducer`.
func yield(_ element: Element) {
self.stateMachine.withLockedValue { stateMachine in
switch stateMachine.state {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we switching on the state here? We should switch on the state inside the state machine and return an action

}

/// The current state of the state machine.
var state: State
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var state: State
private var state: State

/// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
/// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
/// - Returns: A ``KafkaAcknowledgementResult``.
static func convertMessageToAcknowledgementResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static func convertMessageToAcknowledgementResult(
private static func convertMessageToAcknowledgementResult(

Comment on lines 73 to 75
let _ = Task {
try await producer.run()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use task groups here instead of unstructured tasks

Comment on lines 41 to 43
self.runTask = Task {
try await self.sut.run(pollInterval: self.pollInterval)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

XCTFail()
}

self.sut.produceMore()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not call these directly but rather consume the sequence to trigger produce more

Motivation:

We don't want to expose source and poll closure from
`KafkaPollingSystem`.

Modifications:

* add new `func initialize(backPressureStrategy:, pollClosure:)` to
  `KafkaPollingSystem` to support delayed initialization (needed because
  the `AsyncSequence` relies on `KafkaPollingSystem` and
  `KafkaPollingSystem` relies on the `AsyncSequence.Source` (circular
  dependency)
* `KafkaPollingSystem.StateMachine`: add state `.uninitialized` for when
  the `StateMachine` exists but `initialize()` has not yet been invoked
  on it
* stop exposing `pollClosure` and `source` variables from
  `KafkaPollingSystem` / `KafkaPollingSystem.StateMachine`
* move `yield(_:)` state functionality inside of
  `KafkaPollingSystem.StateMachine`
* make `RDKafkaConfig.convertMessageToAcknowledgementResult` `private`
* add new error type: `KafkaError.pollLoop`
* throw error when invoking `KafkaPollingSystem.run()` on closed poll
  loop
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from d73c035 to 30fe1bf Compare June 8, 2023 16:35
Modifications:

* refactor all tests into using task groups (structured concurrency)
* adjust `KafkaPollingSystemTests` to new interface
* add comments to task groups in `README`
* Bug Fix: we were hotlooping because of missing return statements in
  `KafkaPollingSystem`'s poll loop
Motivation:

When testing with `swift test` in terminal, the test
`testRunTaskCancellationThrowsRunError` created a leaking continuation.
This was due to the fact that the run task was still creating a new
continuation although it has been cancelled.

Modifications:

* add a `guard` statement in `KafkaPollingSystem.run()` to check if
  `Task` is cancelled before creating a new `continuation`

// This subscribes to the acknowledgements stream and immediately terminates the stream.
// Required to kill the run task.
var iterator: KafkaAsyncSequence<KafkaProducer.Acknowledgement>.AsyncIterator? = producer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not extremely happy with this, but at the moment we need to subscribe to and then terminate the AsyncSequence to shut down the run task.

This kinda feels bad — do you have any ideas on how we can stop the run task from running forever when the user consumes no acknowledgement messages? @FranzBusch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we dropping the iterator here the didTerminate callback on the delegate should be triggered which should lead to the run() task to terminate. Can we check that the didTerminate is triggered?

Copy link
Contributor Author

@felixschlegel felixschlegel Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution I came up with now is quite simple: I am just invoking self.pollingSystem.terminate() inside of KafkaProducer.shutDownGracefully(). By doing that we don't have to worry about whether the user subscribes to our acknowledgements AsyncSequence or not.

@felixschlegel felixschlegel requested a review from FranzBusch June 9, 2023 13:45
Motivation:

By creating the `KafkaProducer` with a factory method that also returns
the `KafkaAsyncSequence` for receiving message acknowledgements, we
ensure that `KafkaAsyncSequence` will eventually get terminated as the
user now owns the memory reference to it.
It is important that the `KafkaAsyncSequence` gets terminated because
otherwise the `KafkaProducer`'s run task runs forever.

Modifications:

* make initializer of `KafkaProducer` `private`
* create new factory mehtod `KafkaProducer.newProducer` for initializing
  the `KafkaProducer` alongside the acknowledgement `KafkaAsyncSequence`
* update tests
* update README
Motivation:

Previously, we had to wait for the `KafkaProducer.acknowledgements`
`AsyncSequence` delegate method `didTerminate()` to be invoked in order
to shutdown the poll loop. This was an issue as this lead to the poll
loop never stopping when e.g. the user never subscribed to the
`acknowledgements` `AsyncSequence` and therefore the `didTerminate()`
delegate method would never be invoked.

Modifications:

* make `KafkaPollingSystem.terminate()` `internal`
* invoke `pollingSystem.terminate()` in
  `KafkaProducer.shutdownGracefully`
* remove poll loop shutdown workaround in test

Result:

After your change, what will change.
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from dfbbe7e to 5e295df Compare June 12, 2023 09:03
import NIOCore

/// Our `AsyncSequence` implementation wrapping an `NIOAsyncSequenceProducer`.
public struct KafkaAsyncSequence<Element>: AsyncSequence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that this is public. I would rather have two concrete AsyncSequences than a generic one here. We can use the generic one internally though. For this PR that would mean a KafkaMessageAcknowledgements struct

}

/// Kill the ``KafkaPollingSystem``'s poll loop and free its resources.
func terminate(_ error: Error? = nil) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be no problem because we can also invoke didTerminate() from outside, but I thought that the name terminate() made more sense, also because didTerminate() is a delegate method.

(My implementation needs an internal method for terminating the KafkaPollingSystem because I invoke this from KafkaPollingSystem.shutDownGracefully)

/// Determines the next action to be taken in the poll loop based on the current state.
///
/// - Returns: The next action for the poll loop.
func nextPollLoopAction() -> KafkaPollingSystem.StateMachine.PollLoopAction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a separate method for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, but given that our stateMachine is private this provides a safe way to access the poll loop's state for testing. There are not too many things we can assert on here in testing. Alternatively we can make KafkaPollingSystem.yield(_:) return the next action but this would also then just serve the purpose on being able to assert on the KafkaPollingSystem. In general all of this is quite hard as the KafkaPollingSystem does not expose that much information to the outside world.

Comment on lines 269 to 273
case alreadyClosed
/// The poll loop is already running. ``run()`` should not be invoked more than once!
case alreadyRunning
/// The poll loop can now be started.
case startLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last nit here w.r.t. naming. Can you make sure all actions are named in way that they tell what do to e.g. throwError like you named startLoop

return .alreadyRunning
}
self.state = .idle(source: source, pollClosure: pollClosure, running: true)
case .producing(let source, let pollClosure, let running):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen? How can we be in producing before run was called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this might be possible e.g. when produceMore() gets called upon the NIOAsyncSequenceProducerDelegate before run() gets invoked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't happen. produceMore() only gets called after yield() returns stopProducing

/// The possible actions for the poll loop.
enum PollLoopAction {
/// Ask `librdkakfa` to receive new message acknowledgements at a given poll interval.
case pollAndSleep(pollClosure: (() -> Void)?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be optional right?

Comment on lines 229 to 230
source: Producer.Source?,
pollClosure: (() -> Void)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the source and the pollClosure optional here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this was from a previous implementation!

case .idle(let source, _, _), .producing(let source, _, _), .stopProducing(let source, _, _, _):
// We can also yield when in .stopProducing,
// the AsyncSequenceProducer will buffer for us
let yieldResult = source?.yield(element)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't yield inside the state machine. Yielding is a side effect so please return that as an action.

/// Send messages to the Kafka cluster.
/// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore.
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfig``
/// configuration object (only works if server has `auto.create.topics.enable` property set).
public actor KafkaProducer {
public typealias Acknowledgement = Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we typealiasing this here? Let's not introduce this please but spell it out

Comment on lines 132 to 133
// Kill poll loop in polling system
self.pollingSystem.terminate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that this is enough. We should figure out what happens in a system where we don't poll anymore but still produce e.g. nobody consumes the acknowledgements sequence and it gets back pressured. Will rdkafka buffer then and at some point also back pressure or will we just OOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe rd_kafka_flush() got us covered here (we already use this).

Modifications:

* `KafkaPollingSystem`: add `return` statement to every individual `switch` case
* remove unneccesary optional types
* remove typealias `KafkaProducer.Acknowledgement`
* move `StateMachine.yield` side effect outside of `StateMachine`
* rename `StateMachine.RunAction`s to be more imperative
Modifications:

* remove generic `KafkaAsyncSequence` and have a specialised
  `AsyncSequence` called `KafkaMessageAcknowledgements` for the
  `KafkaProducer`
@felixschlegel felixschlegel requested a review from FranzBusch June 12, 2023 17:25
Motivation:

Yielding before running resulted in a fatalError.

Modifications:

* remove message yields from testRunTaskCancellationThrowsRunError
Motivation:

We want to have a `KafkaProducer` that is not consuming any
acknowledgements. This means it is initialized without a
`deliveryReportCallback` which in turn means that `librdkafka` will not
queue any incoming acknowledgements which prevents us from running out
of memory in that case.

Modifications:

* add two new factory methods for creating `KafkaProducer`:
    * `KafkaProducer.newProducer`
    * `KafkaProducer.newProducerWithAcknowledgements`
* update README
@felixschlegel felixschlegel force-pushed the fs-kafka-producer-poll-refactoring branch from de0d8e3 to ff2cb71 Compare June 14, 2023 13:03
@felixschlegel
Copy link
Contributor Author

Having back pressure makes no sense as librdkafka is queuing the messages regardless, closing, but keeping branch for reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants