Skip to content

Commit

Permalink
[fix][client] fix Reader.hasMessageAvailable might return true after …
Browse files Browse the repository at this point in the history
…seeking to latest (apache#22201)

(cherry picked from commit 95a53f3)
(cherry picked from commit 318ff33)
  • Loading branch information
BewareMyPower authored and mukesh-ctds committed Apr 17, 2024
1 parent 71ce79a commit 986e1dd
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -761,4 +762,30 @@ public void testReaderReconnectedFromNextEntry() throws Exception {
producer.close();
admin.topics().delete(topic, false);
}

@DataProvider
public static Object[][] initializeLastMessageIdInBroker() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "initializeLastMessageIdInBroker")
public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception {
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek";
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(MessageId.earliest).create();

@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
producer.send("msg");

if (initializeLastMessageIdInBroker) {
assertTrue(reader.hasMessageAvailable());
} // else: lastMessageIdInBroker is earliest

reader.seek(MessageId.latest);
// lastMessageIdInBroker is the last message ID, while startMessageId is still earliest
assertFalse(reader.hasMessageAvailable());

producer.send("msg");
assertTrue(reader.hasMessageAvailable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private volatile MessageIdAdv startMessageId;

private volatile MessageIdAdv seekMessageId;
private final AtomicBoolean duringSeek;
@VisibleForTesting
final AtomicReference<SeekStatus> seekStatus;
private volatile CompletableFuture<Void> seekFuture;

private final MessageIdAdv initialStartMessageId;

Expand Down Expand Up @@ -303,7 +305,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
stats = ConsumerStatsDisabled.INSTANCE;
}

duringSeek = new AtomicBoolean(false);
seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);

// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
Expand Down Expand Up @@ -779,15 +781,15 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
clearReceiverQueue(false);
return CompletableFuture.completedFuture(null);
}

log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
topic, subscription, cnx.ctx().channel(), consumerId);

long requestId = client.newRequestId();
if (duringSeek.get()) {
if (seekStatus.get() != SeekStatus.NOT_STARTED) {
acknowledgmentsGroupingTracker.flushAndClean();
}

Expand All @@ -798,7 +800,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
int currentSize;
synchronized (this) {
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
setClientCnx(cnx);
clearReceiverQueue(true);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
Expand Down Expand Up @@ -836,7 +839,6 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
Expand Down Expand Up @@ -941,15 +943,24 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application.
*/
private MessageIdAdv clearReceiverQueue() {
private void clearReceiverQueue(boolean updateStartMessageId) {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
resetIncomingMessageSize();

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
CompletableFuture<Void> seekFuture = this.seekFuture;
MessageIdAdv seekMessageId = this.seekMessageId;

if (seekStatus.get() != SeekStatus.NOT_STARTED) {
if (updateStartMessageId) {
startMessageId = seekMessageId;
}
if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) {
internalPinnedExecutor.execute(() -> seekFuture.complete(null));
}
return;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return startMessageId;
return;
}

if (!currentMessageQueue.isEmpty()) {
Expand All @@ -966,15 +977,14 @@ private MessageIdAdv clearReceiverQueue() {
}
// release messages if they are pooled messages
currentMessageQueue.forEach(Message::release);
return previousMessage;
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
if (updateStartMessageId) {
startMessageId = previousMessage;
}
} else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return startMessageId;
}
startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId
}

/**
Expand Down Expand Up @@ -2241,25 +2251,23 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();

CompletableFuture<Void> seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
return seekFuture;
}

private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
final Backoff backoff, final AtomicLong remainingTime,
CompletableFuture<Void> seekFuture) {
final Backoff backoff, final AtomicLong remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return;
}
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
Expand All @@ -2271,14 +2279,25 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
lastDequeuedMessageId = MessageId.earliest;

clearIncomingMessages();
seekFuture.complete(null);
CompletableFuture<Void> future = null;
synchronized (this) {
if (!hasParentConsumer && cnx() == null) {
// It's during reconnection, complete the seek future after connection is established
seekStatus.set(SeekStatus.COMPLETED);
} else {
future = seekFuture;
startMessageId = seekMessageId;
seekStatus.set(SeekStatus.NOT_STARTED);
}
}
if (future != null) {
future.complete(null);
}
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
failSeek(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
Expand All @@ -2287,7 +2306,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
seekFuture.completeExceptionally(
failSeek(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not seek "
+ "withing configured timeout", subscription, topicName.toString())));
Expand All @@ -2298,11 +2317,18 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}

private void failSeek(Throwable throwable) {
CompletableFuture<Void> seekFuture = this.seekFuture;
if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) {
seekFuture.completeExceptionally(throwable);
}
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
Expand Down Expand Up @@ -2960,4 +2986,10 @@ boolean isAckReceiptEnabled() {

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

@VisibleForTesting
enum SeekStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,14 @@ public void testSeekAsyncInternal() {

consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);
consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED);

// when
CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
CompletableFuture<Void> secondResult = consumer.seekAsync(1L);

clientReq.complete(null);

// then
assertTrue(firstResult.isDone());
assertTrue(secondResult.isCompletedExceptionally());
verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
Expand Down

0 comments on commit 986e1dd

Please sign in to comment.