Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] PIP-393: Improve performance of Negative Acknowledgement #23601

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Nov 15, 2024

PIP: 393
Implementation PR: #23600.

Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:

  • the memory occupation is high.
  • the code execution efficiency is low.
  • the redelivery time is not accurate.
  • multiple negative ack for messages in the same entry(batch) will interfere with each other.
    All of these problem is severe and need to be solved.

Modifications

Refactor the NegativeAcksTracker to solve the above problems.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc-not-needed Your PR changes do not impact docs labels Nov 15, 2024
@thetumbled
Copy link
Member Author

thetumbled commented Nov 15, 2024

List some experiment data:

    static long trimLowerBit(long timestamp, int bits) {
        return timestamp & (-1L << bits);
    }

    public static void main(String[] args) throws IOException {
        ConcurrentLongLongPairHashMap map1 = ConcurrentLongLongPairHashMap.newBuilder()
                .autoShrink(true)
                .concurrencyLevel(16)
                .build();
        // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
        // different timestamp, there will be multiple entries in the map
        // AVL Tree -> LongOpenHashMap -> Roaring64Bitmap
        // there are many timestamp, a few ledgerId, many entryId
        Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map2 = new Long2ObjectAVLTreeMap<>();
        
        long numMessages = 1000000, numLedgers=100;
        long numEntries = numMessages/numLedgers;
        long ledgerId, entryId, partitionIndex, timestamp=System.currentTimeMillis();
        for(long i=0; i<numLedgers; i++) {
            ledgerId = 10000+i;
            for(long j=0; j<numEntries; j++) {
                entryId = 10000+j;
                partitionIndex = 0;
                // 1ms per message
                timestamp++;
                map1.put(ledgerId, entryId, partitionIndex, timestamp);
                
                timestamp = trimLowerBit(timestamp, 8);
                if (map2.containsKey(timestamp)) {
                    Long2ObjectMap<Roaring64Bitmap> ledgerMap = map2.get(timestamp);
                    if (ledgerMap.containsKey(ledgerId)) {
                        ledgerMap.get(ledgerId).add(entryId);
                    } else {
                        Roaring64Bitmap entrySet = new Roaring64Bitmap();
                        entrySet.add(entryId);
                        ledgerMap.put(ledgerId, entrySet);
                    }
                } else {
                    Long2ObjectMap<Roaring64Bitmap> ledgerMap = new Long2ObjectOpenHashMap<>();
                    Roaring64Bitmap entrySet = new Roaring64Bitmap();
                    entrySet.add(entryId);
                    ledgerMap.put(ledgerId, entrySet);
                    map2.put(timestamp, ledgerMap);
                }
            }
        }
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

image

The memory occupation of the new data structure is down to 0.82MB, while ConcurrentLongLongPairHashMap need 64MB, HashMap need 178MB.

@thetumbled thetumbled added doc-required Your PR changes impact docs and you will update later. release/4.0.1 labels Nov 22, 2024
@github-actions github-actions bot removed the doc-required Your PR changes impact docs and you will update later. label Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs PIP release/4.0.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants