From 42e7f99aa41b0530b83d82e9f17715d02a1c4d99 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 25 Jan 2024 11:27:40 +0800 Subject: [PATCH] [core] FileBasedRandomInputView should respect LRU Cache (#2788) --- .../apache/paimon/io/cache/CacheManager.java | 16 +++++++- ...iew.java => FileBasedRandomInputView.java} | 37 +++++++++++++++---- .../lookup/hash/HashLookupStoreReader.java | 6 +-- .../paimon/utils/FileBasedBloomFilter.java | 13 +++---- ...java => FileBasedRandomInputViewTest.java} | 20 ++++++---- 5 files changed, 65 insertions(+), 27 deletions(-) rename paimon-common/src/main/java/org/apache/paimon/io/cache/{CachedRandomInputView.java => FileBasedRandomInputView.java} (79%) rename paimon-common/src/test/java/org/apache/paimon/io/cache/{CachedRandomInputViewTest.java => FileBasedRandomInputViewTest.java} (84%) diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java index 69dd9461f243..0634e1c07daa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java @@ -35,8 +35,16 @@ /** Cache manager to cache bytes to paged {@link MemorySegment}s. */ public class CacheManager { + /** + * Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but + * every 10 visits, refresh the LRU strategy. + */ + public static final int REFRESH_COUNT = 10; + private final Cache cache; + private int fileReadCount; + public CacheManager(MemorySize maxMemorySize) { this.cache = Caffeine.newBuilder() @@ -45,6 +53,7 @@ public CacheManager(MemorySize maxMemorySize) { .removalListener(this::onRemoval) .executor(MoreExecutors.directExecutor()) .build(); + this.fileReadCount = 0; } @VisibleForTesting @@ -83,7 +92,11 @@ private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) { value.cleanCallback.accept(key.offset, key.length); } - private static class CacheKey { + public int fileReadCount() { + return fileReadCount; + } + + private class CacheKey { private final RandomAccessFile file; private final long offset; @@ -99,6 +112,7 @@ private MemorySegment read() throws IOException { byte[] bytes = new byte[length]; file.seek(offset); file.readFully(bytes); + fileReadCount++; return MemorySegment.wrap(bytes); } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CachedRandomInputView.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java similarity index 79% rename from paimon-common/src/main/java/org/apache/paimon/io/cache/CachedRandomInputView.java rename to paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java index 53acad5b8449..ef7594516a52 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CachedRandomInputView.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java @@ -34,24 +34,26 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT; + /** * A {@link SeekableDataInputView} to read bytes from {@link RandomAccessFile}, the bytes can be * cached to {@link MemorySegment}s in {@link CacheManager}. */ -public class CachedRandomInputView extends AbstractPagedInputView +public class FileBasedRandomInputView extends AbstractPagedInputView implements SeekableDataInputView, Closeable { private final RandomAccessFile file; private final long fileLength; private final CacheManager cacheManager; - private final Map segments; + private final Map segments; private final int segmentSize; private final int segmentSizeBits; private final int segmentSizeMask; private int currentSegmentIndex; - public CachedRandomInputView(File file, CacheManager cacheManager, int segmentSize) + public FileBasedRandomInputView(File file, CacheManager cacheManager, int segmentSize) throws FileNotFoundException { this.file = new RandomAccessFile(file, "r"); this.fileLength = file.length(); @@ -73,14 +75,16 @@ public void setReadPosition(long position) { } private MemorySegment getCurrentPage() { - MemorySegment segment = segments.get(currentSegmentIndex); - if (segment == null) { + SegmentContainer container = segments.get(currentSegmentIndex); + if (container == null || container.accessCount == REFRESH_COUNT) { long offset = segmentIndexToPosition(currentSegmentIndex); int length = (int) Math.min(segmentSize, fileLength - offset); - segment = cacheManager.getPage(file, offset, length, this::invalidPage); - segments.put(currentSegmentIndex, segment); + container = + new SegmentContainer( + cacheManager.getPage(file, offset, length, this::invalidPage)); + segments.put(currentSegmentIndex, container); } - return segment; + return container.access(); } @Override @@ -127,4 +131,21 @@ private long segmentIndexToPosition(int segmentIndex) { public RandomAccessFile file() { return file; } + + private static class SegmentContainer { + + private final MemorySegment segment; + + private int accessCount; + + private SegmentContainer(MemorySegment segment) { + this.segment = segment; + this.accessCount = 0; + } + + private MemorySegment access() { + this.accessCount++; + return segment; + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java index e092ca6d57cb..aae039f31198 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java @@ -19,7 +19,7 @@ package org.apache.paimon.lookup.hash; import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.io.cache.CachedRandomInputView; +import org.apache.paimon.io.cache.FileBasedRandomInputView; import org.apache.paimon.lookup.LookupStoreReader; import org.apache.paimon.utils.FileBasedBloomFilter; import org.apache.paimon.utils.MurmurHashUtils; @@ -61,7 +61,7 @@ public class HashLookupStoreReader // Offset of the data for different key length private final long[] dataOffsets; // File input view - private CachedRandomInputView inputView; + private FileBasedRandomInputView inputView; // Buffers private final byte[] slotBuffer; @Nullable private FileBasedBloomFilter bloomFilter; @@ -74,7 +74,7 @@ public class HashLookupStoreReader } LOG.info("Opening file {}", file.getName()); // Create Mapped file in read-only mode - inputView = new CachedRandomInputView(file, cacheManager, cachePageSize); + inputView = new FileBasedRandomInputView(file, cacheManager, cachePageSize); // Open file and read metadata long createdAt; diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java index 06edd34e9002..84fa35cfcf6b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java @@ -26,20 +26,19 @@ import java.io.RandomAccessFile; +import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Util to apply a built bloom filter . */ public class FileBasedBloomFilter { - private static final int FORCE_REFRESH_CACHE = 30; - private final RandomAccessFile file; private final CacheManager cacheManager; private final BloomFilter filter; private final long readOffset; private final int readLength; - private int refreshCount; + private int accessCount; public FileBasedBloomFilter( RandomAccessFile file, @@ -53,14 +52,14 @@ public FileBasedBloomFilter( this.filter = new BloomFilter(numRecords, readLength); this.readOffset = readOffset; this.readLength = readLength; - this.refreshCount = 0; + this.accessCount = 0; } public boolean testHash(int hash) { - refreshCount++; + accessCount++; // we should refresh cache in LRU, but we cannot refresh everytime, it is costly. // so we introduce a refresh count to reduce refresh - if (refreshCount == FORCE_REFRESH_CACHE || filter.getMemorySegment() == null) { + if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) { MemorySegment segment = cacheManager.getPage( file, @@ -68,7 +67,7 @@ public boolean testHash(int hash) { readLength, (position, length) -> filter.unsetMemorySegment()); filter.setMemorySegment(segment, 0); - refreshCount = 0; + accessCount = 0; } return filter.testHash(hash); } diff --git a/paimon-common/src/test/java/org/apache/paimon/io/cache/CachedRandomInputViewTest.java b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java similarity index 84% rename from paimon-common/src/test/java/org/apache/paimon/io/cache/CachedRandomInputViewTest.java rename to paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java index c00dfdf27c9f..e1bfe16cf9f5 100644 --- a/paimon-common/src/test/java/org/apache/paimon/io/cache/CachedRandomInputViewTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java @@ -35,8 +35,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; -/** Test for {@link CachedRandomInputView}. */ -public class CachedRandomInputViewTest { +/** Test for {@link FileBasedRandomInputView}. */ +public class FileBasedRandomInputViewTest { @TempDir Path tempDir; @@ -44,20 +44,20 @@ public class CachedRandomInputViewTest { @Test public void testMatched() throws IOException { - innerTest(1024 * 512); + innerTest(1024 * 512, 5000); } @Test public void testNotMatched() throws IOException { - innerTest(131092); + innerTest(131092, 1000); } @Test public void testRandom() throws IOException { - innerTest(rnd.nextInt(5000, 100000)); + innerTest(rnd.nextInt(5000, 100000), 100); } - private void innerTest(int len) throws IOException { + private void innerTest(int len, int maxFileReadCount) throws IOException { byte[] bytes = new byte[len]; MemorySegment segment = MemorySegment.wrap(bytes); for (int i = 0; i < bytes.length; i++) { @@ -66,7 +66,7 @@ private void innerTest(int len) throws IOException { File file = writeFile(bytes); CacheManager cacheManager = new CacheManager(MemorySize.ofKibiBytes(128)); - CachedRandomInputView view = new CachedRandomInputView(file, cacheManager, 1024); + FileBasedRandomInputView view = new FileBasedRandomInputView(file, cacheManager, 1024); // read first one // this assertThatCode check the ConcurrentModificationException is not threw. @@ -88,12 +88,16 @@ private void innerTest(int len) throws IOException { // random read for (int i = 0; i < 10000; i++) { - int position = rnd.nextInt(bytes.length - 8); + // hot key -> 10 + int position = rnd.nextBoolean() ? 10 : rnd.nextInt(bytes.length - 8); assertThatCode(() -> view.setReadPosition(position)).doesNotThrowAnyException(); assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position)); } view.close(); + + // hot key in LRU, should have good cache hit rate + assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount); assertThat(cacheManager.cache().asMap().size()).isEqualTo(0); }