diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index b8ea87ab4016e..ad34b80287f1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -24,10 +24,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import com.google.common.collect.Lists; +import com.google.common.collect.Range; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -42,6 +45,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; + +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -351,4 +357,66 @@ public int choosePartition(Message msg, TopicMetadata metadata) { } consumer.close(); } + + /** + * It tests acking of messageId created from byte[] and validates client acks messages successfully. + * @throws Exception + */ + @Test + public void testMultiTopicAckWithByteMessageId() throws Exception { + String topicName = newTopicName(); + int numPartitions = 2; + int numMessages = 100000; + admin.topics().createPartitionedTopic(topicName, numPartitions); + + Producer[] producers = new Producer[numPartitions]; + + for (int i = 0; i < numPartitions; i++) { + producers[i] = pulsarClient.newProducer(Schema.INT64) + // produce to each partition directly so that order can be maintained in sending + .topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000) + .maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000) + .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxBytes(4 * 1024 * 1024) + .blockIfQueueFull(true).create(); + } + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT64) + // consume on the partitioned topic + .topic(topicName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .receiverQueueSize(numMessages).subscriptionName(methodName).subscribe(); + + // produce sequence numbers to each partition topic + long sequenceNumber = 1L; + for (int i = 0; i < numMessages; i++) { + for (Producer producer : producers) { + producer.newMessage().value(sequenceNumber).sendAsync(); + } + sequenceNumber++; + } + for (Producer producer : producers) { + producer.flush(); + producer.close(); + } + + // receive and validate sequences in the partitioned topic + Map receivedSequences = new HashMap<>(); + int receivedCount = 0; + while (receivedCount < numPartitions * numMessages) { + Message message = consumer.receiveAsync().get(5, TimeUnit.SECONDS); + byte[] idByte = message.getMessageId().toByteArray(); + MessageId id = MessageId.fromByteArray(idByte); + consumer.acknowledge(id); + receivedCount++; + AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(), + k -> new AtomicLong(1L)); + Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement()); + } + Assert.assertEquals(numPartitions * numMessages, receivedCount); + consumer.close(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName + "-partition-0", false).get().get(); + Range range = topic.getManagedLedger().getCursors().iterator().next().getLastIndividualDeletedRange(); + assertNull(range); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java index b70267bb0fb8b..10f286bde4d2b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java @@ -60,6 +60,10 @@ public Impl(String topic, MessageId messageId) { this.messageId = (MessageIdAdv) messageId; } + protected MessageId getMessageId() { + return messageId; + } + @Override public byte[] toByteArray() { return messageId.toByteArray(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index e9cddeb65d7f2..73f10eab33a36 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -78,6 +78,11 @@ public byte[] toByteArray() { return toByteArray(batchIndex, batchSize); } + @Override + protected byte[] toByteArray(String topic) { + return toByteArray(batchIndex, batchSize, topic); + } + @Deprecated public boolean ackIndividual() { return MessageIdAdvUtils.acknowledge(this, true); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 83ee762578390..ea6c4a154dc9f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -23,6 +23,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.util.Objects; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -93,7 +94,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException { throw new IOException(e); } - MessageIdImpl messageId; + MessageId messageId; if (idData.hasBatchIndex()) { if (idData.hasBatchSize()) { messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), @@ -112,6 +113,11 @@ public static MessageId fromByteArray(byte[] data) throws IOException { messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); } + if (idData.hasTopicName()) { + String topicName = idData.getTopicName(); + messageId = new TopicMessageIdImpl(topicName, topicName, messageId); + } + return messageId; } @@ -174,7 +180,18 @@ protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex, // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message protected byte[] toByteArray(int batchIndex, int batchSize) { + return toByteArray(batchIndex, batchSize, null); + } + + protected byte[] toByteArray(String topic) { + return toByteArray(-1, 0, topic); + } + + protected byte[] toByteArray(int batchIndex, int batchSize, String topicName) { MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize); + if (StringUtils.isNotBlank(topicName)) { + msgId.setTopicName(topicName); + } int size = msgId.getSerializedSize(); ByteBuf serialized = Unpooled.buffer(size, size); @@ -186,6 +203,6 @@ protected byte[] toByteArray(int batchIndex, int batchSize) { @Override public byte[] toByteArray() { // there is no message batch so we pass -1 - return toByteArray(-1, 0); + return toByteArray(-1, 0, null); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 189dc1c608379..2b0da33e6c80a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -62,4 +62,14 @@ public boolean equals(Object obj) { public int hashCode() { return super.hashCode(); } -} + + @Override + public byte[] toByteArray() { + MessageId id = getMessageId(); + if (id instanceof MessageIdImpl) { + return ((MessageIdImpl) id).toByteArray(-1, 0, getOwnerTopic()); + } else { + return id.toByteArray(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..ede9112ef2faa 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -62,9 +62,9 @@ message MessageIdData { optional int32 batch_index = 4 [default = -1]; repeated int64 ack_set = 5; optional int32 batch_size = 6; - // For the chunk message id, we need to specify the first chunk message id. optional MessageIdData first_chunk_message_id = 7; + optional string topicName = 8; } message KeyValue {