-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Separate index cache and data cache #4438
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -915,6 +915,13 @@ public class CoreOptions implements Serializable { | |
.defaultValue(MemorySize.parse("256 mb")) | ||
.withDescription("Max memory size for lookup cache."); | ||
|
||
public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO = | ||
key("lookup.cache.high-prio-pool-ratio") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think lookup.cache.high-priority-pool-ratio may be better? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
.doubleType() | ||
.defaultValue(0.25) | ||
.withDescription( | ||
"The fraction of cache memory that is reserved for high-priority data like index, filter."); | ||
|
||
public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED = | ||
key("lookup.cache.bloom.filter.enabled") | ||
.booleanType() | ||
|
@@ -1832,6 +1839,10 @@ public MemorySize lookupCacheMaxMemory() { | |
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE); | ||
} | ||
|
||
public double lookupCacheHighPrioPoolRatio() { | ||
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO); | ||
} | ||
|
||
public long targetFileSize(boolean hasPrimaryKey) { | ||
return options.getOptional(TARGET_FILE_SIZE) | ||
.orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,25 +24,30 @@ | |
/** Key for cache manager. */ | ||
public interface CacheKey { | ||
|
||
static CacheKey forPosition(RandomAccessFile file, long position, int length) { | ||
return new PositionCacheKey(file, position, length); | ||
static CacheKey forPosition(RandomAccessFile file, long position, int length, boolean index) { | ||
return new PositionCacheKey(file, position, length, index); | ||
} | ||
|
||
static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove this method? The invoker should always set the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But for the |
||
return new PageIndexCacheKey(file, pageSize, pageIndex); | ||
return new PageIndexCacheKey(file, pageSize, pageIndex, false); | ||
} | ||
|
||
/** @return Whether this cache key is for index cache. */ | ||
boolean isIndex(); | ||
|
||
/** Key for file position and length. */ | ||
class PositionCacheKey implements CacheKey { | ||
|
||
private final RandomAccessFile file; | ||
private final long position; | ||
private final int length; | ||
private final boolean index; | ||
|
||
private PositionCacheKey(RandomAccessFile file, long position, int length) { | ||
private PositionCacheKey(RandomAccessFile file, long position, int length, boolean index) { | ||
this.file = file; | ||
this.position = position; | ||
this.length = length; | ||
this.index = index; | ||
} | ||
|
||
@Override | ||
|
@@ -56,12 +61,18 @@ public boolean equals(Object o) { | |
PositionCacheKey that = (PositionCacheKey) o; | ||
return position == that.position | ||
&& length == that.length | ||
&& index == that.index | ||
&& Objects.equals(file, that.file); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(file, position, length); | ||
return Objects.hash(file, position, length, index); | ||
} | ||
|
||
@Override | ||
public boolean isIndex() { | ||
return index; | ||
} | ||
} | ||
|
||
|
@@ -71,17 +82,25 @@ class PageIndexCacheKey implements CacheKey { | |
private final RandomAccessFile file; | ||
private final int pageSize; | ||
private final int pageIndex; | ||
private final boolean index; | ||
|
||
private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) { | ||
private PageIndexCacheKey( | ||
RandomAccessFile file, int pageSize, int pageIndex, boolean index) { | ||
this.file = file; | ||
this.pageSize = pageSize; | ||
this.pageIndex = pageIndex; | ||
this.index = index; | ||
} | ||
|
||
public int pageIndex() { | ||
return pageIndex; | ||
} | ||
|
||
@Override | ||
public boolean isIndex() { | ||
return index; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
|
@@ -93,12 +112,13 @@ public boolean equals(Object o) { | |
PageIndexCacheKey that = (PageIndexCacheKey) o; | ||
return pageSize == that.pageSize | ||
&& pageIndex == that.pageIndex | ||
&& index == that.index | ||
&& Objects.equals(file, that.file); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(file, pageSize, pageIndex); | ||
return Objects.hash(file, pageSize, pageIndex, index); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,10 @@ | |
import org.apache.paimon.annotation.VisibleForTesting; | ||
import org.apache.paimon.memory.MemorySegment; | ||
import org.apache.paimon.options.MemorySize; | ||
import org.apache.paimon.utils.Preconditions; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
|
||
|
@@ -29,31 +33,63 @@ | |
/** Cache manager to cache bytes to paged {@link MemorySegment}s. */ | ||
public class CacheManager { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); | ||
|
||
/** | ||
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but | ||
* every 10 visits, refresh the LRU strategy. | ||
*/ | ||
public static final int REFRESH_COUNT = 10; | ||
|
||
private final Cache cache; | ||
private final Cache dataCache; | ||
private final Cache indexCache; | ||
|
||
private int fileReadCount; | ||
|
||
@VisibleForTesting | ||
public CacheManager(MemorySize maxMemorySize) { | ||
this(Cache.CacheType.GUAVA, maxMemorySize); | ||
this(Cache.CacheType.GUAVA, maxMemorySize, 0); | ||
} | ||
|
||
public CacheManager(MemorySize dataMaxMemorySize, double highPrioPoolRatio) { | ||
this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPrioPoolRatio); | ||
} | ||
|
||
public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) { | ||
this.cache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build(); | ||
public CacheManager( | ||
Cache.CacheType cacheType, MemorySize maxMemorySize, double highPrioPoolRatio) { | ||
Preconditions.checkArgument( | ||
highPrioPoolRatio >= 0 && highPrioPoolRatio < 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pay attention to the accuracy of floating-point numbers here, which may lead to judgment errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. e, I think it's ok here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
"The high priority pool ratio should in the range [0, 1)."); | ||
MemorySize indexCacheSize = | ||
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPrioPoolRatio)); | ||
MemorySize dataCacheSize = | ||
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPrioPoolRatio))); | ||
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build(); | ||
if (highPrioPoolRatio == 0) { | ||
this.indexCache = dataCache; | ||
} else { | ||
this.indexCache = | ||
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build(); | ||
} | ||
this.fileReadCount = 0; | ||
LOG.info( | ||
"Initialize cache manager with data cache of {} and index cache of {}.", | ||
dataCacheSize, | ||
indexCacheSize); | ||
} | ||
|
||
@VisibleForTesting | ||
public Cache cache() { | ||
return cache; | ||
public Cache dataCache() { | ||
return dataCache; | ||
} | ||
|
||
@VisibleForTesting | ||
public Cache indexCache() { | ||
return indexCache; | ||
} | ||
|
||
public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) { | ||
Cache cache = key.isIndex() ? indexCache : dataCache; | ||
Cache.CacheValue value = | ||
cache.get( | ||
key, | ||
|
@@ -70,7 +106,11 @@ public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback cal | |
} | ||
|
||
public void invalidPage(CacheKey key) { | ||
cache.invalidate(key); | ||
if (key.isIndex()) { | ||
indexCache.invalidate(key); | ||
} else { | ||
dataCache.invalidate(key); | ||
} | ||
} | ||
|
||
public int fileReadCount() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,6 @@ public BlockCache(RandomAccessFile file, CacheManager cacheManager) { | |
this.blocks = new HashMap<>(); | ||
} | ||
|
||
// TODO separate index and data cache | ||
private byte[] readFrom(long offset, int length) throws IOException { | ||
byte[] buffer = new byte[length]; | ||
int read = channel.read(ByteBuffer.wrap(buffer), offset); | ||
|
@@ -61,9 +60,9 @@ private byte[] readFrom(long offset, int length) throws IOException { | |
} | ||
|
||
public MemorySegment getBlock( | ||
long position, int length, Function<byte[], byte[]> decompressFunc) { | ||
long position, int length, Function<byte[], byte[]> decompressFunc, boolean index) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be isIndex? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
CacheKey cacheKey = CacheKey.forPosition(file, position, length); | ||
CacheKey cacheKey = CacheKey.forPosition(file, position, length, index); | ||
|
||
SegmentContainer container = blocks.get(cacheKey); | ||
if (container == null || container.getAccessCount() == CacheManager.REFRESH_COUNT) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the source of this default value? 0.1 or 0.25?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if 0.1, the size is 25MB, 0.25 is 64MB. When I test, the bloom filter may occupy 5-10MB. So, I give a bigger (64MB) default index cache here.