diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 84ba86124f8b..20a9cf9edcaa 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -423,6 +423,12 @@ Double Define the default false positive probability for lookup cache bloom filters. + +
lookup.cache.high-priority-pool-ratio
+ 0.25 + Double + The fraction of cache memory that is reserved for high-priority data like index, filter. +
lookup.hash-load-factor
0.75 diff --git a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java index e4a5cfa570c7..041637c2dd2f 100644 --- a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java +++ b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java @@ -18,7 +18,12 @@ package org.apache.paimon.benchmark.metric.cpu; -import java.io.*; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; import java.math.BigInteger; import java.nio.charset.Charset; import java.util.HashMap; diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java index 2c85621062fb..9a64322e0bde 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java @@ -52,7 +52,7 @@ public void testCache() throws Exception { CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0); for (Cache.CacheType cacheType : Cache.CacheType.values()) { - CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10)); + CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1); benchmark.addCase( String.format("cache-%s", cacheType.toString()), 5, diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index f4f7f41e2630..d22f8a9b8dac 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -915,6 +915,13 @@ public class CoreOptions implements Serializable { .defaultValue(MemorySize.parse("256 mb")) .withDescription("Max memory size for lookup cache."); + public static final ConfigOption LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO = + key("lookup.cache.high-priority-pool-ratio") + .doubleType() + .defaultValue(0.25) + .withDescription( + "The fraction of cache memory that is reserved for high-priority data like index, filter."); + public static final ConfigOption LOOKUP_CACHE_BLOOM_FILTER_ENABLED = key("lookup.cache.bloom.filter.enabled") .booleanType() @@ -1832,6 +1839,10 @@ public MemorySize lookupCacheMaxMemory() { return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE); } + public double lookupCacheHighPrioPoolRatio() { + return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO); + } + public long targetFileSize(boolean hasPrimaryKey) { return options.getOptional(TARGET_FILE_SIZE) .orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB) diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java index b313018d3589..11b8beb22c55 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java @@ -24,25 +24,31 @@ /** Key for cache manager. */ public interface CacheKey { - static CacheKey forPosition(RandomAccessFile file, long position, int length) { - return new PositionCacheKey(file, position, length); + static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean isIndex) { + return new PositionCacheKey(file, position, length, isIndex); } static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) { - return new PageIndexCacheKey(file, pageSize, pageIndex); + return new PageIndexCacheKey(file, pageSize, pageIndex, false); } + /** @return Whether this cache key is for index cache. */ + boolean isIndex(); + /** Key for file position and length. */ class PositionCacheKey implements CacheKey { private final RandomAccessFile file; private final long position; private final int length; + private final boolean isIndex; - private PositionCacheKey(RandomAccessFile file, long position, int length) { + private PositionCacheKey( + RandomAccessFile file, long position, int length, boolean isIndex) { this.file = file; this.position = position; this.length = length; + this.isIndex = isIndex; } @Override @@ -56,12 +62,18 @@ public boolean equals(Object o) { PositionCacheKey that = (PositionCacheKey) o; return position == that.position && length == that.length + && isIndex == that.isIndex && Objects.equals(file, that.file); } @Override public int hashCode() { - return Objects.hash(file, position, length); + return Objects.hash(file, position, length, isIndex); + } + + @Override + public boolean isIndex() { + return isIndex; } } @@ -71,17 +83,25 @@ class PageIndexCacheKey implements CacheKey { private final RandomAccessFile file; private final int pageSize; private final int pageIndex; + private final boolean isIndex; - private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) { + private PageIndexCacheKey( + RandomAccessFile file, int pageSize, int pageIndex, boolean isIndex) { this.file = file; this.pageSize = pageSize; this.pageIndex = pageIndex; + this.isIndex = isIndex; } public int pageIndex() { return pageIndex; } + @Override + public boolean isIndex() { + return isIndex; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -93,12 +113,13 @@ public boolean equals(Object o) { PageIndexCacheKey that = (PageIndexCacheKey) o; return pageSize == that.pageSize && pageIndex == that.pageIndex + && isIndex == that.isIndex && Objects.equals(file, that.file); } @Override public int hashCode() { - return Objects.hash(file, pageSize, pageIndex); + return Objects.hash(file, pageSize, pageIndex, isIndex); } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java index 9e160aa0dc75..677d87d49909 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java @@ -21,6 +21,10 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.utils.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -29,31 +33,63 @@ /** Cache manager to cache bytes to paged {@link MemorySegment}s. */ public class CacheManager { + private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); + /** * Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but * every 10 visits, refresh the LRU strategy. */ public static final int REFRESH_COUNT = 10; - private final Cache cache; + private final Cache dataCache; + private final Cache indexCache; private int fileReadCount; + @VisibleForTesting public CacheManager(MemorySize maxMemorySize) { - this(Cache.CacheType.GUAVA, maxMemorySize); + this(Cache.CacheType.GUAVA, maxMemorySize, 0); + } + + public CacheManager(MemorySize dataMaxMemorySize, double highPriorityPoolRatio) { + this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPriorityPoolRatio); } - public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) { - this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build(); + public CacheManager( + Cache.CacheType cacheType, MemorySize maxMemorySize, double highPriorityPoolRatio) { + Preconditions.checkArgument( + highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1, + "The high priority pool ratio should in the range [0, 1)."); + MemorySize indexCacheSize = + MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPriorityPoolRatio)); + MemorySize dataCacheSize = + MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPriorityPoolRatio))); + this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build(); + if (highPriorityPoolRatio == 0) { + this.indexCache = dataCache; + } else { + this.indexCache = + CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build(); + } this.fileReadCount = 0; + LOG.info( + "Initialize cache manager with data cache of {} and index cache of {}.", + dataCacheSize, + indexCacheSize); } @VisibleForTesting - public Cache cache() { - return cache; + public Cache dataCache() { + return dataCache; + } + + @VisibleForTesting + public Cache indexCache() { + return indexCache; } public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) { + Cache cache = key.isIndex() ? indexCache : dataCache; Cache.CacheValue value = cache.get( key, @@ -70,7 +106,11 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal } public void invalidPage(CacheKey key) { - cache.invalidate(key); + if (key.isIndex()) { + indexCache.invalidate(key); + } else { + dataCache.invalidate(key); + } } public int fileReadCount() { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java index e181109044fe..0441a24f220e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java @@ -49,7 +49,6 @@ public BlockCache(RandomAccessFile file, CacheManager cacheManager) { this.blocks = new HashMap<>(); } - // TODO separate index and data cache private byte[] readFrom(long offset, int length) throws IOException { byte[] buffer = new byte[length]; int read = channel.read(ByteBuffer.wrap(buffer), offset); @@ -61,9 +60,9 @@ private byte[] readFrom(long offset, int length) throws IOException { } public MemorySegment getBlock( - long position, int length, Function decompressFunc) { + long position, int length, Function decompressFunc, boolean isIndex) { - CacheKey cacheKey = CacheKey.forPosition(file, position, length); + CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex); SegmentContainer container = blocks.get(cacheKey); if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index 727589bfd343..39997888ce92 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -38,11 +38,7 @@ import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c; import static org.apache.paimon.utils.Preconditions.checkArgument; -/** - * A {@link LookupStoreReader} for sort store. - * - *

TODO separate index cache and block cache. - */ +/** A {@link LookupStoreReader} for sort store. */ public class SortLookupStoreReader implements LookupStoreReader { private final Comparator comparator; @@ -68,7 +64,7 @@ public SortLookupStoreReader( this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null); this.blockCache = new BlockCache(fileInput.file(), cacheManager); Footer footer = readFooter(); - this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator(); + this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator(); BloomFilterHandle handle = footer.getBloomFilterHandle(); if (handle != null) { this.bloomFilter = @@ -84,7 +80,7 @@ public SortLookupStoreReader( private Footer readFooter() throws IOException { MemorySegment footerData = blockCache.getBlock( - fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b); + fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true); return Footer.readFooter(MemorySlice.wrap(footerData).toInput()); } @@ -111,23 +107,26 @@ public byte[] lookup(byte[] key) throws IOException { } private BlockIterator getNextBlock() throws IOException { + // index block handle, point to the key, value position. MemorySlice blockHandle = indexBlockIterator.next().getValue(); - BlockReader dataBlock = openBlock(blockHandle); + BlockReader dataBlock = + readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), false); return dataBlock.iterator(); } - private BlockReader openBlock(MemorySlice blockEntry) throws IOException { - BlockHandle blockHandle = BlockHandle.readBlockHandle(blockEntry.toInput()); - return readBlock(blockHandle); - } - - private BlockReader readBlock(BlockHandle blockHandle) { + /** + * @param blockHandle The block handle. + * @param index Whether read the block as an index. + * @return The reader of the target block. + */ + private BlockReader readBlock(BlockHandle blockHandle, boolean index) { // read block trailer MemorySegment trailerData = blockCache.getBlock( blockHandle.offset() + blockHandle.size(), BlockTrailer.ENCODED_LENGTH, - b -> b); + b -> b, + true); BlockTrailer blockTrailer = BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput()); @@ -166,7 +165,8 @@ private BlockReader readBlock(BlockHandle blockHandle) { checkArgument(uncompressedLength == uncompressed.length); return uncompressed; } - }); + }, + index); return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java index 3d8751774cd1..ede7a8e3cfe6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java @@ -55,7 +55,7 @@ public FileBasedBloomFilter( this.readOffset = readOffset; this.readLength = readLength; this.accessCount = 0; - this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength); + this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true); } public boolean testHash(int hash) { diff --git a/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java b/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java index 6f8b4e60e99a..cf8076ac8b80 100644 --- a/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java @@ -48,7 +48,7 @@ void testCaffeineCache() throws Exception { CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0); for (Cache.CacheType cacheType : Cache.CacheType.values()) { - CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10)); + CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1); byte[] value = new byte[6]; Arrays.fill(value, (byte) 1); for (int i = 0; i < 10; i++) { diff --git a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java index 4c88ea343b6b..6486aead8c25 100644 --- a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java @@ -82,7 +82,7 @@ private void innerTest(int len, int maxFileReadCount) throws IOException { } File file = writeFile(bytes); - CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128)); + CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128), 0); FileBasedRandomInputView view = new FileBasedRandomInputView( PageFileInput.create(file, 1024, null, 0, null), cacheManager); @@ -117,7 +117,8 @@ private void innerTest(int len, int maxFileReadCount) throws IOException { // hot key in LRU, should have good cache hit rate assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount); - assertThat(cacheManager.cache().asMap().size()).isEqualTo(0); + assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0); + assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0); } private File writeFile(byte[] bytes) throws IOException { diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java index cbec6131d93e..7ba3f8283aea 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java @@ -110,7 +110,8 @@ public void testNormal() throws IOException { assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull(); reader.close(); - assertThat(cacheManager.cache().asMap()).isEmpty(); + assertThat(cacheManager.dataCache().asMap()).isEmpty(); + assertThat(cacheManager.indexCache().asMap()).isEmpty(); } @TestTemplate diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java index 51babc2889cb..d1471fd74afb 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java @@ -64,7 +64,7 @@ public void testProbe() throws IOException { Arrays.stream(inputs).forEach(i -> builder.addHash(Integer.hashCode(i))); File file = writeFile(segment.getArray()); - CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofMebiBytes(1)); + CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofMebiBytes(1), 0.1); FileBasedBloomFilter filter = new FileBasedBloomFilter( PageFileInput.create(file, 1024, null, 0, null), @@ -76,7 +76,8 @@ public void testProbe() throws IOException { Arrays.stream(inputs) .forEach(i -> Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue()); filter.close(); - Assertions.assertThat(cacheManager.cache().asMap()).isEmpty(); + Assertions.assertThat(cacheManager.dataCache().asMap()).isEmpty(); + Assertions.assertThat(cacheManager.indexCache().asMap()).isEmpty(); Assertions.assertThat(filter.bloomFilter().getMemorySegment()).isNull(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index bbde3fd48580..ff99f06510c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -78,7 +78,9 @@ public MemoryFileStoreWrite( options.writeMaxWritersToSpill(), options.legacyPartitionName()); this.options = options; - this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); + this.cacheManager = + new CacheManager( + options.lookupCacheMaxMemory(), options.lookupCacheHighPrioPoolRatio()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index d5b392d9e099..8ff5ce7a6580 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -101,7 +101,9 @@ public LocalTableQuery(FileStoreTable table) { this.lookupStoreFactory = LookupStoreFactory.create( options, - new CacheManager(options.lookupCacheMaxMemory()), + new CacheManager( + options.lookupCacheMaxMemory(), + options.lookupCacheHighPrioPoolRatio()), new RowCompactedSerializer(keyType).createSliceComparator()); if (options.needLookup()) {