Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-18616: PIP-224: Introduce TopicMessageId for consumer's MessageId related APIs #5174

Open
sijie opened this issue Nov 25, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Nov 25, 2022

Original Issue: apache#18616


Motivation

There are two ways to get a MessageId from consumer:

  • Message#getMessageId from a received message
  • Consumer#getLastMessageId

The returned MessageId can be used in seek and acknowledge (including acknowledgeCumulative). A MessageId represents the position of a message. For a consumer that subscribes a single topic, things are usually right because the MessageId from a received message or returned by getLastMessageId always belongs to the topic. However, it gets different for a consumer that subscribes multiple topics (let's say multi-topics consumer).

For a multi-topics consumer:

  • getLastMessageId returns a MultiMessageIdImpl. However, it's a meaningless implementation. It maintains a map that maps topic name to a MessageId. It's not comparable and serializable. And it's very weird to compare a MessageId and a MultiMessageIdImpl that represents multiple messages' positions.
  • seek only accepts an earliest or latest position. Maybe it's because there is no way to know which topic the message belongs to.
  • acknowledge only accepts a TopicMessageIdImpl implementation with a getTopicPartitionName() method to know which topic the message belongs to. However, when the topic is not subscribed by the consumer, the behavior is undefined. The current behaviors are:
    • If the topic is not subscribed, no error is returned.
    • If the MessageId is not a TopicMessageIdImpl, IllegalArgumentException will be thrown.

Goal

This proposal will introduce a TopicMessageId interface that exposes a method to get a message's owner topic. Therefore, we can make these behaviors more clear with TopicMessageId.

API Changes

/**
 * The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
 *
 * <p>
 * It's guaranteed that {@link Message#getMessageId()} must return a TopicMessageId instance if the Message is received
 * from a consumer that subscribes multiple topics or partitioned topics.
 * The topic name used in APIs related to this class like `getOwnerTopic` and `create` must be the full topic name. For
 * example, "my-topic" is invalid while "persistent://public/default/my-topic" is valid.
 * If the topic is a partitioned topic, the topic name should be the name of the specific partition, e.g.
 * "persistent://public/default/my-topic-partition-0".
 * </p>
 */
public interface TopicMessageId extends MessageId {

    /**
     * Return the owner topic name of a message.
     *
     * @return the owner topic
     */
    String getOwnerTopic();

    static TopicMessageId create(String topic, MessageId messageId) {
        if (messageId instanceof TopicMessageId) {
            return (TopicMessageId) messageId;
        }
        return new TopicMessageId() {

            @Override
            public String getOwnerTopic() {
                return topic;
            }

            @Override
            public byte[] toByteArray() {
                return messageId.toByteArray();
            }

            @Override
            public int compareTo(MessageId o) {
                return messageId.compareTo(o);
            }
        };
    }
}

The create method can be used to replace the TopicMessageIdImpl implementation. Once we have a MessageId returned by Message#getMessageId(), we can use TopicMessageId.create(topic, msgId) to have a message id to seek or acknowledge.

NOTE

This create method is actually not required for acknowledgment. Because the MessageId in the received message should be the correct implementation. So the previous code could work without explicitly using the new API.

It's only used for new seek APIs because they accept a TopicMessageId.

Add a TopicMessageIdSerDes class to serialize and deserialize a TopicMessageId:

/**
 * To keep the backward compatibility, {@link TopicMessageId#toByteArray()} should not serialize the owner topic. This
 * class provides a convenient way for users to serialize a TopicMessageId with its owner topic serialized.
 */
class TopicMessageIdSerDes {
    public static byte[] serialize(TopicMessageId topicMessageId) {/* ... */}
    public static TopicMessageId deserialize(byte[] bytes) {/* ... */}
}

Here we don't override the toByteArray() method for backward compatibility because there might be existing code like:

var msg = multiTopicsConsumer.receive();
// msg.getMessageId() is a TopicMessageId
serialize(multiTopicsConsumer.getTopic(), msg.getMessageId().toByteArray());

This interface doesn't add any acknowledge overload because the overloads are already too many. But it will make the behavior clear.

/**
 * ...
 * @throws PulsarClientException.NotAllowedException if `messageId` is not a {@link TopicMessageId} when multiple topics are subscribed.
 */
void acknowledge(MessageId messageId) throws PulsarClientException;

// NOTE: the same goes for acknowledgeCumulative

/**
 * ...
 * @throws PulsarClientException.NotAllowedException if any message id in the list is not a {@link TopicMessageId} when multiple topics are subscribed.
 */
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

For seek operations, the semantics will be modified when the argument is a TopicMessageId:

    /**
     * ...
     * <p>Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen
     * on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you
     * can only seek to the earliest or latest message.
     * ...
     */
    void seek(MessageId messageId) throws PulsarClientException;

i.e. for a multi-topics consumer, we can now seek the message id of a received message without any code change:

var msgId = multiTopicsConsumer.receive().getMessageId(); // it's a TopicMessageId
consumer.seek(msgId); // seek(TopicMessageId) will be called, before this PIP an exception would be thrown

It's because the MessageId of a message received from a multi-topics consumer is a TopicMessageId that already contains the correct topic name.

For those want to convert a MessageId, which might not be a TopicMessageId, to a TopicMessageId if they know the topic name, they can perform the conversion explicitly like:

consumer.seek(TopicMessageId.create(msg.getTopicName(), msgId);

It's used when users want to seek a MessageId returned by Producer#send. In this case, the MessageId is not a TopicMessageId and we have to pass the topic name to tell the multi-topics consumer which topic it belongs to.

For getLastMessageAsync method, change the semantics to disallow it on a multi-topics consumer. Instead, add these two APIs:

// NOTE: it's guaranteed that no duplicated owner topics should be returned
List<TopicMessageId> getLastTopicMessageId() throws PulsarClientException;
CompletableFuture<List<TopicMessageId>> getLastTopicMessageIdAsync();

Implementation

The main changes are removing the TopicMessageIdImpl and MultiTopicMessageIdImpl. Then add the ability to seek or acknowledge a specific topic with a MessageId on a multi-topics consumer. Much code and logic could be reused.

Alternatives

https://lists.apache.org/thread/vydpjgyrfzkr45ho9r12sd2lw5c4mw6s tries to seek or acknowledge a topic and a MessageId. But it will increase many public APIs. With this proposal, new APIs are only added for the missed features (e.g. seek a MessageId other than earliest or latest on a multi-topic consumer).

Adding a seek(String, MessageId) overload is also denied for two reasons:

  1. There is only one valid topic for a given MessageId. Users have to get the correct topic name, e.g. my-topic-partition-2. Otherwise, the seek command might be sent to a wrong topic and unexpected behavior might happen.
  2. Users have to distinguish whether the consumer is a multi-topics consumer. If not, they need to use the original seek(MessageId) method. It's an inconsistency for user experience.

Adding a serialize method and a static deserialize method (like toByteArray() and fromByteArray methods of MessageId) to TopicMessageId is denied: the SerDes implementation of TopericMessageId should be unique. We should not allow other implementations.

Inheriting a Schema<TopicMessageId> is also denied: there are some other methods in Schema like getSchemaInfo that might make users confused. We only need a clear SerDes implementation of TopicMessageId.

Anything else?

See previous discussions related to the MessageId:

@sijie sijie added the PIP label Nov 25, 2022
@sijie sijie added the Stale label Mar 2, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant