diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index cee3ea09968dc4..2d3e8d4c6e978e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -66,8 +66,8 @@ import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -77,7 +77,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { private static final String subscription = "reader-sub"; - @BeforeMethod + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); @@ -89,7 +89,7 @@ protected void setup() throws Exception { admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -198,21 +198,41 @@ public void testReadMessageWithBatching() throws Exception { testReadMessages(topic, true); } - @Test - public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { + @DataProvider + public static Object[][] seekBeforeHasMessageAvailable() { + return new Object[][] { { true }, { false } }; + } + + @Test(timeOut = 20000, dataProvider = "seekBeforeHasMessageAvailable") + public void testReadMessageWithBatchingWithMessageInclusive(boolean seekBeforeHasMessageAvailable) + throws Exception { String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive"; Set keys = publishMessages(topic, 10, true); Reader reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) .startMessageIdInclusive().readerName(subscription).create(); - while (reader.hasMessageAvailable()) { - Assert.assertTrue(keys.remove(reader.readNext().getKey())); + if (seekBeforeHasMessageAvailable) { + reader.seek(0L); // it should seek to the earliest } + + assertTrue(reader.hasMessageAvailable()); + final Message msg = reader.readNext(); + assertTrue(keys.remove(msg.getKey())); // start from latest with start message inclusive should only read the last message in batch assertEquals(keys.size(), 9); - Assert.assertFalse(keys.contains("key9")); - Assert.assertFalse(reader.hasMessageAvailable()); + + final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); + if (seekBeforeHasMessageAvailable) { + assertEquals(msgId.getBatchIndex(), 0); + assertFalse(keys.contains("key0")); + assertTrue(reader.hasMessageAvailable()); + } else { + assertEquals(msgId.getBatchIndex(), 9); + assertFalse(reader.hasMessageAvailable()); + assertFalse(keys.contains("key9")); + assertFalse(reader.hasMessageAvailable()); + } } private void testReadMessages(String topic, boolean enableBatch) throws Exception { @@ -310,7 +330,7 @@ public void testReadFromPartition() throws Exception { @Test public void testReaderWithTimeLong() throws Exception { String ns = "my-property/my-ns"; - String topic = "persistent://" + ns + "/testReadFromPartition"; + String topic = "persistent://" + ns + "/testReaderWithTimeLong"; RetentionPolicies retention = new RetentionPolicies(-1, -1); admin.namespaces().setRetention(ns, retention); @@ -840,4 +860,46 @@ public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBr producer.send("msg"); assertTrue(reader.hasMessageAvailable()); } + + @Test(dataProvider = "initializeLastMessageIdInBroker") + public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMessageIdInBroker) throws Exception { + final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek-timestamp"; + + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + final long timestampBeforeSend = System.currentTimeMillis(); + final MessageId sentMsgId = producer.send("msg"); + + final List messageIds = new ArrayList<>(); + messageIds.add(MessageId.earliest); + messageIds.add(sentMsgId); + messageIds.add(MessageId.latest); + + for (MessageId messageId : messageIds) { + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageId(messageId).create(); + if (initializeLastMessageIdInBroker) { + if (messageId == MessageId.earliest) { + assertTrue(reader.hasMessageAvailable()); + } else { + assertFalse(reader.hasMessageAvailable()); + } + } // else: lastMessageIdInBroker is earliest + reader.seek(System.currentTimeMillis()); + assertFalse(reader.hasMessageAvailable()); + } + + for (MessageId messageId : messageIds) { + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageId(messageId).create(); + if (initializeLastMessageIdInBroker) { + if (messageId == MessageId.earliest) { + assertTrue(reader.hasMessageAvailable()); + } else { + assertFalse(reader.hasMessageAvailable()); + } + } // else: lastMessageIdInBroker is earliest + reader.seek(timestampBeforeSend); + assertTrue(reader.hasMessageAvailable()); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6c2ded819a56fe..5a0e5de330d319 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -218,6 +218,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); + private volatile boolean hasSoughtByTimestamp = false; static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -2252,7 +2253,8 @@ public CompletableFuture seekAsync(Function function) { new PulsarClientException("Only support seek by messageId or timestamp")); } - private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) { + private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, + Long seekTimestamp, String seekBy) { AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() .setInitialTime(100, TimeUnit.MILLISECONDS) @@ -2269,11 +2271,11 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, return FutureUtil.failedFuture(new IllegalStateException(message)); } seekFuture = new CompletableFuture<>(); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs); + seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, opTimeoutMs); return seekFuture; } - private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, + private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, Long seekTimestamp, String seekBy, final Backoff backoff, final AtomicLong remainingTime) { ClientCnx cnx = cnx(); if (isConnected() && cnx != null) { @@ -2281,6 +2283,8 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S seekMessageId = (MessageIdAdv) seekId; log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); + final boolean originalHasSoughtByTimestamp = hasSoughtByTimestamp; + hasSoughtByTimestamp = (seekTimestamp != null); cnx.sendRequestWithId(seek, requestId).thenRun(() -> { log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy); acknowledgmentsGroupingTracker.flushAndClean(); @@ -2304,6 +2308,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S } }).exceptionally(e -> { seekMessageId = originSeekMessageId; + hasSoughtByTimestamp = originalHasSoughtByTimestamp; log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); failSeek( @@ -2326,7 +2331,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime); + seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } @@ -2343,7 +2348,7 @@ public CompletableFuture seekAsync(long timestamp) { String seekBy = String.format("the timestamp %d", timestamp); long requestId = client.newRequestId(); return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp), - MessageId.earliest, seekBy); + MessageId.earliest, timestamp, seekBy); } @Override @@ -2369,7 +2374,7 @@ public CompletableFuture seekAsync(MessageId messageId) { } seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr); } - return seekAsyncInternal(requestId, seek, messageId, seekBy); + return seekAsyncInternal(requestId, seek, messageId, null, seekBy); } public boolean hasMessageAvailable() throws PulsarClientException { @@ -2389,13 +2394,15 @@ public CompletableFuture hasMessageAvailableAsync() { // we haven't read yet. use startMessageId for comparison if (lastDequeuedMessageId == MessageId.earliest) { + // If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we + // have to get the mark-delete position from the GetLastMessageId response to compare as well. // if we are starting from latest, we should seek to the actual last message first. // allow the last one to be read when read head inclusively. - if (MessageId.latest.equals(startMessageId)) { - + final boolean hasSoughtByTimestamp = this.hasSoughtByTimestamp; + if (MessageId.latest.equals(startMessageId) || hasSoughtByTimestamp) { CompletableFuture future = internalGetLastMessageIdAsync(); // if the consumer is configured to read inclusive then we need to seek to the last message - if (resetIncludeHead) { + if (resetIncludeHead && !hasSoughtByTimestamp) { future = future.thenCompose((lastMessageIdResponse) -> seekAsync(lastMessageIdResponse.lastMessageId) .thenApply((ignore) -> lastMessageIdResponse));