Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Separate index cache and data cache #4438

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,6 @@
<td>MemorySize</td>
<td>Max disk size for lookup cache, you can use this option to limit the use of local disks.</td>
</tr>
<tr>
<td><h5>lookup.cache-max-memory-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup cache.</td>
</tr>
<tr>
<td><h5>lookup.cache-spill-compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand All @@ -423,12 +417,24 @@
<td>Double</td>
<td>Define the default false positive probability for lookup cache bloom filters.</td>
</tr>
<tr>
<td><h5>lookup.data-cache-max-memory-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup data cache.</td>
</tr>
<tr>
<td><h5>lookup.hash-load-factor</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>lookup.index-cache-max-memory-size</h5></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#rocksdb-state-backend

Can we just introduce an option like state.backend.rocksdb.memory.high-prio-pool-ratio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

<td style="word-wrap: break-word;">64 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup index cache.</td>
</tr>
<tr>
<td><h5>lookup.local-file-type</h5></td>
<td style="word-wrap: break-word;">hash</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.paimon.benchmark.metric.cpu;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void testCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager =
new CacheManager(cacheType, MemorySize.ofBytes(10), MemorySize.ofBytes(10));
benchmark.addCase(
String.format("cache-%s", cacheType.toString()),
5,
Expand Down
21 changes: 16 additions & 5 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -909,11 +909,18 @@ public class CoreOptions implements Serializable {
.withDescription(
"Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.");

public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
key("lookup.cache-max-memory-size")
public static final ConfigOption<MemorySize> LOOKUP_DATA_CACHE_MAX_MEMORY_SIZE =
key("lookup.data-cache-max-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("256 mb"))
.withDescription("Max memory size for lookup cache.");
.withFallbackKeys("lookup.cache-max-memory-size")
.withDescription("Max memory size for lookup data cache.");

public static final ConfigOption<MemorySize> LOOKUP_INDEX_CACHE_MAX_MEMORY_SIZE =
key("lookup.index-cache-max-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("64 mb"))
.withDescription("Max memory size for lookup index cache.");

public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
key("lookup.cache.bloom.filter.enabled")
Expand Down Expand Up @@ -1828,8 +1835,12 @@ public LookupLocalFileType lookupLocalFileType() {
return options.get(LOOKUP_LOCAL_FILE_TYPE);
}

public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
public MemorySize lookupDataCacheMaxMemory() {
return options.get(LOOKUP_DATA_CACHE_MAX_MEMORY_SIZE);
}

public MemorySize lookupIndexCacheMaxMemory() {
return options.get(LOOKUP_INDEX_CACHE_MAX_MEMORY_SIZE);
}

public long targetFileSize(boolean hasPrimaryKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
/** Key for cache manager. */
public interface CacheKey {

static CacheKey forPosition(RandomAccessFile file, long position, int length) {
return new PositionCacheKey(file, position, length);
static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean index) {
return new PositionCacheKey(file, position, length, index);
}

static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this method? The invoker should always set the isIndex.

Copy link
Contributor Author

@Aitozi Aitozi Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for the pageIndex, we always store it in the data cache now. Because we are unsure if this page is an index page.

return new PageIndexCacheKey(file, pageSize, pageIndex);
return new PageIndexCacheKey(file, pageSize, pageIndex, false);
}

/** @return Whether this cache key is for index cache. */
boolean isIndex();

/** Key for file position and length. */
class PositionCacheKey implements CacheKey {

private final RandomAccessFile file;
private final long position;
private final int length;
private final boolean index;

private PositionCacheKey(RandomAccessFile file, long position, int length) {
private PositionCacheKey(RandomAccessFile file, long position, int length, boolean index) {
this.file = file;
this.position = position;
this.length = length;
this.index = index;
}

@Override
Expand All @@ -56,12 +61,18 @@ public boolean equals(Object o) {
PositionCacheKey that = (PositionCacheKey) o;
return position == that.position
&& length == that.length
&& index == that.index
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, position, length);
return Objects.hash(file, position, length, index);
}

@Override
public boolean isIndex() {
return index;
}
}

Expand All @@ -71,17 +82,25 @@ class PageIndexCacheKey implements CacheKey {
private final RandomAccessFile file;
private final int pageSize;
private final int pageIndex;
private final boolean index;

private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) {
private PageIndexCacheKey(
RandomAccessFile file, int pageSize, int pageIndex, boolean index) {
this.file = file;
this.pageSize = pageSize;
this.pageIndex = pageIndex;
this.index = index;
}

public int pageIndex() {
return pageIndex;
}

@Override
public boolean isIndex() {
return index;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -93,12 +112,13 @@ public boolean equals(Object o) {
PageIndexCacheKey that = (PageIndexCacheKey) o;
return pageSize == that.pageSize
&& pageIndex == that.pageIndex
&& index == that.index
&& Objects.equals(file, that.file);
}

@Override
public int hashCode() {
return Objects.hash(file, pageSize, pageIndex);
return Objects.hash(file, pageSize, pageIndex, index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,40 @@ public class CacheManager {
*/
public static final int REFRESH_COUNT = 10;

private final Cache cache;
private final Cache dataCache;
private final Cache indexCache;

private int fileReadCount;

@VisibleForTesting
public CacheManager(MemorySize maxMemorySize) {
this(Cache.CacheType.GUAVA, maxMemorySize);
this(Cache.CacheType.GUAVA, maxMemorySize, maxMemorySize);
}

public CacheManager(MemorySize dataMaxMemorySize, MemorySize indexMaxMemorySize) {
this(Cache.CacheType.GUAVA, dataMaxMemorySize, indexMaxMemorySize);
}

public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) {
this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
public CacheManager(
Cache.CacheType cacheType, MemorySize maxMemorySize, MemorySize indexMaxMemorySize) {
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexMaxMemorySize).build();
this.fileReadCount = 0;
}

@VisibleForTesting
public Cache cache() {
return cache;
public Cache dataCache() {
return dataCache;
}

@VisibleForTesting
public Cache indexCache() {
return indexCache;
}

public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) {
Cache cache = key.isIndex() ? indexCache : dataCache;
Cache.CacheValue value =
cache.get(
key,
Expand All @@ -70,7 +85,11 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal
}

public void invalidPage(CacheKey key) {
cache.invalidate(key);
if (key.isIndex()) {
indexCache.invalidate(key);
} else {
dataCache.invalidate(key);
}
}

public int fileReadCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public BlockCache(RandomAccessFile file, CacheManager cacheManager) {
this.blocks = new HashMap<>();
}

// TODO separate index and data cache
private byte[] readFrom(long offset, int length) throws IOException {
byte[] buffer = new byte[length];
int read = channel.read(ByteBuffer.wrap(buffer), offset);
Expand All @@ -61,9 +60,9 @@ private byte[] readFrom(long offset, int length) throws IOException {
}

public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]> decompressFunc) {
long position, int length, Function<byte[], byte[]> decompressFunc, boolean index) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be isIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


CacheKey cacheKey = CacheKey.forPosition(file, position, length);
CacheKey cacheKey = CacheKey.forPosition(file, position, length, index);

SegmentContainer container = blocks.get(cacheKey);
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link LookupStoreReader} for sort store.
*
* <p>TODO separate index cache and block cache.
*/
/** A {@link LookupStoreReader} for sort store. */
public class SortLookupStoreReader implements LookupStoreReader {

private final Comparator<MemorySlice> comparator;
Expand All @@ -68,7 +64,7 @@ public SortLookupStoreReader(
this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null);
this.blockCache = new BlockCache(fileInput.file(), cacheManager);
Footer footer = readFooter();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle()).iterator();
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), true).iterator();
BloomFilterHandle handle = footer.getBloomFilterHandle();
if (handle != null) {
this.bloomFilter =
Expand All @@ -84,7 +80,7 @@ public SortLookupStoreReader(
private Footer readFooter() throws IOException {
MemorySegment footerData =
blockCache.getBlock(
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b);
fileSize - Footer.ENCODED_LENGTH, Footer.ENCODED_LENGTH, b -> b, true);
return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
}

Expand All @@ -111,23 +107,26 @@ public byte[] lookup(byte[] key) throws IOException {
}

private BlockIterator getNextBlock() throws IOException {
// index block handle, point to the key, value position.
MemorySlice blockHandle = indexBlockIterator.next().getValue();
BlockReader dataBlock = openBlock(blockHandle);
BlockReader dataBlock =
readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), false);
return dataBlock.iterator();
}

private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
BlockHandle blockHandle = BlockHandle.readBlockHandle(blockEntry.toInput());
return readBlock(blockHandle);
}

private BlockReader readBlock(BlockHandle blockHandle) {
/**
* @param blockHandle The block handle.
* @param index Whether read the block as an index.
* @return The reader of the target block.
*/
private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
// read block trailer
MemorySegment trailerData =
blockCache.getBlock(
blockHandle.offset() + blockHandle.size(),
BlockTrailer.ENCODED_LENGTH,
b -> b);
b -> b,
true);
BlockTrailer blockTrailer =
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());

Expand Down Expand Up @@ -166,7 +165,8 @@ private BlockReader readBlock(BlockHandle blockHandle) {
checkArgument(uncompressedLength == uncompressed.length);
return uncompressed;
}
});
},
index);
return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public FileBasedBloomFilter(
this.readOffset = readOffset;
this.readLength = readLength;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength);
this.cacheKey = CacheKey.forPosition(input.file(), readOffset, readLength, true);
}

public boolean testHash(int hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void testCaffeineCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10));
CacheManager cacheManager =
new CacheManager(cacheType, MemorySize.ofBytes(10), MemorySize.ofBytes(10));
byte[] value = new byte[6];
Arrays.fill(value, (byte) 1);
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {
}

File file = writeFile(bytes);
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128));
CacheManager cacheManager =
new CacheManager(
cacheType, MemorySize.ofKibiBytes(128), MemorySize.ofKibiBytes(128));
FileBasedRandomInputView view =
new FileBasedRandomInputView(
PageFileInput.create(file, 1024, null, 0, null), cacheManager);
Expand Down Expand Up @@ -117,7 +119,8 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {

// hot key in LRU, should have good cache hit rate
assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
assertThat(cacheManager.cache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
}

private File writeFile(byte[] bytes) throws IOException {
Expand Down
Loading
Loading