diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c1050340..2478a7a2538d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -121,8 +121,8 @@ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTL managedLedger.getLedgersInfo().lastKey(), true); MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { - if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, - ledgerInfo.getTimestamp())) { + if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L + || !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { break; } info = ledgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ace552a55a72a..6883c0467e481 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -46,7 +44,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -72,11 +73,10 @@ import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; - @Test(groups = "broker") public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -463,6 +463,45 @@ public void testIncorrectClientClock() throws Exception { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } + @Test + public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { + final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; + int maxTTLSeconds = 1; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(5); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + // set client clock to 10 days later + long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10); + for (int i = 0; i < 7; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + AsyncCallbacks.MarkDeleteCallback markDeleteCallback = + (AsyncCallbacks.MarkDeleteCallback) spy( + FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true)); + FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true); + + AtomicReference throwableAtomicReference = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class); + throwableAtomicReference.set(argument); + return invocation.callRealMethod(); + }).when(markDeleteCallback).markDeleteFailed(any(), any()); + + PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry(); + c1.markDelete(position); + Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); + monitor.expireMessages(maxTTLSeconds); + assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + + Assert.assertNull(throwableAtomicReference.get()); + } + @Test void testMessageExpiryWithPosition() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";