Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-224: Introduce TopicMessageId for consumer's MessageId related APIs #18616

Open
BewareMyPower opened this issue Nov 24, 2022 · 8 comments
Open

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Nov 24, 2022

Motivation

There are two ways to get a MessageId from consumer:

  • Message#getMessageId from a received message
  • Consumer#getLastMessageId

The returned MessageId can be used in seek and acknowledge (including acknowledgeCumulative). A MessageId represents the position of a message. For a consumer that subscribes a single topic, things are usually right because the MessageId from a received message or returned by getLastMessageId always belongs to the topic. However, it gets different for a consumer that subscribes multiple topics (let's say multi-topics consumer).

For a multi-topics consumer:

  • getLastMessageId returns a MultiMessageIdImpl. However, it's a meaningless implementation. It maintains a map that maps topic name to a MessageId. It's not comparable and serializable. And it's very weird to compare a MessageId and a MultiMessageIdImpl that represents multiple messages' positions.
  • seek only accepts an earliest or latest position. Maybe it's because there is no way to know which topic the message belongs to.
  • acknowledge only accepts a TopicMessageIdImpl implementation with a getTopicPartitionName() method to know which topic the message belongs to. However, when the topic is not subscribed by the consumer, the behavior is undefined. The current behaviors are:
    • If the topic is not subscribed, no error is returned.
    • If the MessageId is not a TopicMessageIdImpl, IllegalArgumentException will be thrown.

Goal

This proposal will introduce a TopicMessageId interface that exposes a method to get a message's owner topic. Therefore, we can make these behaviors more clear with TopicMessageId.

API Changes

/**
 * The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
 *
 * <p>
 * It's guaranteed that {@link Message#getMessageId()} must return a TopicMessageId instance if the Message is received
 * from a consumer that subscribes multiple topics or partitioned topics.
 * The topic name used in APIs related to this class like `getOwnerTopic` and `create` must be the full topic name. For
 * example, "my-topic" is invalid while "persistent://public/default/my-topic" is valid.
 * If the topic is a partitioned topic, the topic name should be the name of the specific partition, e.g.
 * "persistent://public/default/my-topic-partition-0".
 * </p>
 */
public interface TopicMessageId extends MessageId {

    /**
     * Return the owner topic name of a message.
     *
     * @return the owner topic
     */
    String getOwnerTopic();

    static TopicMessageId create(String topic, MessageId messageId) {
        if (messageId instanceof TopicMessageId) {
            return (TopicMessageId) messageId;
        }
        return new TopicMessageId() {

            @Override
            public String getOwnerTopic() {
                return topic;
            }

            @Override
            public byte[] toByteArray() {
                return messageId.toByteArray();
            }

            @Override
            public int compareTo(MessageId o) {
                return messageId.compareTo(o);
            }
        };
    }
}

The create method can be used to replace the TopicMessageIdImpl implementation. Once we have a MessageId returned by Message#getMessageId(), we can use TopicMessageId.create(topic, msgId) to have a message id to seek or acknowledge.

NOTE

This create method is actually not required for acknowledgment. Because the MessageId in the received message should be the correct implementation. So the previous code could work without explicitly using the new API.

It's only used for new seek APIs because they accept a TopicMessageId.

Add a TopicMessageIdSerDes class to serialize and deserialize a TopicMessageId:

/**
 * To keep the backward compatibility, {@link TopicMessageId#toByteArray()} should not serialize the owner topic. This
 * class provides a convenient way for users to serialize a TopicMessageId with its owner topic serialized.
 */
class TopicMessageIdSerDes {
    public static byte[] serialize(TopicMessageId topicMessageId) {/* ... */}
    public static TopicMessageId deserialize(byte[] bytes) {/* ... */}
}

Here we don't override the toByteArray() method for backward compatibility because there might be existing code like:

var msg = multiTopicsConsumer.receive();
// msg.getMessageId() is a TopicMessageId
serialize(multiTopicsConsumer.getTopic(), msg.getMessageId().toByteArray());

This interface doesn't add any acknowledge overload because the overloads are already too many. But it will make the behavior clear.

/**
 * ...
 * @throws PulsarClientException.NotAllowedException if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed.
 */
void acknowledge(MessageId messageId) throws PulsarClientException;

// NOTE: the same goes for acknowledgeCumulative

/**
 * ...
 * @throws PulsarClientException.NotAllowedException if any message id in the list is not a {@link TopicMessageId} when multiple topics are subscribed.
 */
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

For seek operations, the semantics will be modified when the argument is a TopicMessageId:

    /**
     * ...
     * <p>Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen
     * on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you
     * can only seek to the earliest or latest message.
     * ...
     */
    void seek(MessageId messageId) throws PulsarClientException;

i.e. for a multi-topics consumer, we can now seek the message id of a received message without any code change:

var msgId = multiTopicsConsumer.receive().getMessageId(); // it's a TopicMessageId
consumer.seek(msgId); // seek(TopicMessageId) will be called, before this PIP an exception would be thrown

It's because the MessageId of a message received from a multi-topics consumer is a TopicMessageId that already contains the correct topic name.

For those want to convert a MessageId, which might not be a TopicMessageId, to a TopicMessageId if they know the topic name, they can perform the conversion explicitly like:

consumer.seek(TopicMessageId.create(msg.getTopicName(), msgId);

It's used when users want to seek a MessageId returned by Producer#send. In this case, the MessageId is not a TopicMessageId and we have to pass the topic name to tell the multi-topics consumer which topic it belongs to.

For getLastMessageAsync method, change the semantics to disallow it on a multi-topics consumer. Instead, add these two APIs:

// NOTE: it's guaranteed that no duplicated owner topics should be returned
List<TopicMessageId> getLastMessageIds() throws PulsarClientException;
CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync();

Implementation

The main changes are removing the TopicMessageIdImpl and MultiTopicMessageIdImpl. Then add the ability to seek or acknowledge a specific topic with a MessageId on a multi-topics consumer. Much code and logic could be reused.

Alternatives

https://lists.apache.org/thread/vydpjgyrfzkr45ho9r12sd2lw5c4mw6s tries to seek or acknowledge a topic and a MessageId. But it will increase many public APIs. With this proposal, new APIs are only added for the missed features (e.g. seek a MessageId other than earliest or latest on a multi-topic consumer).

Adding a seek(String, MessageId) overload is also denied for two reasons:

  1. There is only one valid topic for a given MessageId. Users have to get the correct topic name, e.g. my-topic-partition-2. Otherwise, the seek command might be sent to a wrong topic and unexpected behavior might happen.
  2. Users have to distinguish whether the consumer is a multi-topics consumer. If not, they need to use the original seek(MessageId) method. It's an inconsistency for user experience.

Adding a serialize method and a static deserialize method (like toByteArray() and fromByteArray methods of MessageId) to TopicMessageId is denied: the SerDes implementation of TopericMessageId should be unique. We should not allow other implementations.

Inheriting a Schema<TopicMessageId> is also denied: there are some other methods in Schema like getSchemaInfo that might make users confused. We only need a clear SerDes implementation of TopicMessageId.

Anything else?

See previous discussions related to the MessageId:

@BewareMyPower BewareMyPower changed the title PIP-XYZ: PIP-224: Introduce TopicMessageId for consumer's MessageId related APIs Nov 24, 2022
@codelipenghui
Copy link
Contributor

codelipenghui commented Nov 28, 2022

acknowledge only accepts a TopicMessageIdImpl implementation with a getPartitionedTopic() method to know which topic the message belongs to.

It should be TopicMessageIdImpl.getTopicPartitionName() not getPartitionedTopic()

And we also missed the topic name when serializing the TopicMessageIdImpl, so we will lose the capability to deserialize the correct message ID from bytes. Users must have their own serialization implementation with the topic name and then use the method fromByteArrayWithTopic to recover a correct message ID. This looks inconvenient to use. I'm thinking whether we can add getTopicName() to MessageID directly and the topic name will also be serialized.

@BewareMyPower
Copy link
Contributor Author

It should be TopicMessageIdImpl.getTopicPartitionName() not getPartitionedTopic()

Fixed.

I'm thinking whether we can add getTopicName() to MessageID directly and the topic name will also be serialized.

No. We should not. The topic name should only be added in seek or acknowledge to replace more overloads like void acknowledge(String topic, MessageId messageId).

The toByteArray() method could be affected. There is also a case when MessageId is returned from Producer#send. In this case, the returned MessageId should only used for serialization, e.g. write the MessageId into another topic. If we added the method directly to MessageId, to keep the backward compatibility, the implementation of getTopicName() must return null, which is not a good design.

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Nov 28, 2022

Serializing the topic name could also make it harder to interact with clients of other languages because the topic name is not described in PulsarApi.proto. To serialize a string, the charset should also be defined in the protocol.

If user want to serialize the topic name, they should define their own protocol like:

    public static byte[] serialize(String topic, MessageId messageId) {
        byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
        byte[] messageIdBytes = messageId.toByteArray();
        byte[] bytes = new byte[topicBytes.length + messageIdBytes.length];
        System.arraycopy(topicBytes, 0, bytes, 0, topicBytes.length);
        System.arraycopy(messageIdBytes, 0, bytes, topicBytes.length, messageIdBytes.length);
        return bytes;
    }

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Jan 6, 2023

I will use 3 PRs to complete this proposal.

BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Jan 7, 2023
Master Issue: apache#18616

### Motivation

Introduce `TopicMessageId` to support getting the owner topic of a
`MessageId`. When a `MessageId` is retrieved from a received message,
the owner topic will be correctly set by the client library. When it's
returned by `Producer#send`, this PR provides a `TopicMessageId#create`
method to configure the owner topic.

`acknowledge` APIs are affected only for the error cases: when a
`MessageId` other than a `TopicMessageId` is accepted on a multi-topics
consumer, `PulsarClientException.NotAllowedException` will be thrown.

The semantic of the `seek(MessageId)` API is changed. Now if a
`TopicMessageId` is accepted on a multi-topics consumer, the seek
behavior will happen on the internal consumer of the owner topic.

### Modifications

- Add the `TopicMessageId` interface.
- In `MultiTopicsConsumerImpl#doAcknowledge`, complete the future with
  `NotAllowedException` if the argument is not a `TopicMessageId`.
- In `MultiTopicsConsumerImpl#seekAsync`, when the argument is a
  `TopicMessageId`, find the internal consumer according to the owner
  topic and pass the argument to it if it exists.
- In `ConsumerImpl#seekAsync`, get the inner message ID of the
  `TopicMessageId` so that now a single-topic consumer can also accept a
  `TopicMessageId` to seek.

Besides the main modifications above, this patch does some refactorings
to avoid direct access to `TopicMessageIdImpl`:
- Deprecated `getTopicName` method by trimming the partition suffix of
  the owner topic in `getOriginTopicNameStr`.
- Deprecated `getTopicPartitionName` by `getOwnerTopic`.
- `getInnerMessageId` cannot be deprecated because we still need to
  convert `TopicMessageId` to `MessageIdImpl` in many cases (because we
  cannot get the fields like ledger id). Instead of deprecating it,
  use `MessageIdImpl.convertToMessageIdImpl` to replace it.
- In `convertToMessageIdImpl`, for a customized `TopicMessageId`
  implementation, use serialization and deserialization to get the
  `MessageIdImpl` object.

### Verifications

Add the following tests to `MultiTopicsConsumerTest`:
- `testAcknowledgeWrongMessageId`: verify the correct exceptions are
  thrown in `acknowledge` APIs
- `testSeekCustomTopicMessageId`: verify the new seek semantics for a
  `TopicMessageId`, including the existing `TopicMessageIdImpl` and the
  customized implementation by `TopicMessageId#create`

### TODO
- Add a standard SerDes class for `TopicMessageId`
- Apply `TopicMessageId` into `getLastMessageId` related APIs.
- Deprecate the `getInnerMessageId` after PIP-229 is approved.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Jan 8, 2023
Master Issue: apache#18616

### Motivation

Introduce `TopicMessageId` to support getting the owner topic of a
`MessageId`. When a `MessageId` is retrieved from a received message,
the owner topic will be correctly set by the client library. When it's
returned by `Producer#send`, this PR provides a `TopicMessageId#create`
method to configure the owner topic.

`acknowledge` APIs are affected only for the error cases: when a
`MessageId` other than a `TopicMessageId` is accepted on a multi-topics
consumer, `PulsarClientException.NotAllowedException` will be thrown.

The semantic of the `seek(MessageId)` API is changed. Now if a
`TopicMessageId` is accepted on a multi-topics consumer, the seek
behavior will happen on the internal consumer of the owner topic.

### Modifications

- Add the `TopicMessageId` interface.
- In `MultiTopicsConsumerImpl#doAcknowledge`, complete the future with
  `NotAllowedException` if the argument is not a `TopicMessageId`.
- In `MultiTopicsConsumerImpl#seekAsync`, when the argument is a
  `TopicMessageId`, find the internal consumer according to the owner
  topic and pass the argument to it if it exists.
- In `ConsumerImpl#seekAsync`, get the inner message ID of the
  `TopicMessageId` so that now a single-topic consumer can also accept a
  `TopicMessageId` to seek.

Besides the main modifications above, this patch does some refactorings
to avoid direct access to `TopicMessageIdImpl`:
- Deprecated `getTopicName` method by trimming the partition suffix of
  the owner topic in `getOriginTopicNameStr`.
- Deprecated `getTopicPartitionName` by `getOwnerTopic`.
- `getInnerMessageId` cannot be deprecated because we still need to
  convert `TopicMessageId` to `MessageIdImpl` in many cases (because we
  cannot get the fields like ledger id). Instead of deprecating it,
  use `MessageIdImpl.convertToMessageIdImpl` to replace it.
- In `convertToMessageIdImpl`, for a customized `TopicMessageId`
  implementation, use serialization and deserialization to get the
  `MessageIdImpl` object.

### Verifications

Add the following tests to `MultiTopicsConsumerTest`:
- `testAcknowledgeWrongMessageId`: verify the correct exceptions are
  thrown in `acknowledge` APIs
- `testSeekCustomTopicMessageId`: verify the new seek semantics for a
  `TopicMessageId`, including the existing `TopicMessageIdImpl` and the
  customized implementation by `TopicMessageId#create`

### TODO
- Add a standard SerDes class for `TopicMessageId`
- Apply `TopicMessageId` into `getLastMessageId` related APIs.
- Deprecate the `getInnerMessageId` after PIP-229 is approved.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Jan 16, 2023
Master Issue: apache#18616

### Motivation

Introduce `TopicMessageId` to support getting the owner topic of a
`MessageId`. When a `MessageId` is retrieved from a received message,
the owner topic will be correctly set by the client library. When it's
returned by `Producer#send`, this PR provides a `TopicMessageId#create`
method to configure the owner topic.

`acknowledge` APIs are affected only for the error cases: when a
`MessageId` other than a `TopicMessageId` is accepted on a multi-topics
consumer, `PulsarClientException.NotAllowedException` will be thrown.

The semantic of the `seek(MessageId)` API is changed. Now if a
`TopicMessageId` is accepted on a multi-topics consumer, the seek
behavior will happen on the internal consumer of the owner topic.

### Modifications

- Add the `TopicMessageId` interface.
- In `MultiTopicsConsumerImpl#doAcknowledge`, complete the future with
  `NotAllowedException` if the argument is not a `TopicMessageId`.
- In `MultiTopicsConsumerImpl#seekAsync`, when the argument is a
  `TopicMessageId`, find the internal consumer according to the owner
  topic and pass the argument to it if it exists.
- In `ConsumerImpl#seekAsync`, get the inner message ID of the
  `TopicMessageId` so that now a single-topic consumer can also accept a
  `TopicMessageId` to seek.

Besides the main modifications above, this patch does some refactorings
to avoid direct access to `TopicMessageIdImpl`:
- Deprecated `getTopicName` method by trimming the partition suffix of
  the owner topic in `getOriginTopicNameStr`.
- Deprecated `getTopicPartitionName` by `getOwnerTopic`.
- `getInnerMessageId` cannot be deprecated because we still need to
  convert `TopicMessageId` to `MessageIdImpl` in many cases (because we
  cannot get the fields like ledger id). Instead of deprecating it,
  use `MessageIdImpl.convertToMessageIdImpl` to replace it.
- In `convertToMessageIdImpl`, for a customized `TopicMessageId`
  implementation, use serialization and deserialization to get the
  `MessageIdImpl` object.

### Verifications

Add the following tests to `MultiTopicsConsumerTest`:
- `testAcknowledgeWrongMessageId`: verify the correct exceptions are
  thrown in `acknowledge` APIs
- `testSeekCustomTopicMessageId`: verify the new seek semantics for a
  `TopicMessageId`, including the existing `TopicMessageIdImpl` and the
  customized implementation by `TopicMessageId#create`

### TODO
- Add a standard SerDes class for `TopicMessageId`
- Apply `TopicMessageId` into `getLastMessageId` related APIs.
- Deprecate the `getInnerMessageId` after PIP-229 is approved.
@github-actions
Copy link

github-actions bot commented Mar 2, 2023

The issue had no activity for 30 days, mark with Stale label.

@rdhabalia
Copy link
Contributor

@BewareMyPower : serializing and deserializing is expensive and on top of that having different APIs for different use cases is creating a really bad experience for users, and I strongly feel we should avoid such APIs and complexity if things can be solved with a simple straight forward change with the same API and without creating the bad user experience.
I think we should consider this simple change without costing performance and API incompatibility and confusing usage to the users. so, we should go with this obvious and straightforward change PR: #19944 as a lot of configurations and APIs are a huge pain point for Pulsar users already and I want to avoid doing it more and more in the Pulsar community.

BewareMyPower added a commit to BewareMyPower/pulsar that referenced this issue Apr 7, 2023
…LastMessageId

Master Issue: apache#18616

Fixes apache#4940

NOTE: This implementation is different from the original design of
PIP-224 that the method name is `getLastMessageIds` instead of
`getLastTopicMessageId`.

### Motivation

When a multi-topics consumer calls `getLastMessageId`, a
`MultiMessageIdImpl` instance will be returned. It contains a map of the
topic name and the latest message id of the topic. However, the
`MultiMessageIdImpl` cannot be used in any place of the API that accepts
a `MessageId` because all methods of the `MessageId` interface are not
implemented, including `compareTo` and `toByteArray`.

Therefore, users cannot do anything on such a `MessageId` implementation
except casting `MessageId` to `MultiMessageIdImpl` and get the internal
map.

### Modifications

- Throw an exception when calling `getLastMessageId` on a multi-topics
  consumer instead of returning a `MultiMessageIdImpl`.
- Remove the `MultiMessageIdImpl` implementation and its related tests.
- Add the `getLastMessageIds` methods to `Consumer`. It returns a list
  of `TopicMessageId` instances, each of them represents the last
  message id of the owner topic.
- Mark the `getLastMessageId` API as deprecated.

### Verifications

- Modify the `TopicsConsumerImplTest#testGetLastMessageId` to test the
  `getLastMessageIds` for a multi-topics consumer.
- Modify the `TopicReaderTest#testHasMessageAvailable` to test the
  `getLastMessageIds` for a single topic consumer.
@shibd
Copy link
Member

shibd commented Apr 10, 2023

@BewareMyPower : serializing and deserializing is expensive and on top of that having different APIs for different use cases is creating a really bad experience for users, and I strongly feel we should avoid such APIs and complexity if things can be solved with a simple straight forward change with the same API and without creating the bad user experience. I think we should consider this simple change without costing performance and API incompatibility and confusing usage to the users. so, we should go with this obvious and straightforward change PR: #19944 as a lot of configurations and APIs are a huge pain point for Pulsar users already and I want to avoid doing it more and more in the Pulsar community.

I think it is necessary to introduce TopicMessageId interface.

If users know topicName, ledgerId, entryId and partitionIndex and if he would create MessageId to use acknowledge, It can be constructed in the following way:

MessageId topicMsgId = TopicMessageId.create(topicName, 

      DefaultImplementation.getDefaultImplementation().newMessageId(ledgerId, entryId, partitionIndex));

consumer.acknowledge(topicMsgId);

Without TopicMessageId interface, I'm not sure there is another way to construct it?

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label May 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants