From 8aa4cf041eb23914bc2b29b25048d2e187edca03 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Mon, 8 Jan 2024 10:22:14 +0800 Subject: [PATCH] perf(s3stream): use read lock rather than write lock in append (#886) Signed-off-by: Ning Yu --- .../main/java/com/automq/stream/s3/S3Storage.java | 3 ++- .../main/java/com/automq/stream/s3/S3Stream.java | 13 ++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index fa5721124..298ed9f67 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -789,7 +789,7 @@ public List after(WalWriteRequest request) { return Collections.emptyList(); } - List rst = new ArrayList<>(); + LinkedList rst = new LinkedList<>(); WalWriteRequest poll = streamRequests.poll(); assert poll == peek; rst.add(poll); @@ -801,6 +801,7 @@ public List after(WalWriteRequest request) { } poll = streamRequests.poll(); assert poll == peek; + assert poll.record.getBaseOffset() == rst.getLast().record.getLastOffset(); rst.add(poll); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 0eff5f3a2..f4c83445c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -77,6 +78,7 @@ public class S3Stream implements Stream { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantLock appendLock = new ReentrantLock(); private final Set> pendingAppends = ConcurrentHashMap.newKeySet(); private final Set> pendingFetches = ConcurrentHashMap.newKeySet(); private final AsyncNetworkBandwidthLimiter networkInboundLimiter; @@ -134,14 +136,19 @@ public long nextOffset() { @WithSpan public CompletableFuture append(AppendContext context, RecordBatch recordBatch) { TimerUtil timerUtil = new TimerUtil(); - writeLock.lock(); + readLock.lock(); S3StreamMetricsManager.recordOperationLatency(MetricsLevel.DEBUG, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK); try { CompletableFuture cf = exec(() -> { if (networkInboundLimiter != null) { networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining()); } - return append0(context, recordBatch); + appendLock.lock(); + try { + return append0(context, recordBatch); + } finally { + appendLock.unlock(); + } }, LOGGER, "append"); pendingAppends.add(cf); cf.whenComplete((nil, ex) -> { @@ -150,7 +157,7 @@ public CompletableFuture append(AppendContext context, RecordBatch }); return cf; } finally { - writeLock.unlock(); + readLock.unlock(); } }