Skip to content

Commit

Permalink
[core] Separate index cache and data cache (#4438)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Nov 11, 2024
1 parent ee8b1b1 commit 85f563b
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 44 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@
<td>Double</td>
<td>Define the default false positive probability for lookup cache bloom filters.</td>
</tr>
<tr>
<td><h5>lookup.cache.high-priority-pool-ratio</h5></td>
<td style="word-wrap: break-word;">0.25</td>
<td>Double</td>
<td>The fraction of cache memory that is reserved for high-priority data like index, filter.</td>
</tr>
<tr>
<td><h5>lookup.hash-load-factor</h5></td>
<td style="word-wrap: break-word;">0.75</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.paimon.benchmark.metric.cpu;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
benchmark.addCase(
String.format("cache-%s", cacheType.toString()),
5,
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,13 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.parse("256 mb"))
.withDescription("Max memory size for lookup cache.");

public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO =
key("lookup.cache.high-priority-pool-ratio")
.doubleType()
.defaultValue(0.25)
.withDescription(
"The fraction of cache memory that is reserved for high-priority data like index, filter.");

public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
key("lookup.cache.bloom.filter.enabled")
.booleanType()
Expand Down Expand Up @@ -1837,6 +1844,10 @@ public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}

public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}

public long targetFileSize(boolean hasPrimaryKey) {
return options.getOptional(TARGET_FILE_SIZE)
.orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,31 @@
/** Key for cache manager. */
public interface CacheKey {

static CacheKey forPosition(RandomAccessFile file, long position, int length) {
return new PositionCacheKey(file, position, length);
static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean isIndex) {
return new PositionCacheKey(file, position, length, isIndex);
}

static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) {
return new PageIndexCacheKey(file, pageSize, pageIndex);
return new PageIndexCacheKey(file, pageSize, pageIndex, false);
}

/** @return Whether this cache key is for index cache. */
boolean isIndex();

/** Key for file position and length. */
class PositionCacheKey implements CacheKey {

private final RandomAccessFile file;
private final long position;
private final int length;
private final boolean isIndex;

private PositionCacheKey(RandomAccessFile file, long position, int length) {
private PositionCacheKey(
RandomAccessFile file, long position, int length, boolean isIndex) {
this.file = file;
this.position = position;
this.length = length;
this.isIndex = isIndex;
}

@Override
Expand All @@ -56,12 +62,18 @@ public boolean equals(Object o) {
PositionCacheKey that = (PositionCacheKey) o;
return position == that.position
&& length == that.length
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, position, length);
return Objects.hash(file, position, length, isIndex);
}

@Override
public boolean isIndex() {
return isIndex;
}
}

Expand All @@ -71,17 +83,25 @@ class PageIndexCacheKey implements CacheKey {
private final RandomAccessFile file;
private final int pageSize;
private final int pageIndex;
private final boolean isIndex;

private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) {
private PageIndexCacheKey(
RandomAccessFile file, int pageSize, int pageIndex, boolean isIndex) {
this.file = file;
this.pageSize = pageSize;
this.pageIndex = pageIndex;
this.isIndex = isIndex;
}

public int pageIndex() {
return pageIndex;
}

@Override
public boolean isIndex() {
return isIndex;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -93,12 +113,13 @@ public boolean equals(Object o) {
PageIndexCacheKey that = (PageIndexCacheKey) o;
return pageSize == that.pageSize
&& pageIndex == that.pageIndex
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, pageSize, pageIndex);
return Objects.hash(file, pageSize, pageIndex, isIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

Expand All @@ -29,31 +33,63 @@
/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {

private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);

/**
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
* every 10 visits, refresh the LRU strategy.
*/
public static final int REFRESH_COUNT = 10;

private final Cache cache;
private final Cache dataCache;
private final Cache indexCache;

private int fileReadCount;

@VisibleForTesting
public CacheManager(MemorySize maxMemorySize) {
this(Cache.CacheType.GUAVA, maxMemorySize);
this(Cache.CacheType.GUAVA, maxMemorySize, 0);
}

public CacheManager(MemorySize dataMaxMemorySize, double highPriorityPoolRatio) {
this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPriorityPoolRatio);
}

public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) {
this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
public CacheManager(
Cache.CacheType cacheType, MemorySize maxMemorySize, double highPriorityPoolRatio) {
Preconditions.checkArgument(
highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1,
"The high priority pool ratio should in the range [0, 1).");
MemorySize indexCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPriorityPoolRatio));
MemorySize dataCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPriorityPoolRatio)));
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
if (highPriorityPoolRatio == 0) {
this.indexCache = dataCache;
} else {
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
}
this.fileReadCount = 0;
LOG.info(
"Initialize cache manager with data cache of {} and index cache of {}.",
dataCacheSize,
indexCacheSize);
}

@VisibleForTesting
public Cache cache() {
return cache;
public Cache dataCache() {
return dataCache;
}

@VisibleForTesting
public Cache indexCache() {
return indexCache;
}

public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
Cache cache = key.isIndex() ? indexCache : dataCache;
Cache.CacheValue value =
cache.get(
key,
Expand All @@ -70,7 +106,11 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
}

public void invalidPage(CacheKey key) {
cache.invalidate(key);
if (key.isIndex()) {
indexCache.invalidate(key);
} else {
dataCache.invalidate(key);
}
}

public int fileReadCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
this.blocks = new HashMap<>();
}

// TODO separate index and data cache
private byte[] readFrom(long offset, int length) throws IOException {
byte[] buffer = new byte[length];
int read = channel.read(ByteBuffer.wrap(buffer), offset);
Expand All @@ -61,9 +60,9 @@ private byte[] readFrom(long offset, int length) throws IOException {
}

public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]> decompressFunc) {
long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {

CacheKey cacheKey = CacheKey.forPosition(file, position, length);
CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);

SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link LookupStoreReader} for sort store.
*
* <p>TODO separate index cache and block cache.
*/
/** A {@link LookupStoreReader} for sort store. */
public class SortLookupStoreReader implements LookupStoreReader {

private final Comparator<MemorySlice> comparator;
Expand All @@ -68,7 +64,7 @@ public SortLookupStoreReader(
this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null);
this.blockCache = new BlockCache(fileInput.file(), cacheManager);
Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator();
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle != null) {
this.bloomFilter =
Expand All @@ -84,7 +80,7 @@ public SortLookupStoreReader(
private Footer readFooter() throws IOException {
MemorySegment footerData =
blockCache.getBlock(
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b);
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true);
return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
}

Expand All @@ -111,23 +107,26 @@ public byte[] lookup(byte[] key) throws IOException {
}

private BlockIterator getNextBlock() throws IOException {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock = openBlock(blockHandle);
BlockReader dataBlock =
readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), false);
return dataBlock.iterator();
}

private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
BlockHandle blockHandle = BlockHandle.readBlockHandle(blockEntry.toInput());
return readBlock(blockHandle);
}

private BlockReader readBlock(BlockHandle blockHandle) {
/**
* @param blockHandle The block handle.
* @param index Whether read the block as an index.
* @return The reader of the target block.
*/
private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
// read block trailer
MemorySegment trailerData =
blockCache.getBlock(
blockHandle.offset() + blockHandle.size(),
BlockTrailer.ENCODED_LENGTH,
b -> b);
b -> b,
true);
BlockTrailer blockTrailer =
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());

Expand Down Expand Up @@ -166,7 +165,8 @@ private BlockReader readBlock(BlockHandle blockHandle) {
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
});
},
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public FileBasedBloomFilter(
this.readOffset = readOffset;
this.readLength = readLength;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength);
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true);
}

public boolean testHash(int hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void testCaffeineCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
byte[] value = new byte[6];
Arrays.fill(value, (byte) 1);
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {
}

File file = writeFile(bytes);
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128), 0);
FileBasedRandomInputView view =
new FileBasedRandomInputView(
PageFileInput.create(file, 1024, null, 0, null), cacheManager);
Expand Down Expand Up @@ -117,7 +117,8 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {

// hot key in LRU, should have good cache hit rate
assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
assertThat(cacheManager.cache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
}

private File writeFile(byte[] bytes) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void testNormal() throws IOException {
assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull();

reader.close();
assertThat(cacheManager.cache().asMap()).isEmpty();
assertThat(cacheManager.dataCache().asMap()).isEmpty();
assertThat(cacheManager.indexCache().asMap()).isEmpty();
}

@TestTemplate
Expand Down
Loading

0 comments on commit 85f563b

Please sign in to comment.