diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4c36ad4db3c9..f7f6228669ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Preconditions; @@ -200,7 +201,17 @@ public void dropPartition(Identifier identifier, Map partitionSp @Override public List listPartitions(Identifier identifier) throws TableNotExistException { - return getTable(identifier).newReadBuilder().newScan().listPartitionEntries(); + return listPartitions(identifier, null); + } + + public List listPartitions( + Identifier identifier, Map partitionSpec) + throws TableNotExistException { + ReadBuilder readBuilder = getTable(identifier).newReadBuilder(); + if (partitionSpec != null) { + readBuilder.withPartitionFilter(partitionSpec); + } + return readBuilder.newScan().listPartitionEntries(); } protected abstract void createDatabaseImpl(String name, Map properties); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 6a6a047bd38c..6dc391056764 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -254,6 +254,16 @@ void dropPartition(Identifier identifier, Map partitions) */ List listPartitions(Identifier identifier) throws TableNotExistException; + /** + * Get PartitionEntry of filtered partitions of the table. + * + * @param identifier path of the table to list partitions + * @param partitions – the partition to be filtered + * @throws TableNotExistException if the table does not exist + */ + List listPartitions(Identifier identifier, Map partitions) + throws TableNotExistException; + /** * Modify an existing table from a {@link SchemaChange}. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index ec14d53a2b03..6a2937cfc9c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -170,6 +170,12 @@ public List listPartitions(Identifier identifier) return wrapped.listPartitions(identifier); } + @Override + public List listPartitions( + Identifier identifier, Map partitions) throws TableNotExistException { + return wrapped.listPartitions(identifier, partitions); + } + @Override public void repairCatalog() { wrapped.repairCatalog(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index cae6e6f0e367..c9a118b14ed9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -36,7 +36,6 @@ import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; @@ -1160,15 +1159,18 @@ private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException private List getPartitionEntries( Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec) throws TableNotPartitionedException { - if (table.partitionKeys() == null || table.partitionKeys().size() == 0) { + try { + if (table.partitionKeys() == null || table.partitionKeys().isEmpty()) { + throw new TableNotPartitionedException(getName(), tablePath); + } + if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) { + return catalog.listPartitions( + toIdentifier(tablePath), partitionSpec.getPartitionSpec()); + } + return catalog.listPartitions(toIdentifier(tablePath)); + } catch (Catalog.TableNotExistException | TableNotPartitionedException e) { throw new TableNotPartitionedException(getName(), tablePath); } - - ReadBuilder readBuilder = table.newReadBuilder(); - if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) { - readBuilder.withPartitionFilter(partitionSpec.getPartitionSpec()); - } - return readBuilder.newScan().listPartitionEntries(); } private List getPartitionSpecs( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 439cdf958f50..447a339dae22 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -19,11 +19,20 @@ package org.apache.paimon.flink; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.TestableCachingCatalog; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.table.system.AllTableOptionsTable; import org.apache.paimon.table.system.CatalogOptionsTable; import org.apache.paimon.table.system.SinkTableLineageTable; import org.apache.paimon.table.system.SourceTableLineageTable; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.FakeTicker; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.catalog.CatalogPartition; @@ -36,6 +45,8 @@ import javax.annotation.Nonnull; +import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -605,6 +616,49 @@ public void testShowPartitions() { assertThat(result).containsExactlyInAnyOrder(Row.of("dt=2020-01-02/hh=11")); } + @Test + void testShowPartitionsWithCachingCatalog() throws IOException { + String catalogName = "caching"; + String warehouse = toWarehouse(path); + Options catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); + CatalogContext catalogContext = CatalogContext.create(catalogOptions); + FileIO fileIO = FileIO.get(new Path(warehouse), catalogContext); + FileSystemCatalog catalog = new FileSystemCatalog(fileIO, new Path(toWarehouse(path))); + FakeTicker ticker = new FakeTicker(); + TestableCachingCatalog testableCachingCatalog = + new TestableCachingCatalog(catalog, Duration.ofMinutes(5), ticker); + FlinkCatalog caching = + FlinkCatalogFactory.createCatalog( + catalogName, testableCachingCatalog, catalogOptions); + tEnv.registerCatalog(catalogName, caching); + tEnv.useCatalog(catalogName); + + tEnv.executeSql( + "CREATE TABLE PartitionTable (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh)"); + sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'"); + sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'"); + sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'"); + List result = sql("SHOW PARTITIONS PartitionTable"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of("dt=2020-01-01/hh=10"), + Row.of("dt=2020-01-02/hh=11"), + Row.of("dt=2020-01-03/hh=11")); + + assertThat(testableCachingCatalog.partitionCache().asMap()) + .containsKey(new Identifier("default", "PartitionTable")); + List result2 = sql("SHOW PARTITIONS PartitionTable"); + assertThat(result).isEqualTo(result2); + } + @Test void testPKTableGetPartition() throws Exception { sql(