diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index c2eb957ee605d..424081b904c81 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1096,6 +1096,12 @@ public void testHasMessageAvailable() throws Exception { assertFalse(lastMsgId instanceof BatchMessageIdImpl); assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); + List lastMsgIds = reader.getConsumer().getLastMessageIds(); + assertEquals(lastMsgIds.size(), 1); + assertEquals(lastMsgIds.get(0).getOwnerTopic(), topicName); + MessageIdAdv lastMsgIdAdv = (MessageIdAdv) lastMsgIds.get(0); + assertEquals(lastMsgIdAdv.getLedgerId(), messageId.getLedgerId()); + assertEquals(lastMsgIdAdv.getEntryId(), messageId.getEntryId()); reader.close(); CountDownLatch latch = new CountDownLatch(numOfMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index ce4a0ae86ac4e..73fe97996424c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -42,7 +43,9 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -1097,6 +1100,11 @@ public void testGetLastMessageId() throws Exception { admin.topics().createPartitionedTopic(topicName2, 2); admin.topics().createPartitionedTopic(topicName3, 3); + final Set topics = new HashSet<>(); + topics.add(topicName1); + IntStream.range(0, 2).forEach(i -> topics.add(topicName2 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + IntStream.range(0, 3).forEach(i -> topics.add(topicName3 + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + // 1. producer connect Producer producer1 = pulsarClient.newProducer().topic(topicName1) .enableBatching(false) @@ -1146,12 +1154,27 @@ public void testGetLastMessageId() throws Exception { } }); + List msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages / 2); + } else { + assertEquals(numMessages, totalMessages / 3); + } + } + for (int i = 0; i < totalMessages; i++) { producer1.send((messagePredicate + "producer1-" + i).getBytes()); producer2.send((messagePredicate + "producer2-" + i).getBytes()); producer3.send((messagePredicate + "producer3-" + i).getBytes()); } + messageId = consumer.getLastMessageId(); assertTrue(messageId instanceof MultiMessageIdImpl); MultiMessageIdImpl multiMessageId2 = (MultiMessageIdImpl) messageId; @@ -1170,6 +1193,20 @@ public void testGetLastMessageId() throws Exception { } }); + msgIds = consumer.getLastMessageIds(); + assertEquals(msgIds.size(), 6); + assertEquals(msgIds.stream().map(TopicMessageId::getOwnerTopic).collect(Collectors.toSet()), topics); + for (TopicMessageId msgId : msgIds) { + int numMessages = (int) ((MessageIdAdv) msgId).getEntryId() + 1; + if (msgId.getOwnerTopic().equals(topicName1)) { + assertEquals(numMessages, totalMessages * 2); + } else if (msgId.getOwnerTopic().startsWith(topicName2)) { + assertEquals(numMessages, totalMessages); + } else { + assertEquals(numMessages, totalMessages / 3 * 2); + } + } + consumer.unsubscribe(); consumer.close(); producer1.close(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 694099004965a..88ad24fe1f484 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Closeable; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -539,16 +540,36 @@ CompletableFuture reconsumeLaterCumulativeAsync(Message message, * Get the last message id available for consume. * * @return the last message id. + * @apiNote If the consumer is a multi-topics consumer, the returned value cannot be used anywhere. + * @deprecated Use {@link Consumer#getLastMessageIds()} instead. */ + @Deprecated MessageId getLastMessageId() throws PulsarClientException; /** * Get the last message id available for consume. * * @return a future that can be used to track the completion of the operation. + * @deprecated Use {@link Consumer#getLastMessageIdsAsync()}} instead. */ + @Deprecated CompletableFuture getLastMessageIdAsync(); + /** + * Get all the last message id of the topics the consumer subscribed. + * + * @return the list of TopicMessageId instances of all the topics that the consumer subscribed + * @throws PulsarClientException if failed to get last message id. + * @apiNote It's guaranteed that the owner topic of each TopicMessageId in the returned list is different from owner + * topics of other TopicMessageId instances + */ + List getLastMessageIds() throws PulsarClientException; + + /** + * The asynchronous version of {@link Consumer#getLastMessageIds()}. + */ + CompletableFuture> getLastMessageIdsAsync(); + /** * @return Whether the consumer is connected to the broker */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 973b3302f4199..0db2a8e0ab9f5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -730,6 +731,7 @@ public void close() throws PulsarClientException { public abstract CompletableFuture closeAsync(); + @Deprecated @Override public MessageId getLastMessageId() throws PulsarClientException { try { @@ -742,9 +744,22 @@ public MessageId getLastMessageId() throws PulsarClientException { } } + @Deprecated @Override public abstract CompletableFuture getLastMessageIdAsync(); + @Override + public List getLastMessageIds() throws PulsarClientException { + try { + return getLastMessageIdsAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw PulsarClientException.unwrap(e); + } catch (ExecutionException e) { + throw PulsarClientException.unwrap(e); + } + } + private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) { return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fb372566426d3..cc01609319698 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2336,11 +2336,18 @@ private static final class GetLastMessageIdResponse { } } + @Deprecated @Override public CompletableFuture getLastMessageIdAsync() { return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId); } + @Override + public CompletableFuture> getLastMessageIdsAsync() { + return getLastMessageIdAsync() + .thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId))); + } + public CompletableFuture internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java index 6e60239ffe537..f40e3476dd0e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java @@ -29,6 +29,7 @@ * This is useful when MessageId is need for partition/multi-topics/pattern consumer. * e.g. seek(), ackCumulative(), getLastMessageId(). */ +@Deprecated public class MultiMessageIdImpl implements MessageId { @Getter private Map map; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 5fe0e4a82b840..ef0345de919ea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1468,6 +1468,7 @@ public Timeout getPartitionsAutoUpdateTimeout() { return partitionsAutoUpdateTimeout; } + @Deprecated @Override public CompletableFuture getLastMessageIdAsync() { CompletableFuture returnFuture = new CompletableFuture<>(); @@ -1496,6 +1497,18 @@ public CompletableFuture getLastMessageIdAsync() { return returnFuture; } + @Override + public CompletableFuture> getLastMessageIdsAsync() { + final List>> futures = consumers.values().stream() + .map(ConsumerImpl::getLastMessageIdsAsync) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenApply(__ -> { + final List messageIds = new ArrayList<>(); + futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll); + return messageIds; + }); + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {