diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 82f9d6efc3985..45ad359862437 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -18,25 +18,20 @@ */ package org.apache.pulsar.broker.service.persistent; -import java.io.IOException; -import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.SortedMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; -import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -86,10 +81,12 @@ public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, messageTTLInSeconds); - Map ledgerIdToInfoCache = new ConcurrentHashMap<>(); + if (checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds)) { + return true; + } cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { try { - long entryTimestamp = getEntryTimestamp(entry, ledgerIdToInfoCache, cursor); + long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); @@ -108,31 +105,25 @@ public boolean expireMessages(int messageTTLInSeconds) { } } - private long getEntryTimestamp(Entry entry, - Map ledgerIdToInfoCache, - ManagedCursor cursor) throws IOException { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - ManagedLedger managedLedger = cursor.getManagedLedger(); - ManagedLedgerConfig config = managedLedger.getConfig(); - MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgerIdToInfoCache.computeIfAbsent( - entry.getLedgerId(), (ledgerId) -> { - try { - return managedLedger.getLedgerInfo(ledgerId) - .get(config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS); - } catch (Exception e) { - return null; - } - }); - if (ledgerInfo == null) { - return entryTimestamp; - } - long maxAvailableTimestamp = ledgerInfo.getTimestamp() + config.getMaximumRolloverTimeMs(); - if (entryTimestamp > maxAvailableTimestamp) { - log.debug("Timestamp {} from {} {}:{} is incorrect, ledger is created at {}, please check client's clock", - entry.getLedgerId(), topicName, entry.getEntryId(), entryTimestamp, ledgerInfo.getTimestamp()); - return maxAvailableTimestamp; + private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) { + if (cursor instanceof ManagedCursorImpl managedCursor) { + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) managedCursor.getManagedLedger(); + Position deletedPosition = managedCursor.getMarkDeletedPosition(); + SortedMap ledgerInfoSortedMap = + managedLedger.getLedgersInfo() + .subMap(deletedPosition.getLedgerId(), managedLedger.getLedgersInfo().lastKey()); + long expiryLedgerId = -1; + for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { + if (!MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { + break; + } + expiryLedgerId = ledgerInfo.getLedgerId(); + } + if (expiryLedgerId > -1) { + findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId), 0), null); + } } - return entryTimestamp; + return false; } @Override