Skip to content

Commit

Permalink
[fix][client] Fix wrong results of hasMessageAvailable and readNext a…
Browse files Browse the repository at this point in the history
…fter seeking by timestamp (apache#22363)

Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 149deaa)
(cherry picked from commit 1045f8b)
  • Loading branch information
BewareMyPower authored and mukesh-ctds committed Apr 17, 2024
1 parent d69b26d commit 62ff020
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
Expand All @@ -64,8 +65,8 @@
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -75,7 +76,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {

private static final String subscription = "reader-sub";

@BeforeMethod
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand All @@ -87,7 +88,7 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -146,21 +147,41 @@ public void testReadMessageWithBatching() throws Exception {
testReadMessages(topic, true);
}

@Test
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
@DataProvider
public static Object[][] seekBeforeHasMessageAvailable() {
return new Object[][] { { true }, { false } };
}

@Test(timeOut = 20000, dataProvider = "seekBeforeHasMessageAvailable")
public void testReadMessageWithBatchingWithMessageInclusive(boolean seekBeforeHasMessageAvailable)
throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
Set<String> keys = publishMessages(topic, 10, true);

Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.startMessageIdInclusive().readerName(subscription).create();

while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
if (seekBeforeHasMessageAvailable) {
reader.seek(0L); // it should seek to the earliest
}

assertTrue(reader.hasMessageAvailable());
final Message<byte[]> msg = reader.readNext();
assertTrue(keys.remove(msg.getKey()));
// start from latest with start message inclusive should only read the last message in batch
assertEquals(keys.size(), 9);
Assert.assertFalse(keys.contains("key9"));
Assert.assertFalse(reader.hasMessageAvailable());

final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
if (seekBeforeHasMessageAvailable) {
assertEquals(msgId.getBatchIndex(), 0);
assertFalse(keys.contains("key0"));
assertTrue(reader.hasMessageAvailable());
} else {
assertEquals(msgId.getBatchIndex(), 9);
assertFalse(reader.hasMessageAvailable());
assertFalse(keys.contains("key9"));
assertFalse(reader.hasMessageAvailable());
}
}

private void testReadMessages(String topic, boolean enableBatch) throws Exception {
Expand Down Expand Up @@ -258,7 +279,7 @@ public void testReadFromPartition() throws Exception {
@Test
public void testReaderWithTimeLong() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testReadFromPartition";
String topic = "persistent://" + ns + "/testReaderWithTimeLong";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);

Expand Down Expand Up @@ -788,4 +809,46 @@ public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBr
producer.send("msg");
assertTrue(reader.hasMessageAvailable());
}

@Test(dataProvider = "initializeLastMessageIdInBroker")
public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMessageIdInBroker) throws Exception {
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek-timestamp";

@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
final long timestampBeforeSend = System.currentTimeMillis();
final MessageId sentMsgId = producer.send("msg");

final List<MessageId> messageIds = new ArrayList<>();
messageIds.add(MessageId.earliest);
messageIds.add(sentMsgId);
messageIds.add(MessageId.latest);

for (MessageId messageId : messageIds) {
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(messageId).create();
if (initializeLastMessageIdInBroker) {
if (messageId == MessageId.earliest) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
} // else: lastMessageIdInBroker is earliest
reader.seek(System.currentTimeMillis());
assertFalse(reader.hasMessageAvailable());
}

for (MessageId messageId : messageIds) {
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(messageId).create();
if (initializeLastMessageIdInBroker) {
if (messageId == MessageId.earliest) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
} // else: lastMessageIdInBroker is earliest
reader.seek(timestampBeforeSend);
assertTrue(reader.hasMessageAvailable());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
private volatile boolean hasSoughtByTimestamp = false;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -2243,7 +2244,8 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or timestamp"));
}

private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId,
Long seekTimestamp, String seekBy) {
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
Expand All @@ -2260,18 +2262,20 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, opTimeoutMs);
return seekFuture;
}

private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, Long seekTimestamp, String seekBy,
final Backoff backoff, final AtomicLong remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

final boolean originalHasSoughtByTimestamp = hasSoughtByTimestamp;
hasSoughtByTimestamp = (seekTimestamp != null);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
Expand All @@ -2295,6 +2299,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
}
}).exceptionally(e -> {
seekMessageId = originSeekMessageId;
hasSoughtByTimestamp = originalHasSoughtByTimestamp;
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

failSeek(
Expand All @@ -2317,7 +2322,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}
Expand All @@ -2334,7 +2339,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
MessageId.earliest, timestamp, seekBy);
}

@Override
Expand All @@ -2360,7 +2365,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
}
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
return seekAsyncInternal(requestId, seek, messageId, null, seekBy);
}

public boolean hasMessageAvailable() throws PulsarClientException {
Expand All @@ -2380,13 +2385,15 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {

// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if (MessageId.latest.equals(startMessageId)) {

final boolean hasSoughtByTimestamp = this.hasSoughtByTimestamp;
if (MessageId.latest.equals(startMessageId) || hasSoughtByTimestamp) {
CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we need to seek to the last message
if (resetIncludeHead) {
if (resetIncludeHead && !hasSoughtByTimestamp) {
future = future.thenCompose((lastMessageIdResponse) ->
seekAsync(lastMessageIdResponse.lastMessageId)
.thenApply((ignore) -> lastMessageIdResponse));
Expand Down

0 comments on commit 62ff020

Please sign in to comment.