Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Separate index cache and data cache #4438

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@
<td>Double</td>
<td>Define the default false positive probability for lookup cache bloom filters.</td>
</tr>
<tr>
<td><h5>lookup.cache.high-priority-pool-ratio</h5></td>
<td style="word-wrap: break-word;">0.25</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the source of this default value? 0.1 or 0.25?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if 0.1, the size is 25MB, 0.25 is 64MB. When I test, the bloom filter may occupy 5-10MB. So, I give a bigger (64MB) default index cache here.

<td>Double</td>
<td>The fraction of cache memory that is reserved for high-priority data like index, filter.</td>
</tr>
<tr>
<td><h5>lookup.hash-load-factor</h5></td>
<td style="word-wrap: break-word;">0.75</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.paimon.benchmark.metric.cpu;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
benchmark.addCase(
String.format("cache-%s", cacheType.toString()),
5,
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.parse("256 mb"))
.withDescription("Max memory size for lookup cache.");

public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO =
key("lookup.cache.high-priority-pool-ratio")
.doubleType()
.defaultValue(0.25)
.withDescription(
"The fraction of cache memory that is reserved for high-priority data like index, filter.");

public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
key("lookup.cache.bloom.filter.enabled")
.booleanType()
Expand Down Expand Up @@ -1832,6 +1839,10 @@ public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}

public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}

public long targetFileSize(boolean hasPrimaryKey) {
return options.getOptional(TARGET_FILE_SIZE)
.orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,31 @@
/** Key for cache manager. */
public interface CacheKey {

static CacheKey forPosition(RandomAccessFile file, long position, int length) {
return new PositionCacheKey(file, position, length);
static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean isIndex) {
return new PositionCacheKey(file, position, length, isIndex);
}

static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this method? The invoker should always set the isIndex.

Copy link
Contributor Author

@Aitozi Aitozi Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for the pageIndex, we always store it in the data cache now. Because we are unsure if this page is an index page.

return new PageIndexCacheKey(file, pageSize, pageIndex);
return new PageIndexCacheKey(file, pageSize, pageIndex, false);
}

/** @return Whether this cache key is for index cache. */
boolean isIndex();

/** Key for file position and length. */
class PositionCacheKey implements CacheKey {

private final RandomAccessFile file;
private final long position;
private final int length;
private final boolean isIndex;

private PositionCacheKey(RandomAccessFile file, long position, int length) {
private PositionCacheKey(
RandomAccessFile file, long position, int length, boolean isIndex) {
this.file = file;
this.position = position;
this.length = length;
this.isIndex = isIndex;
}

@Override
Expand All @@ -56,12 +62,18 @@ public boolean equals(Object o) {
PositionCacheKey that = (PositionCacheKey) o;
return position == that.position
&& length == that.length
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, position, length);
return Objects.hash(file, position, length, isIndex);
}

@Override
public boolean isIndex() {
return isIndex;
}
}

Expand All @@ -71,17 +83,25 @@ class PageIndexCacheKey implements CacheKey {
private final RandomAccessFile file;
private final int pageSize;
private final int pageIndex;
private final boolean isIndex;

private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) {
private PageIndexCacheKey(
RandomAccessFile file, int pageSize, int pageIndex, boolean isIndex) {
this.file = file;
this.pageSize = pageSize;
this.pageIndex = pageIndex;
this.isIndex = isIndex;
}

public int pageIndex() {
return pageIndex;
}

@Override
public boolean isIndex() {
return isIndex;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -93,12 +113,13 @@ public boolean equals(Object o) {
PageIndexCacheKey that = (PageIndexCacheKey) o;
return pageSize == that.pageSize
&& pageIndex == that.pageIndex
&& isIndex == that.isIndex
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, pageSize, pageIndex);
return Objects.hash(file, pageSize, pageIndex, isIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

Expand All @@ -29,31 +33,63 @@
/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {

private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);

/**
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
* every 10 visits, refresh the LRU strategy.
*/
public static final int REFRESH_COUNT = 10;

private final Cache cache;
private final Cache dataCache;
private final Cache indexCache;

private int fileReadCount;

@VisibleForTesting
public CacheManager(MemorySize maxMemorySize) {
this(Cache.CacheType.GUAVA, maxMemorySize);
this(Cache.CacheType.GUAVA, maxMemorySize, 0);
}

public CacheManager(MemorySize dataMaxMemorySize, double highPriorityPoolRatio) {
this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPriorityPoolRatio);
}

public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) {
this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
public CacheManager(
Cache.CacheType cacheType, MemorySize maxMemorySize, double highPriorityPoolRatio) {
Preconditions.checkArgument(
highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1,
"The high priority pool ratio should in the range [0, 1).");
MemorySize indexCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPriorityPoolRatio));
MemorySize dataCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPriorityPoolRatio)));
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
if (highPriorityPoolRatio == 0) {
this.indexCache = dataCache;
} else {
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
}
this.fileReadCount = 0;
LOG.info(
"Initialize cache manager with data cache of {} and index cache of {}.",
dataCacheSize,
indexCacheSize);
}

@VisibleForTesting
public Cache cache() {
return cache;
public Cache dataCache() {
return dataCache;
}

@VisibleForTesting
public Cache indexCache() {
return indexCache;
}

public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
Cache cache = key.isIndex() ? indexCache : dataCache;
Cache.CacheValue value =
cache.get(
key,
Expand All @@ -70,7 +106,11 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
}

public void invalidPage(CacheKey key) {
cache.invalidate(key);
if (key.isIndex()) {
indexCache.invalidate(key);
} else {
dataCache.invalidate(key);
}
}

public int fileReadCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
this.blocks = new HashMap<>();
}

// TODO separate index and data cache
private byte[] readFrom(long offset, int length) throws IOException {
byte[] buffer = new byte[length];
int read = channel.read(ByteBuffer.wrap(buffer), offset);
Expand All @@ -61,9 +60,9 @@ private byte[] readFrom(long offset, int length) throws IOException {
}

public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]> decompressFunc) {
long position, int length, Function<byte[], byte[]> decompressFunc, boolean isIndex) {

CacheKey cacheKey = CacheKey.forPosition(file, position, length);
CacheKey cacheKey = CacheKey.forPosition(file, position, length, isIndex);

SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link LookupStoreReader} for sort store.
*
* <p>TODO separate index cache and block cache.
*/
/** A {@link LookupStoreReader} for sort store. */
public class SortLookupStoreReader implements LookupStoreReader {

private final Comparator<MemorySlice> comparator;
Expand All @@ -68,7 +64,7 @@ public SortLookupStoreReader(
this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null);
this.blockCache = new BlockCache(fileInput.file(), cacheManager);
Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator();
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle != null) {
this.bloomFilter =
Expand All @@ -84,7 +80,7 @@ public SortLookupStoreReader(
private Footer readFooter() throws IOException {
MemorySegment footerData =
blockCache.getBlock(
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b);
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true);
return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
}

Expand All @@ -111,23 +107,26 @@ public byte[] lookup(byte[] key) throws IOException {
}

private BlockIterator getNextBlock() throws IOException {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock = openBlock(blockHandle);
BlockReader dataBlock =
readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), false);
return dataBlock.iterator();
}

private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
BlockHandle blockHandle = BlockHandle.readBlockHandle(blockEntry.toInput());
return readBlock(blockHandle);
}

private BlockReader readBlock(BlockHandle blockHandle) {
/**
* @param blockHandle The block handle.
* @param index Whether read the block as an index.
* @return The reader of the target block.
*/
private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
// read block trailer
MemorySegment trailerData =
blockCache.getBlock(
blockHandle.offset() + blockHandle.size(),
BlockTrailer.ENCODED_LENGTH,
b -> b);
b -> b,
true);
BlockTrailer blockTrailer =
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());

Expand Down Expand Up @@ -166,7 +165,8 @@ private BlockReader readBlock(BlockHandle blockHandle) {
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
});
},
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public FileBasedBloomFilter(
this.readOffset = readOffset;
this.readLength = readLength;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength);
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true);
}

public boolean testHash(int hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void testCaffeineCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
byte[] value = new byte[6];
Arrays.fill(value, (byte) 1);
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {
}

File file = writeFile(bytes);
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128), 0);
FileBasedRandomInputView view =
new FileBasedRandomInputView(
PageFileInput.create(file, 1024, null, 0, null), cacheManager);
Expand Down Expand Up @@ -117,7 +117,8 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {

// hot key in LRU, should have good cache hit rate
assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
assertThat(cacheManager.cache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
}

private File writeFile(byte[] bytes) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void testNormal() throws IOException {
assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull();

reader.close();
assertThat(cacheManager.cache().asMap()).isEmpty();
assertThat(cacheManager.dataCache().asMap()).isEmpty();
assertThat(cacheManager.indexCache().asMap()).isEmpty();
}

@TestTemplate
Expand Down
Loading
Loading