Skip to content

Commit

Permalink
[improve][client] Add backoff for seek (apache#20963)
Browse files Browse the repository at this point in the history
(cherry picked from commit ee91edc)
(cherry picked from commit 9ea7f60)
  • Loading branch information
Technoboy- authored and mukesh-ctds committed Apr 17, 2024
1 parent 7325e46 commit 2b2b925
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2233,100 +2233,108 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or timestamp"));
}

private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
if (getState() == State.Closing || getState() == State.Closed) {
return Optional.of(FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when seeking the subscription %s of the"
+ " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
}

if (!isConnected()) {
return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
String.format("The client is not connected to the broker when seeking the subscription %s of the "
+ "topic %s to %s", subscription, topicName.toString(), seekBy))));
}
private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();

return Optional.empty();
CompletableFuture<Void> seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
return seekFuture;
}

private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
final Backoff backoff, final AtomicLong remainingTime,
CompletableFuture<Void> seekFuture) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return;
}
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return seekFuture;
}

MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();

lastDequeuedMessageId = MessageId.earliest;
lastDequeuedMessageId = MessageId.earliest;

clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
seekFuture.completeExceptionally(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not seek "
+ "withing configured timeout", subscription, topicName.toString())));
return;
}

seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
});
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
String seekBy = String.format("the message %s", messageId.toString());
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
final MessageIdAdv msgId = (MessageIdAdv) messageId;
final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
final ByteBuf seek;
if (msgId.getFirstChunkMessageId() != null) {
seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
firstChunkMsgId.getEntryId(), new long[0]);
long requestId = client.newRequestId();
final MessageIdAdv msgId = (MessageIdAdv) messageId;
final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
final ByteBuf seek;
if (msgId.getFirstChunkMessageId() != null) {
seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
firstChunkMsgId.getEntryId(), new long[0]);
} else {
final long[] ackSetArr;
if (MessageIdAdvUtils.isBatch(msgId)) {
final BitSetRecyclable ackSet = BitSetRecyclable.create();
ackSet.set(0, msgId.getBatchSize());
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
ackSetArr = ackSet.toLongArray();
ackSet.recycle();
} else {
final long[] ackSetArr;
if (MessageIdAdvUtils.isBatch(msgId)) {
final BitSetRecyclable ackSet = BitSetRecyclable.create();
ackSet.set(0, msgId.getBatchSize());
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
ackSetArr = ackSet.toLongArray();
ackSet.recycle();
} else {
ackSetArr = new long[0];
}
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
ackSetArr = new long[0];
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
}

public boolean hasMessageAvailable() throws PulsarClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
Expand All @@ -46,6 +48,7 @@
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -263,13 +266,21 @@ public void testTopicPriorityLevel() {
assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}

@Test(invocationTimeOut = 1000)
@Test
public void testSeekAsyncInternal() {
// given
ClientCnx cnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> clientReq = new CompletableFuture<>();
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq);

ScheduledExecutorProvider provider = mock(ScheduledExecutorProvider.class);
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
when(provider.getExecutor()).thenReturn(scheduledExecutorService);
when(consumer.getClient().getScheduledExecutorProvider()).thenReturn(provider);

CompletableFuture<Void> result = consumer.seekAsync(1L);
verify(scheduledExecutorService, atLeast(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));

consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);

Expand Down

0 comments on commit 2b2b925

Please sign in to comment.