Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][client] PIP-224 Part 2: Add TopicMessageIdSerDes #20

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}

Expand All @@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,54 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MultiTopicsConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerTest.class);
private ScheduledExecutorService internalExecutorServiceDelegate;

@BeforeMethod(alwaysRun = true)
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -231,4 +242,113 @@ public void testBatchReceiveAckTimeout()
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}

@Test(timeOut = 30000)
public void testAcknowledgeWrongMessageId() throws Exception {
final var topic1 = newTopicName();
final var topic2 = newTopicName();

@Cleanup final var singleTopicConsumer = pulsarClient.newConsumer()
.topic(topic1)
.subscriptionName("sub-1")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(singleTopicConsumer instanceof ConsumerImpl);

@Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer()
.topics(List.of(topic1, topic2))
.subscriptionName("sub-2")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);

@Cleanup final var producer = pulsarClient.newProducer().topic(topic1).create();
final var nonTopicMessageIds = new ArrayList<MessageId>();
nonTopicMessageIds.add(producer.send(new byte[]{ 0x00 }));
nonTopicMessageIds.add(singleTopicConsumer.receive().getMessageId());

// Multi-topics consumers can only acknowledge TopicMessageId, otherwise NotAllowedException will be thrown
for (var msgId : nonTopicMessageIds) {
assertFalse(msgId instanceof TopicMessageId);
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledge(msgId));
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledge(Collections.singletonList(msgId)));
Assert.assertThrows(PulsarClientException.NotAllowedException.class,
() -> multiTopicsConsumer.acknowledgeCumulative(msgId));
}

// Single-topic consumer can acknowledge TopicMessageId
final var topicMessageId = multiTopicsConsumer.receive().getMessageId();
assertTrue(topicMessageId instanceof TopicMessageId);
assertFalse(topicMessageId instanceof MessageIdImpl);
singleTopicConsumer.acknowledge(topicMessageId);
}

@DataProvider
public static Object[][] messageIdFromProducer() {
return new Object[][] { { true }, { false } };
}

@Test(timeOut = 30000, dataProvider = "messageIdFromProducer")
public void testSeekCustomTopicMessageId(boolean messageIdFromProducer) throws Exception {
final var topic = TopicName.get(newTopicName()).toString();
final var numPartitions = 3;
admin.topics().createPartitionedTopic(topic, numPartitions);

@Cleanup final var producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.messageRouter(new MessageRouter() {
int index = 0;
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return index++ % metadata.numPartitions();
}
})
.create();
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
.subscriptionName("sub").subscribe();
assertTrue(consumer instanceof MultiTopicsConsumerImpl);

final var msgIds = new HashMap<String, List<MessageId>>();
final var numMessagesPerPartition = 10;
final var numMessages = numPartitions * numMessagesPerPartition;
for (int i = 0; i < numMessages; i++) {
var msgId = (MessageIdImpl) producer.send(i);
if (messageIdFromProducer) {
msgIds.computeIfAbsent(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + msgId.getPartitionIndex(),
__ -> new ArrayList<>()).add(msgId);
} else {
var topicMessageId = (TopicMessageId) consumer.receive().getMessageId();
msgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId);
}
}

final var partitions = IntStream.range(0, numPartitions)
.mapToObj(i -> topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
.collect(Collectors.toSet());
assertEquals(msgIds.keySet(), partitions);

for (var partition : partitions) {
final var msgIdList = msgIds.get(partition);
assertEquals(msgIdList.size(), numMessagesPerPartition);
if (messageIdFromProducer) {
consumer.seek(TopicMessageId.create(partition, msgIdList.get(numMessagesPerPartition / 2)));
} else {
consumer.seek(msgIdList.get(numMessagesPerPartition / 2));
}
}

var topicMsgIds = new HashMap<String, List<TopicMessageId>>();
for (int i = 0; i < ((numMessagesPerPartition / 2 - 1) * numPartitions); i++) {
TopicMessageId topicMessageId = (TopicMessageId) consumer.receive().getMessageId();
topicMsgIds.computeIfAbsent(topicMessageId.getOwnerTopic(), __ -> new ArrayList<>()).add(topicMessageId);
}
assertEquals(topicMsgIds.keySet(), partitions);
for (var partition : partitions) {
assertEquals(topicMsgIds.get(partition),
msgIds.get(partition).subList(numMessagesPerPartition / 2 + 1, numMessagesPerPartition));
}
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {

for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(), 2);
consumer1.acknowledge(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
Expand Down Expand Up @@ -166,9 +166,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls

for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
Expand Down Expand Up @@ -292,7 +293,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
.subscribe();

MessageId messageId = new MessageIdImpl(3, 1, 0);
TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId);
TopicMessageId topicMessageId = TopicMessageId.create("topic-1", messageId);
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(3, 1, 0, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,9 @@ CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
* <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
* </ul>
*
* <p>Note: For multi-topics consumer, you can only seek to the earliest or latest message.
* <p>Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen
* on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you
* can only seek to the earliest or latest message for all topics subscribed.
*
* @param messageId
* the message id where to reposition the subscription
Expand Down Expand Up @@ -519,19 +521,7 @@ CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message,
CompletableFuture<Void> seekAsync(Function<String, Object> function);

/**
* Reset the subscription associated with this consumer to a specific message id.
*
* <p>The message id can either be a specific message or represent the first or last messages in the topic.
* <ul>
* <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
* <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
* </ul>
*
* <p>Note: For multi-topics consumer, you can only seek to the earliest or latest message.
*
* @param messageId
* the message id where to reposition the subscription
* @return a future to track the completion of the seek operation
* The asynchronous version of {@link Consumer#seek(MessageId)}.
*/
CompletableFuture<Void> seekAsync(MessageId messageId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface MessageAcknowledger {
*
* @throws PulsarClientException.AlreadyClosedException}
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledge(MessageId messageId) throws PulsarClientException;

Expand All @@ -59,6 +61,8 @@ default void acknowledge(Message<?> message) throws PulsarClientException {
/**
* Acknowledge the consumption of a list of message.
* @param messageIdList the list of message IDs.
* @throws PulsarClientException.NotAllowedException
* if any message id in the list is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

Expand All @@ -82,6 +86,8 @@ default void acknowledge(Messages<?> messages) throws PulsarClientException {
* The {@code MessageId} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed
*/
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;

Expand Down
Loading