Skip to content

Commit

Permalink
[improve][client] PIP-224: Add getLastMessageIds API to deprecate get…
Browse files Browse the repository at this point in the history
…LastMessageId

Master Issue: apache#18616

Fixes apache#4940

NOTE: This implementation is different from the original design of
PIP-224 that the method name is `getLastMessageIds` instead of
`getLastTopicMessageId`.

### Motivation

When a multi-topics consumer calls `getLastMessageId`, a
`MultiMessageIdImpl` instance will be returned. It contains a map of the
topic name and the latest message id of the topic. However, the
`MultiMessageIdImpl` cannot be used in any place of the API that accepts
a `MessageId` because all methods of the `MessageId` interface are not
implemented, including `compareTo` and `toByteArray`.

Therefore, users cannot do anything on such a `MessageId` implementation
except casting `MessageId` to `MultiMessageIdImpl` and get the internal
map.

### Modifications

- Throw an exception when calling `getLastMessageId` on a multi-topics
  consumer instead of returning a `MultiMessageIdImpl`.
- Remove the `MultiMessageIdImpl` implementation and its related tests.
- Add the `getLastMessageIds` methods to `Consumer`. It returns a list
  of `TopicMessageId` instances, each of them represents the last
  message id of the owner topic.
- Mark the `getLastMessageId` API as deprecated.

### Verifications

- Modify the `TopicsConsumerImplTest#testGetLastMessageId` to test the
  `getLastMessageIds` for a multi-topics consumer.
- Modify the `TopicReaderTest#testHasMessageAvailable` to test the
  `getLastMessageIds` for a single topic consumer.
  • Loading branch information
BewareMyPower committed Apr 7, 2023
1 parent 3b118b6 commit 35b90d7
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,10 @@ public void testHasMessageAvailable() throws Exception {
assertFalse(messageId instanceof BatchMessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
List<TopicMessageId> lastMsgIds = reader.getConsumer().getLastMessageIds();
assertEquals(lastMsgIds.size(), 1);
assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName);
MessageIdAdv lastMsgId = (MessageIdAdv) lastMsgIds.get(0);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -42,7 +43,9 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
Expand Down Expand Up @@ -75,6 +78,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -1097,6 +1101,11 @@ public void testGetLastMessageId() throws Exception {
admin.topics().createPartitionedTopic(topicName2, 2);
admin.topics().createPartitionedTopic(topicName3, 3);

final Set<String> topics = new HashSet<>();
topics.add(topicName1);
IntStream.range(0, 2).forEach(i -> topics.add(topicName2 + TopicName.PARTITIONED_TOPIC_SUFFIX + i));
IntStream.range(0, 3).forEach(i -> topics.add(topicName3 + TopicName.PARTITIONED_TOPIC_SUFFIX + i));

// 1. producer connect
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1)
.enableBatching(false)
Expand Down Expand Up @@ -1128,47 +1137,41 @@ public void testGetLastMessageId() throws Exception {
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

MessageId messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map = multiMessageId.getMap();
assertEquals(map.size(), 6);
map.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages / 2 - 1);
assertThrows(PulsarClientException.class, consumer::getLastMessageId);
List<TopicMessageId> msgIds = consumer.getLastMessageIds();
assertEquals(msgIds.size(), 6);
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics);
for (TopicMessageId msgId : msgIds) {
int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
if (msgId.getOwnerTopic().equals(topicName1)) {
assertEquals(numMessages, totalMessages);
} else if (msgId.getOwnerTopic().startsWith(topicName2)) {
assertEquals(numMessages, totalMessages / 2);
} else {
assertEquals(messageId1.entryId, totalMessages / 3 - 1);
assertEquals(numMessages, totalMessages / 3);
}
});
}

for (int i = 0; i < totalMessages; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

messageId = consumer.getLastMessageId();
assertTrue(messageId instanceof MultiMessageIdImpl);
MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId;
Map<String, MessageId> map2 = multiMessageId2.getMap();
assertEquals(map2.size(), 6);
map2.forEach((k, v) -> {
log.info("topic: {}, messageId:{} ", k, v.toString());
assertTrue(v instanceof MessageIdImpl);
MessageIdImpl messageId1 = (MessageIdImpl) v;
if (k.contains(topicName1)) {
assertEquals(messageId1.entryId, totalMessages * 2 - 1);
} else if (k.contains(topicName2)) {
assertEquals(messageId1.entryId, totalMessages - 1);
assertThrows(PulsarClientException.class, consumer::getLastMessageId);
msgIds = consumer.getLastMessageIds();
assertEquals(msgIds.size(), 6);
assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics);
for (TopicMessageId msgId : msgIds) {
int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1;
if (msgId.getOwnerTopic().equals(topicName1)) {
assertEquals(numMessages, totalMessages * 2);
} else if (msgId.getOwnerTopic().startsWith(topicName2)) {
assertEquals(numMessages, totalMessages / 2 * 2);
} else {
assertEquals(messageId1.entryId, totalMessages * 2 / 3 - 1);
assertEquals(numMessages, totalMessages / 3 * 2);
}
});
}

consumer.unsubscribe();
consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -536,19 +537,38 @@ CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
CompletableFuture<Void> seekAsync(long timestamp);

/**
* Get the last message id available for consume.
* Get the last message id of the topic subscribed.
*
* @return the last message id.
* @return the last message id of the topic subscribed
* @throws PulsarClientException if multiple topics or partitioned topics are subscribed or failed because of a
* network issue
* NOTE: Use {@link Consumer#getLastMessageIds()} instead.
*/
@Deprecated
MessageId getLastMessageId() throws PulsarClientException;

/**
* Get the last message id available for consume.
*
* @return a future that can be used to track the completion of the operation.
* The asynchronous version of {@link Consumer#getLastMessageId()}.
* NOTE: Use {@link Consumer#getLastMessageIdsAsync()} instead.
*/
@Deprecated
CompletableFuture<MessageId> getLastMessageIdAsync();

/**
* Get all the last message id of the topics the consumer subscribed.
*
* @return the list of TopicMessageId instances of all the topics that the consumer subscribed
* @throws PulsarClientException if failed to get last message id.
* @apiNote It's guaranteed that the owner topic of each TopicMessageId in the returned list is different from owner
* topics of other TopicMessageId instances
*/
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;

/**
* The asynchronous version of {@link Consumer#getLastMessageIds()}.
*/
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();

/**
* @return Whether the consumer is connected to the broker
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
Expand Down Expand Up @@ -745,6 +746,18 @@ public MessageId getLastMessageId() throws PulsarClientException {
@Override
public abstract CompletableFuture<MessageId> getLastMessageIdAsync();

@Override
public List<TopicMessageId> getLastMessageIds() throws PulsarClientException {
try {
return getLastMessageIdsAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}

private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,12 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
}

@Override
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
return getLastMessageIdAsync()
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
}

public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
Expand All @@ -49,7 +47,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
Expand Down Expand Up @@ -1470,30 +1467,20 @@ public Timeout getPartitionsAutoUpdateTimeout() {

@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
CompletableFuture<MessageId> returnFuture = new CompletableFuture<>();

Map<String, CompletableFuture<MessageId>> messageIdFutures = consumers.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue().getLastMessageIdAsync()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

CompletableFuture
.allOf(messageIdFutures.values().toArray(new CompletableFuture<?>[0]))
.whenComplete((ignore, ex) -> {
Builder<String, MessageId> builder = ImmutableMap.builder();
messageIdFutures.forEach((key, future) -> {
MessageId messageId;
try {
messageId = future.get();
} catch (Exception e) {
log.warn("[{}] Exception when topic {} getLastMessageId.", key, e);
messageId = MessageId.earliest;
}
builder.put(key, messageId);
});
returnFuture.complete(new MultiMessageIdImpl(builder.build()));
});
return FutureUtil.failedFuture(new PulsarClientException(
"getLastMessageIdAsync cannot be used on a multi-topics consumer"));
}

return returnFuture;
@Override
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
final List<CompletableFuture<List<TopicMessageId>>> futures = consumers.values().stream()
.map(ConsumerImpl::getLastMessageIdsAsync)
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> {
final List<TopicMessageId> messageIds = new ArrayList<>();
futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll);
return messageIds;
});
}

private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
Expand Down
5 changes: 0 additions & 5 deletions pulsar-client/src/main/resources/findbugsExclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,6 @@
<Method name="getByteBuf"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.client.impl.MultiMessageIdImpl"/>
<Method name="getMap"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.client.impl.MultiTopicConsumerStatsRecorderImpl"/>
<Method name="getPartitionStats"/>
Expand Down
Loading

0 comments on commit 35b90d7

Please sign in to comment.