Skip to content

Commit

Permalink
[core] FileBasedRandomInputView should respect LRU Cache (#2788)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 25, 2024
1 parent c408d50 commit 42e7f99
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@
/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {

/**
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
* every 10 visits, refresh the LRU strategy.
*/
public static final int REFRESH_COUNT = 10;

private final Cache<CacheKey, CacheValue> cache;

private int fileReadCount;

public CacheManager(MemorySize maxMemorySize) {
this.cache =
Caffeine.newBuilder()
Expand All @@ -45,6 +53,7 @@ public CacheManager(MemorySize maxMemorySize) {
.removalListener(this::onRemoval)
.executor(MoreExecutors.directExecutor())
.build();
this.fileReadCount = 0;
}

@VisibleForTesting
Expand Down Expand Up @@ -83,7 +92,11 @@ private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) {
value.cleanCallback.accept(key.offset, key.length);
}

private static class CacheKey {
public int fileReadCount() {
return fileReadCount;
}

private class CacheKey {

private final RandomAccessFile file;
private final long offset;
Expand All @@ -99,6 +112,7 @@ private MemorySegment read() throws IOException {
byte[] bytes = new byte[length];
file.seek(offset);
file.readFully(bytes);
fileReadCount++;
return MemorySegment.wrap(bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,26 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;

/**
* A {@link SeekableDataInputView} to read bytes from {@link RandomAccessFile}, the bytes can be
* cached to {@link MemorySegment}s in {@link CacheManager}.
*/
public class CachedRandomInputView extends AbstractPagedInputView
public class FileBasedRandomInputView extends AbstractPagedInputView
implements SeekableDataInputView, Closeable {

private final RandomAccessFile file;
private final long fileLength;
private final CacheManager cacheManager;
private final Map<Integer, MemorySegment> segments;
private final Map<Integer, SegmentContainer> segments;
private final int segmentSize;
private final int segmentSizeBits;
private final int segmentSizeMask;

private int currentSegmentIndex;

public CachedRandomInputView(File file, CacheManager cacheManager, int segmentSize)
public FileBasedRandomInputView(File file, CacheManager cacheManager, int segmentSize)
throws FileNotFoundException {
this.file = new RandomAccessFile(file, "r");
this.fileLength = file.length();
Expand All @@ -73,14 +75,16 @@ public void setReadPosition(long position) {
}

private MemorySegment getCurrentPage() {
MemorySegment segment = segments.get(currentSegmentIndex);
if (segment == null) {
SegmentContainer container = segments.get(currentSegmentIndex);
if (container == null || container.accessCount == REFRESH_COUNT) {
long offset = segmentIndexToPosition(currentSegmentIndex);
int length = (int) Math.min(segmentSize, fileLength - offset);
segment = cacheManager.getPage(file, offset, length, this::invalidPage);
segments.put(currentSegmentIndex, segment);
container =
new SegmentContainer(
cacheManager.getPage(file, offset, length, this::invalidPage));
segments.put(currentSegmentIndex, container);
}
return segment;
return container.access();
}

@Override
Expand Down Expand Up @@ -127,4 +131,21 @@ private long segmentIndexToPosition(int segmentIndex) {
public RandomAccessFile file() {
return file;
}

private static class SegmentContainer {

private final MemorySegment segment;

private int accessCount;

private SegmentContainer(MemorySegment segment) {
this.segment = segment;
this.accessCount = 0;
}

private MemorySegment access() {
this.accessCount++;
return segment;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.lookup.hash;

import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.io.cache.CachedRandomInputView;
import org.apache.paimon.io.cache.FileBasedRandomInputView;
import org.apache.paimon.lookup.LookupStoreReader;
import org.apache.paimon.utils.FileBasedBloomFilter;
import org.apache.paimon.utils.MurmurHashUtils;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class HashLookupStoreReader
// Offset of the data for different key length
private final long[] dataOffsets;
// File input view
private CachedRandomInputView inputView;
private FileBasedRandomInputView inputView;
// Buffers
private final byte[] slotBuffer;
@Nullable private FileBasedBloomFilter bloomFilter;
Expand All @@ -74,7 +74,7 @@ public class HashLookupStoreReader
}
LOG.info("Opening file {}", file.getName());
// Create Mapped file in read-only mode
inputView = new CachedRandomInputView(file, cacheManager, cachePageSize);
inputView = new FileBasedRandomInputView(file, cacheManager, cachePageSize);

// Open file and read metadata
long createdAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@

import java.io.RandomAccessFile;

import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util to apply a built bloom filter . */
public class FileBasedBloomFilter {

private static final int FORCE_REFRESH_CACHE = 30;

private final RandomAccessFile file;
private final CacheManager cacheManager;
private final BloomFilter filter;
private final long readOffset;
private final int readLength;

private int refreshCount;
private int accessCount;

public FileBasedBloomFilter(
RandomAccessFile file,
Expand All @@ -53,22 +52,22 @@ public FileBasedBloomFilter(
this.filter = new BloomFilter(numRecords, readLength);
this.readOffset = readOffset;
this.readLength = readLength;
this.refreshCount = 0;
this.accessCount = 0;
}

public boolean testHash(int hash) {
refreshCount++;
accessCount++;
// we should refresh cache in LRU, but we cannot refresh everytime, it is costly.
// so we introduce a refresh count to reduce refresh
if (refreshCount == FORCE_REFRESH_CACHE || filter.getMemorySegment() == null) {
if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) {
MemorySegment segment =
cacheManager.getPage(
file,
readOffset,
readLength,
(position, length) -> filter.unsetMemorySegment());
filter.setMemorySegment(segment, 0);
refreshCount = 0;
accessCount = 0;
}
return filter.testHash(hash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,29 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/** Test for {@link CachedRandomInputView}. */
public class CachedRandomInputViewTest {
/** Test for {@link FileBasedRandomInputView}. */
public class FileBasedRandomInputViewTest {

@TempDir Path tempDir;

private final ThreadLocalRandom rnd = ThreadLocalRandom.current();

@Test
public void testMatched() throws IOException {
innerTest(1024 * 512);
innerTest(1024 * 512, 5000);
}

@Test
public void testNotMatched() throws IOException {
innerTest(131092);
innerTest(131092, 1000);
}

@Test
public void testRandom() throws IOException {
innerTest(rnd.nextInt(5000, 100000));
innerTest(rnd.nextInt(5000, 100000), 100);
}

private void innerTest(int len) throws IOException {
private void innerTest(int len, int maxFileReadCount) throws IOException {
byte[] bytes = new byte[len];
MemorySegment segment = MemorySegment.wrap(bytes);
for (int i = 0; i < bytes.length; i++) {
Expand All @@ -66,7 +66,7 @@ private void innerTest(int len) throws IOException {

File file = writeFile(bytes);
CacheManager cacheManager = new CacheManager(MemorySize.ofKibiBytes(128));
CachedRandomInputView view = new CachedRandomInputView(file, cacheManager, 1024);
FileBasedRandomInputView view = new FileBasedRandomInputView(file, cacheManager, 1024);

// read first one
// this assertThatCode check the ConcurrentModificationException is not threw.
Expand All @@ -88,12 +88,16 @@ private void innerTest(int len) throws IOException {

// random read
for (int i = 0; i < 10000; i++) {
int position = rnd.nextInt(bytes.length - 8);
// hot key -> 10
int position = rnd.nextBoolean() ? 10 : rnd.nextInt(bytes.length - 8);
assertThatCode(() -> view.setReadPosition(position)).doesNotThrowAnyException();
assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position));
}

view.close();

// hot key in LRU, should have good cache hit rate
assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
assertThat(cacheManager.cache().asMap().size()).isEqualTo(0);
}

Expand Down

0 comments on commit 42e7f99

Please sign in to comment.