Skip to content

Commit

Permalink
use BlockingQueue in LazyMemorySegmentPool
Browse files Browse the repository at this point in the history
  • Loading branch information
0rrange committed Dec 4, 2024
1 parent 2f33802 commit 7d52985
Showing 1 changed file with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -55,7 +57,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private final List<MemorySegment> cachePages;
private final BlockingQueue<MemorySegment> cachePages;

@GuardedBy("lock")
private final Deque<Condition> waiters;
Expand All @@ -79,7 +81,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
this.cachePages = new ArrayList<>();
this.cachePages = new LinkedBlockingDeque<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
Expand Down Expand Up @@ -107,38 +109,33 @@ public static LazyMemorySegmentPool create(Configuration conf) {

@Override
public MemorySegment nextSegment(boolean waiting) {
return inLock(
lock,
() -> {
checkClosed();
int freePages = freePages();
if (freePages == 0) {
if (waiting) {
return waitForSegment();
} else {
return null;
}
}

if (cachePages.isEmpty()) {
int numPages = Math.min(freePages, perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}

this.pageUsage++;
return cachePages.remove(this.cachePages.size() - 1);
});
checkClosed();
int freePages = freePages();
if (freePages == 0) {
if (waiting) {
return waitForSegment();
} else {
return null;
}
}

if (cachePages.isEmpty()) {
int numPages = Math.min(freePages, perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}

this.pageUsage++;
return cachePages.poll();
}

private MemorySegment waitForSegment() {
Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
MemorySegment memorySegment = null;
while (cachePages.isEmpty()) {
boolean success = moreMemory.await(maxTimeToBlockMs, TimeUnit.MILLISECONDS);
if (!success) {
memorySegment = cachePages.poll(maxTimeToBlockMs, TimeUnit.MILLISECONDS);
if (Objects.isNull(memorySegment)) {
throw new BufferExhaustedException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
Expand All @@ -152,12 +149,10 @@ private MemorySegment waitForSegment() {
}
checkClosed();
}
return cachePages.remove(cachePages.size() - 1);
return memorySegment;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlussRuntimeException(e);
} finally {
waiters.remove(moreMemory);
}
}

Expand All @@ -177,18 +172,14 @@ public void returnPage(MemorySegment segment) {

@Override
public void returnAll(List<MemorySegment> memory) {
inLock(
lock,
() -> {
pageUsage -= memory.size();
if (this.pageUsage < 0) {
throw new RuntimeException("Return too more memories.");
}
cachePages.addAll(memory);
for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) {
waiters.pollFirst().signal();
}
});
pageUsage -= memory.size();
if (this.pageUsage < 0) {
throw new RuntimeException("Return too more memories.");
}
cachePages.addAll(memory);
for (int i = 0; i < memory.size() && !waiters.isEmpty(); i++) {
waiters.pollFirst().signal();
}
}

@Override
Expand Down Expand Up @@ -217,7 +208,7 @@ public int queued() {
}

@VisibleForTesting
public List<MemorySegment> getAllCachePages() {
public BlockingQueue<MemorySegment> getAllCachePages() {
return cachePages;
}
}

0 comments on commit 7d52985

Please sign in to comment.