Skip to content

Commit

Permalink
[core] Avoid building the LookupFile cache repeatedly when the cache …
Browse files Browse the repository at this point in the history
…is nearly full.
  • Loading branch information
Aitozi committed Jul 31, 2024
1 parent 6d66fc1 commit 361fff0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
Expand All @@ -42,11 +45,16 @@
/** Lookup file for cache remote file to local. */
public class LookupFile implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(LookupFile.class);

private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
private final Runnable callback;

private long requestCount;
private long hitCount;
private boolean used;
private boolean isClosed = false;

public LookupFile(
Expand All @@ -60,7 +68,26 @@ public LookupFile(
@Nullable
public byte[] get(byte[] key) throws IOException {
checkArgument(!isClosed);
return reader.lookup(key);
requestCount++;
byte[] res = reader.lookup(key);
if (res != null) {
hitCount++;
}
return res;
}

public boolean isUsed() {
return used;
}

public LookupFile pin() {
this.used = true;
return this;
}

public LookupFile unPin() {
this.used = false;
return this;
}

public DataFileMeta remoteFile() {
Expand All @@ -76,6 +103,12 @@ public void close() throws IOException {
reader.close();
isClosed = true;
callback.run();
LOG.info(
"Delete Lookup file {} stats: requestCount={}, hitCount={}, size={}KB",
localFile,
requestCount,
hitCount,
localFile.length() >> 10);
FileIOUtils.deleteFileOrDirectory(localFile);
}

Expand All @@ -93,7 +126,7 @@ public static Cache<String, LookupFile> createCache(
}

private static int fileWeigh(String file, LookupFile lookupFile) {
return fileKibiBytes(lookupFile.localFile);
return lookupFile.isUsed() ? 0 : fileKibiBytes(lookupFile.localFile);
}

private static void removalCallback(String file, LookupFile lookupFile, RemovalCause cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;

Expand Down Expand Up @@ -129,13 +130,16 @@ private T lookup(InternalRow key, SortedRun level) throws IOException {
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());

while (lookupFile == null || lookupFile.isClosed()) {
if (lookupFile == null) {
lookupFile = createLookupFile(file);
lookupFileCache.put(file.fileName(), lookupFile);
lookupFileCache.put(file.fileName(), lookupFile.pin());
}
Preconditions.checkArgument(
!lookupFile.isClosed(), "The new create lookup file should not be closed.");

byte[] keyBytes = keySerializer.serializeToBytes(key);
byte[] valueBytes = lookupFile.get(keyBytes);
lookupFileCache.asMap().computeIfPresent(file.fileName(), (k, v) -> v.unPin());
if (valueBytes == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.paimon.utils.BloomFilter;
import org.apache.paimon.utils.FileStorePathFactory;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -169,7 +170,7 @@ public void testMultiFiles() throws IOException {
assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(0);
}

@Test
@RepeatedTest(value = 10)
public void testMaxDiskSize() throws IOException {
List<DataFileMeta> files = new ArrayList<>();
int fileNum = 10;
Expand Down

0 comments on commit 361fff0

Please sign in to comment.