From 2930994a37e065b946977d74f4ad3966f8963267 Mon Sep 17 00:00:00 2001 From: dinglei Date: Tue, 23 Aug 2022 13:57:35 +0800 Subject: [PATCH] [ISSUE #4584] Add new persist method to update consume offset to remote server. --- .../consumer/DefaultMQPullConsumer.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index 48e7a3ace89..2747fabbbac 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -36,10 +36,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; /** - * @deprecated - * Default pulling consumer. - * This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use - * in the scenario of actively pulling messages. + * @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link + * DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages. */ @Deprecated public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { @@ -109,6 +107,7 @@ public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { public DefaultMQPullConsumer(final String namespace, final String consumerGroup) { this(namespace, consumerGroup, null); } + /** * Constructor specifying namespace, consumer group and RPC hook. * @@ -127,7 +126,8 @@ public DefaultMQPullConsumer(final String namespace, final String consumerGroup, */ @Deprecated @Override - public void createTopic(String key, String newTopic, int queueNum, Map attributes) throws MQClientException { + public void createTopic(String key, String newTopic, int queueNum, + Map attributes) throws MQClientException { createTopic(key, withNamespace(newTopic), queueNum, 0, null); } @@ -136,7 +136,8 @@ public void createTopic(String key, String newTopic, int queueNum, Map attributes) throws MQClientException { + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, + Map attributes) throws MQClientException { this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag); } @@ -352,7 +353,8 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums } @Override - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, int maxSize, PullCallback pullCallback, + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, int maxSize, + PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, maxSize, pullCallback, timeout); @@ -460,4 +462,8 @@ public int getMaxReconsumeTimes() { public void setMaxReconsumeTimes(final int maxReconsumeTimes) { this.maxReconsumeTimes = maxReconsumeTimes; } + + public void persist(MessageQueue mq) { + this.getOffsetStore().persist(queueWithNamespace(mq)); + } }