Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Manage the lookup file disk cache in task granularity #3853

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;

import javax.annotation.Nullable;
Expand All @@ -50,6 +51,8 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -72,6 +75,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final LookupStoreFactory lookupStoreFactory;
private final Cache<String, LookupFile> lookupFiles;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Set<String> cachedFiles;

public LookupLevels(
Levels levels,
Expand All @@ -81,28 +85,32 @@ public LookupLevels(
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Supplier<File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Duration fileRetention,
MemorySize maxDiskSize,
Function<Long, BloomFilter.Builder> bfGenerator) {
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFiles) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
this.valueProcessor = valueProcessor;
this.fileReaderFactory = fileReaderFactory;
this.localFileFactory = localFileFactory;
this.lookupStoreFactory = lookupStoreFactory;
this.lookupFiles =
Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher(this::fileWeigh)
.removalListener(this::removalCallback)
.executor(MoreExecutors.directExecutor())
.build();
this.bfGenerator = bfGenerator;
this.lookupFiles = lookupFiles;
this.cachedFiles = new HashSet<>();
levels.addDropFileCallback(this);
}

public static Cache<String, LookupFile> createCache(
Duration fileRetention, MemorySize maxDiskSize) {
return Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher((Weigher<String, LookupFile>) LookupLevels::fileWeigh)
.removalListener(LookupLevels::removalCallback)
.executor(MoreExecutors.directExecutor())
.build();
}

public Levels getLevels() {
return levels;
}
Expand All @@ -112,6 +120,11 @@ Cache<String, LookupFile> lookupFiles() {
return lookupFiles;
}

@VisibleForTesting
Set<String> cachedFiles() {
return cachedFiles;
}

@Override
public void notifyDropFile(String file) {
lookupFiles.invalidate(file);
Expand All @@ -138,6 +151,7 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {

while (lookupFile == null || lookupFile.isClosed) {
lookupFile = createLookupFile(file);
cachedFiles.add(file.fileName());
lookupFiles.put(file.fileName(), lookupFile);
}

Expand All @@ -151,11 +165,11 @@ private T lookup(InternalRow key, DataFileMeta file) throws IOException {
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
}

private int fileWeigh(String file, LookupFile lookupFile) {
private static int fileWeigh(String file, LookupFile lookupFile) {
return fileKibiBytes(lookupFile.localFile);
}

private void removalCallback(String key, LookupFile file, RemovalCause cause) {
private static void removalCallback(String key, LookupFile file, RemovalCause cause) {
if (file != null) {
try {
file.close();
Expand Down Expand Up @@ -204,26 +218,37 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
context = kvWriter.close();
}

return new LookupFile(localFile, file, lookupStoreFactory.createReader(localFile, context));
return new LookupFile(
localFile, file, lookupStoreFactory.createReader(localFile, context), cachedFiles);
}

@Override
public void close() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clean all files when closing?

Copy link
Contributor Author

@Aitozi Aitozi Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here . And if a partition bucket writer is idle and closed, the lookup file for this writer will only be evicted based on size or LRU (there is no explicit close for this situation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But by default, there is no max size configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evict by the retention time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the LookupLevels's cache file will be cleared when the writer is idled (no data written to in the latest snapshot).

Now, the cache file will still be there after the idle writer closed and evict by the retention time by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is better to remove all files when closing this writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can manage a collection in LookupLevels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can manage a collection in LookupLevels.

Thanks, Make sense to me.

lookupFiles.invalidateAll();
Set<String> toClean = new HashSet<>(cachedFiles);
for (String cachedFile : toClean) {
lookupFiles.invalidate(cachedFile);
}
}

private static class LookupFile implements Closeable {
/** Lookup file. */
public static class LookupFile implements Closeable {

private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
private final Set<String> cachedFiles;

private boolean isClosed = false;

public LookupFile(File localFile, DataFileMeta remoteFile, LookupStoreReader reader) {
public LookupFile(
File localFile,
DataFileMeta remoteFile,
LookupStoreReader reader,
Set<String> cachedFiles) {
this.localFile = localFile;
this.remoteFile = remoteFile;
this.reader = reader;
this.cachedFiles = cachedFiles;
}

@Nullable
Expand All @@ -240,6 +265,7 @@ public DataFileMeta remoteFile() {
public void close() throws IOException {
reader.close();
isClosed = true;
cachedFiles.remove(remoteFile.fileName());
FileIOUtils.deleteFileOrDirectory(localFile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,6 +104,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
private final RowType keyType;
private final RowType valueType;
@Nullable private final RecordLevelExpire recordLevelExpire;
private Cache<String, LookupLevels.LookupFile> lookupFileCache;

public KeyValueFileStoreWrite(
FileIO fileIO,
Expand Down Expand Up @@ -356,6 +359,13 @@ private <T> LookupLevels<T> createLookupLevels(
cacheManager,
new RowCompactedSerializer(keyType).createSliceComparator());
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
LookupLevels.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}

return new LookupLevels<>(
levels,
keyComparatorSupplier.get(),
Expand All @@ -364,8 +374,15 @@ private <T> LookupLevels<T> createLookupLevels(
readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
bfGenerator(options),
lookupFileCache);
}

@Override
public void close() throws Exception {
super.close();
if (lookupFileCache != null) {
lookupFileCache.invalidateAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -70,6 +72,8 @@ public class LocalTableQuery implements TableQuery {

private IOManager ioManager;

private Cache<String, LookupLevels.LookupFile> lookupFileCache;

public LocalTableQuery(FileStoreTable table) {
this.options = table.coreOptions();
this.tableView = new HashMap<>();
Expand Down Expand Up @@ -130,6 +134,13 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
KeyValueFileReaderFactory factory =
readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory());
Options options = this.options.toConfiguration();
if (lookupFileCache == null) {
lookupFileCache =
LookupLevels.createCache(
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}

LookupLevels<KeyValue> lookupLevels =
new LookupLevels<>(
levels,
Expand All @@ -148,9 +159,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
.createChannel()
.getPathFile(),
lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
bfGenerator(options),
lookupFileCache);

tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);
}
Expand Down Expand Up @@ -202,6 +212,9 @@ public void close() throws IOException {
bucket.getValue().close();
}
}
if (lookupFileCache != null) {
lookupFileCache.invalidateAll();
}
tableView.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ private LookupLevels<Boolean> createContainsLevels(Levels levels, MemorySize max
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
Duration.ofHours(1),
maxDiskSize,
rowCount -> BloomFilter.builder(rowCount, 0.01));
rowCount -> BloomFilter.builder(rowCount, 0.01),
LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}

private KeyValue kv(int key, int value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public void testMaxDiskSize() throws IOException {
assertThat(kv.level()).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(i);
}
assertThat(lookupLevels.lookupFiles().asMap().keySet())
.isEqualTo(lookupLevels.cachedFiles());

// some files are invalided
long fileNumber = lookupLevels.lookupFiles().estimatedSize();
Expand All @@ -202,6 +204,7 @@ public void testMaxDiskSize() throws IOException {
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);

lookupLevels.close();
assertThat(lookupLevels.cachedFiles()).isEmpty();
assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}

Expand Down Expand Up @@ -271,9 +274,8 @@ private LookupLevels<KeyValue> createLookupLevels(Levels levels, MemorySize maxD
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"),
Duration.ofHours(1),
maxDiskSize,
rowCount -> BloomFilter.builder(rowCount, 0.05));
rowCount -> BloomFilter.builder(rowCount, 0.05),
LookupLevels.createCache(Duration.ofHours(1), maxDiskSize));
}

private KeyValue kv(int key, int value) {
Expand Down
Loading