Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23600

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Nov 14, 2024

Implementation PR for PIP-393: #23601.

Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:

  • the memory occupation is high.
  • the code execution efficiency is low.
  • the redelivery time is not accurate.
  • multiple negative ack for messages in the same entry(batch) will interfere with each other.
    All of these problem is severe and need to be solved.

Memory occupation is high

After the improvement of #23582, we have reduce half more memory occupation
of NegativeAcksTracker by replacing HashMap with ConcurrentLongLongPairHashMap. With 100w entry, the memory occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation decrease from 1132Mb to 512Mb.
The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte.

But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h redelivery delay for each message,
the memory occupation of NegativeAcksTracker will be 3600*10000*53/1024/1024/1024=1.77GB, if the delay is 5h,
the required memory is 3600*10000*53/1024/1024/1024*5=8.88GB, which increase too fast.

Code execution efficiency is low

Currently, each time the timer task is triggered, it will iterate all the entries in NegativeAcksTracker.nackedMessages,
which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered.

Redelivery time is not accurate

Currently, the redelivery time is controlled by the timerIntervalNanos, which is 1/3 of the negativeAckRedeliveryDelay.
That means, if the negativeAckRedeliveryDelay is 1h, the check interval time will be 20min, which is unacceptable.

Multiple negative ack for messages in the same entry(batch) will interfere with each other

Currently, NegativeAcksTracker#nackedMessages map (ledgerId, entryId) to timestamp, which means multiple nacks from messages in the same batch share single one timestamp.
If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. msg1 will not be redelivered 10s later as the timestamp recorded in NegativeAcksTracker#nackedMessages is overrode by the second nack call.

we can reproduce this problem with test code below:

Consumer consumer = client.newConsumer()
                .topic("persistent://public/default/testNack")
                .subscriptionName("sub2")
                .subscriptionType(SubscriptionType.Shared)
                .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // fixed delay with 20s.
                .subscribe();
        // receive first message and nack it.
        Message msg = consumer.receive();
        MessageIdAdv batchMessageId = (MessageIdAdv) msg.getMessageId();
        int batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        consumer.negativeAcknowledge(msg);
        
        // receive the secode message and sleep for 10s, then nack it.
        msg = consumer.receive();
        batchMessageId = (MessageIdAdv) msg.getMessageId();
        batchIndex = batchMessageId.getBatchIndex();
        log.info("Message received, timestamp:{}, message id:{}, batch index:{}", getTime(), batchMessageId, batchIndex);
        Thread.sleep(10000);
        consumer.negativeAcknowledge(msg);

We expect the second message redelivered 10s later than the first message, as it call nack 10s later than the first one.
However, we will receive two messages together.
image

You can also reproduce this problem with the test code in this PR: org.apache.pulsar.client.impl.NegativeAcksTest#testNegativeAcksWithBatch

Modifications

Refactor the NegativeAcksTracker to solve the above problems.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#64

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

@thetumbled
Copy link
Member Author

I will implement a space efficient map structure for ConcurrentTripleLong2LongHashMap later, which for now use hashmap to ensure the logical correction.

@thetumbled Fastutil could be a better source of space efficient map data structures. I believe that there's a templating solution where it's possible to generate code for efficient implementations. In this case, is there a need for the data structure to be concurrent? Following a single writer principle could result in simpler and more performant designs. One way to address message passing from other threads to a single writer thread is to use message passing queues from JCTools which we already use in Pulsar. Just some food for thought.

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency.
Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.

@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

@thetumbled
Copy link
Member Author

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

If any solution from Fastutil prove to be more space efficient, i am glad to adopt it. Nack Tracker will consume enormous amount of memory, we have to choose the best one.

The main purpose of Fastutil is performance and space efficiency. Instead of adding very complex keys such as triple keys, there's a chance to use a datastructure that is hierarchical. That's the approach I took in the PendingAcksMap class:

private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>> pendingAcks;

There's a huge benefit when the keys are primitives and you can only achieve that with a hierarchical data structure.
In the long run, I'd rather get rid of our custom collection implementations instead of adding more of them.
It's not trivial to create bug free collections. Using existing libraries for that purpose is strongly preferred.
@thetumbled Have you considered a hierarchical data structure? (such as map of maps)

It is a good point, i will test it.

@thetumbled In certain cases when tracking existence (true/false), it's worth considering to use space efficient bit maps. In Pulsar, we use the RoaringBitmap library.
I think that it should be used for storing nacks.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

I think that it should be used for storing nacks.

I guess it's not applicable in this case.

@thetumbled I checked the NegativeAcksTracker class and it seems that the actual key is (ledgerId, entryId).
The partitionIndex and timestamp are part of the value.
partitionIndex doesn't have to be a long value.

It's easy to implement (ledgerId, entryId) as map of maps.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

This is a very poor solution in the current implementation:

nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});

There should be a separate datastructure (a list or queue) which contains the entries in timestamp order. The benefit of that is that iterating could stop after the timestamp condition no longer holds.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

@thetumbled
Copy link
Member Author

@thetumbled It looks like there's no need for a map data structure in the first place. That's completely unnecessary for implementing NegativeAcksTracker

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

@lhotari
Copy link
Member

lhotari commented Nov 14, 2024

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html.
For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html
or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

@thetumbled thetumbled changed the title [fix][client] fix multiple nack from messages in the same batch interfere each other. [improve][client] PIP-393: Improve performance of Negative Acknowledgement Nov 15, 2024
@thetumbled
Copy link
Member Author

You are right. We need to improve the code execution efficiency too. some kind of structures sorted by timestamp.

Fastutil contains multiple PriorityQueue implementations: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/PriorityQueue.html. For example, this would work: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectArrayPriorityQueue.html or this one: https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/ObjectHeapPriorityQueue.html

I propose a pip to fix several issues with nack tracker, with a new data structure :

Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>();

This PR become the implementation PR for PIP-393: #23601.
I will implement PIP-393 soon.

Comment on lines +210 to +218
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be necessary to update pulsar-client-shaded and pulsar-client-all shading configuration to shade these libraries since they haven't been used on the client by now.

Avoiding client jar growth is something that we would need to address when adding new dependencies.

I realized that including the fastuti library will increase the client jar file size significantly. One detail about fastutil is that there's a smaller library alternative fastutil-core which includes a subset of the classes. fastutil is about 23MB and fastutil-core is about 6MB.

Picking only the classes that are needed from fastutil is possible with minimizeJar with maven's shade plugin, but since minimizeJar is a global setting, it would require building a minimized version of fastutil for the client usage to pick only the classes are needed since applying minimizeJar to the complete Pulsar client jar would be a very large change which could break things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be necessary to update pulsar-client-shaded and pulsar-client-all shading configuration to shade these libraries since they haven't been used on the client by now.

I am not familiar with the shade part, hope this commit modify correctly.
28251b2

I realized that including the fastuti library will increase the client jar file size significantly. One detail about fastutil is that there's a smaller library alternative fastutil-core which includes a subset of the classes. fastutil is about 23MB and fastutil-core is about 6MB.
Picking only the classes that are needed from fastutil is possible with minimizeJar with maven's shade plugin, but since minimizeJar is a global setting, it would require building a minimized version of fastutil for the client usage to pick only the classes are needed since applying minimizeJar to the complete Pulsar client jar would be a very large change which could break things.

I have update the dependency to fastutil-core.
As for minimizeJar, it looks like we can minimize the size of jar to the minimum, but i think that we may improve pulsar with fastutil many places else, so some other data structures can be helpful too.

Copy link
Member

@lhotari lhotari Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment cannot be marked resolved before this is really addressed.

Due to the large size of both fastutil (23MB) and fastutil-core (6MB), It is necessary to add a new module that minimizes fastutil for the use of the shaded Pulsar client (in published pulsar-client and pulsar-client-all modules). The minimized fastutil module shouldn't be published to maven central at all, but it is necessary to make it a separate module since it's not possible to selectively minimize modules with the maven-shade-plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a later step that we minimize the fastutil, right?
Currently, we need to fix the shade problem, is it not enough with following code?

<include>it.unimi.dsi:*</include>

I will appreciate it if you can help to shade the fastutil dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you should use the fastutil-core instead of fastutil, and then we can make a new PR to reduce the fastutil-core size.

And then use <include>it.unimi.dsi:fastutil-core</include>.

pom.xml Outdated Show resolved Hide resolved
pulsar-client/pom.xml Outdated Show resolved Hide resolved
@thetumbled
Copy link
Member Author

There are some issues with the licence.

Run src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz
it.unimi.dsi-fastutil-core-8.5.14.jar unaccounted for in LICENSE

It looks like there are issues with the LICENSE/NOTICE.

Should we replace fastutil-core with fastutil? @lhotari

@lhotari
Copy link
Member

lhotari commented Nov 18, 2024

Should we replace fastutil-core with fastutil?

@thetumbled That's fine to complete the experiment. For a proper solution there will need to be a way to avoid the shaded jar files growing significantly. The proper solution would require a separate module for fastutil which minimizes the number of classes. I tried to explain that in my previous comment, #23600 (comment) . Minimizing all classes would most likely be a breaking change. Since you cannot specify minimizeJar for a specific dependency in maven-shade-plugin configuration, it's necessary to have a separate intermediate module for minimizing just fastutil.

@thetumbled
Copy link
Member Author

thetumbled commented Nov 18, 2024

Run src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz
it.unimi.dsi-fastutil-core-8.5.14.jar unaccounted for in LICENSE

I wonder whether it is the problem with the it.unimi.dsi-fastutil-core-8.5.14.jar, or i need to configure something to fix this?

@lhotari
Copy link
Member

lhotari commented Nov 18, 2024

I wonder whether it is the problem with the it.unimi.dsi-fastutil-core-8.5.14.jar, or i need to configure something to fix this?

@thetumbled When new dependencies are added, it's necessary to add them to LICENSE file(s). However, there's a bit of a problem in using fastutil-core together with fastutil since there would be duplication in the classpath. That's why it's better to simply use the fastutil dependency and add the minimizing solution in a later step.

@thetumbled
Copy link
Member Author

I wonder whether it is the problem with the it.unimi.dsi-fastutil-core-8.5.14.jar, or i need to configure something to fix this?

@thetumbled When new dependencies are added, it's necessary to add them to LICENSE file(s). However, there's a bit of a problem in using fastutil-core together with fastutil since there would be duplication in the classpath. That's why it's better to simply use the fastutil dependency and add the minimizing solution in a later step.

get it, i change it back to fastutil for now.

@thetumbled
Copy link
Member Author

The implementation is done, please help to review this PIP, thanks. @lhotari @BewareMyPower @nodece @poorbarcode @dao-jun

pulsar-client-all/pom.xml Outdated Show resolved Hide resolved
@@ -168,6 +168,7 @@
<include>org.tukaani:xz</include>
<!-- Issue #6834, Since Netty ByteBuf shaded, we need also shade this module -->
<include>org.apache.pulsar:pulsar-client-messagecrypto-bc</include>
<include>it.unimi.dsi:*</include>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the correct solution for shading fastutil. Please see https://github.com/apache/pulsar/pull/23600/files#r1848136065

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have update the shade conf, please review again, thanks.

+ "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be"
+ "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms."
)
private int negativeAckPrecisionBitCnt = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider 0 as the default value? Users may wish to submit later, which is consistent with previous behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to configure it as the best the confguration value. Without this enhancement, it is unrealistic to use negative acknowledgement in production. The memory occupation will inflate very fast.
But i will listen to the voice from the community, if more and more people think that disabling this feature by default is better, i will update it.

}

long backoffNs;
long backoffMs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use nanoseconds? Then you need to use System.nanoTime() instead of System.currentTimeMillis(), the System.nanoTime() is quickly based on JVM.

Copy link
Member Author

@thetumbled thetumbled Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Nano is not convenient for calculating, as we need to estimate the accuracy of timestamp.
    For example, with negativeAckPrecisionBitCnt = 10, we know that the redelivery time may be earlier at most 2^10=1024ms~=1s. We just trim the lower 8 bit to bucket the time.
    But with nano(1ms=1000000ns). we can't get a suitable conf value easily.
  • It is unnecessary to use timestamp in ns unit. As the tick time of the timer is 1ms, and the latency of message delivery is at ms level.

Comment on lines +210 to +218
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you should use the fastutil-core instead of fastutil, and then we can make a new PR to reduce the fastutil-core size.

And then use <include>it.unimi.dsi:fastutil-core</include>.

@thetumbled thetumbled added PIP release/4.0.1 doc-required Your PR changes impact docs and you will update later. labels Nov 22, 2024
@github-actions github-actions bot removed the doc-required Your PR changes impact docs and you will update later. label Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs PIP release/4.0.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants