Skip to content

Commit

Permalink
[improve][client] Introduce MessageIdDataInterface to avoid type asse…
Browse files Browse the repository at this point in the history
…rtion and cast
  • Loading branch information
BewareMyPower committed Dec 12, 2022
1 parent 346b04a commit ef4ce86
Show file tree
Hide file tree
Showing 32 changed files with 540 additions and 462 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -337,8 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
}

assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty());
Expand All @@ -354,8 +352,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition());
}
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
received.add(message.getMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -768,7 +768,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {

for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
Assert.assertEquals(((PulsarApiMessageId) msg.getMessageId()).getPartition(), 2);
consumer1.acknowledge(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -176,10 +177,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
messageIds.add(message.getMessageId());
}
MessageId firstEntryMessageId = messageIds.get(0);
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
MessageId secondEntryMessageId = MessageIdImpl.from((PulsarApiMessageId) messageIds.get(1));
// Verify messages 2 to N must be in the same entry
for (int i = 2; i < messageIds.size(); i++) {
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
assertEquals(MessageIdImpl.from((PulsarApiMessageId) messageIds.get(i)), secondEntryMessageId);
}

assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down Expand Up @@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls

for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
.negativeAckRedeliveryDelay(100, TimeUnit.SECONDS)
.subscribe();

MessageId messageId = new MessageIdImpl(3, 1, 0);
PulsarApiMessageId messageId = new MessageIdImpl(3, 1, 0);
TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId);
BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.client.cli.NoSplitter;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.RelativeTimeUtil;

@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics "
Expand Down Expand Up @@ -611,12 +610,11 @@ void run() throws PulsarAdminException {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
if (msgId.isBatch()) {
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.cli.NoSplitter;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand Down Expand Up @@ -1192,12 +1193,11 @@ void run() throws PulsarAdminException {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId();
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

Expand Down Expand Up @@ -1251,12 +1251,11 @@ void run() throws PulsarAdminException {
MessageImpl message =
(MessageImpl) getTopics().examineMessage(persistentTopic, initialPosition, messagePosition);

if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
if (msgId.isBatch()) {
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

Expand Down Expand Up @@ -1310,12 +1309,11 @@ void run() throws PulsarAdminException {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId();
if (msgId.isBatch()) {
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarApiMessageId;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;

/**
Expand All @@ -31,7 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {

boolean isDuplicate(MessageId messageId);

CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
CompletableFuture<Void> addAcknowledgment(PulsarApiMessageId msgId, AckType ackType,
Map<String, Long> properties);

CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties);
Expand Down
Loading

0 comments on commit ef4ce86

Please sign in to comment.