Skip to content

Commit

Permalink
[flink] Skip listPartitions scan for FlinkRecomputeStatisticsProgram (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
WencongLiu authored Apr 25, 2024
1 parent f0c43a8 commit 63ef2fa
Showing 1 changed file with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.apache.flink.table.procedures.Procedure;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -125,6 +127,9 @@

/** Catalog for paimon. */
public class FlinkCatalog extends AbstractCatalog {

private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class);

private final ClassLoader classLoader;

private final Catalog catalog;
Expand Down Expand Up @@ -822,6 +827,15 @@ public final List<String> listViews(String databaseName) throws CatalogException
@Override
public final List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
// The method is skipped if it is being called as part of FlinkRecomputeStatisticsProgram,
// since this program scans the entire table and all its partitions, which is a
// time-consuming operation. By returning an empty result, we can prompt the FlinkPlanner to
// use the FlinkTableSource#reportStatistics method to gather the necessary statistics.
if (isCalledFromFlinkRecomputeStatisticsProgram()) {
LOG.info(
"Skipping listPartitions method due to detection of FlinkRecomputeStatisticsProgram call.");
return Collections.emptyList();
}
return getPartitionSpecs(tablePath, null);
}

Expand Down Expand Up @@ -1057,4 +1071,14 @@ public Procedure getProcedure(ObjectPath procedurePath)
return ProcedureUtil.getProcedure(catalog, procedurePath)
.orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}

private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace) {
if (stackTraceElement.getClassName().contains("FlinkRecomputeStatisticsProgram")) {
return true;
}
}
return false;
}
}

0 comments on commit 63ef2fa

Please sign in to comment.