From 44dde5f0250cd9785e583eef86fba50f8ddfcff9 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Thu, 16 Nov 2023 11:34:01 +0800 Subject: [PATCH] fix(store): fix potential concurrency issues (#648) Signed-off-by: SSpirits --- .../queue/DefaultLogicQueueStateMachine.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java b/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java index 48c66f6c2..aea094efe 100644 --- a/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java +++ b/store/src/main/java/com/automq/rocketmq/store/queue/DefaultLogicQueueStateMachine.java @@ -288,10 +288,15 @@ private AckCommitter getAckCommitter(long consumerGroupId) { private AckCommitter getAckCommitter(long consumerGroupId, ByteBuffer serializedBitmapBuffer) { ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId)); - return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getAckOffset(), offset -> { - metadata.setAckOffset(offset); - this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset)); - }, serializedBitmapBuffer)); + return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> + new AckCommitter( + metadata.getAckOffset(), + offset -> { + metadata.setAckOffset(offset); + this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset)); + }, + serializedBitmapBuffer + )); } private AckCommitter getRetryAckCommitter(long consumerGroupId) { @@ -632,10 +637,10 @@ public int consumeTimes(long consumerGroupId, long offset) { } static class AckCommitter { - private long ackOffset; + private final long baseOffset; + private volatile long ackOffset; private final RoaringBitmap bitmap; private final Consumer ackAdvanceFn; - private final long baseOffset; public AckCommitter(long ackOffset, Consumer ackAdvanceFn) { this(ackOffset, ackAdvanceFn, null); @@ -654,7 +659,7 @@ public AckCommitter(long ackOffset, Consumer ackAdvanceFn, ByteBuffer seri } } - public void commitAck(long offset) { + public synchronized void commitAck(long offset) { if (offset >= ackOffset) { // TODO: how to handle overflow? int offsetInBitmap = (int) (offset - baseOffset); @@ -671,7 +676,7 @@ public void commitAck(long offset) { } // / - public ByteBuffer getSerializedBuffer() { + public synchronized ByteBuffer getSerializedBuffer() { int length = bitmap.serializedSizeInBytes() + Long.BYTES; ByteBuffer buffer = ByteBuffer.allocate(length); buffer.putLong(baseOffset);