Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Reduce memory occupation of the delayed message queue #23611

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
Expand All @@ -29,12 +35,14 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.longlong.Roaring64Bitmap;

@Slf4j
public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
// timestamp -> ledgerId -> entryId
// AVL tree -> OpenHashMap -> RoaringBitmap
protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> priorityQueue = new Long2ObjectAVLTreeMap<>();

// If we detect that all messages have fixed delay time, such that the delivery is
// always going to be in FIFO order, then we can avoid pulling all the messages in
Expand All @@ -52,12 +60,16 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

// The bit count to trim to reduce memory occupation.
private int timestampPrecisionBitCnt = 0;
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis);
}

public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
Expand All @@ -66,6 +78,35 @@ public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsum
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis);
}

/**
* The tick time is used to determine the precision of the delivery time. As the redelivery time
* is not accurate, we can bucket the delivery time and group multiple message ids into the same
* bucket to reduce the memory usage. THe default value is 1 second, which means we accept 1 second
* deviation for the delivery time, so that we can trim the lower 9 bits of the delivery time, because
* 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
* @param tickTimeMillis
* @return
*/
private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
int bitCnt = 0;
while (tickTimeMillis > 0) {
tickTimeMillis >>= 1;
bitCnt++;
}
return bitCnt - 1;
}

/**
* trim the lower bits of the timestamp to reduce the memory usage.
* @param timestamp
* @param bits
* @return
*/
private static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

@Override
Expand All @@ -80,7 +121,10 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
deliverAt - clock.millis());
}

priorityQueue.add(deliverAt, ledgerId, entryId);
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
priorityQueue.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
updateTimer();

checkAndUpdateHighest(deliverAt);
Expand All @@ -105,7 +149,7 @@ private void checkAndUpdateHighest(long deliverAt) {
*/
@Override
public boolean hasMessageAvailable() {
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.firstLongKey() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
Expand All @@ -122,17 +166,40 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
long timestamp = priorityQueue.peekN1();
long timestamp = priorityQueue.firstLongKey();
if (timestamp > cutoffTime) {
break;
}

long ledgerId = priorityQueue.peekN2();
long entryId = priorityQueue.peekN3();
positions.add(PositionFactory.create(ledgerId, entryId));

priorityQueue.pop();
--n;
LongSet ledgerIdToDelete = new LongOpenHashSet();
Long2ObjectMap<Roaring64Bitmap> ledgerMap = priorityQueue.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
if (entryIds.getLongCardinality() <= n) {
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
entryIds.forEach(entryId -> {
positions.add(PositionFactory.create(ledgerId, entryId));
});
n -= (int) entryIds.getLongCardinality();
ledgerIdToDelete.add(ledgerId);
} else {
long[] entryIdsArray = entryIds.toArray();
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < n; i++) {
positions.add(PositionFactory.create(ledgerId, entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
n = 0;
}
if (n <= 0) {
break;
}
}
for (long ledgerId : ledgerIdToDelete) {
ledgerMap.remove(ledgerId);
}
if (ledgerMap.isEmpty()) {
priorityQueue.remove(timestamp);
}
}

if (log.isDebugEnabled()) {
Expand All @@ -157,18 +224,27 @@ public CompletableFuture<Void> clear() {

@Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
return priorityQueue.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

/**
* This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate the memory usage of the buffer.
* The memory usage of the buffer is not accurate, because Roaring64Bitmap::getLongSizeInBytes will
* overestimate the memory usage of the buffer a lot.
* @return the memory usage of the buffer
*/
@Override
public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
return priorityQueue.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
}
thetumbled marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void close() {
super.close();
priorityQueue.close();
}

@Override
Expand All @@ -181,6 +257,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
}

protected long nextDeliveryTime() {
return priorityQueue.peekN1();
return priorityQueue.firstLongKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void run(Timeout timeout) throws Exception {
return;
}
try {
this.priorityQueue.peekN1();
this.priorityQueue.firstLongKey();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
Expand Down
Loading