diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index b9d74c8124dd..94f11c6c484a 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,6 +44,12 @@
Duration |
Controls the duration for which databases and tables in the catalog are cached. |
+
+ cache.partition.max-num |
+ 0 |
+ Long |
+ Controls the max number for which partitions in the catalog are cached. |
+
cache.manifest.max-memory |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 081668675dd9..0d8a9290a53f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -98,6 +98,13 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and tables in the catalog are cached.");
+ public static final ConfigOption CACHE_PARTITION_MAX_NUM =
+ key("cache.partition.max-num")
+ .longType()
+ .defaultValue(0L)
+ .withDescription(
+ "Controls the max number for which partitions in the catalog are cached.");
+
public static final ConfigOption CACHE_MANIFEST_SMALL_FILE_MEMORY =
key("cache.manifest.small-file-memory")
.memoryType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 0777759456f8..444a828af43e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
@@ -52,6 +53,7 @@
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
+import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
/** A {@link Catalog} to cache databases and tables and manifests. */
@@ -61,26 +63,31 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache> databaseCache;
protected final Cache tableCache;
+ @Nullable protected final Cache> partitionCache;
@Nullable protected final SegmentsCache manifestCache;
+ private final long cachedPartitionMaxNum;
public CachingCatalog(Catalog wrapped) {
this(
wrapped,
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
- CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
+ CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
+ CACHE_PARTITION_MAX_NUM.defaultValue());
}
public CachingCatalog(
Catalog wrapped,
Duration expirationInterval,
MemorySize manifestMaxMemory,
- long manifestCacheThreshold) {
+ long manifestCacheThreshold,
+ long cachedPartitionMaxNum) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
+ cachedPartitionMaxNum,
Ticker.systemTicker());
}
@@ -89,6 +96,7 @@ public CachingCatalog(
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
+ long cachedPartitionMaxNum,
Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
@@ -111,7 +119,19 @@ public CachingCatalog(
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
+ this.partitionCache =
+ cachedPartitionMaxNum == 0
+ ? null
+ : Caffeine.newBuilder()
+ .softValues()
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .weigher(this::weigh)
+ .maximumWeight(cachedPartitionMaxNum)
+ .ticker(ticker)
+ .build();
this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold);
+ this.cachedPartitionMaxNum = cachedPartitionMaxNum;
}
public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -131,7 +151,8 @@ public static Catalog tryToCreate(Catalog catalog, Options options) {
catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
- manifestThreshold);
+ manifestThreshold,
+ options.get(CACHE_PARTITION_MAX_NUM));
}
@Override
@@ -227,6 +248,51 @@ private void putTableCache(Identifier identifier, Table table) {
tableCache.put(identifier, table);
}
+ public List getPartitions(Identifier identifier) throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ if (partitionCacheEnabled(table)) {
+ List partitions;
+ partitions = partitionCache.getIfPresent(identifier);
+ if (partitions == null || partitions.isEmpty()) {
+ partitions = this.refreshPartitions(identifier);
+ }
+ return partitions;
+ }
+ return ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ }
+
+ public List refreshPartitions(Identifier identifier)
+ throws TableNotExistException {
+ Table table = this.getTable(identifier);
+ List partitions =
+ ((FileStoreTable) table).newSnapshotReader().partitionEntries();
+ if (partitionCacheEnabled(table)
+ && partitionCache.asMap().values().stream().mapToInt(List::size).sum()
+ < this.cachedPartitionMaxNum) {
+ partitionCache.put(identifier, partitions);
+ }
+ return partitions;
+ }
+
+ private boolean partitionCacheEnabled(Table table) {
+ return partitionCache != null
+ && table instanceof FileStoreTable
+ && !table.partitionKeys().isEmpty();
+ }
+
+ private int weigh(Identifier identifier, List partitions) {
+ return partitions.size();
+ }
+
+ @Override
+ public void dropPartition(Identifier identifier, Map partitions)
+ throws TableNotExistException, PartitionNotExistException {
+ wrapped.dropPartition(identifier, partitions);
+ if (partitionCache != null) {
+ partitionCache.invalidate(identifier);
+ }
+ }
+
private class TableInvalidatingRemovalListener implements RemovalListener {
@Override
public void onRemoval(Identifier identifier, Table table, @NonNull RemovalCause cause) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index d1f7eeb8a56d..d645c46bf656 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -20,8 +20,10 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -32,6 +34,8 @@
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.FakeTicker;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -44,6 +48,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -51,6 +56,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
@@ -113,15 +120,15 @@ public void testTableExpiresAfterInterval() throws Exception {
Table table = catalog.getTable(tableIdent);
// Ensure table is cached with full ttl remaining upon creation
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.getTable(tableIdent))
.as("CachingCatalog should return a new instance after expiration")
.isNotSameAs(table);
@@ -135,11 +142,11 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
assertThat(catalog.remainingAgeFor(tableIdent))
.isPresent()
@@ -148,7 +155,7 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce
Duration oneMinute = Duration.ofMinutes(1L);
ticker.advance(oneMinute);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent))
.isPresent()
.get()
@@ -175,17 +182,17 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
Table table = catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
for (Identifier sysTable : sysTables(tableIdent)) {
catalog.getTable(sysTable);
}
- assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+ assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
.isNotEmpty()
.allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO));
@@ -209,17 +216,39 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
// Move time forward so the data table drops.
ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
Arrays.stream(sysTables(tableIdent))
.forEach(
sysTable ->
- assertThat(catalog.cache().asMap())
+ assertThat(catalog.tableCache().asMap())
.as(
"When a data table expires, its sys tables should expire regardless of age")
.doesNotContainKeys(sysTable));
}
+ @Test
+ public void testPartitionCache() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ Schema schema =
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+ Collections.emptyMap(),
+ "");
+ catalog.createTable(tableIdent, schema, false);
+ List partitionEntryList = catalog.getPartitions(tableIdent);
+ assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent);
+ List partitionEntryListFromCache =
+ catalog.partitionCache().getIfPresent(tableIdent);
+ assertThat(partitionEntryListFromCache).isNotNull();
+ assertThat(partitionEntryListFromCache).containsAll(partitionEntryList);
+ }
+
@Test
public void testDeadlock() throws Exception {
Catalog underlyCatalog = this.catalog;
@@ -233,7 +262,7 @@ public void testDeadlock() throws Exception {
createdTables.add(tableIdent);
}
- Cache cache = catalog.cache();
+ Cache cache = catalog.tableCache();
AtomicInteger cacheGetCount = new AtomicInteger(0);
AtomicInteger cacheCleanupCount = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -288,10 +317,10 @@ public void testInvalidateTableForChainedCachingCatalogs() throws Exception {
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
catalog.getTable(tableIdent);
- assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
catalog.dropTable(tableIdent, false);
- assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
- assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(wrappedCatalog.tableCache().asMap()).doesNotContainKey(tableIdent);
}
public static Identifier[] sysTables(Identifier tableIdent) {
@@ -313,7 +342,8 @@ private void innerTestManifestCache(long manifestCacheThreshold) throws Exceptio
this.catalog,
Duration.ofSeconds(10),
MemorySize.ofMebiBytes(1),
- manifestCacheThreshold);
+ manifestCacheThreshold,
+ 0L);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.dropTable(tableIdent, true);
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 159f5edaef1f..4c70a0232c44 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.Table;
@@ -25,6 +26,7 @@
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
import java.time.Duration;
+import java.util.List;
import java.util.Optional;
/**
@@ -36,18 +38,22 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;
public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) {
- super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, ticker);
+ super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, Long.MAX_VALUE, ticker);
this.cacheExpirationInterval = expirationInterval;
}
- public Cache cache() {
+ public Cache tableCache() {
// cleanUp must be called as tests apply assertions directly on the underlying map, but
- // metadata
- // table map entries are cleaned up asynchronously.
+ // metadata table map entries are cleaned up asynchronously.
tableCache.cleanUp();
return tableCache;
}
+ public Cache> partitionCache() {
+ partitionCache.cleanUp();
+ return partitionCache;
+ }
+
public Optional ageOf(Identifier identifier) {
return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
}