diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 41bc6098e1a2f..d82f3d93f8f4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.Optional; +import java.util.SortedMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -30,8 +31,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.protocol.Commands; @@ -78,7 +81,9 @@ public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, messageTTLInSeconds); - + // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew + checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); + // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { try { long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); @@ -99,6 +104,34 @@ public boolean expireMessages(int messageTTLInSeconds) { return false; } } + private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) { + if (messageTTLInSeconds <= 0) { + return; + } + if (cursor instanceof ManagedCursorImpl managedCursor) { + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) managedCursor.getManagedLedger(); + Position deletedPosition = managedCursor.getMarkDeletedPosition(); + SortedMap ledgerInfoSortedMap = + managedLedger.getLedgersInfo().subMap(deletedPosition.getLedgerId(), true, + managedLedger.getLedgersInfo().lastKey(), true); + MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; + for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { + if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, + ledgerInfo.getTimestamp())) { + break; + } + info = ledgerInfo; + } + if (info != null && info.getLedgerId() > -1) { + PositionImpl position = PositionImpl.get(info.getLedgerId(), info.getEntries() - 1); + if (((PositionImpl) managedLedger.getLastConfirmedEntry()).compareTo(position) < 0) { + findEntryComplete(managedLedger.getLastConfirmedEntry(), null); + } else { + findEntryComplete(position, null); + } + } + } + } public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index e77fd07c6ef8b..d56968c8f8e8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -80,8 +80,11 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { public static byte[] createMessageWrittenToLedger(String msg) { + return createMessageWrittenToLedger(msg, System.currentTimeMillis()); + } + public static byte[] createMessageWrittenToLedger(String msg, long messageTimestamp) { MessageMetadata messageMetadata = new MessageMetadata() - .setPublishTime(System.currentTimeMillis()) + .setPublishTime(messageTimestamp) .setProducerName("createMessageWrittenToLedger") .setSequenceId(1); ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes()); @@ -428,6 +431,26 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { } + @Test + public void testIncorrectClientClock() throws Exception { + final String ledgerAndCursorName = "testIncorrectClientClock"; + int maxTTLSeconds = 1; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + // set client clock to 10 days later + long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10); + for (int i = 0; i < 10; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 10); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); + monitor.expireMessages(maxTTLSeconds); + assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + } + @Test void testMessageExpiryWithPosition() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";