Skip to content

Commit

Permalink
[ISSUE apache#4584] Add new persist method to update consume offset t…
Browse files Browse the repository at this point in the history
…o remote server.
  • Loading branch information
ShannonDing authored Aug 23, 2022
1 parent ec8a93d commit 2930994
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
* @deprecated
* Default pulling consumer.
* This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use
* in the scenario of actively pulling messages.
* @deprecated Default pulling consumer. This class will be removed in 2022, and a better implementation {@link
* DefaultLitePullConsumer} is recommend to use in the scenario of actively pulling messages.
*/
@Deprecated
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
Expand Down Expand Up @@ -109,6 +107,7 @@ public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
public DefaultMQPullConsumer(final String namespace, final String consumerGroup) {
this(namespace, consumerGroup, null);
}

/**
* Constructor specifying namespace, consumer group and RPC hook.
*
Expand All @@ -127,7 +126,8 @@ public DefaultMQPullConsumer(final String namespace, final String consumerGroup,
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}

Expand All @@ -136,7 +136,8 @@ public void createTopic(String key, String newTopic, int queueNum, Map<String, S
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
Map<String, String> attributes) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}

Expand Down Expand Up @@ -352,7 +353,8 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
}

@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, int maxSize, PullCallback pullCallback,
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, int maxSize,
PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, maxSize, pullCallback, timeout);
Expand Down Expand Up @@ -460,4 +462,8 @@ public int getMaxReconsumeTimes() {
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}

public void persist(MessageQueue mq) {
this.getOffsetStore().persist(queueWithNamespace(mq));
}
}

0 comments on commit 2930994

Please sign in to comment.