Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Nov 4, 2024
1 parent 1e55c5b commit ebcab76
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 53 deletions.
20 changes: 10 additions & 10 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@
<td>MemorySize</td>
<td>Max disk size for lookup cache, you can use this option to limit the use of local disks.</td>
</tr>
<tr>
<td><h5>lookup.cache-max-memory-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup data cache.</td>
</tr>
<tr>
<td><h5>lookup.cache-spill-compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand All @@ -418,23 +424,17 @@
<td>Define the default false positive probability for lookup cache bloom filters.</td>
</tr>
<tr>
<td><h5>lookup.data-cache-max-memory-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup data cache.</td>
<td><h5>lookup.cache.high-prio-pool-ratio</h5></td>
<td style="word-wrap: break-word;">0.25</td>
<td>Double</td>
<td>The fraction of cache memory that is reserved for high-priority data like index, filter.</td>
</tr>
<tr>
<td><h5>lookup.hash-load-factor</h5></td>
<td style="word-wrap: break-word;">0.75</td>
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>lookup.index-cache-max-memory-size</h5></td>
<td style="word-wrap: break-word;">64 mb</td>
<td>MemorySize</td>
<td>Max memory size for lookup index cache.</td>
</tr>
<tr>
<td><h5>lookup.local-file-type</h5></td>
<td style="word-wrap: break-word;">hash</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,18 +242,6 @@
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.</td>
</tr>
<tr>
<td><h5>sink.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public void testCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager =
new CacheManager(cacheType, MemorySize.ofBytes(10), MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
benchmark.addCase(
String.format("cache-%s", cacheType.toString()),
5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
/** Benchmark for measuring the throughput of writing for lookup. */
@ExtendWith(ParameterizedTestExtension.class)
public class LookupReaderBenchmark extends AbstractLookupBenchmark {
private static final int QUERY_KEY_COUNT = 10000;
private static final int QUERY_KEY_COUNT = 100000;
private final int recordCount;
private final boolean bloomFilterEnabled;
@TempDir Path tempDir;
Expand Down
24 changes: 12 additions & 12 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -909,18 +909,18 @@ public class CoreOptions implements Serializable {
.withDescription(
"Spill compression for lookup cache, currently zstd, none, lz4 and lzo are supported.");

public static final ConfigOption<MemorySize> LOOKUP_DATA_CACHE_MAX_MEMORY_SIZE =
key("lookup.data-cache-max-memory-size")
public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
key("lookup.cache-max-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("256 mb"))
.withFallbackKeys("lookup.cache-max-memory-size")
.withDescription("Max memory size for lookup data cache.");

public static final ConfigOption<MemorySize> LOOKUP_INDEX_CACHE_MAX_MEMORY_SIZE =
key("lookup.index-cache-max-memory-size")
.memoryType()
.defaultValue(MemorySize.parse("64 mb"))
.withDescription("Max memory size for lookup index cache.");
public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO =
key("lookup.cache.high-prio-pool-ratio")
.doubleType()
.defaultValue(0.25)
.withDescription(
"The fraction of cache memory that is reserved for high-priority data like index, filter.");

public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
key("lookup.cache.bloom.filter.enabled")
Expand Down Expand Up @@ -1835,12 +1835,12 @@ public LookupLocalFileType lookupLocalFileType() {
return options.get(LOOKUP_LOCAL_FILE_TYPE);
}

public MemorySize lookupDataCacheMaxMemory() {
return options.get(LOOKUP_DATA_CACHE_MAX_MEMORY_SIZE);
public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}

public MemorySize lookupIndexCacheMaxMemory() {
return options.get(LOOKUP_INDEX_CACHE_MAX_MEMORY_SIZE);
public double lookupCacheHighPrioPoolRatio() {
return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
}

public long targetFileSize(boolean hasPrimaryKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

Expand All @@ -29,6 +33,8 @@
/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
public class CacheManager {

private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);

/**
* Refreshing the cache comes with some costs, so not every time we visit the CacheManager, but
* every 10 visits, refresh the LRU strategy.
Expand All @@ -42,19 +48,34 @@ public class CacheManager {

@VisibleForTesting
public CacheManager(MemorySize maxMemorySize) {
this(Cache.CacheType.GUAVA, maxMemorySize, maxMemorySize);
this(Cache.CacheType.GUAVA, maxMemorySize, 0);
}

public CacheManager(MemorySize dataMaxMemorySize, MemorySize indexMaxMemorySize) {
this(Cache.CacheType.GUAVA, dataMaxMemorySize, indexMaxMemorySize);
public CacheManager(MemorySize dataMaxMemorySize, double highPrioPoolRatio) {
this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPrioPoolRatio);
}

public CacheManager(
Cache.CacheType cacheType, MemorySize maxMemorySize, MemorySize indexMaxMemorySize) {
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexMaxMemorySize).build();
Cache.CacheType cacheType, MemorySize maxMemorySize, double highPrioPoolRatio) {
Preconditions.checkArgument(
highPrioPoolRatio >= 0 && highPrioPoolRatio < 1,
"The high priority pool ratio should in the range [0, 1).");
MemorySize indexCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * highPrioPoolRatio));
MemorySize dataCacheSize =
MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - highPrioPoolRatio)));
this.dataCache = CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
if (highPrioPoolRatio == 0) {
this.indexCache = dataCache;
} else {
this.indexCache =
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
}
this.fileReadCount = 0;
LOG.info(
"Initialize cache manager with data cache of {} and index cache of {}.",
dataCacheSize,
indexCacheSize);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ void testCaffeineCache() throws Exception {
CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, "r"), 0, 0);

for (Cache.CacheType cacheType : Cache.CacheType.values()) {
CacheManager cacheManager =
new CacheManager(cacheType, MemorySize.ofBytes(10), MemorySize.ofBytes(10));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofBytes(10), 0.1);
byte[] value = new byte[6];
Arrays.fill(value, (byte) 1);
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {
}

File file = writeFile(bytes);
CacheManager cacheManager =
new CacheManager(
cacheType, MemorySize.ofKibiBytes(128), MemorySize.ofKibiBytes(128));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofKibiBytes(128), 0.25);
FileBasedRandomInputView view =
new FileBasedRandomInputView(
PageFileInput.create(file, 1024, null, 0, null), cacheManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void testProbe() throws IOException {
Arrays.stream(inputs).forEach(i -> builder.addHash(Integer.hashCode(i)));
File file = writeFile(segment.getArray());

CacheManager cacheManager =
new CacheManager(cacheType, MemorySize.ofMebiBytes(1), MemorySize.ofMebiBytes(1));
CacheManager cacheManager = new CacheManager(cacheType, MemorySize.ofMebiBytes(1), 0.1);
FileBasedBloomFilter filter =
new FileBasedBloomFilter(
PageFileInput.create(file, 1024, null, 0, null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public MemoryFileStoreWrite(
this.options = options;
this.cacheManager =
new CacheManager(
options.lookupDataCacheMaxMemory(), options.lookupIndexCacheMaxMemory());
options.lookupCacheMaxMemory(), options.lookupCacheHighPrioPoolRatio());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public LocalTableQuery(FileStoreTable table) {
LookupStoreFactory.create(
options,
new CacheManager(
options.lookupDataCacheMaxMemory(),
options.lookupCacheMaxMemory(),
options.lookupIndexCacheMaxMemory()),
new RowCompactedSerializer(keyType).createSliceComparator());

Expand Down

0 comments on commit ebcab76

Please sign in to comment.