diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 263fc4dfb85d5..3e5529dedbc86 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2233,100 +2233,108 @@ public CompletableFuture seekAsync(Function function) { new PulsarClientException("Only support seek by messageId or timestamp")); } - private Optional> seekAsyncCheckState(String seekBy) { - if (getState() == State.Closing || getState() == State.Closed) { - return Optional.of(FutureUtil - .failedFuture(new PulsarClientException.AlreadyClosedException( - String.format("The consumer %s was already closed when seeking the subscription %s of the" - + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy)))); - } - - if (!isConnected()) { - return Optional.of(FutureUtil.failedFuture(new PulsarClientException( - String.format("The client is not connected to the broker when seeking the subscription %s of the " - + "topic %s to %s", subscription, topicName.toString(), seekBy)))); - } + private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) { + AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(); - return Optional.empty(); + CompletableFuture seekFuture = new CompletableFuture<>(); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture); + return seekFuture; } - private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) { - final CompletableFuture seekFuture = new CompletableFuture<>(); + private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, + final Backoff backoff, final AtomicLong remainingTime, + CompletableFuture seekFuture) { ClientCnx cnx = cnx(); + if (isConnected() && cnx != null) { + if (!duringSeek.compareAndSet(false, true)) { + final String message = String.format( + "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", + topic, subscription, seekBy); + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + seekFuture.completeExceptionally(new IllegalStateException(message)); + return; + } + MessageIdAdv originSeekMessageId = seekMessageId; + seekMessageId = (MessageIdAdv) seekId; + log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); - if (!duringSeek.compareAndSet(false, true)) { - final String message = String.format( - "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", - topic, subscription, seekBy); - log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", - topic, subscription, seekBy); - seekFuture.completeExceptionally(new IllegalStateException(message)); - return seekFuture; - } - - MessageIdAdv originSeekMessageId = seekMessageId; - seekMessageId = (MessageIdAdv) seekId; - log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); - - cnx.sendRequestWithId(seek, requestId).thenRun(() -> { - log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy); - acknowledgmentsGroupingTracker.flushAndClean(); + cnx.sendRequestWithId(seek, requestId).thenRun(() -> { + log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy); + acknowledgmentsGroupingTracker.flushAndClean(); - lastDequeuedMessageId = MessageId.earliest; + lastDequeuedMessageId = MessageId.earliest; - clearIncomingMessages(); - seekFuture.complete(null); - }).exceptionally(e -> { - // re-set duringSeek and seekMessageId if seek failed - seekMessageId = originSeekMessageId; - duringSeek.set(false); - log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); + clearIncomingMessages(); + seekFuture.complete(null); + }).exceptionally(e -> { + // re-set duringSeek and seekMessageId if seek failed + seekMessageId = originSeekMessageId; + duringSeek.set(false); + log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); + + seekFuture.completeExceptionally( + PulsarClientException.wrap(e.getCause(), + String.format("Failed to seek the subscription %s of the topic %s to %s", + subscription, topicName.toString(), seekBy))); + return null; + }); + } else { + long nextDelay = Math.min(backoff.next(), remainingTime.get()); + if (nextDelay <= 0) { + seekFuture.completeExceptionally( + new PulsarClientException.TimeoutException( + String.format("The subscription %s of the topic %s could not seek " + + "withing configured timeout", subscription, topicName.toString()))); + return; + } - seekFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to seek the subscription %s of the topic %s to %s", - subscription, topicName.toString(), seekBy))); - return null; - }); - return seekFuture; + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> { + log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", + topic, getHandlerName(), nextDelay); + remainingTime.addAndGet(-nextDelay); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture); + }, nextDelay, TimeUnit.MILLISECONDS); + } } @Override public CompletableFuture seekAsync(long timestamp) { String seekBy = String.format("the timestamp %d", timestamp); - return seekAsyncCheckState(seekBy).orElseGet(() -> { - long requestId = client.newRequestId(); - return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp), + long requestId = client.newRequestId(); + return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp), MessageId.earliest, seekBy); - }); } @Override public CompletableFuture seekAsync(MessageId messageId) { String seekBy = String.format("the message %s", messageId.toString()); - return seekAsyncCheckState(seekBy).orElseGet(() -> { - long requestId = client.newRequestId(); - final MessageIdAdv msgId = (MessageIdAdv) messageId; - final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId(); - final ByteBuf seek; - if (msgId.getFirstChunkMessageId() != null) { - seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(), - firstChunkMsgId.getEntryId(), new long[0]); + long requestId = client.newRequestId(); + final MessageIdAdv msgId = (MessageIdAdv) messageId; + final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId(); + final ByteBuf seek; + if (msgId.getFirstChunkMessageId() != null) { + seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(), + firstChunkMsgId.getEntryId(), new long[0]); + } else { + final long[] ackSetArr; + if (MessageIdAdvUtils.isBatch(msgId)) { + final BitSetRecyclable ackSet = BitSetRecyclable.create(); + ackSet.set(0, msgId.getBatchSize()); + ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0)); + ackSetArr = ackSet.toLongArray(); + ackSet.recycle(); } else { - final long[] ackSetArr; - if (MessageIdAdvUtils.isBatch(msgId)) { - final BitSetRecyclable ackSet = BitSetRecyclable.create(); - ackSet.set(0, msgId.getBatchSize()); - ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0)); - ackSetArr = ackSet.toLongArray(); - ackSet.recycle(); - } else { - ackSetArr = new long[0]; - } - seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr); + ackSetArr = new long[0]; } - return seekAsyncInternal(requestId, seek, messageId, seekBy); - }); + seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr); + } + return seekAsyncInternal(requestId, seek, messageId, seekBy); } public boolean hasMessageAvailable() throws PulsarClientException { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 5a223d5da15c0..070919c57a420 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -46,6 +48,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -263,13 +266,21 @@ public void testTopicPriorityLevel() { assertThat(consumer.getPriorityLevel()).isEqualTo(1); } - @Test(invocationTimeOut = 1000) + @Test public void testSeekAsyncInternal() { // given ClientCnx cnx = mock(ClientCnx.class); CompletableFuture clientReq = new CompletableFuture<>(); when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq); + ScheduledExecutorProvider provider = mock(ScheduledExecutorProvider.class); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + when(provider.getExecutor()).thenReturn(scheduledExecutorService); + when(consumer.getClient().getScheduledExecutorProvider()).thenReturn(provider); + + CompletableFuture result = consumer.seekAsync(1L); + verify(scheduledExecutorService, atLeast(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + consumer.setClientCnx(cnx); consumer.setState(HandlerState.State.Ready);