From 2f4c8d0d67f0513acc8868cf44b3672b13839c07 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 5 Nov 2024 13:48:46 +0800 Subject: [PATCH] [core] Introduce listPartitions interface to Catalog --- .../paimon/catalog/AbstractCatalog.java | 7 +++ .../apache/paimon/catalog/CachingCatalog.java | 52 ++++++------------- .../org/apache/paimon/catalog/Catalog.java | 9 ++++ .../paimon/catalog/DelegateCatalog.java | 7 +++ .../paimon/catalog/CachingCatalogTest.java | 2 +- 5 files changed, 41 insertions(+), 36 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 1b390bc66ad2..b3f255f10a30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.lineage.LineageMetaFactory; +import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; @@ -198,6 +199,12 @@ public void dropPartition(Identifier identifier, Map partitionSp } } + @Override + public List listPartitions(Identifier identifier) + throws TableNotExistException { + return getTable(identifier).newReadBuilder().newScan().listPartitionEntries(); + } + protected abstract void createDatabaseImpl(String name, Map properties); @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 444a828af43e..b1f683e4014d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -34,6 +34,7 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Weigher; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; @@ -63,9 +64,10 @@ public class CachingCatalog extends DelegateCatalog { protected final Cache> databaseCache; protected final Cache tableCache; - @Nullable protected final Cache> partitionCache; @Nullable protected final SegmentsCache manifestCache; - private final long cachedPartitionMaxNum; + + // partition cache will affect data latency + @Nullable protected final Cache> partitionCache; public CachingCatalog(Catalog wrapped) { this( @@ -126,12 +128,13 @@ public CachingCatalog( .softValues() .executor(Runnable::run) .expireAfterAccess(expirationInterval) - .weigher(this::weigh) + .weigher( + (Weigher>) + (identifier, v) -> v.size()) .maximumWeight(cachedPartitionMaxNum) .ticker(ticker) .build(); this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); - this.cachedPartitionMaxNum = cachedPartitionMaxNum; } public static Catalog tryToCreate(Catalog catalog, Options options) { @@ -248,40 +251,19 @@ private void putTableCache(Identifier identifier, Table table) { tableCache.put(identifier, table); } - public List getPartitions(Identifier identifier) throws TableNotExistException { - Table table = this.getTable(identifier); - if (partitionCacheEnabled(table)) { - List partitions; - partitions = partitionCache.getIfPresent(identifier); - if (partitions == null || partitions.isEmpty()) { - partitions = this.refreshPartitions(identifier); - } - return partitions; - } - return ((FileStoreTable) table).newSnapshotReader().partitionEntries(); - } - - public List refreshPartitions(Identifier identifier) + @Override + public List listPartitions(Identifier identifier) throws TableNotExistException { - Table table = this.getTable(identifier); - List partitions = - ((FileStoreTable) table).newSnapshotReader().partitionEntries(); - if (partitionCacheEnabled(table) - && partitionCache.asMap().values().stream().mapToInt(List::size).sum() - < this.cachedPartitionMaxNum) { - partitionCache.put(identifier, partitions); + if (partitionCache == null) { + return wrapped.listPartitions(identifier); } - return partitions; - } - private boolean partitionCacheEnabled(Table table) { - return partitionCache != null - && table instanceof FileStoreTable - && !table.partitionKeys().isEmpty(); - } - - private int weigh(Identifier identifier, List partitions) { - return partitions.size(); + List result = partitionCache.getIfPresent(identifier); + if (result == null) { + result = wrapped.listPartitions(identifier); + partitionCache.put(identifier, result); + } + return result; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 84e924a2ab07..c72c354e45e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -290,6 +291,14 @@ void createPartition(Identifier identifier, Map partitionSpec) void dropPartition(Identifier identifier, Map partitions) throws TableNotExistException, PartitionNotExistException; + /** + * Get PartitionEntry of all partitions of the table. + * + * @param identifier path of the table to list partitions + * @throws TableNotExistException if the table does not exist + */ + List listPartitions(Identifier identifier) throws TableNotExistException; + /** * Modify an existing table from a {@link SchemaChange}. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 0f9cf2b649ae..01719e59029d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -192,6 +193,12 @@ public void dropPartition(Identifier identifier, Map partitions) wrapped.dropPartition(identifier, partitions); } + @Override + public List listPartitions(Identifier identifier) + throws TableNotExistException { + return wrapped.listPartitions(identifier); + } + @Override public void repairCatalog() { wrapped.repairCatalog(); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index d645c46bf656..0cdd9486cfc1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -241,7 +241,7 @@ public void testPartitionCache() throws Exception { Collections.emptyMap(), ""); catalog.createTable(tableIdent, schema, false); - List partitionEntryList = catalog.getPartitions(tableIdent); + List partitionEntryList = catalog.listPartitions(tableIdent); assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent); List partitionEntryListFromCache = catalog.partitionCache().getIfPresent(tableIdent);