Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Avoid producer latency rise during internal ledger trimming operations #20649

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2451,15 +2451,15 @@ private void trimConsumedLedgersInBackground() {

@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.execute(() -> internalTrimConsumedLedgers(promise));
scheduledExecutor.execute(() -> internalTrimConsumedLedgers(promise));
}

public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture<?> promise) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both tasks trim ledgers and internalAsyncAddEntry will try to acquire the lock synchronized(ml), If the task trim ledgers running in the thread executor, it avoids lock contention.

Then if we make the task trim ledgers run in another thread, the publish task will also wait for the lock synchronized(ml). Maybe we need improve this scenario first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can resolve this busy thread problem first, because the synchronized symbol only affects one ManagedLedger object, but one bk main executor thread may handle a lot of trim tasks for different ManagedLedger objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not in two steps?
Thread pool isolation is necessary, trim should not run in the main thread pool
Next, let's optimize the time-consuming of trim

Copy link
Contributor

@poorbarcode poorbarcode Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@315157973

Now the tasks publish and trim ledgers all run in the thread executor( it is a single thread pool, and the tasks of same topic will be executed in the same thread[L-1] ), right?[Q-1]


if yes[Q-1], the task trim ledgers blocks the task publish, which this PR tries to fix, right?[Q-2]


if yes[Q-2], but both tasks trim ledgers[L-2] and internalAsyncAddEntry[L-3] will try to acquire the same lock synchronized(ml), so the task trim ledgers is still blocking the task publish even if this patch is merged, right?[Q-3]


[L-1]: https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L348
[L-2]: https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2608
[L-3]: https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L798

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is segmented and does not block each producer for a long time.
But if they are in the same thread pool, the thread pool will be occupied for a long time

Copy link
Contributor

@poorbarcode poorbarcode Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@315157973

This lock is segmented and does not block each producer for a long time.

Make sense, the task trim ledgers has two sub-tasks: Memory changes and Persist changes, and Memory changes would take the lock, Persist changes is an async task that will not take the lock.


But the sub-task Persist changes will not use the thread executor, right?[Q-1]


if yes[Q-1], this patch doesn't have much effect on this issue, right?[Q-2]

Copy link
Contributor Author

@315157973 315157973 Jul 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if yes[Q-1], this patch doesn't have much effect on this issue, right?[Q-2]

Assuming that one trim task increase 0.01ms, if there are tens of thousands of partitions, it will increase a lot.
Especially when there are many small packets sent to the cluster, this situation will get worse

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@315157973

Assuming that one trim task increase 0.01ms, if there are tens of thousands of partitions, it will increase a lot.
Especially when there are many small packets sent to the cluster, this situation will get worse

I agree with you. But I'm wondering if the current changes will solve the issue.
截屏2023-07-03 22 29 13

In the picture above, the logic of trim ledgers can be split into two sections: Verification(not in the lock block) and Execution(in the lock block), and there have some async tasks in Section Execution which will run in other threads. The three tasks execute as the flow below.

Without lock In lock In other threads
Verification Memory changes Persist changes

Since there is the lock synchronized(ml), we can only reduce the executor thread cost of the task Verification, which is very simple. It only tries to acquire the lock and is done if it fails.

(Highlight) However, this change also makes the scramble of the lock synchronized(ml) more frequent, which can worsen performance.

For example, the topic-a and topic-b were assigned the same thread: executor-1. And the task trim ledgers will be executed in the thread scheduled executor, these tasks will be executed as flow below

time executor-1 scheduled executor
1 publish of topic-a
2 publish of topic-b
3 publish of topic-b start trim ledgers of topic-a
4 publish of topic-b get the ML lock of topic-a
5 publish of topic-b
6 publish of topic-a
7 waiting the lock, all publish of topic-a and topic-b will be blocked util the task trim ledgers of topic-a is complete

So in step-4, if the task trim ledgers of topic-a is executed, the performance will be better; if the task trim ledgers of topic-a is executed, the performance will be worse. If more and more topics are assigned to the same thread, the performance will be better(maybe we can add a config to disable this feature, it is helpful for the users who have few topics).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users with fewer partitions submit fewer tasks to the thread pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can improve the performance if there are a lot of topics in the broker and there are few threads in the BK client thread pool.

Make sense to me.

executor.execute(() -> internalTrimLedgers(isTruncate, promise));
scheduledExecutor.execute(() -> internalTrimLedgers(isTruncate, promise));
}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
315157973 marked this conversation as resolved.
Show resolved Hide resolved
scheduledExecutor.schedule(() -> trimConsumedLedgersInBackground(isTruncate, promise),
scheduledExecutor.schedule(() -> internalTrimLedgers(isTruncate, promise),
100, TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -4235,9 +4235,9 @@ public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
futures.add(future);
}
CompletableFuture<Void> future = new CompletableFuture();
FutureUtil.waitForAll(futures).thenAccept(p -> {
FutureUtil.waitForAll(futures).thenAcceptAsync(p -> {
internalTrimLedgers(true, future);
}).exceptionally(e -> {
}, scheduledExecutor).exceptionally(e -> {
future.completeExceptionally(e);
return null;
});
Expand Down