diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index ebc208a8d86..33e698b00c1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -198,10 +198,12 @@ public long removeMessage(final List msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; - msgSize.addAndGet(0 - msg.getBody().length); + msgSize.addAndGet(-msg.getBody().length); } } - msgCount.addAndGet(removedCnt); + if (msgCount.addAndGet(removedCnt) == 0) { + msgSize.set(0); + } if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); @@ -264,9 +266,12 @@ public long commit() { this.treeMapLock.writeLock().lockInterruptibly(); try { Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); - msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); - for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { - msgSize.addAndGet(0 - msg.getBody().length); + if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) { + msgSize.set(0); + } else { + for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { + msgSize.addAndGet(-msg.getBody().length); + } } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { @@ -426,8 +431,8 @@ public void fillProcessQueueInfo(final ProcessQueueInfo info) { info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); info.setCachedMsgCount(this.msgTreeMap.size()); - info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); } + info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());