Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]Optimize flink listPartitions speed #4495

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -200,7 +201,17 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
return getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
return listPartitions(identifier, null);
}

public List<PartitionEntry> listPartitions(
Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
ReadBuilder readBuilder = getTable(identifier).newReadBuilder();
if (partitionSpec != null) {
readBuilder.withPartitionFilter(partitionSpec);
}
return readBuilder.newScan().listPartitionEntries();
}

protected abstract void createDatabaseImpl(String name, Map<String, String> properties);
Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ void dropPartition(Identifier identifier, Map<String, String> partitions)
*/
List<PartitionEntry> listPartitions(Identifier identifier) throws TableNotExistException;

/**
* Get PartitionEntry of filtered partitions of the table.
*
* @param identifier path of the table to list partitions
* @param partitions – the partition to be filtered
* @throws TableNotExistException if the table does not exist
*/
List<PartitionEntry> listPartitions(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException;

/**
* Modify an existing table from a {@link SchemaChange}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ public List<PartitionEntry> listPartitions(Identifier identifier)
return wrapped.listPartitions(identifier);
}

@Override
public List<PartitionEntry> listPartitions(
Identifier identifier, Map<String, String> partitions) throws TableNotExistException {
return wrapped.listPartitions(identifier, partitions);
}

@Override
public void repairCatalog() {
wrapped.repairCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -1160,15 +1159,18 @@ private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException
private List<PartitionEntry> getPartitionEntries(
Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec partitionSpec)
throws TableNotPartitionedException {
if (table.partitionKeys() == null || table.partitionKeys().size() == 0) {
try {
if (table.partitionKeys() == null || table.partitionKeys().isEmpty()) {
throw new TableNotPartitionedException(getName(), tablePath);
}
if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) {
return catalog.listPartitions(
toIdentifier(tablePath), partitionSpec.getPartitionSpec());
}
return catalog.listPartitions(toIdentifier(tablePath));
} catch (Catalog.TableNotExistException | TableNotPartitionedException e) {
throw new TableNotPartitionedException(getName(), tablePath);
}

ReadBuilder readBuilder = table.newReadBuilder();
if (partitionSpec != null && partitionSpec.getPartitionSpec() != null) {
readBuilder.withPartitionFilter(partitionSpec.getPartitionSpec());
}
return readBuilder.newScan().listPartitionEntries();
}

private List<CatalogPartitionSpec> getPartitionSpecs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,20 @@
package org.apache.paimon.flink;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.TestableCachingCatalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SinkTableLineageTable;
import org.apache.paimon.table.system.SourceTableLineageTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FakeTicker;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.catalog.CatalogPartition;
Expand All @@ -36,6 +45,8 @@

import javax.annotation.Nonnull;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -605,6 +616,49 @@ public void testShowPartitions() {
assertThat(result).containsExactlyInAnyOrder(Row.of("dt=2020-01-02/hh=11"));
}

@Test
void testShowPartitionsWithCachingCatalog() throws IOException {
String catalogName = "caching";
String warehouse = toWarehouse(path);
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
CatalogContext catalogContext = CatalogContext.create(catalogOptions);
FileIO fileIO = FileIO.get(new Path(warehouse), catalogContext);
FileSystemCatalog catalog = new FileSystemCatalog(fileIO, new Path(toWarehouse(path)));
FakeTicker ticker = new FakeTicker();
TestableCachingCatalog testableCachingCatalog =
new TestableCachingCatalog(catalog, Duration.ofMinutes(5), ticker);
FlinkCatalog caching =
FlinkCatalogFactory.createCatalog(
catalogName, testableCachingCatalog, catalogOptions);
tEnv.registerCatalog(catalogName, caching);
tEnv.useCatalog(catalogName);

tEnv.executeSql(
"CREATE TABLE PartitionTable (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " behavior STRING,\n"
+ " dt STRING,\n"
+ " hh STRING,\n"
+ " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ ") PARTITIONED BY (dt, hh)");
sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'");
sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'");
sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'");
List<Row> result = sql("SHOW PARTITIONS PartitionTable");
assertThat(result)
.containsExactlyInAnyOrder(
Row.of("dt=2020-01-01/hh=10"),
Row.of("dt=2020-01-02/hh=11"),
Row.of("dt=2020-01-03/hh=11"));

assertThat(testableCachingCatalog.partitionCache().asMap())
.containsKey(new Identifier("default", "PartitionTable"));
List<Row> result2 = sql("SHOW PARTITIONS PartitionTable");
assertThat(result).isEqualTo(result2);
}

@Test
void testPKTableGetPartition() throws Exception {
sql(
Expand Down
Loading