diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 4e4dc8273d3f0..cee3ea09968dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -68,6 +68,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -813,4 +814,30 @@ public void testReaderReconnectedFromNextEntry() throws Exception { producer.close(); admin.topics().delete(topic, false); } + + @DataProvider + public static Object[][] initializeLastMessageIdInBroker() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "initializeLastMessageIdInBroker") + public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception { + final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek"; + @Cleanup Reader reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageId(MessageId.earliest).create(); + + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + producer.send("msg"); + + if (initializeLastMessageIdInBroker) { + assertTrue(reader.hasMessageAvailable()); + } // else: lastMessageIdInBroker is earliest + + reader.seek(MessageId.latest); + // lastMessageIdInBroker is the last message ID, while startMessageId is still earliest + assertFalse(reader.hasMessageAvailable()); + + producer.send("msg"); + assertTrue(reader.hasMessageAvailable()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5619837757363..c09e0afe58d4b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -167,7 +167,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile MessageIdAdv startMessageId; private volatile MessageIdAdv seekMessageId; - private final AtomicBoolean duringSeek; + @VisibleForTesting + final AtomicReference seekStatus; + private volatile CompletableFuture seekFuture; private final MessageIdAdv initialStartMessageId; @@ -304,7 +306,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat stats = ConsumerStatsDisabled.INSTANCE; } - duringSeek = new AtomicBoolean(false); + seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED); // Create msgCrypto if not created already if (conf.getCryptoKeyReader() != null) { @@ -781,7 +783,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); - clearReceiverQueue(); + clearReceiverQueue(false); return CompletableFuture.completedFuture(null); } @@ -789,7 +791,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { topic, subscription, cnx.ctx().channel(), consumerId); long requestId = client.newRequestId(); - if (duringSeek.get()) { + if (seekStatus.get() != SeekStatus.NOT_STARTED) { acknowledgmentsGroupingTracker.flushAndClean(); } @@ -800,7 +802,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { int currentSize; synchronized (this) { currentSize = incomingMessages.size(); - startMessageId = clearReceiverQueue(); + setClientCnx(cnx); + clearReceiverQueue(true); if (possibleSendToDeadLetterTopicMessages != null) { possibleSendToDeadLetterTopicMessages.clear(); } @@ -838,7 +841,6 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them final CompletableFuture future = new CompletableFuture<>(); synchronized (this) { - setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), @@ -943,15 +945,24 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application. */ - private MessageIdAdv clearReceiverQueue() { + private void clearReceiverQueue(boolean updateStartMessageId) { List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); resetIncomingMessageSize(); - if (duringSeek.compareAndSet(true, false)) { - return seekMessageId; + CompletableFuture seekFuture = this.seekFuture; + MessageIdAdv seekMessageId = this.seekMessageId; + + if (seekStatus.get() != SeekStatus.NOT_STARTED) { + if (updateStartMessageId) { + startMessageId = seekMessageId; + } + if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) { + internalPinnedExecutor.execute(() -> seekFuture.complete(null)); + } + return; } else if (subscriptionMode == SubscriptionMode.Durable) { - return startMessageId; + return; } if (!currentMessageQueue.isEmpty()) { @@ -968,15 +979,14 @@ private MessageIdAdv clearReceiverQueue() { } // release messages if they are pooled messages currentMessageQueue.forEach(Message::release); - return previousMessage; - } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { + if (updateStartMessageId) { + startMessageId = previousMessage; + } + } else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); - } else { - // No message was received or dequeued by this consumer. Next message would still be the startMessageId - return startMessageId; - } + startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); + } // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId } /** @@ -2249,25 +2259,23 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); - CompletableFuture seekFuture = new CompletableFuture<>(); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture); + if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) { + final String message = String.format( + "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", + topic, subscription, seekBy); + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + return FutureUtil.failedFuture(new IllegalStateException(message)); + } + seekFuture = new CompletableFuture<>(); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs); return seekFuture; } private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, - final Backoff backoff, final AtomicLong remainingTime, - CompletableFuture seekFuture) { + final Backoff backoff, final AtomicLong remainingTime) { ClientCnx cnx = cnx(); if (isConnected() && cnx != null) { - if (!duringSeek.compareAndSet(false, true)) { - final String message = String.format( - "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", - topic, subscription, seekBy); - log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", - topic, subscription, seekBy); - seekFuture.completeExceptionally(new IllegalStateException(message)); - return; - } MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); @@ -2279,14 +2287,25 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S lastDequeuedMessageId = MessageId.earliest; clearIncomingMessages(); - seekFuture.complete(null); + CompletableFuture future = null; + synchronized (this) { + if (!hasParentConsumer && cnx() == null) { + // It's during reconnection, complete the seek future after connection is established + seekStatus.set(SeekStatus.COMPLETED); + } else { + future = seekFuture; + startMessageId = seekMessageId; + seekStatus.set(SeekStatus.NOT_STARTED); + } + } + if (future != null) { + future.complete(null); + } }).exceptionally(e -> { - // re-set duringSeek and seekMessageId if seek failed seekMessageId = originSeekMessageId; - duringSeek.set(false); log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); - seekFuture.completeExceptionally( + failSeek( PulsarClientException.wrap(e.getCause(), String.format("Failed to seek the subscription %s of the topic %s to %s", subscription, topicName.toString(), seekBy))); @@ -2295,7 +2314,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S } else { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { - seekFuture.completeExceptionally( + failSeek( new PulsarClientException.TimeoutException( String.format("The subscription %s of the topic %s could not seek " + "withing configured timeout", subscription, topicName.toString()))); @@ -2306,11 +2325,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } + private void failSeek(Throwable throwable) { + CompletableFuture seekFuture = this.seekFuture; + if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) { + seekFuture.completeExceptionally(throwable); + } + } + @Override public CompletableFuture seekAsync(long timestamp) { String seekBy = String.format("the timestamp %d", timestamp); @@ -2968,4 +2994,10 @@ boolean isAckReceiptEnabled() { private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); + @VisibleForTesting + enum SeekStatus { + NOT_STARTED, + IN_PROGRESS, + COMPLETED + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 070919c57a420..9995246c175e1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -283,6 +283,7 @@ public void testSeekAsyncInternal() { consumer.setClientCnx(cnx); consumer.setState(HandlerState.State.Ready); + consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED); // when CompletableFuture firstResult = consumer.seekAsync(1L); @@ -290,7 +291,6 @@ public void testSeekAsyncInternal() { clientReq.complete(null); - // then assertTrue(firstResult.isDone()); assertTrue(secondResult.isCompletedExceptionally()); verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());