Skip to content

Commit

Permalink
[core] Manage the lookup file disk cache in task granularity (#3853)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Jul 31, 2024
1 parent d5290fc commit 29bc06c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;

import javax.annotation.Nullable;
Expand All @@ -50,6 +51,8 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -72,6 +75,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final LookupStoreFactory lookupStoreFactory;
private final Cache<String, LookupFile> lookupFiles;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Set<String> cachedFiles;

public LookupLevels(
Levels levels,
Expand All @@ -81,28 +85,32 @@ public LookupLevels(
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Duration fileRetention,
MemorySize maxDiskSize,
Function<Long, BloomFilter.Builder> bfGenerator) {
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFiles) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
this.valueProcessor = valueProcessor;
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
this.lookupFiles =
Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher(this::fileWeigh)
.removalListener(this::removalCallback)
.executor(MoreExecutors.directExecutor())
.build();
this.bfGenerator = bfGenerator;
this.lookupFiles = lookupFiles;
this.cachedFiles = new HashSet<>();
levels.addDropFileCallback(this);
}

public static Cache<String, LookupFile> createCache(
Duration fileRetention, MemorySize maxDiskSize) {
return Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
.removalListener(LookupLevels::removalCallback)
.executor(MoreExecutors.directExecutor())
.build();
}

public Levels getLevels() {
return levels;
}
Expand All @@ -112,6 +120,11 @@ Cache<String, LookupFile> lookupFiles() {
return lookupFiles;
}

@VisibleForTesting
Set<String> cachedFiles() {
return cachedFiles;
}

@Override
public void notifyDropFile(String file) {
lookupFiles.invalidate(file);
Expand All @@ -138,6 +151,7 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {

while (lookupFile == null || lookupFile.isClosed) {
lookupFile = createLookupFile(file);
cachedFiles.add(file.fileName());
lookupFiles.put(file.fileName(), lookupFile);
}

Expand All @@ -151,11 +165,11 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
}

private int fileWeigh(String file, LookupFile lookupFile) {
private static int fileWeigh(String file, LookupFile lookupFile) {
return fileKibiBytes(lookupFile.localFile);
}

private void removalCallback(String key, LookupFile file, RemovalCause cause) {
private static void removalCallback(String key, LookupFile file, RemovalCause cause) {
if (file != null) {
try {
file.close();
Expand Down Expand Up @@ -204,26 +218,37 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
context = kvWriter.close();
}

return new LookupFile(localFile, file, lookupStoreFactory.createReader(localFile, context));
return new LookupFile(
localFile, file, lookupStoreFactory.createReader(localFile, context), cachedFiles);
}

@Override
public void close() throws IOException {
lookupFiles.invalidateAll();
Set<String> toClean = new HashSet<>(cachedFiles);
for (String cachedFile : toClean) {
lookupFiles.invalidate(cachedFile);
}
}

private static class LookupFile implements Closeable {
/** Lookup file. */
public static class LookupFile implements Closeable {

private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
private final Set<String> cachedFiles;

private boolean isClosed = false;

public LookupFile(File localFile, DataFileMeta remoteFile, LookupStoreReader reader) {
public LookupFile(
File localFile,
DataFileMeta remoteFile,
LookupStoreReader reader,
Set<String> cachedFiles) {
this.localFile = localFile;
this.remoteFile = remoteFile;
this.reader = reader;
this.cachedFiles = cachedFiles;
}

@Nullable
Expand All @@ -240,6 +265,7 @@ public DataFileMeta remoteFile() {
public void close() throws IOException {
reader.close();
isClosed = true;
cachedFiles.remove(remoteFile.fileName());
FileIOUtils.deleteFileOrDirectory(localFile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,6 +104,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
@Nullable private final RecordLevelExpire recordLevelExpire;
private Cache<String, LookupLevels.LookupFile> lookupFileCache;

public KeyValueFileStoreWrite(
FileIO fileIO,
Expand Down Expand Up @@ -356,6 +359,13 @@ private <T> LookupLevels<T> createLookupLevels(
cacheManager,
new RowCompactedSerializer(keyType).createSliceComparator());
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
LookupLevels.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}

return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
Expand All @@ -364,8 +374,15 @@ private <T> LookupLevels<T> createLookupLevels(
readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
bfGenerator(options),
lookupFileCache);
}

@Override
public void close() throws Exception {
super.close();
if (lookupFileCache != null) {
lookupFileCache.invalidateAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -70,6 +72,8 @@ public class LocalTableQuery implements TableQuery {

private IOManager ioManager;

private Cache<String, LookupLevels.LookupFile> lookupFileCache;

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
Expand Down Expand Up @@ -130,6 +134,13 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
KeyValueFileReaderFactory factory =
readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory());
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
LookupLevels.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}

LookupLevels<KeyValue> lookupLevels =
new LookupLevels<>(
levels,
Expand All @@ -148,9 +159,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
.createChannel()
.getPathFile(),
lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
bfGenerator(options),
lookupFileCache);

tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
}
Expand Down Expand Up @@ -202,6 +212,9 @@ public void close() throws IOException {
bucket.getValue().close();
}
}
if (lookupFileCache != null) {
lookupFileCache.invalidateAll();
}
tableView.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ private LookupLevels<Boolean> createContainsLevels(Levels levels, MemorySize max
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
Duration.ofHours(1),
maxDiskSize,
rowCount -> BloomFilter.builder(rowCount, 0.01));
rowCount -> BloomFilter.builder(rowCount, 0.01),
LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}

private KeyValue kv(int key, int value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public void testMaxDiskSize() throws IOException {
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(i);
}
assertThat(lookupLevels.lookupFiles().asMap().keySet())
.isEqualTo(lookupLevels.cachedFiles());

// some files are invalided
long fileNumber = lookupLevels.lookupFiles().estimatedSize();
Expand All @@ -202,6 +204,7 @@ public void testMaxDiskSize() throws IOException {
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);

lookupLevels.close();
assertThat(lookupLevels.cachedFiles()).isEmpty();
assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}

Expand Down Expand Up @@ -271,9 +274,8 @@ private LookupLevels<KeyValue> createLookupLevels(Levels levels, MemorySize maxD
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
Duration.ofHours(1),
maxDiskSize,
rowCount -> BloomFilter.builder(rowCount, 0.05));
rowCount -> BloomFilter.builder(rowCount, 0.05),
LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}

private KeyValue kv(int key, int value) {
Expand Down

0 comments on commit 29bc06c

Please sign in to comment.