From b8af7ea36c7036e6e6d9114ccf995f9ba5575aa9 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 19 Nov 2024 15:13:43 +0800 Subject: [PATCH 1/8] add code. --- .../InMemoryDelayedDeliveryTracker.java | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index bdc6e4c814e33..ba9fa1c788189 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -24,17 +24,24 @@ import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; +import org.roaringbitmap.longlong.Roaring64Bitmap; @Slf4j public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { - protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); + // timestamp -> ledgerId -> entryId + protected final Long2ObjectSortedMap> priorityQueue + = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -52,12 +59,16 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // Track whether we have seen all messages with fixed delay so far. private boolean messagesHaveFixedDelay = true; + // + private int timestampPrecisionBitCnt = 8; + InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) { this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); } public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, @@ -66,6 +77,35 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum long fixedDelayDetectionLookahead) { super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + } + + /** + * The tick time is used to determine the precision of the delivery time. As the redelivery time + * is not accurate, we can bucket the delivery time and group multiple message ids into the same + * bucket to reduce the memory usage. THe default value is 1 second, which means we accept 1 second + * deviation for the delivery time, so that we can trim the lower 9 bits of the delivery time, because + * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s. + * @param tickTimeMillis + * @return + */ + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt-1; + } + + /** + * trim the lower bits of the timestamp to reduce the memory usage. + * @param timestamp + * @param bits + * @return + */ + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); } @Override @@ -80,6 +120,19 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { deliverAt - clock.millis()); } + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + if (priorityQueue.containsKey(timestamp)) { + Long2ObjectMap ledgerMap = priorityQueue.get(timestamp); + Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + entryIds.add(entryId); + } else { + Roaring64Bitmap entryIds = new Roaring64Bitmap(); + entryIds.add(entryId); + Long2ObjectMap ledgerMap = new Long2ObjectAVLTreeMap<>(); + ledgerMap.put(ledgerId, entryIds); + priorityQueue.put(timestamp, ledgerMap); + } + priorityQueue.add(deliverAt, ledgerId, entryId); updateTimer(); From 932b84145ee2129e4fd1caf1720e48dec0861354 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 19 Nov 2024 15:59:00 +0800 Subject: [PATCH 2/8] add code. --- .../InMemoryDelayedDeliveryTracker.java | 75 ++++++++++++------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ba9fa1c788189..e9cee32ee253d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -27,7 +27,10 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; @@ -121,19 +124,9 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - if (priorityQueue.containsKey(timestamp)) { - Long2ObjectMap ledgerMap = priorityQueue.get(timestamp); - Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); - entryIds.add(entryId); - } else { - Roaring64Bitmap entryIds = new Roaring64Bitmap(); - entryIds.add(entryId); - Long2ObjectMap ledgerMap = new Long2ObjectAVLTreeMap<>(); - ledgerMap.put(ledgerId, entryIds); - priorityQueue.put(timestamp, ledgerMap); - } - - priorityQueue.add(deliverAt, ledgerId, entryId); + priorityQueue.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) + .add(entryId); updateTimer(); checkAndUpdateHighest(deliverAt); @@ -158,7 +151,7 @@ private void checkAndUpdateHighest(long deliverAt) { */ @Override public boolean hasMessageAvailable() { - boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime(); + boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); } @@ -175,17 +168,40 @@ public NavigableSet getScheduledMessages(int maxMessages) { long cutoffTime = getCutoffTime(); while (n > 0 && !priorityQueue.isEmpty()) { - long timestamp = priorityQueue.peekN1(); + long timestamp = priorityQueue.firstLongKey(); if (timestamp > cutoffTime) { break; } - long ledgerId = priorityQueue.peekN2(); - long entryId = priorityQueue.peekN3(); - positions.add(PositionFactory.create(ledgerId, entryId)); - - priorityQueue.pop(); - --n; + LongSet ledgerIdToDelete = new LongOpenHashSet(); + Long2ObjectMap ledgerMap = priorityQueue.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (entryIds.getLongCardinality() <= n) { + entryIds.forEach(entryId -> { + positions.add(PositionFactory.create(ledgerId, entryId)); + }); + n -= (int) entryIds.getLongCardinality(); + ledgerIdToDelete.add(ledgerId); + } else { + long[] entryIdsArray = entryIds.toArray(); + for (int i = 0; i < n; i++) { + positions.add(PositionFactory.create(ledgerId, entryIdsArray[i])); + entryIds.removeLong(entryIdsArray[i]); + } + n = 0; + } + if (n <= 0) { + break; + } + } + for (long ledgerId : ledgerIdToDelete) { + ledgerMap.remove(ledgerId); + } + if (ledgerMap.isEmpty()) { + priorityQueue.remove(timestamp); + } } if (log.isDebugEnabled()) { @@ -210,18 +226,27 @@ public CompletableFuture clear() { @Override public long getNumberOfDelayedMessages() { - return priorityQueue.size(); + return priorityQueue.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongCardinality).sum()).sum(); } + /** + * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate the memory usage of the buffer. + * The memory usage of the buffer is not accurate, because Roaring64Bitmap::getLongSizeInBytes will + * overestimate the memory usage of the buffer a lot. + * @return the memory usage of the buffer + */ @Override public long getBufferMemoryUsage() { - return priorityQueue.bytesCapacity(); + return priorityQueue.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); } @Override public void close() { super.close(); - priorityQueue.close(); } @Override @@ -234,6 +259,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead } protected long nextDeliveryTime() { - return priorityQueue.peekN1(); + return priorityQueue.firstLongKey(); } } From e8f6dc0a68ff737b1f7ff48a8d5f48ceef6e39ba Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 19 Nov 2024 16:21:47 +0800 Subject: [PATCH 3/8] add code. --- .../broker/delayed/InMemoryDelayedDeliveryTracker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index e9cee32ee253d..3219a1fd2f26e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -43,6 +43,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { // timestamp -> ledgerId -> entryId + // AVL tree -> OpenHashMap -> RoaringBitmap protected final Long2ObjectSortedMap> priorityQueue = new Long2ObjectAVLTreeMap<>(); @@ -62,8 +63,8 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // Track whether we have seen all messages with fixed delay so far. private boolean messagesHaveFixedDelay = true; - // - private int timestampPrecisionBitCnt = 8; + // The bit count to trim to reduce memory occupation. + private int timestampPrecisionBitCnt = 0; InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, From 6b59b5e8669d0d349c5a006672627ea7c14e3c57 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 19 Nov 2024 17:21:25 +0800 Subject: [PATCH 4/8] fix check. --- .../delayed/InMemoryDelayedDeliveryTracker.java | 15 ++++++--------- .../delayed/InMemoryDeliveryTrackerTest.java | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 3219a1fd2f26e..7beda66fd9089 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -20,23 +20,21 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timer; -import java.time.Clock; -import java.util.NavigableSet; -import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; - import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import it.unimi.dsi.fastutil.longs.LongSet; +import java.time.Clock; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; -import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.longlong.Roaring64Bitmap; @Slf4j @@ -44,8 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // timestamp -> ledgerId -> entryId // AVL tree -> OpenHashMap -> RoaringBitmap - protected final Long2ObjectSortedMap> priorityQueue - = new Long2ObjectAVLTreeMap<>(); + protected final Long2ObjectSortedMap> priorityQueue = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -99,7 +96,7 @@ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { tickTimeMillis >>= 1; bitCnt++; } - return bitCnt-1; + return bitCnt - 1; } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index ff7763927d888..2433086163d34 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -230,7 +230,7 @@ public void run(Timeout timeout) throws Exception { return; } try { - this.priorityQueue.peekN1(); + this.priorityQueue.firstLongKey(); } catch (Exception e) { e.printStackTrace(); exceptions[0] = e; From cb6dc4508f478785ac69e34e3ccb81113754c591 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 20 Nov 2024 10:40:23 +0800 Subject: [PATCH 5/8] add cdoe. --- .../broker/delayed/InMemoryDelayedDeliveryTracker.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 7beda66fd9089..81246844740b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -61,7 +61,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack private boolean messagesHaveFixedDelay = true; // The bit count to trim to reduce memory occupation. - private int timestampPrecisionBitCnt = 0; + private final int timestampPrecisionBitCnt; InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, @@ -69,7 +69,6 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack long fixedDelayDetectionLookahead) { this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); - this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); } public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, @@ -176,11 +175,12 @@ public NavigableSet getScheduledMessages(int maxMessages) { for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); - if (entryIds.getLongCardinality() <= n) { + int cardinality = (int) entryIds.getLongCardinality(); + if (cardinality <= n) { entryIds.forEach(entryId -> { positions.add(PositionFactory.create(ledgerId, entryId)); }); - n -= (int) entryIds.getLongCardinality(); + n -= cardinality; ledgerIdToDelete.add(ledgerId); } else { long[] entryIdsArray = entryIds.toArray(); From 07e3902fa3145bd52d6f0ba25d67f829a0c2886d Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 20 Nov 2024 11:41:05 +0800 Subject: [PATCH 6/8] fix. --- .../InMemoryDelayedDeliveryTracker.java | 26 +++++++++---------- .../delayed/InMemoryDeliveryTrackerTest.java | 6 ++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 81246844740b3..6b94ca31b69e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -42,7 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // timestamp -> ledgerId -> entryId // AVL tree -> OpenHashMap -> RoaringBitmap - protected final Long2ObjectSortedMap> priorityQueue = new Long2ObjectAVLTreeMap<>(); + protected final Long2ObjectSortedMap> delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -95,7 +95,7 @@ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { tickTimeMillis >>= 1; bitCnt++; } - return bitCnt - 1; + return bitCnt > 0 ? bitCnt - 1 : 0; } /** @@ -121,7 +121,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - priorityQueue.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) .add(entryId); updateTimer(); @@ -148,7 +148,7 @@ private void checkAndUpdateHighest(long deliverAt) { */ @Override public boolean hasMessageAvailable() { - boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.firstLongKey() <= getCutoffTime(); + boolean hasMessageAvailable = !delayedMessageMap.isEmpty() && delayedMessageMap.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); } @@ -164,14 +164,14 @@ public NavigableSet getScheduledMessages(int maxMessages) { NavigableSet positions = new TreeSet<>(); long cutoffTime = getCutoffTime(); - while (n > 0 && !priorityQueue.isEmpty()) { - long timestamp = priorityQueue.firstLongKey(); + while (n > 0 && !delayedMessageMap.isEmpty()) { + long timestamp = delayedMessageMap.firstLongKey(); if (timestamp > cutoffTime) { break; } LongSet ledgerIdToDelete = new LongOpenHashSet(); - Long2ObjectMap ledgerMap = priorityQueue.get(timestamp); + Long2ObjectMap ledgerMap = delayedMessageMap.get(timestamp); for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); @@ -198,7 +198,7 @@ public NavigableSet getScheduledMessages(int maxMessages) { ledgerMap.remove(ledgerId); } if (ledgerMap.isEmpty()) { - priorityQueue.remove(timestamp); + delayedMessageMap.remove(timestamp); } } @@ -206,7 +206,7 @@ public NavigableSet getScheduledMessages(int maxMessages) { log.debug("[{}] Get scheduled messages - found {}", dispatcher.getName(), positions.size()); } - if (priorityQueue.isEmpty()) { + if (delayedMessageMap.isEmpty()) { // Reset to initial state highestDeliveryTimeTracked = 0; messagesHaveFixedDelay = true; @@ -218,13 +218,13 @@ public NavigableSet getScheduledMessages(int maxMessages) { @Override public CompletableFuture clear() { - this.priorityQueue.clear(); + this.delayedMessageMap.clear(); return CompletableFuture.completedFuture(null); } @Override public long getNumberOfDelayedMessages() { - return priorityQueue.values().stream().mapToLong( + return delayedMessageMap.values().stream().mapToLong( ledgerMap -> ledgerMap.values().stream().mapToLong( Roaring64Bitmap::getLongCardinality).sum()).sum(); } @@ -237,7 +237,7 @@ public long getNumberOfDelayedMessages() { */ @Override public long getBufferMemoryUsage() { - return priorityQueue.values().stream().mapToLong( + return delayedMessageMap.values().stream().mapToLong( ledgerMap -> ledgerMap.values().stream().mapToLong( Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); } @@ -257,6 +257,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead } protected long nextDeliveryTime() { - return priorityQueue.firstLongKey(); + return delayedMessageMap.firstLongKey(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 2433086163d34..d19841017a21c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -92,7 +92,7 @@ public Object[][] provider(Method method) throws Exception { false, 0) }}; case "testAddMessageWithStrictDelay" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 0, clock, true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{ @@ -100,7 +100,7 @@ public Object[][] provider(Method method) throws Exception { true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100000, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" -> new Object[][]{{ @@ -230,7 +230,7 @@ public void run(Timeout timeout) throws Exception { return; } try { - this.priorityQueue.firstLongKey(); + this.delayedMessageMap.firstLongKey(); } catch (Exception e) { e.printStackTrace(); exceptions[0] = e; From 5f28e080b62b2e5482645ecffc1f316231b2b4af Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 20 Nov 2024 11:50:26 +0800 Subject: [PATCH 7/8] fix test. --- .../pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index d19841017a21c..dc6f623c82b57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -92,7 +92,7 @@ public Object[][] provider(Method method) throws Exception { false, 0) }}; case "testAddMessageWithStrictDelay" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 0, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, true, 0) }}; case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{ @@ -108,7 +108,7 @@ public Object[][] provider(Method method) throws Exception { true, 0) }}; case "testWithFixedDelays", "testWithMixedDelays","testWithNoDelays" -> new Object[][]{{ - new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock, + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, clock, true, 100) }}; default -> new Object[][]{{ From d06eea6026f7af1de0197486aa3ed824ae3a937c Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 20 Nov 2024 12:04:19 +0800 Subject: [PATCH 8/8] fix checkstyle. --- .../broker/delayed/InMemoryDelayedDeliveryTracker.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 6b94ca31b69e5..5796fcbd78550 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -42,7 +42,8 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack // timestamp -> ledgerId -> entryId // AVL tree -> OpenHashMap -> RoaringBitmap - protected final Long2ObjectSortedMap> delayedMessageMap = new Long2ObjectAVLTreeMap<>(); + protected final Long2ObjectSortedMap> + delayedMessageMap = new Long2ObjectAVLTreeMap<>(); // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in @@ -148,7 +149,8 @@ private void checkAndUpdateHighest(long deliverAt) { */ @Override public boolean hasMessageAvailable() { - boolean hasMessageAvailable = !delayedMessageMap.isEmpty() && delayedMessageMap.firstLongKey() <= getCutoffTime(); + boolean hasMessageAvailable = !delayedMessageMap.isEmpty() + && delayedMessageMap.firstLongKey() <= getCutoffTime(); if (!hasMessageAvailable) { updateTimer(); }