Skip to content

Commit

Permalink
fix(store): fix potential concurrency issues (#648)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 16, 2023
1 parent caee819 commit 44dde5f
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,15 @@ private AckCommitter getAckCommitter(long consumerGroupId) {

private AckCommitter getAckCommitter(long consumerGroupId, ByteBuffer serializedBitmapBuffer) {
ConsumerGroupMetadata metadata = this.consumerGroupMetadataMap.computeIfAbsent(consumerGroupId, k -> new ConsumerGroupMetadata(consumerGroupId));
return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k -> new AckCommitter(metadata.getAckOffset(), offset -> {
metadata.setAckOffset(offset);
this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset));
}, serializedBitmapBuffer));
return this.ackCommitterMap.computeIfAbsent(consumerGroupId, k ->
new AckCommitter(
metadata.getAckOffset(),
offset -> {
metadata.setAckOffset(offset);
this.ackOffsetListeners.forEach(listener -> listener.onOffset(consumerGroupId, offset));
},
serializedBitmapBuffer
));
}

private AckCommitter getRetryAckCommitter(long consumerGroupId) {
Expand Down Expand Up @@ -632,10 +637,10 @@ public int consumeTimes(long consumerGroupId, long offset) {
}

static class AckCommitter {
private long ackOffset;
private final long baseOffset;
private volatile long ackOffset;
private final RoaringBitmap bitmap;
private final Consumer<Long> ackAdvanceFn;
private final long baseOffset;

public AckCommitter(long ackOffset, Consumer<Long> ackAdvanceFn) {
this(ackOffset, ackAdvanceFn, null);
Expand All @@ -654,7 +659,7 @@ public AckCommitter(long ackOffset, Consumer<Long> ackAdvanceFn, ByteBuffer seri
}
}

public void commitAck(long offset) {
public synchronized void commitAck(long offset) {
if (offset >= ackOffset) {
// TODO: how to handle overflow?
int offsetInBitmap = (int) (offset - baseOffset);
Expand All @@ -671,7 +676,7 @@ public void commitAck(long offset) {
}

// <baseOffset>/<bitmap>
public ByteBuffer getSerializedBuffer() {
public synchronized ByteBuffer getSerializedBuffer() {
int length = bitmap.serializedSizeInBytes() + Long.BYTES;
ByteBuffer buffer = ByteBuffer.allocate(length);
buffer.putLong(baseOffset);
Expand Down

0 comments on commit 44dde5f

Please sign in to comment.