From a0ac052e6e1a8a80794f8b8dc1a94764a67fff5a Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 17 Apr 2024 16:14:09 -0700 Subject: [PATCH 1/4] reduce stickyHash calculations ofr non-persistent topics --- .../pulsar/broker/service/Consumer.java | 21 +++++++++++++++---- ...tStickyKeyDispatcherMultipleConsumers.java | 21 +++++++++++++------ .../pulsar/common/protocol/Commands.java | 10 +++++++++ 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 9485931304b0c..fd5af3a158b0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -289,16 +289,29 @@ public Future sendMessages(final List entries, EntryBatch totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH); } + public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + EntryBatchIndexesAcks batchIndexesAcks, + int totalMessages, long totalBytes, long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, long epoch) { + return sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes, + totalChunkedMessages, redeliveryTracker, epoch); + } + /** * Dispatch a list of entries to the consumer.
* It is also responsible to release entries data and recycle entries object. * * @return a SendMessageInfo object that contains the detail of what was sent to consumer */ - public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + public Future sendMessages(final List entries, + final List stickyKeyHashes, + EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, - int totalMessages, long totalBytes, long totalChunkedMessages, - RedeliveryTracker redeliveryTracker, long epoch) { + int totalMessages, + long totalBytes, + long totalChunkedMessages, + RedeliveryTracker redeliveryTracker, + long epoch) { this.lastConsumedTimestamp = System.currentTimeMillis(); if (entries.isEmpty() || totalMessages == 0) { @@ -326,7 +339,7 @@ public Future sendMessages(final List entries, EntryBatch // because this consumer is possible to disconnect at this time. if (pendingAcks != null) { int batchSize = batchSizes.getBatchSize(i); - int stickyKeyHash = getStickyKeyHash(entry); + int stickyKeyHash = stickyKeyHashes == null ? getStickyKeyHash(entry) : stickyKeyHashes.get(i); long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); if (ackSet != null) { unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 2cad253f96ee2..17a0865f04e24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -139,28 +139,37 @@ public void sendMessages(List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); + final Map> consumerStickyKeyHashesMap = new HashMap<>(); for (Entry entry : entries) { - Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer())); + byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); + int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + + Consumer consumer = selector.select(stickyKeyHash); if (consumer != null) { - groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); + int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size())); + groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(entry); + consumerStickyKeyHashesMap + .computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(stickyKeyHash); } else { entry.release(); } } for (Map.Entry> entriesByConsumer : groupedEntries.entrySet()) { - Consumer consumer = entriesByConsumer.getKey(); - List entriesForConsumer = entriesByConsumer.getValue(); + final Consumer consumer = entriesByConsumer.getKey(); + final List entriesForConsumer = entriesByConsumer.getValue(); + final List stickyKeysForConsumer = consumerStickyKeyHashesMap.get(consumer); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size()); filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false, consumer); if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) { - consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(), + consumer.sendMessages(entriesForConsumer, stickyKeysForConsumer, batchSizes, + null, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), - getRedeliveryTracker()); + getRedeliveryTracker(), Commands.DEFAULT_CONSUMER_EPOCH); TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } else { entriesForConsumer.forEach(e -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 3982900041813..35eaa87aa44ea 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1937,6 +1937,15 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str int readerIdx = metadataAndPayload.readerIndex(); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); + return peekStickyKey(metadata, topic, subscription); + } catch (Throwable t) { + log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t); + return Commands.NONE_KEY; + } + } + + public static byte[] peekStickyKey(MessageMetadata metadata, String topic, String subscription) { + try { if (metadata.hasOrderingKey()) { return metadata.getOrderingKey(); } else if (metadata.hasPartitionKey()) { @@ -1951,6 +1960,7 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str return Commands.NONE_KEY; } + public static int getCurrentProtocolVersion() { return CURRENT_PROTOCOL_VERSION; } From d0254141dcf313b33b8997920bb5482b58fa53a6 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 17 Apr 2024 16:34:12 -0700 Subject: [PATCH 2/4] unnecessary change --- .../org/apache/pulsar/common/protocol/Commands.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 35eaa87aa44ea..3982900041813 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1937,15 +1937,6 @@ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, Str int readerIdx = metadataAndPayload.readerIndex(); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); - return peekStickyKey(metadata, topic, subscription); - } catch (Throwable t) { - log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t); - return Commands.NONE_KEY; - } - } - - public static byte[] peekStickyKey(MessageMetadata metadata, String topic, String subscription) { - try { if (metadata.hasOrderingKey()) { return metadata.getOrderingKey(); } else if (metadata.hasPartitionKey()) { @@ -1960,7 +1951,6 @@ public static byte[] peekStickyKey(MessageMetadata metadata, String topic, Strin return Commands.NONE_KEY; } - public static int getCurrentProtocolVersion() { return CURRENT_PROTOCOL_VERSION; } From 912c75556081ae1090a98d2432637134046ff35a Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 17 Apr 2024 17:04:31 -0700 Subject: [PATCH 3/4] plus fast thread local --- ...ersistentStickyKeyDispatcherMultipleConsumers.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 17a0865f04e24..fb7bd22de94a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -126,6 +126,14 @@ protected Map> initialValue() throws Exception { } }; + private static final FastThreadLocal>> localGroupedStickyKeyHashes = + new FastThreadLocal>>() { + @Override + protected Map> initialValue() throws Exception { + return new HashMap<>(); + } + }; + @Override public void sendMessages(List entries) { if (entries.isEmpty()) { @@ -139,7 +147,8 @@ public void sendMessages(List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); - final Map> consumerStickyKeyHashesMap = new HashMap<>(); + final Map> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get(); + consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); From 6ed72ea4992cad9a99a0e42da29207cbd0c1f338 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 17 Apr 2024 19:07:48 -0700 Subject: [PATCH 4/4] fixed test --- ...ersistentStickyKeyDispatcherMultipleConsumersTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java index b2638d53ab1c3..6b0f48a57cfe3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -128,15 +128,15 @@ public void testSendMessage() throws BrokerServiceException { assertEquals(byteBuf.toString(UTF_8), "message" + index); }; return mockPromise; - }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(), - anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + }).when(consumerMock).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), any(), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong()); try { nonpersistentDispatcher.sendMessages(entries); } catch (Exception e) { fail("Failed to sendMessages.", e); } - verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class), - eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + verify(consumerMock, times(1)).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), + eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong()); } @Test(timeOut = 10000)