Skip to content

Commit

Permalink
[improve][client] PIP-229: Add a common interface to get fields of Me…
Browse files Browse the repository at this point in the history
…ssageIdData

Master issue: apache#18950

### Motivation

We need a common interface to get fields of the MessageIdData. After
that, we won't need to assert a MessageId implementation is an instance
of a specific class. And we can pass our customized MessageId
implementation to APIs like `acknowledge` and `seek`.

### Modifications

- Add `MessageIdAdv` to get fields of `MessageIdData`, make all
  MessageId implementations inherit it (except `MultiMessageIdImpl`).
- Add `MessageIdAdvUtils` for the most common used methods.
- Replace `BatchMessageAcker` with the `BitSet` for ACK.
- Remove `TopicMessageIdImpl#getInnerMessageId` since a
  `TopicMessageIdImpl` can be treated as its underlying `MessageId`
  implementation now.
- Remove `instanceof BatchMessageIdImpl` checks in `pulsar-client`
  module by casting to `MessageIdAdv`.

After this refactoring, the 3rd party library will no longer need to
cast a `MessageId` to a specific implementation. It only needs to cast
`MessageId` to `MessageIdAdv`. Users can also implement their own util
class so the methods of `MessageIdAdvUtils` are all not public.

### Verifications

Add `CustomMessageIdTest` to verify a simple MessageIdAdv implementation
that only has the (ledger id, entry id, batch idx, batch size) fields
also works for seek and acknowledgment.
  • Loading branch information
BewareMyPower committed Feb 3, 2023
1 parent 31fe347 commit fdd32c0
Showing 35 changed files with 671 additions and 805 deletions.
Original file line number Diff line number Diff line change
@@ -40,11 +40,11 @@
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -336,8 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
receivedPtns.add(((MessageIdAdv) msg.getMessageId()).getPartitionIndex());
}

assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty());
@@ -353,8 +352,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
receivedPtns.add(msgId.getPartitionIndex());
receivedPtns.add(((MessageIdAdv) msg.getMessageId()).getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty());
Original file line number Diff line number Diff line change
@@ -678,7 +678,7 @@ public void testSeekByFunction() throws Exception {
if (message == null) {
break;
}
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
received.add(message.getMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class CustomMessageIdTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider
public static Object[][] enableBatching() {
return new Object[][]{
{ true },
{ false }
};
}

@Test
public void testSeek() throws Exception {
final var topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis();
@Cleanup final var producer = pulsarClient.newProducer(Schema.INT32).topic(topic).create();
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
for (int i = 0; i < 10; i++) {
msgIds.add(new SimpleMessageIdImpl((MessageIdAdv) producer.send(i)));
}
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic).subscriptionName("sub").subscribe();
consumer.seek(msgIds.get(6));
final var msg = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getValue(), 7);
}

@Test(dataProvider = "enableBatching")
public void testAcknowledgment(boolean enableBatching) throws Exception {
final var topic = "persistent://my-property/my-ns/test-ack-"
+ enableBatching + System.currentTimeMillis();
final var producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(enableBatching)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(300, TimeUnit.MILLISECONDS)
.create();
final var consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.enableBatchIndexAcknowledgment(true)
.isAckReceiptEnabled(true)
.subscribe();
for (int i = 0; i < 10; i++) {
producer.sendAsync(i);
}
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
for (int i = 0; i < 10; i++) {
final var msg = consumer.receive();
final var msgId = new SimpleMessageIdImpl((MessageIdAdv) msg.getMessageId());
msgIds.add(msgId);
if (enableBatching) {
assertTrue(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0);
} else {
assertFalse(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0);
}
}
consumer.acknowledgeCumulative(msgIds.get(8));
consumer.redeliverUnacknowledgedMessages();
final var msg = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getValue(), 9);
}

private record SimpleMessageIdImpl(long ledgerId, long entryId, int batchIndex, int batchSize)
implements MessageIdAdv {

public SimpleMessageIdImpl(MessageIdAdv msgId) {
this(msgId.getLedgerId(), msgId.getEntryId(), msgId.getBatchIndex(), msgId.getBatchSize());
}

@Override
public byte[] toByteArray() {
return new byte[0]; // never used
}

@Override
public long getLedgerId() {
return ledgerId;
}

@Override
public long getEntryId() {
return entryId;
}

@Override
public int getBatchIndex() {
return batchIndex;
}

@Override
public int getBatchSize() {
return batchSize;
}
}
}
Original file line number Diff line number Diff line change
@@ -767,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {

for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(), 2);
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
consumer1.acknowledge(msg);
}

Original file line number Diff line number Diff line change
@@ -176,10 +176,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
messageIds.add(message.getMessageId());
}
MessageId firstEntryMessageId = messageIds.get(0);
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
MessageId secondEntryMessageId = MessageIdAdvUtils.discardBatch(messageIds.get(1));
// Verify messages 2 to N must be in the same entry
for (int i = 2; i < messageIds.size(); i++) {
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
assertEquals(MessageIdAdvUtils.discardBatch(messageIds.get(i)), secondEntryMessageId);
}

assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
Original file line number Diff line number Diff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
log.info("Remaining message IDs = {}", messageIds);
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.BitSet;

/**
* The {@link MessageId} interface provided for advanced users.
* <p>
* All built-in MessageId implementations should be able to be cast to MessageIdAdv.
* </p>
*/
public interface MessageIdAdv extends MessageId {

/**
* Get the ledger ID.
*
* @return the ledger ID
*/
long getLedgerId();

/**
* Get the entry ID.
*
* @return the entry ID
*/
long getEntryId();

/**
* Get the partition index.
*
* @return -1 if the message is from a non-partitioned topic, otherwise the non-negative partition index
*/
default int getPartitionIndex() {
return -1;
}

/**
* Get the batch index.
*
* @return -1 if the message is not in a batch
*/
default int getBatchIndex() {
return -1;
}

/**
* Get the batch size.
*
* @return 0 if the message is not in a batch
*/
default int getBatchSize() {
return 0;
}

/**
* Get the BitSet that indicates which messages in the batch.
*
* @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose
* size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the
* 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0).
*
* @return null if the message is a non-batched message
*/
default BitSet getAckSet() {
return null;
}

/**
* Get the message ID of the first chunk if the current message ID represents the position of a chunked message.
*
* @implNote A chunked message is distributed across different BookKeeper entries. The message ID of a chunked
* message is composed of two message IDs that represent positions of the first and the last chunk. The message ID
* itself represents the position of the last chunk.
*
* @return null if the message is not a chunked message
*/
default MessageIdAdv getFirstChunkMessageId() {
return null;
}

/**
* The default implementation of {@link Comparable#compareTo(Object)}.
*/
default int compareTo(MessageId o) {
if (!(o instanceof MessageIdAdv)) {
throw new UnsupportedOperationException("Unknown MessageId type: "
+ ((o != null) ? o.getClass().getName() : "null"));
}
final MessageIdAdv other = (MessageIdAdv) o;
int result = Long.compare(this.getLedgerId(), other.getLedgerId());
if (result != 0) {
return result;
}
result = Long.compare(this.getEntryId(), other.getEntryId());
if (result != 0) {
return result;
}
// TODO: Correct the following compare logics, see https://github.com/apache/pulsar/pull/18981
result = Integer.compare(this.getPartitionIndex(), other.getPartitionIndex());
if (result != 0) {
return result;
}
return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
}
}
Loading

0 comments on commit fdd32c0

Please sign in to comment.