Skip to content

Commit

Permalink
use BlockingQueue in LazyMemorySegmentPool
Browse files Browse the repository at this point in the history
  • Loading branch information
0rrange committed Dec 6, 2024
1 parent 2f33802 commit 7273a9c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static com.alibaba.fluss.utils.Preconditions.checkArgument;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
Expand All @@ -55,7 +59,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
private final ReentrantLock lock = new ReentrantLock();

@GuardedBy("lock")
private final List<MemorySegment> cachePages;
private final BlockingQueue<MemorySegment> cachePages;

@GuardedBy("lock")
private final Deque<Condition> waiters;
Expand All @@ -79,7 +83,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {
"Page size should be less than PER_REQUEST_MEMORY_SIZE. Page size is:"
+ " %s KB, PER_REQUEST_MEMORY_SIZE is %s KB.",
pageSize / 1024, PER_REQUEST_MEMORY_SIZE / 1024));
this.cachePages = new ArrayList<>();
this.cachePages = new LinkedBlockingQueue<>();
this.pageUsage = 0;
this.maxPages = maxPages;
this.pageSize = pageSize;
Expand Down Expand Up @@ -107,52 +111,44 @@ public static LazyMemorySegmentPool create(Configuration conf) {

@Override
public MemorySegment nextSegment(boolean waiting) {
return inLock(
inLock(
lock,
() -> {
checkClosed();
int freePages = freePages();
if (freePages == 0) {
if (waiting) {
return waitForSegment();
} else {
return null;
}
}

if (cachePages.isEmpty()) {
int numPages = Math.min(freePages, perRequestPages);
for (int i = 0; i < numPages; i++) {
cachePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
}

this.pageUsage++;
return cachePages.remove(this.cachePages.size() - 1);
if (freePages != 0) {
this.pageUsage++;
}
});
return waitForSegment(waiting);
}

private MemorySegment waitForSegment() {
private MemorySegment waitForSegment(boolean waiting) {
Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
while (cachePages.isEmpty()) {
boolean success = moreMemory.await(maxTimeToBlockMs, TimeUnit.MILLISECONDS);
if (!success) {
throw new BufferExhaustedException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
+ " ms. Total memory: "
+ totalSize()
+ " bytes. Available memory: "
+ freePages() * pageSize
+ " bytes. page size: "
+ pageSize
+ " bytes");
}
checkClosed();
MemorySegment memorySegment = cachePages.poll(waiting ? maxTimeToBlockMs : 0, TimeUnit.MILLISECONDS);
if (Objects.isNull(memorySegment) && waiting) {
throw new BufferExhaustedException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
+ " ms. Total memory: "
+ totalSize()
+ " bytes. Available memory: "
+ freePages() * pageSize
+ " bytes. page size: "
+ pageSize
+ " bytes");
}
return cachePages.remove(cachePages.size() - 1);
checkClosed();
return memorySegment;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlussRuntimeException(e);
Expand Down Expand Up @@ -218,6 +214,6 @@ public int queued() {

@VisibleForTesting
public List<MemorySegment> getAllCachePages() {
return cachePages;
return new ArrayList<>(cachePages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;

import static com.alibaba.fluss.utils.function.ThrowingRunnable.unchecked;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -52,6 +53,7 @@ void testNextSegmentWaiter() throws InterruptedException {
CountDownLatch returnAllLatch = asyncReturnAll(source, Arrays.asList(ms1, ms2));
CountDownLatch getNextSegmentLatch = asyncGetNextSegment(source);
assertThat(getNextSegmentLatch.getCount()).isEqualTo(1);
assertThat(source.queued()).isEqualTo(1);
returnAllLatch.countDown();
assertThat(getNextSegmentLatch.await(Long.MAX_VALUE, TimeUnit.SECONDS)).isTrue();
}
Expand All @@ -77,6 +79,28 @@ void testIllegalArgument() {
.hasMessage("Return too more memories.");
}

@Test
void testMultiThreadCallNextSegment() throws InterruptedException {
LazyMemorySegmentPool source = buildLazyMemorySegmentSource(1, 10);
assertThat(source.pageSize()).isEqualTo(10);
assertThat(source.freePages()).isEqualTo(1);

CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
source.nextSegment(true);
countDownLatch.countDown();
}).start();
new Thread(() -> {
source.nextSegment(true);
countDownLatch.countDown();
}).start();

boolean await = countDownLatch.await(2000, TimeUnit.MILLISECONDS);
assertThat(await).isFalse();
assertThat(source.freePages()).isEqualTo(0);
assertThat(source.queued()).isEqualTo(1);
}

private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) {
return new LazyMemorySegmentPool(maxPages, pageSize, Long.MAX_VALUE);
}
Expand Down

0 comments on commit 7273a9c

Please sign in to comment.