Skip to content

Commit

Permalink
perf(s3stream): use read lock rather than write lock in append (#886)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Jan 8, 2024
1 parent 1dab15a commit 8aa4cf0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
3 changes: 2 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ public List<WalWriteRequest> after(WalWriteRequest request) {
return Collections.emptyList();
}

List<WalWriteRequest> rst = new ArrayList<>();
LinkedList<WalWriteRequest> rst = new LinkedList<>();
WalWriteRequest poll = streamRequests.poll();
assert poll == peek;
rst.add(poll);
Expand All @@ -801,6 +801,7 @@ public List<WalWriteRequest> after(WalWriteRequest request) {
}
poll = streamRequests.poll();
assert poll == peek;
assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset();
rst.add(poll);
}

Expand Down
13 changes: 10 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -77,6 +78,7 @@ public class S3Stream implements Stream {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantLock appendLock = new ReentrantLock();
private final Set<CompletableFuture<?>> pendingAppends = ConcurrentHashMap.newKeySet();
private final Set<CompletableFuture<?>> pendingFetches = ConcurrentHashMap.newKeySet();
private final AsyncNetworkBandwidthLimiter networkInboundLimiter;
Expand Down Expand Up @@ -134,14 +136,19 @@ public long nextOffset() {
@WithSpan
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
try {
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
}
return append0(context, recordBatch);
appendLock.lock();
try {
return append0(context, recordBatch);
} finally {
appendLock.unlock();
}
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
Expand All @@ -150,7 +157,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
});
return cf;
} finally {
writeLock.unlock();
readLock.unlock();
}
}

Expand Down

0 comments on commit 8aa4cf0

Please sign in to comment.