Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp #22363

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -77,7 +77,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {

private static final String subscription = "reader-sub";

@BeforeMethod
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand All @@ -89,7 +89,7 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -198,21 +198,41 @@ public void testReadMessageWithBatching() throws Exception {
testReadMessages(topic, true);
}

@Test
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
@DataProvider
public static Object[][] seekBeforeHasMessageAvailable() {
return new Object[][] { { true }, { false } };
}

@Test(timeOut = 20000, dataProvider = "seekBeforeHasMessageAvailable")
public void testReadMessageWithBatchingWithMessageInclusive(boolean seekBeforeHasMessageAvailable)
throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
Set<String> keys = publishMessages(topic, 10, true);

Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.startMessageIdInclusive().readerName(subscription).create();

while (reader.hasMessageAvailable()) {
Assert.assertTrue(keys.remove(reader.readNext().getKey()));
if (seekBeforeHasMessageAvailable) {
reader.seek(0L); // it should seek to the earliest
}

assertTrue(reader.hasMessageAvailable());
final Message<byte[]> msg = reader.readNext();
assertTrue(keys.remove(msg.getKey()));
// start from latest with start message inclusive should only read the last message in batch
assertEquals(keys.size(), 9);
Assert.assertFalse(keys.contains("key9"));
Assert.assertFalse(reader.hasMessageAvailable());

final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
if (seekBeforeHasMessageAvailable) {
assertEquals(msgId.getBatchIndex(), 0);
assertFalse(keys.contains("key0"));
assertTrue(reader.hasMessageAvailable());
} else {
assertEquals(msgId.getBatchIndex(), 9);
assertFalse(reader.hasMessageAvailable());
assertFalse(keys.contains("key9"));
assertFalse(reader.hasMessageAvailable());
}
}

private void testReadMessages(String topic, boolean enableBatch) throws Exception {
Expand Down Expand Up @@ -310,7 +330,7 @@ public void testReadFromPartition() throws Exception {
@Test
public void testReaderWithTimeLong() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testReadFromPartition";
String topic = "persistent://" + ns + "/testReaderWithTimeLong";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);

Expand Down Expand Up @@ -840,4 +860,46 @@ public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBr
producer.send("msg");
assertTrue(reader.hasMessageAvailable());
}

@Test(dataProvider = "initializeLastMessageIdInBroker")
public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMessageIdInBroker) throws Exception {
final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek-timestamp";

@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
final long timestampBeforeSend = System.currentTimeMillis();
final MessageId sentMsgId = producer.send("msg");

final List<MessageId> messageIds = new ArrayList<>();
messageIds.add(MessageId.earliest);
messageIds.add(sentMsgId);
messageIds.add(MessageId.latest);

for (MessageId messageId : messageIds) {
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(messageId).create();
if (initializeLastMessageIdInBroker) {
if (messageId == MessageId.earliest) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
} // else: lastMessageIdInBroker is earliest
reader.seek(System.currentTimeMillis());
assertFalse(reader.hasMessageAvailable());
}

for (MessageId messageId : messageIds) {
@Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(messageId).create();
if (initializeLastMessageIdInBroker) {
if (messageId == MessageId.earliest) {
assertTrue(reader.hasMessageAvailable());
} else {
assertFalse(reader.hasMessageAvailable());
}
} // else: lastMessageIdInBroker is earliest
reader.seek(timestampBeforeSend);
assertTrue(reader.hasMessageAvailable());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
private volatile boolean hasSoughtByTimestamp = false;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -2252,7 +2253,8 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or timestamp"));
}

private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId,
Long seekTimestamp, String seekBy) {
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
Expand All @@ -2269,18 +2271,20 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs);
seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, opTimeoutMs);
return seekFuture;
}

private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, Long seekTimestamp, String seekBy,
final Backoff backoff, final AtomicLong remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

final boolean originalHasSoughtByTimestamp = hasSoughtByTimestamp;
hasSoughtByTimestamp = (seekTimestamp != null);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
Expand All @@ -2304,6 +2308,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
}
}).exceptionally(e -> {
seekMessageId = originSeekMessageId;
hasSoughtByTimestamp = originalHasSoughtByTimestamp;
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

failSeek(
Expand All @@ -2326,7 +2331,7 @@ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, S
log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime);
seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}
Expand All @@ -2343,7 +2348,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
MessageId.earliest, timestamp, seekBy);
}

@Override
Expand All @@ -2369,7 +2374,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
}
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
return seekAsyncInternal(requestId, seek, messageId, null, seekBy);
}

public boolean hasMessageAvailable() throws PulsarClientException {
Expand All @@ -2389,13 +2394,15 @@ public CompletableFuture<Boolean> hasMessageAvailableAsync() {

// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if (MessageId.latest.equals(startMessageId)) {

final boolean hasSoughtByTimestamp = this.hasSoughtByTimestamp;
if (MessageId.latest.equals(startMessageId) || hasSoughtByTimestamp) {
CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we need to seek to the last message
if (resetIncludeHead) {
if (resetIncludeHead && !hasSoughtByTimestamp) {
future = future.thenCompose((lastMessageIdResponse) ->
seekAsync(lastMessageIdResponse.lastMessageId)
.thenApply((ignore) -> lastMessageIdResponse));
Expand Down
Loading