From 4042a95c20ca3444c43c92912bdf5c0fa3f7125f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 21 Dec 2022 13:00:38 +0800 Subject: [PATCH 1/3] [PIP-224] Part 1: Add TopicMessageId for seek and acknowledge Master Issue: https://github.com/apache/pulsar/issues/18616 ### Motivation Introduce `TopicMessageId` to support getting the owner topic of a `MessageId`. When a `MessageId` is retrieved from a received message, the owner topic will be correctly set by the client library. When it's returned by `Producer#send`, this PR provides a `TopicMessageId#create` method to configure the owner topic. `acknowledge` APIs are affected only for the error cases: when a `MessageId` other than a `TopicMessageId` is accepted on a multi-topics consumer, `PulsarClientException.NotAllowedException` will be thrown. The semantic of the `seek(MessageId)` API is changed. Now if a `TopicMessageId` is accepted on a multi-topics consumer, the seek behavior will happen on the internal consumer of the owner topic. ### Modifications - Add the `TopicMessageId` interface. - In `MultiTopicsConsumerImpl#doAcknowledge`, complete the future with `NotAllowedException` if the argument is not a `TopicMessageId`. - In `MultiTopicsConsumerImpl#seekAsync`, when the argument is a `TopicMessageId`, find the internal consumer according to the owner topic and pass the argument to it if it exists. - In `ConsumerImpl#seekAsync`, get the inner message ID of the `TopicMessageId` so that now a single-topic consumer can also accept a `TopicMessageId` to seek. Besides the main modifications above, this patch does some refactorings to avoid direct access to `TopicMessageIdImpl`: - Deprecated `getTopicName` method by trimming the partition suffix of the owner topic in `getOriginTopicNameStr`. - Deprecated `getTopicPartitionName` by `getOwnerTopic`. - `getInnerMessageId` cannot be deprecated because we still need to convert `TopicMessageId` to `MessageIdImpl` in many cases (because we cannot get the fields like ledger id). Instead of deprecating it, use `MessageIdImpl.convertToMessageIdImpl` to replace it. - In `convertToMessageIdImpl`, for a customized `TopicMessageId` implementation, use serialization and deserialization to get the `MessageIdImpl` object. ### Verifications Add the following tests to `MultiTopicsConsumerTest`: - `testAcknowledgeWrongMessageId`: verify the correct exceptions are thrown in `acknowledge` APIs - `testSeekCustomTopicMessageId`: verify the new seek semantics for a `TopicMessageId`, including the existing `TopicMessageIdImpl` and the customized implementation by `TopicMessageId#create` ### TODO - Add a standard SerDes class for `TopicMessageId` - Apply `TopicMessageId` into `getLastMessageId` related APIs. - Deprecate the `getInnerMessageId` after PIP-229 is approved. --- .../service/PersistentFailoverE2ETest.java | 5 +- .../broker/service/SubscriptionSeekTest.java | 4 +- .../client/api/MultiTopicsConsumerTest.java | 134 +++++++++++++++++- .../api/PartitionedProducerConsumerTest.java | 3 +- .../pulsar/client/impl/MessageIdTest.java | 5 +- .../pulsar/client/impl/NegativeAcksTest.java | 3 +- .../apache/pulsar/client/api/Consumer.java | 18 +-- .../client/api/MessageAcknowledger.java | 6 + .../pulsar/client/api/TopicMessageId.java | 78 ++++++++++ .../client/impl/BatchMessageIdImpl.java | 5 +- .../pulsar/client/impl/ConsumerImpl.java | 43 +++--- .../pulsar/client/impl/MessageIdImpl.java | 25 ++-- .../client/impl/MultiTopicsConsumerImpl.java | 115 +++++++++------ .../client/impl/NegativeAcksTracker.java | 5 +- .../client/impl/TopicMessageIdImpl.java | 10 +- .../pulsar/client/impl/TopicMessageImpl.java | 7 +- .../UnAckedTopicMessageRedeliveryTracker.java | 9 +- .../impl/UnAckedTopicMessageTracker.java | 5 +- .../pulsar/client/impl/MessageTest.java | 4 +- .../functions/utils/FunctionCommon.java | 5 +- .../pulsar/websocket/ConsumerHandler.java | 5 +- .../integration/semantics/SemanticsTest.java | 4 +- 22 files changed, 360 insertions(+), 138 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 7be0590fe53af..ffc1444676b23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer1.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); + MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } @@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer2.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); + MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index b6f1771c08882..2c2f62529d20a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -50,7 +50,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.awaitility.Awaitility; @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception { if (message == null) { break; } - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId(); - received.add(topicMessageId.getInnerMessageId()); + received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId())); } int msgNumFromPartition1 = list.size() / 2; int msgNumFromPartition2 = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 6bd11de5a2f88..b8ea87ab4016e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -23,8 +23,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -32,34 +39,38 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") public class MultiTopicsConsumerTest extends ProducerConsumerBase { - private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerTest.class); private ScheduledExecutorService internalExecutorServiceDelegate; - @BeforeMethod(alwaysRun = true) + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -231,4 +242,113 @@ public void testBatchReceiveAckTimeout() Assert.assertEquals(consumer.batchReceive().size(), 1); }); } + + @Test(timeOut = 30000) + public void testAcknowledgeWrongMessageId() throws Exception { + final var topic1 = newTopicName(); + final var topic2 = newTopicName(); + + @Cleanup final var singleTopicConsumer = pulsarClient.newConsumer() + .topic(topic1) + .subscriptionName("sub-1") + .isAckReceiptEnabled(true) + .subscribe(); + assertTrue(singleTopicConsumer instanceof ConsumerImpl); + + @Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer() + .topics(List.of(topic1, topic2)) + .subscriptionName("sub-2") + .isAckReceiptEnabled(true) + .subscribe(); + assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl); + + @Cleanup final var producer = pulsarClient.newProducer().topic(topic1).create(); + final var nonTopicMessageIds = new ArrayList(); + nonTopicMessageIds.add(producer.send(new byte[]{ 0x00 })); + nonTopicMessageIds.add(singleTopicConsumer.receive().getMessageId()); + + // Multi-topics consumers can only acknowledge TopicMessageId, otherwise NotAllowedException will be thrown + for (var msgId : nonTopicMessageIds) { + assertFalse(msgId instanceof TopicMessageId); + Assert.assertThrows(PulsarClientException.NotAllowedException.class, + () -> multiTopicsConsumer.acknowledge(msgId)); + Assert.assertThrows(PulsarClientException.NotAllowedException.class, + () -> multiTopicsConsumer.acknowledge(Collections.singletonList(msgId))); + Assert.assertThrows(PulsarClientException.NotAllowedException.class, + () -> multiTopicsConsumer.acknowledgeCumulative(msgId)); + } + + // Single-topic consumer can acknowledge TopicMessageId + final var topicMessageId = multiTopicsConsumer.receive().getMessageId(); + assertTrue(topicMessageId instanceof TopicMessageId); + assertFalse(topicMessageId instanceof MessageIdImpl); + singleTopicConsumer.acknowledge(topicMessageId); + } + + @DataProvider + public static Object[][] messageIdFromProducer() { + return new Object[][] { { true }, { false } }; + } + + @Test(timeOut = 30000, dataProvider = "messageIdFromProducer") + public void testSeekCustomTopicMessageId(boolean messageIdFromProducer) throws Exception { + final var topic = TopicName.get(newTopicName()).toString(); + final var numPartitions = 3; + admin.topics().createPartitionedTopic(topic, numPartitions); + + @Cleanup final var producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .messageRouter(new MessageRouter() { + int index = 0; + @Override + public int choosePartition(Message msg, TopicMetadata metadata) { + return index++ % metadata.numPartitions(); + } + }) + .create(); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic) + .subscriptionName("sub").subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + + final var msgIds = new HashMap>(); + final var numMessagesPerPartition = 10; + final var numMessages = numPartitions * numMessagesPerPartition; + for (int i = 0; i < numMessages; i++) { + var msgId = (MessageIdImpl) producer.send(i); + if (messageIdFromProducer) { + msgIds.computeIfAbsent(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + msgId.getPartitionIndex(), + __ -> new ArrayList<>()).add(msgId); + } else { + var topicMessageId = (TopicMessageId) consumer.receive().getMessageId(); + msgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId); + } + } + + final var partitions = IntStream.range(0, numPartitions) + .mapToObj(i -> topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i) + .collect(Collectors.toSet()); + assertEquals(msgIds.keySet(), partitions); + + for (var partition : partitions) { + final var msgIdList = msgIds.get(partition); + assertEquals(msgIdList.size(), numMessagesPerPartition); + if (messageIdFromProducer) { + consumer.seek(TopicMessageId.create(partition, msgIdList.get(numMessagesPerPartition / 2))); + } else { + consumer.seek(msgIdList.get(numMessagesPerPartition / 2)); + } + } + + var topicMsgIds = new HashMap>(); + for (int i = 0; i < ((numMessagesPerPartition / 2 - 1) * numPartitions); i++) { + TopicMessageId topicMessageId = (TopicMessageId) consumer.receive().getMessageId(); + topicMsgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId); + } + assertEquals(topicMsgIds.keySet(), partitions); + for (var partition : partitions) { + assertEquals(topicMsgIds.get(partition), + msgIds.get(partition).subList(numMessagesPerPartition / 2 + 1, numMessagesPerPartition)); + } + consumer.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index e0e1bf20e7cbc..cd384e587898d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -39,7 +39,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; @@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception { for (int i = 0; i < totalMessages; i ++) { msg = consumer1.receive(5, TimeUnit.SECONDS); - Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2); + Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(), 2); consumer1.acknowledge(msg); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index c4cdcbd19d575..ceb5c51e6aa77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -119,7 +119,7 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException, assertEquals(new String(message.getData()), messagePrefix + i); MessageId messageId = message.getMessageId(); if (topicType == TopicType.PARTITIONED) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); + messageId = MessageIdImpl.convertToMessageIdImpl(messageId); } assertTrue(messageIds.remove(messageId), "Failed to receive message"); } @@ -166,9 +166,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls for (int i = 0; i < numberOfMessages; i++) { MessageId messageId = consumer.receive().getMessageId(); - if (topicType == TopicType.PARTITIONED) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); - } assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } log.info("Remaining message IDs = {}", messageIds); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 679d6c1a19e51..b4d01e263bc7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; @@ -292,7 +293,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { .subscribe(); MessageId messageId = new MessageIdImpl(3, 1, 0); - TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId); + TopicMessageId topicMessageId = TopicMessageId.create("topic-1", messageId); BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0); BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1); BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 9a3ef7833dfd5..694099004965a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -471,7 +471,9 @@ CompletableFuture reconsumeLaterCumulativeAsync(Message message, *
  • MessageId.latest : Reset the subscription on the latest message in the topic * * - *

    Note: For multi-topics consumer, you can only seek to the earliest or latest message. + *

    Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen + * on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you + * can only seek to the earliest or latest message for all topics subscribed. * * @param messageId * the message id where to reposition the subscription @@ -519,19 +521,7 @@ CompletableFuture reconsumeLaterCumulativeAsync(Message message, CompletableFuture seekAsync(Function function); /** - * Reset the subscription associated with this consumer to a specific message id. - * - *

    The message id can either be a specific message or represent the first or last messages in the topic. - *

      - *
    • MessageId.earliest : Reset the subscription on the earliest message available in the topic - *
    • MessageId.latest : Reset the subscription on the latest message in the topic - *
    - * - *

    Note: For multi-topics consumer, you can only seek to the earliest or latest message. - * - * @param messageId - * the message id where to reposition the subscription - * @return a future to track the completion of the seek operation + * The asynchronous version of {@link Consumer#seek(MessageId)}. */ CompletableFuture seekAsync(MessageId messageId); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java index c0a53983c5adb..d1bab3abb5b62 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java @@ -49,6 +49,8 @@ public interface MessageAcknowledger { * * @throws PulsarClientException.AlreadyClosedException} * if the consumer was already closed + * @throws PulsarClientException.NotAllowedException + * if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed */ void acknowledge(MessageId messageId) throws PulsarClientException; @@ -59,6 +61,8 @@ default void acknowledge(Message message) throws PulsarClientException { /** * Acknowledge the consumption of a list of message. * @param messageIdList the list of message IDs. + * @throws PulsarClientException.NotAllowedException + * if any message id in the list is not a {@link TopicMessageId} when multiple topics are subscribed */ void acknowledge(List messageIdList) throws PulsarClientException; @@ -82,6 +86,8 @@ default void acknowledge(Messages messages) throws PulsarClientException { * The {@code MessageId} to be cumulatively acknowledged * @throws PulsarClientException.AlreadyClosedException * if the consumer was already closed + * @throws PulsarClientException.NotAllowedException + * if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed */ void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java new file mode 100644 index 0000000000000..a5a944eff72ba --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * The MessageId used for a consumer that subscribes multiple topics or partitioned topics. + * + *

    + * It's guaranteed that {@link Message#getMessageId()} must return a TopicMessageId instance if the Message is received + * from a consumer that subscribes multiple topics or partitioned topics. + * The topic name used in APIs related to this class like `getOwnerTopic` and `create` must be the full topic name. For + * example, "my-topic" is invalid while "persistent://public/default/my-topic" is valid. + * If the topic is a partitioned topic, the topic name should be the name of the specific partition, e.g. + * "persistent://public/default/my-topic-partition-0". + *

    + */ +public interface TopicMessageId extends MessageId { + + /** + * Return the owner topic name of a message. + * + * @return the owner topic + */ + String getOwnerTopic(); + + static TopicMessageId create(String topic, MessageId messageId) { + if (messageId instanceof TopicMessageId) { + return (TopicMessageId) messageId; + } + return new TopicMessageId() { + @Override + public String getOwnerTopic() { + return topic; + } + + @Override + public byte[] toByteArray() { + return messageId.toByteArray(); + } + + @Override + public int compareTo(MessageId o) { + return messageId.compareTo(o); + } + + @Override + public boolean equals(Object obj) { + return messageId.equals(obj); + } + + @Override + public int hashCode() { + return messageId.hashCode(); + } + + @Override + public String toString() { + return messageId.toString(); + } + }; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 7e3a143dff8e0..ed28082ff6a30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -20,6 +20,7 @@ import javax.annotation.Nonnull; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; /** */ @@ -77,8 +78,8 @@ public int compareTo(@Nonnull MessageId o) { this.ledgerId, this.entryId, this.partitionIndex, this.batchIndex, other.ledgerId, other.entryId, other.partitionIndex, batchIndex ); - } else if (o instanceof TopicMessageIdImpl) { - return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); + } else if (o instanceof TopicMessageId) { + return compareTo(MessageIdImpl.convertToMessageIdImpl(o)); } else { throw new UnsupportedOperationException("Unknown MessageId type: " + o.getClass().getName()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8fef739983648..a59a67adbaab4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -79,6 +79,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -533,7 +534,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { - checkArgument(messageId instanceof MessageIdImpl); + messageId = MessageIdImpl.convertToMessageIdImpl(messageId); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -590,10 +591,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .InvalidMessageException("Cannot handle message with null messageId")); } - if (messageId instanceof TopicMessageIdImpl) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); - } - checkArgument(messageId instanceof MessageIdImpl); + messageId = MessageIdImpl.convertToMessageIdImpl(messageId); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -629,7 +627,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a if (retryLetterProducer != null) { try { MessageImpl retryMessage = (MessageImpl) getMessageImpl(message); - String originMessageIdStr = getOriginMessageIdStr(message); + String originMessageIdStr = message.getMessageId().toString(); String originTopicNameStr = getOriginTopicNameStr(message); SortedMap propertiesMap = getPropertiesMap(message, originMessageIdStr, originTopicNameStr); @@ -721,22 +719,19 @@ private SortedMap getPropertiesMap(Message message, return propertiesMap; } - private String getOriginMessageIdStr(Message message) { - if (message instanceof TopicMessageImpl) { - return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString(); - } else if (message instanceof MessageImpl) { - return message.getMessageId().toString(); - } - return null; - } - private String getOriginTopicNameStr(Message message) { - if (message instanceof TopicMessageImpl) { - return ((TopicMessageIdImpl) message.getMessageId()).getTopicName(); - } else if (message instanceof MessageImpl) { + MessageId messageId = message.getMessageId(); + if (messageId instanceof TopicMessageId) { + String topic = ((TopicMessageId) messageId).getOwnerTopic(); + int index = topic.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX); + if (index < 0) { + return topic; + } else { + return topic.substring(0, index); + } + } else { return message.getTopicName(); } - return null; } private MessageImpl getMessageImpl(Message message) { @@ -2008,7 +2003,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdImpl messageId) MessageIdImpl finalMessageId = messageId; deadLetterProducer.thenAcceptAsync(producerDLQ -> { for (MessageImpl message : finalDeadLetterMessages) { - String originMessageIdStr = getOriginMessageIdStr(message); + String originMessageIdStr = message.getMessageId().toString(); String originTopicNameStr = getOriginTopicNameStr(message); TypedMessageBuilder typedMessageBuilderNew = producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) @@ -2177,7 +2172,8 @@ public CompletableFuture seekAsync(long timestamp) { } @Override - public CompletableFuture seekAsync(MessageId messageId) { + public CompletableFuture seekAsync(MessageId originalMessageId) { + final MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(originalMessageId); String seekBy = String.format("the message %s", messageId.toString()); return seekAsyncCheckState(seekBy).orElseGet(() -> { long requestId = client.newRequestId(); @@ -2197,8 +2193,8 @@ public CompletableFuture seekAsync(MessageId messageId) { seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), msgId.getFirstChunkMessageId().getEntryId(), new long[0]); } else { - MessageIdImpl msgId = (MessageIdImpl) messageId; - seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]); + seek = Commands.newSeek( + consumerId, requestId, messageId.getLedgerId(), messageId.getEntryId(), new long[0]); } return seekAsyncInternal(requestId, seek, messageId, seekBy); }); @@ -2573,6 +2569,7 @@ void grabCnx() { this.connectionHandler.grabCnx(); } + @Deprecated public String getTopicNameWithoutPartition() { return topicNameWithoutPartition; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 02298e0f9d66d..1a0f491a6a7bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -27,7 +27,9 @@ import java.util.Objects; import javax.annotation.Nonnull; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.naming.TopicName; public class MessageIdImpl implements MessageId { @@ -116,15 +118,20 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return messageId; } + @InterfaceStability.Unstable public static MessageIdImpl convertToMessageIdImpl(MessageId messageId) { - if (messageId instanceof BatchMessageIdImpl) { - return (BatchMessageIdImpl) messageId; - } else if (messageId instanceof MessageIdImpl) { - return (MessageIdImpl) messageId; - } else if (messageId instanceof TopicMessageIdImpl) { - return convertToMessageIdImpl(((TopicMessageIdImpl) messageId).getInnerMessageId()); + if (messageId instanceof TopicMessageId) { + if (messageId instanceof TopicMessageIdImpl) { + return (MessageIdImpl) ((TopicMessageIdImpl) messageId).getInnerMessageId(); + } else { + try { + return (MessageIdImpl) MessageId.fromByteArray(messageId.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } - return null; + return (MessageIdImpl) messageId; } public static MessageId fromByteArrayWithTopic(byte[] data, String topicName) throws IOException { @@ -210,8 +217,8 @@ public int compareTo(@Nonnull MessageId o) { this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH, other.ledgerId, other.entryId, other.partitionIndex, batchIndex ); - } else if (o instanceof TopicMessageIdImpl) { - return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); + } else if (o instanceof TopicMessageId) { + return compareTo(convertToMessageIdImpl(o)); } else { throw new UnsupportedOperationException("Unknown MessageId type: " + o.getClass().getName()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 224276ba5f08f..341a91e97348e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -59,6 +59,7 @@ import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ConsumerName; @@ -290,8 +291,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchR // Must be called from the internalPinnedExecutor thread private void messageReceived(ConsumerImpl consumer, Message message) { checkArgument(message instanceof MessageImpl); - TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), - consumer.getTopicNameWithoutPartition(), message, consumer); + TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message, consumer); if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message from topics-consumer {}", @@ -443,26 +443,26 @@ protected CompletableFuture> internalReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txnImpl) { - checkArgument(messageId instanceof TopicMessageIdImpl); - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + if (!(messageId instanceof TopicMessageId)) { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "Only TopicMessageId is allowed to acknowledge for a multi-topics consumer, while messageId is " + + messageId.getClass().getName())); + } if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } + TopicMessageId topicMessageId = (TopicMessageId) messageId; + ConsumerImpl consumer = consumers.get(topicMessageId.getOwnerTopic()); + if (consumer == null) { + return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); + } + MessageId innerMessageId = MessageIdImpl.convertToMessageIdImpl(topicMessageId); if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); - if (individualConsumer != null) { - MessageId innerId = topicMessageId.getInnerMessageId(); - return individualConsumer.acknowledgeCumulativeAsync(innerId); - } else { - return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); - } + return consumer.acknowledgeCumulativeAsync(innerMessageId); } else { - ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); - - MessageId innerId = topicMessageId.getInnerMessageId(); - return consumer.doAcknowledgeWithTxn(innerId, ackType, properties, txnImpl) + return consumer.doAcknowledgeWithTxn(innerMessageId, ackType, properties, txnImpl) .thenRun(() -> unAckedMessageTracker.remove(topicMessageId)); } @@ -473,31 +473,34 @@ protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + for (MessageId messageId : messageIdList) { + if (!(messageId instanceof TopicMessageId)) { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "Only TopicMessageId is allowed to acknowledge for a multi-topics consumer, while messageId is " + + messageId.getClass().getName())); + } + } List> resultFutures = new ArrayList<>(); if (ackType == AckType.Cumulative) { messageIdList.forEach(messageId -> resultFutures.add(doAcknowledge(messageId, ackType, properties, txn))); - return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); } else { if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } Map> topicToMessageIdMap = new HashMap<>(); for (MessageId messageId : messageIdList) { - if (!(messageId instanceof TopicMessageIdImpl)) { - return FutureUtil.failedFuture( - new IllegalArgumentException("messageId is not instance of TopicMessageIdImpl")); - } - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; - topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>()); - topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId()); + TopicMessageId topicMessageId = (TopicMessageId) messageId; + topicToMessageIdMap.putIfAbsent(topicMessageId.getOwnerTopic(), new ArrayList<>()); + topicToMessageIdMap.get(topicMessageId.getOwnerTopic()) + .add(MessageIdImpl.convertToMessageIdImpl(topicMessageId)); } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); }); - return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); } + return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); } @Override @@ -510,21 +513,25 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return FutureUtil.failedFuture(new PulsarClientException .InvalidMessageException("Cannot handle message with null messageId")); } - checkArgument(messageId instanceof TopicMessageIdImpl); - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + if (!(messageId instanceof TopicMessageId)) { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "Only TopicMessageId is allowed for reconsumeLater for a multi-topics consumer, while messageId is " + + message.getClass().getName())); + } + TopicMessageId topicMessageId = (TopicMessageId) messageId; if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); + Consumer individualConsumer = consumers.get(topicMessageId.getOwnerTopic()); if (individualConsumer != null) { return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit); } else { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { - ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); + ConsumerImpl consumer = consumers.get(topicMessageId.getOwnerTopic()); return consumer.doReconsumeLater(message, ackType, customProperties, delayTime, unit) .thenRun(() ->unAckedMessageTracker.remove(topicMessageId)); } @@ -532,20 +539,18 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a @Override public void negativeAcknowledge(MessageId messageId) { - checkArgument(messageId instanceof TopicMessageIdImpl); - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + checkArgument(messageId instanceof TopicMessageId); + TopicMessageId topicMessageId = (TopicMessageId) messageId; - ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); - consumer.negativeAcknowledge(topicMessageId.getInnerMessageId()); + ConsumerImpl consumer = consumers.get(topicMessageId.getOwnerTopic()); + consumer.negativeAcknowledge(MessageIdImpl.convertToMessageIdImpl(topicMessageId)); } @Override public void negativeAcknowledge(Message message) { MessageId messageId = message.getMessageId(); - checkArgument(messageId instanceof TopicMessageIdImpl); - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; - - ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); + checkArgument(messageId instanceof TopicMessageId); + ConsumerImpl consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(message); } @@ -680,7 +685,9 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { return; } - checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl); + for (MessageId messageId : messageIds) { + checkArgument(messageId instanceof TopicMessageId); + } if (conf.getSubscriptionType() != SubscriptionType.Shared && conf.getSubscriptionType() != SubscriptionType.Key_Shared) { @@ -689,12 +696,12 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { return; } removeExpiredMessagesFromQueue(messageIds); - messageIds.stream().map(messageId -> (TopicMessageIdImpl) messageId) - .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) + messageIds.stream().map(messageId -> (TopicMessageId) messageId) + .collect(Collectors.groupingBy(TopicMessageId::getOwnerTopic, Collectors.toSet())) .forEach((topicName, messageIds1) -> consumers.get(topicName) .redeliverUnacknowledgedMessages(messageIds1.stream() - .map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet()))); + .map(MessageIdImpl::convertToMessageIdImpl).collect(Collectors.toSet()))); resumeReceivingFromPausedConsumersIfNeeded(); } @@ -748,19 +755,35 @@ public CompletableFuture seekAsync(Function function) { @Override public CompletableFuture seekAsync(MessageId messageId) { - MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId); - if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) { + final Consumer internalConsumer; + if (messageId instanceof TopicMessageId) { + TopicMessageId topicMessageId = (TopicMessageId) messageId; + internalConsumer = consumers.get(topicMessageId.getOwnerTopic()); + if (internalConsumer == null) { + return FutureUtil.failedFuture(new PulsarClientException.NotAllowedException( + "The owner topic " + topicMessageId.getOwnerTopic() + " is not subscribed")); + } + } else { + internalConsumer = null; + } + if (internalConsumer == null && isIllegalMultiTopicsMessageId(messageId)) { return FutureUtil.failedFuture( new PulsarClientException("Illegal messageId, messageId can only be earliest/latest") ); } - List> futures = new ArrayList<>(consumers.size()); - consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId))); + + final CompletableFuture seekFuture; + if (internalConsumer == null) { + List> futures = new ArrayList<>(consumers.size()); + consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageId))); + seekFuture = FutureUtil.waitForAll(futures); + } else { + seekFuture = internalConsumer.seekAsync(messageId); + } unAckedMessageTracker.clear(); clearIncomingMessages(); - - return FutureUtil.waitForAll(futures); + return seekFuture; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 8680f0f0e6c73..70d57db3bb691 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -95,10 +95,7 @@ public synchronized void add(Message message) { } private synchronized void add(MessageId messageId, int redeliveryCount) { - if (messageId instanceof TopicMessageIdImpl) { - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; - messageId = topicMessageId.getInnerMessageId(); - } + messageId = MessageIdImpl.convertToMessageIdImpl(messageId); if (messageId instanceof BatchMessageIdImpl) { BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index c20960950d54e..941f18cf65a2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -19,8 +19,9 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; -public class TopicMessageIdImpl implements MessageId { +public class TopicMessageIdImpl implements TopicMessageId { /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ private final String topicPartitionName; @@ -37,6 +38,7 @@ public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId * Get the topic name without partition part of this message. * @return the name of the topic on which this message was published */ + @Deprecated public String getTopicName() { return this.topicName; } @@ -45,6 +47,7 @@ public String getTopicName() { * Get the topic name which contains partition part for this message. * @return the topic name which contains Partition part */ + @Deprecated public String getTopicPartitionName() { return this.topicPartitionName; } @@ -77,4 +80,9 @@ public boolean equals(Object obj) { public int compareTo(MessageId o) { return messageId.compareTo(o); } + + @Override + public String getOwnerTopic() { + return topicPartitionName; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index f6c33cc930ff6..c3fcb0a16a383 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -36,14 +36,13 @@ public class TopicMessageImpl implements Message { final ConsumerImpl receivedByconsumer; TopicMessageImpl(String topicPartitionName, - String topicName, Message msg, ConsumerImpl receivedByConsumer) { this.topicPartitionName = topicPartitionName; this.receivedByconsumer = receivedByConsumer; this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, topicPartitionName, msg.getMessageId()); } /** @@ -59,6 +58,7 @@ public String getTopicName() { * Get the topic name which contains partition part for this message. * @return the topic name which contains Partition part */ + @Deprecated public String getTopicPartitionName() { return topicPartitionName; } @@ -68,8 +68,9 @@ public MessageId getMessageId() { return messageId; } + @Deprecated public MessageId getInnerMessageId() { - return messageId.getInnerMessageId(); + return MessageIdImpl.convertToMessageIdImpl(messageId); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java index 907e6109e196b..823dd4ad5f488 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map.Entry; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; public class UnAckedTopicMessageRedeliveryTracker extends UnAckedMessageRedeliveryTracker { @@ -41,8 +42,8 @@ public int removeTopicMessages(String topicName) { Entry> entry = iterator.next(); UnackMessageIdWrapper messageIdWrapper = entry.getKey(); MessageId messageId = messageIdWrapper.getMessageId(); - if (messageId instanceof TopicMessageIdImpl - && ((TopicMessageIdImpl) messageId).getTopicPartitionName().contains(topicName)) { + if (messageId instanceof TopicMessageId + && ((TopicMessageId) messageId).getOwnerTopic().contains(topicName)) { HashSet exist = redeliveryMessageIdPartitionMap.get(messageIdWrapper); entry.getValue().remove(messageIdWrapper); iterator.remove(); @@ -54,8 +55,8 @@ public int removeTopicMessages(String topicName) { Iterator iteratorAckTimeOut = ackTimeoutMessages.keySet().iterator(); while (iterator.hasNext()) { MessageId messageId = iteratorAckTimeOut.next(); - if (messageId instanceof TopicMessageIdImpl - && ((TopicMessageIdImpl) messageId).getTopicPartitionName().contains(topicName)) { + if (messageId instanceof TopicMessageId + && ((TopicMessageId) messageId).getOwnerTopic().contains(topicName)) { iterator.remove(); removed++; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java index e8e80c5e69025..1cbab5844046f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map.Entry; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; public class UnAckedTopicMessageTracker extends UnAckedMessageTracker { @@ -39,8 +40,8 @@ public int removeTopicMessages(String topicName) { while (iterator.hasNext()) { Entry> entry = iterator.next(); MessageId messageId = entry.getKey(); - if (messageId instanceof TopicMessageIdImpl - && ((TopicMessageIdImpl) messageId).getTopicPartitionName().contains(topicName)) { + if (messageId instanceof TopicMessageId + && ((TopicMessageId) messageId).getOwnerTopic().contains(topicName)) { entry.getValue().remove(messageId); iterator.remove(); removed++; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java index ad56df918c051..feb4539971a97 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java @@ -63,7 +63,7 @@ public void testTopicMessageImplReplicatedInfo() { ByteBuffer payload = ByteBuffer.wrap(new byte[0]); MessageImpl msg = MessageImpl.create(builder, payload, Schema.BYTES, null); msg.setMessageId(new MessageIdImpl(-1, -1, -1)); - TopicMessageImpl topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null); + TopicMessageImpl topicMessage = new TopicMessageImpl<>(topicName, msg, null); assertTrue(topicMessage.isReplicated()); assertEquals(msg.getReplicatedFrom(), from); @@ -76,7 +76,7 @@ public void testTopicMessageImplNoReplicatedInfo() { ByteBuffer payload = ByteBuffer.wrap(new byte[0]); MessageImpl msg = MessageImpl.create(builder, payload, Schema.BYTES, null); msg.setMessageId(new MessageIdImpl(-1, -1, -1)); - TopicMessageImpl topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null); + TopicMessageImpl topicMessage = new TopicMessageImpl<>(topicName, msg, null); assertFalse(topicMessage.isReplicated()); assertNull(topicMessage.getReplicatedFrom()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 96f6f4708a7f1..bda99a39478a3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -47,7 +47,6 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.nar.NarClassLoader; @@ -315,9 +314,7 @@ public static String getFullyQualifiedInstanceId(String tenant, String namespace } public static final long getSequenceId(MessageId messageId) { - MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) - ? ((TopicMessageIdImpl) messageId).getInnerMessageId() - : messageId); + MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(messageId); long ledgerId = msgId.getLedgerId(); long entryId = msgId.getEntryId(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 1aa99722512ba..579b423339911 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; @@ -292,8 +293,8 @@ private void checkResumeReceive() { private void handleAck(ConsumerCommand command) throws IOException { // We should have received an ack - MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), - topic.toString()); + TopicMessageId msgId = TopicMessageId.create(topic.toString(), + MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); if (log.isDebugEnabled()) { log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 0d523ff56eae9..76f09675245d1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -40,8 +40,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -219,7 +219,7 @@ private void testSubscriptionInitialPosition(int numTopics) throws Exception { Message m = consumer.receive(); int topicIdx; if (numTopics > 1) { - String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName(); + String topic = ((TopicMessageId) m.getMessageId()).getOwnerTopic(); String[] topicParts = StringUtils.split(topic, '-'); topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]); From 421064617f203721c43c11f8529b4eefacd10ffa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 30 Jan 2023 16:20:18 +0800 Subject: [PATCH 2/3] Add TopicMessageIdSerDes --- .../client/api/TopicMessageIdSerDes.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageIdSerDes.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageIdSerDes.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageIdSerDes.java new file mode 100644 index 0000000000000..8e1d7dd92daec --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageIdSerDes.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * To keep the backward compatibility, {@link TopicMessageId#toByteArray()} should not serialize the owner topic. This + * class provides a convenient way for users to serialize a TopicMessageId with its owner topic serialized. + *

    + * The format is: + * 1. 4 bytes that represent the length of the serialized topic name. + * 2. N bytes that represent the UTF-8 serialized topic name. + * 3. Serialized bytes from {@link MessageId#toByteArray()}. + */ +public class TopicMessageIdSerDes { + + public static byte[] serialize(TopicMessageId topicMessageId) { + final byte[] topicBytes = topicMessageId.getOwnerTopic().getBytes(StandardCharsets.UTF_8); + final byte[] messageIdBytes = topicMessageId.toByteArray(); + + int topicLength = topicBytes.length; + final byte[] serialized = new byte[4 + topicLength + messageIdBytes.length]; + serialized[0] = (byte) (topicLength >>> 24); + serialized[1] = (byte) (topicLength >>> 16); + serialized[2] = (byte) (topicLength >>> 8); + serialized[3] = (byte) topicLength; + + System.arraycopy(topicBytes, 0, serialized, 4, topicLength); + System.arraycopy(messageIdBytes, 0, serialized, 4 + topicLength, messageIdBytes.length); + return serialized; + } + + public static TopicMessageId deserialize(byte[] bytes) throws IOException { + if (bytes.length < 4) { + throw new IOException("No length field"); + } + int topicLength = 0; + for (int i = 0; i < 4; i++) { + topicLength <<= 8; + topicLength |= bytes[i] & 0xFF; + } + if (bytes.length < 4 + topicLength) { + throw new IOException("Read topic length " + topicLength + ", while there is only " + (bytes.length - 4) + + " bytes remained"); + } + + final byte[] topicBytes = new byte[topicLength]; + System.arraycopy(bytes, 4, topicBytes, 0, topicLength); + final String topic = new String(topicBytes, StandardCharsets.UTF_8); + + final byte[] messageIdBytes = new byte[bytes.length - 4 - topicLength]; + System.arraycopy(bytes, 4 + topicLength, messageIdBytes, 0, messageIdBytes.length); + final MessageId messageId = MessageId.fromByteArray(messageIdBytes); + + return TopicMessageId.create(topic, messageId); + } +} From 81961107c0d5490427634f1e62380425084b9c09 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 30 Jan 2023 16:41:03 +0800 Subject: [PATCH 3/3] Add tests --- .../impl/MessageIdSerializationTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java index 7f029635241de..35eca790e54e7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -21,6 +21,8 @@ import static org.testng.Assert.assertEquals; import java.io.IOException; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.client.api.TopicMessageIdSerDes; import org.testng.annotations.Test; public class MessageIdSerializationTest { @@ -59,4 +61,27 @@ public void testProtobufSerializationNull() throws Exception { void testProtobufSerializationEmpty() throws Exception { MessageId.fromByteArray(new byte[0]); } + + @Test + public void testTopicMessageIdSerDes() throws Exception { + String topic = "persistent://public/default/my-topic"; + MessageId id1 = new MessageIdImpl(1L, 2L, 3); + MessageId id2 = new BatchMessageIdImpl(1L, 2L, 3, 4); + + TopicMessageId topicMessageId1 = convert(topic, id1); + assertEquals(topicMessageId1.getOwnerTopic(), topic); + assertEquals(topicMessageId1.toByteArray(), id1.toByteArray()); + + TopicMessageId topicMessageId2 = convert(topic, id2); + assertEquals(topicMessageId2.getOwnerTopic(), topic); + assertEquals(topicMessageId2.toByteArray(), id2.toByteArray()); + } + + private static TopicMessageId convert(String topic, MessageId messageId) throws IOException { + return TopicMessageIdSerDes.deserialize( + TopicMessageIdSerDes.serialize( + TopicMessageId.create(topic, messageId) + ) + ); + } }