-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Poll for messages using TaskExecutor
#178
Poll for messages using TaskExecutor
#178
Conversation
We currently sleep for `pollInterval` when no new messages have been polled from the cluster. This leads to unnecessary slowness of the client. Instead of doing that, we now break up the polling of messages into two distinct approaches: 1. Attempt to poll synchronously: if there a message is polled, we return it. If there is no message, we immediately go to step 2. 2. We create a `DispatchQueue` and run the `consumerPoll` on it using `withTaskExecutorPreference`. We make the `consumerPoll` call wait for up to `pollInterval` before bailing. This prevents us from sleeping on the running thread, and frees up cycles to do other work if required. Resolves swift-server#165
Whoops.
Can't use the current implementation before Swift 6.
9c4b7dd
to
a84f1c0
Compare
Haven't done any benchmarks just yet. Would like to do some, before we go about merging this. |
The benchmark looks better, but the benchmark suite says all differences are negative—and it's setting a positive difference as the threshold, so while this PR is better, the benchmark ends up failing?
|
Sources/Kafka/KafkaConsumer.swift
Outdated
@@ -12,6 +12,7 @@ | |||
// | |||
//===----------------------------------------------------------------------===// | |||
|
|||
import Dispatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need this import here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. Left over from a refactor. Is there a linter that can help point out unused imports?
#if swift(>=6.0) | ||
// Wait on a separate thread for the next message. | ||
return try await withTaskExecutorPreference(queue) { | ||
try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens after the time out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We attempt to retrieve a message for self.pollInterval
. If there's still no message, we return nil
—the same behavior as in the above if let
. I'd expect we get caught up in the while
-loop on line 100 until we do receive a message eventually.
public func enqueue(_ _job: consuming ExecutorJob) { | ||
let job = UnownedJob(_job) | ||
queue.async { | ||
job.runSynchronously( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ktoso Should we call this runSynchronously
or the one that also takes isolatedTo
?
Yes the benchmarks improved significantly! Negative numbers means fewer allocations. |
It's just funny that the benchmarking suite thinks the benchmark failed, because we are below the positive threshold with our negative numbers :) |
Also, should this PR—once approved—update the benchmark baseline as well? |
Yes. We intentionally fail the benchmarks when we improve so we set a new thresholds to make sure we don't regress again. |
There are different return codes so CI can choose how to handle that - for an expected improvement one would check in new baselines manually anyway - for an unexpected improvement something may be wrong with eg. The benchmark setup. But super nice improvements here 👍🏻 |
@hassila Oh, fair enough. I've not thought about the unexpected improvement situation. Thanks for bringing that up! |
let job = UnownedJob(_job) | ||
queue.async { | ||
job.runSynchronously( | ||
on: self.asUnownedTaskExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you're able to use the
public func runSynchronously(isolatedTo serialExecutor: UnownedSerialExecutor,
taskExecutor: UnownedTaskExecutor) {
here and pass the queue's UnownedSerialExecutor
that would be preferable as it would AFAIR be more correct in tracking where this is isolated to.
If this is a pain because the queue does not conform to SerialExecutor on some platforms still... then perhaps conditionalize it to platforms where it is? Or leave as is and let me know and we'll chase fixing the conformance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got you. Since we own the underlying queue here is owned by us nothing should be isolated to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, check the run func but either way I think this is looking good
We currently sleep for
pollInterval
when no new messages have been polled from the cluster. This leads to unnecessary slowness of the client. Instead of doing that, we now break up the polling of messages into two distinct approaches:DispatchQueue
and run theconsumerPoll
on it usingwithTaskExecutorPreference
. We make theconsumerPoll
call wait for up topollInterval
before bailing.This prevents us from sleeping on the running thread, and frees up cycles to do other work if required.
Resolves #165