Skip to content

Commit

Permalink
[core] Allow cache and refresh partitions for CachingCatalog (#4427)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdzs0612 authored Nov 5, 2024
1 parent 8fc5b69 commit 5a8bde0
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>Duration</td>
<td>Controls the duration for which databases and tables in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.partition.max-num</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Long</td>
<td>Controls the max number for which partitions in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.manifest.max-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and tables in the catalog are cached.");

public static final ConfigOption<Long> CACHE_PARTITION_MAX_NUM =
key("cache.partition.max-num")
.longType()
.defaultValue(0L)
.withDescription(
"Controls the max number for which partitions in the catalog are cached.");

public static final ConfigOption<MemorySize> CACHE_MANIFEST_SMALL_FILE_MEMORY =
key("cache.manifest.small-file-memory")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -52,6 +53,7 @@
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

/** A {@link Catalog} to cache databases and tables and manifests. */
Expand All @@ -61,26 +63,31 @@ public class CachingCatalog extends DelegateCatalog {

protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
@Nullable protected final Cache<Identifier, List<PartitionEntry>> partitionCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
private final long cachedPartitionMaxNum;

public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CACHE_PARTITION_MAX_NUM.defaultValue());
}

public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold) {
long manifestCacheThreshold,
long cachedPartitionMaxNum) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
cachedPartitionMaxNum,
Ticker.systemTicker());
}

Expand All @@ -89,6 +96,7 @@ public CachingCatalog(
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
Expand All @@ -111,7 +119,19 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
this.partitionCache =
cachedPartitionMaxNum == 0
? null
: Caffeine.newBuilder()
.softValues()
.executor(Runnable::run)
.expireAfterAccess(expirationInterval)
.weigher(this::weigh)
.maximumWeight(cachedPartitionMaxNum)
.ticker(ticker)
.build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
this.cachedPartitionMaxNum = cachedPartitionMaxNum;
}

public static Catalog tryToCreate(Catalog catalog, Options options) {
Expand All @@ -131,7 +151,8 @@ public static Catalog tryToCreate(Catalog catalog, Options options) {
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
manifestThreshold);
manifestThreshold,
options.get(CACHE_PARTITION_MAX_NUM));
}

@Override
Expand Down Expand Up @@ -227,6 +248,51 @@ private void putTableCache(Identifier identifier, Table table) {
tableCache.put(identifier, table);
}

public List<PartitionEntry> getPartitions(Identifier identifier) throws TableNotExistException {
Table table = this.getTable(identifier);
if (partitionCacheEnabled(table)) {
List<PartitionEntry> partitions;
partitions = partitionCache.getIfPresent(identifier);
if (partitions == null || partitions.isEmpty()) {
partitions = this.refreshPartitions(identifier);
}
return partitions;
}
return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
}

public List<PartitionEntry> refreshPartitions(Identifier identifier)
throws TableNotExistException {
Table table = this.getTable(identifier);
List<PartitionEntry> partitions =
((FileStoreTable) table).newSnapshotReader().partitionEntries();
if (partitionCacheEnabled(table)
&& partitionCache.asMap().values().stream().mapToInt(List::size).sum()
< this.cachedPartitionMaxNum) {
partitionCache.put(identifier, partitions);
}
return partitions;
}

private boolean partitionCacheEnabled(Table table) {
return partitionCache != null
&& table instanceof FileStoreTable
&& !table.partitionKeys().isEmpty();
}

private int weigh(Identifier identifier, List<PartitionEntry> partitions) {
return partitions.size();
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
wrapped.dropPartition(identifier, partitions);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}

private class TableInvalidatingRemovalListener implements RemovalListener<Identifier, Table> {
@Override
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
Expand All @@ -32,6 +34,8 @@
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.FakeTicker;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
Expand All @@ -44,13 +48,16 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
Expand Down Expand Up @@ -113,15 +120,15 @@ public void testTableExpiresAfterInterval() throws Exception {
Table table = catalog.getTable(tableIdent);

// Ensure table is cached with full ttl remaining upon creation
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);

ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);

ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.getTable(tableIdent))
.as("CachingCatalog should return a new instance after expiration")
.isNotSameAs(table);
Expand All @@ -135,11 +142,11 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);

ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
assertThat(catalog.remainingAgeFor(tableIdent))
.isPresent()
Expand All @@ -148,7 +155,7 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce

Duration oneMinute = Duration.ofMinutes(1L);
ticker.advance(oneMinute);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent))
.isPresent()
.get()
Expand All @@ -175,17 +182,17 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
Table table = catalog.getTable(tableIdent);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);

ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);

for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
Expand All @@ -209,17 +216,39 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {

// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);

Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
assertThat(catalog.cache().asMap())
assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires, its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}

@Test
public void testPartitionCache() throws Exception {
TestableCachingCatalog catalog =
new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);

Identifier tableIdent = new Identifier("db", "tbl");
Schema schema =
new Schema(
RowType.of(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE).getFields(),
singletonList("f0"),
emptyList(),
Collections.emptyMap(),
"");
catalog.createTable(tableIdent, schema, false);
List<PartitionEntry> partitionEntryList = catalog.getPartitions(tableIdent);
assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
List<PartitionEntry> partitionEntryListFromCache =
catalog.partitionCache().getIfPresent(tableIdent);
assertThat(partitionEntryListFromCache).isNotNull();
assertThat(partitionEntryListFromCache).containsAll(partitionEntryList);
}

@Test
public void testDeadlock() throws Exception {
Catalog underlyCatalog = this.catalog;
Expand All @@ -233,7 +262,7 @@ public void testDeadlock() throws Exception {
createdTables.add(tableIdent);
}

Cache<Identifier, Table> cache = catalog.cache();
Cache<Identifier, Table> cache = catalog.tableCache();
AtomicInteger cacheGetCount = new AtomicInteger(0);
AtomicInteger cacheCleanupCount = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Expand Down Expand Up @@ -288,10 +317,10 @@ public void testInvalidateTableForChainedCachingCatalogs() throws Exception {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
catalog.dropTable(tableIdent, false);
assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
assertThat(wrappedCatalog.tableCache().asMap()).doesNotContainKey(tableIdent);
}

public static Identifier[] sysTables(Identifier tableIdent) {
Expand All @@ -313,7 +342,8 @@ private void innerTestManifestCache(long manifestCacheThreshold) throws Exceptio
this.catalog,
Duration.ofSeconds(10),
MemorySize.ofMebiBytes(1),
manifestCacheThreshold);
manifestCacheThreshold,
0L);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.dropTable(tableIdent, true);
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.paimon.catalog;

import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.Table;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;

import java.time.Duration;
import java.util.List;
import java.util.Optional;

/**
Expand All @@ -36,18 +38,22 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;

public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) {
super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, ticker);
super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, Long.MAX_VALUE, ticker);
this.cacheExpirationInterval = expirationInterval;
}

public Cache<Identifier, Table> cache() {
public Cache<Identifier, Table> tableCache() {
// cleanUp must be called as tests apply assertions directly on the underlying map, but
// metadata
// table map entries are cleaned up asynchronously.
// metadata table map entries are cleaned up asynchronously.
tableCache.cleanUp();
return tableCache;
}

public Cache<Identifier, List<PartitionEntry>> partitionCache() {
partitionCache.cleanUp();
return partitionCache;
}

public Optional<Duration> ageOf(Identifier identifier) {
return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
}
Expand Down

0 comments on commit 5a8bde0

Please sign in to comment.