Skip to content

Commit

Permalink
[ISSUE apache#7538] fix wrong cachedMsgSize if msg body is changed in…
Browse files Browse the repository at this point in the history
… consumer callback
  • Loading branch information
yuz10 authored Feb 2, 2024
1 parent 6616600 commit c833ff6
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,12 @@ public long removeMessage(final List<MessageExt> msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
msgSize.addAndGet(-msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
if (msgCount.addAndGet(removedCnt) == 0) {
msgSize.set(0);
}

if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
Expand Down Expand Up @@ -264,9 +266,12 @@ public long commit() {
this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
msgSize.set(0);
} else {
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(-msg.getBody().length);
}
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
Expand Down Expand Up @@ -426,8 +431,8 @@ public void fillProcessQueueInfo(final ProcessQueueInfo info) {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));

if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
Expand Down

0 comments on commit c833ff6

Please sign in to comment.