diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index d6024ebb88cf..1ba31816d3e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -39,6 +39,7 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher; import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors; import javax.annotation.Nullable; @@ -50,6 +51,8 @@ import java.time.Duration; import java.util.Arrays; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import java.util.TreeSet; import java.util.function.Function; import java.util.function.Supplier; @@ -72,6 +75,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final LookupStoreFactory lookupStoreFactory; private final Cache lookupFiles; private final Function bfGenerator; + private final Set cachedFiles; public LookupLevels( Levels levels, @@ -81,9 +85,8 @@ public LookupLevels( IOFunction> fileReaderFactory, Supplier localFileFactory, LookupStoreFactory lookupStoreFactory, - Duration fileRetention, - MemorySize maxDiskSize, - Function bfGenerator) { + Function bfGenerator, + Cache lookupFiles) { this.levels = levels; this.keyComparator = keyComparator; this.keySerializer = new RowCompactedSerializer(keyType); @@ -91,18 +94,23 @@ public LookupLevels( this.fileReaderFactory = fileReaderFactory; this.localFileFactory = localFileFactory; this.lookupStoreFactory = lookupStoreFactory; - this.lookupFiles = - Caffeine.newBuilder() - .expireAfterAccess(fileRetention) - .maximumWeight(maxDiskSize.getKibiBytes()) - .weigher(this::fileWeigh) - .removalListener(this::removalCallback) - .executor(MoreExecutors.directExecutor()) - .build(); this.bfGenerator = bfGenerator; + this.lookupFiles = lookupFiles; + this.cachedFiles = new HashSet<>(); levels.addDropFileCallback(this); } + public static Cache createCache( + Duration fileRetention, MemorySize maxDiskSize) { + return Caffeine.newBuilder() + .expireAfterAccess(fileRetention) + .maximumWeight(maxDiskSize.getKibiBytes()) + .weigher((Weigher) LookupLevels::fileWeigh) + .removalListener(LookupLevels::removalCallback) + .executor(MoreExecutors.directExecutor()) + .build(); + } + public Levels getLevels() { return levels; } @@ -112,6 +120,11 @@ Cache lookupFiles() { return lookupFiles; } + @VisibleForTesting + Set cachedFiles() { + return cachedFiles; + } + @Override public void notifyDropFile(String file) { lookupFiles.invalidate(file); @@ -138,6 +151,7 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException { while (lookupFile == null || lookupFile.isClosed) { lookupFile = createLookupFile(file); + cachedFiles.add(file.fileName()); lookupFiles.put(file.fileName(), lookupFile); } @@ -151,11 +165,11 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException { key, lookupFile.remoteFile().level(), valueBytes, file.fileName()); } - private int fileWeigh(String file, LookupFile lookupFile) { + private static int fileWeigh(String file, LookupFile lookupFile) { return fileKibiBytes(lookupFile.localFile); } - private void removalCallback(String key, LookupFile file, RemovalCause cause) { + private static void removalCallback(String key, LookupFile file, RemovalCause cause) { if (file != null) { try { file.close(); @@ -204,26 +218,37 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { context = kvWriter.close(); } - return new LookupFile(localFile, file, lookupStoreFactory.createReader(localFile, context)); + return new LookupFile( + localFile, file, lookupStoreFactory.createReader(localFile, context), cachedFiles); } @Override public void close() throws IOException { - lookupFiles.invalidateAll(); + Set toClean = new HashSet<>(cachedFiles); + for (String cachedFile : toClean) { + lookupFiles.invalidate(cachedFile); + } } - private static class LookupFile implements Closeable { + /** Lookup file. */ + public static class LookupFile implements Closeable { private final File localFile; private final DataFileMeta remoteFile; private final LookupStoreReader reader; + private final Set cachedFiles; private boolean isClosed = false; - public LookupFile(File localFile, DataFileMeta remoteFile, LookupStoreReader reader) { + public LookupFile( + File localFile, + DataFileMeta remoteFile, + LookupStoreReader reader, + Set cachedFiles) { this.localFile = localFile; this.remoteFile = remoteFile; this.reader = reader; + this.cachedFiles = cachedFiles; } @Nullable @@ -240,6 +265,7 @@ public DataFileMeta remoteFile() { public void close() throws IOException { reader.close(); isClosed = true; + cachedFiles.remove(remoteFile.fileName()); FileIOUtils.deleteFileOrDirectory(localFile); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index df3242141e50..6a5b30aeaa7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -71,6 +71,8 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.UserDefinedSeqComparator; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final RowType keyType; private final RowType valueType; @Nullable private final RecordLevelExpire recordLevelExpire; + private Cache lookupFileCache; public KeyValueFileStoreWrite( FileIO fileIO, @@ -356,6 +359,13 @@ private LookupLevels createLookupLevels( cacheManager, new RowCompactedSerializer(keyType).createSliceComparator()); Options options = this.options.toConfiguration(); + if (lookupFileCache == null) { + lookupFileCache = + LookupLevels.createCache( + options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), + options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE)); + } + return new LookupLevels<>( levels, keyComparatorSupplier.get(), @@ -364,8 +374,15 @@ private LookupLevels createLookupLevels( readerFactory::createRecordReader, () -> ioManager.createChannel().getPathFile(), lookupStoreFactory, - options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), - options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), - bfGenerator(options)); + bfGenerator(options), + lookupFileCache); + } + + @Override + public void close() throws Exception { + super.close(); + if (lookupFileCache != null) { + lookupFileCache.invalidateAll(); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 9181fdb16f8d..7475d6e8e1a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -41,6 +41,8 @@ import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; + import javax.annotation.Nullable; import java.io.IOException; @@ -70,6 +72,8 @@ public class LocalTableQuery implements TableQuery { private IOManager ioManager; + private Cache lookupFileCache; + public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); this.tableView = new HashMap<>(); @@ -130,6 +134,13 @@ private void newLookupLevels(BinaryRow partition, int bucket, List KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory()); Options options = this.options.toConfiguration(); + if (lookupFileCache == null) { + lookupFileCache = + LookupLevels.createCache( + options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), + options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE)); + } + LookupLevels lookupLevels = new LookupLevels<>( levels, @@ -148,9 +159,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List .createChannel() .getPathFile(), lookupStoreFactory, - options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), - options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), - bfGenerator(options)); + bfGenerator(options), + lookupFileCache); tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels); } @@ -202,6 +212,9 @@ public void close() throws IOException { bucket.getValue().close(); } } + if (lookupFileCache != null) { + lookupFileCache.invalidateAll(); + } tableView.clear(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 781bab0dfb1a..07972b28a2f4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -195,9 +195,8 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), - Duration.ofHours(1), - maxDiskSize, - rowCount -> BloomFilter.builder(rowCount, 0.01)); + rowCount -> BloomFilter.builder(rowCount, 0.01), + LookupLevels.createCache(Duration.ofHours(1), maxDiskSize)); } private KeyValue kv(int key, int value) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 7b89f409dc8c..a5d58cf13cb1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -193,6 +193,8 @@ public void testMaxDiskSize() throws IOException { assertThat(kv.level()).isEqualTo(1); assertThat(kv.value().getInt(1)).isEqualTo(i); } + assertThat(lookupLevels.lookupFiles().asMap().keySet()) + .isEqualTo(lookupLevels.cachedFiles()); // some files are invalided long fileNumber = lookupLevels.lookupFiles().estimatedSize(); @@ -202,6 +204,7 @@ public void testMaxDiskSize() throws IOException { assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length); lookupLevels.close(); + assertThat(lookupLevels.cachedFiles()).isEmpty(); assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0); } @@ -271,9 +274,8 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), - Duration.ofHours(1), - maxDiskSize, - rowCount -> BloomFilter.builder(rowCount, 0.05)); + rowCount -> BloomFilter.builder(rowCount, 0.05), + LookupLevels.createCache(Duration.ofHours(1), maxDiskSize)); } private KeyValue kv(int key, int value) {