diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java index 5f24ae19..95afd1e2 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java @@ -28,10 +28,12 @@ import java.io.Closeable; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -55,7 +57,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { private final ReentrantLock lock = new ReentrantLock(); @GuardedBy("lock") - private final List cachePages; + private final BlockingQueue cachePages; @GuardedBy("lock") private final Deque waiters; @@ -79,7 +81,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { "Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:" + " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.", pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024)); - this.cachePages = new ArrayList<>(); + this.cachePages = new LinkedBlockingDeque<>(); this.pageUsage = 0; this.maxPages = maxPages; this.pageSize = pageSize; @@ -107,38 +109,33 @@ public static LazyMemorySegmentPool create(Configuration conf) { @Override public MemorySegment nextSegment(boolean waiting) { - return inLock( - lock, - () -> { - checkClosed(); - int freePages = freePages(); - if (freePages == 0) { - if (waiting) { - return waitForSegment(); - } else { - return null; - } - } - - if (cachePages.isEmpty()) { - int numPages = Math.min(freePages, perRequestPages); - for (int i = 0; i < numPages; i++) { - cachePages.add(MemorySegment.allocateHeapMemory(pageSize)); - } - } - - this.pageUsage++; - return cachePages.remove(this.cachePages.size() - 1); - }); + checkClosed(); + int freePages = freePages(); + if (freePages == 0) { + if (waiting) { + return waitForSegment(); + } else { + return null; + } + } + + if (cachePages.isEmpty()) { + int numPages = Math.min(freePages, perRequestPages); + for (int i = 0; i < numPages; i++) { + cachePages.add(MemorySegment.allocateHeapMemory(pageSize)); + } + } + + this.pageUsage++; + return cachePages.poll(); } private MemorySegment waitForSegment() { - Condition moreMemory = lock.newCondition(); - waiters.addLast(moreMemory); try { + MemorySegment memorySegment = null; while (cachePages.isEmpty()) { - boolean success = moreMemory.await(maxTimeToBlockMs, TimeUnit.MILLISECONDS); - if (!success) { + memorySegment = cachePages.poll(maxTimeToBlockMs, TimeUnit.MILLISECONDS); + if (Objects.isNull(memorySegment)) { throw new BufferExhaustedException( "Failed to allocate new segment within the configured max blocking time " + maxTimeToBlockMs @@ -152,12 +149,10 @@ private MemorySegment waitForSegment() { } checkClosed(); } - return cachePages.remove(cachePages.size() - 1); + return memorySegment; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new FlussRuntimeException(e); - } finally { - waiters.remove(moreMemory); } } @@ -177,18 +172,14 @@ public void returnPage(MemorySegment segment) { @Override public void returnAll(List memory) { - inLock( - lock, - () -> { - pageUsage -= memory.size(); - if (this.pageUsage < 0) { - throw new RuntimeException("Return too more memories."); - } - cachePages.addAll(memory); - for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) { - waiters.pollFirst().signal(); - } - }); + pageUsage -= memory.size(); + if (this.pageUsage < 0) { + throw new RuntimeException("Return too more memories."); + } + cachePages.addAll(memory); + for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) { + waiters.pollFirst().signal(); + } } @Override @@ -217,7 +208,7 @@ public int queued() { } @VisibleForTesting - public List getAllCachePages() { + public BlockingQueue getAllCachePages() { return cachePages; } }