From 63ef2fa7d13b5736a260b0972ed69b3a1146509f Mon Sep 17 00:00:00 2001 From: Wencong Liu <104502720+WencongLiu@users.noreply.github.com> Date: Thu, 25 Apr 2024 17:11:43 +0800 Subject: [PATCH] [flink] Skip listPartitions scan for FlinkRecomputeStatisticsProgram (#3261) --- .../org/apache/paimon/flink/FlinkCatalog.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 71fbf0fa669d..90effe57fe35 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -85,6 +85,8 @@ import org.apache.flink.table.procedures.Procedure; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -125,6 +127,9 @@ /** Catalog for paimon. */ public class FlinkCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class); + private final ClassLoader classLoader; private final Catalog catalog; @@ -822,6 +827,15 @@ public final List listViews(String databaseName) throws CatalogException @Override public final List listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException { + // The method is skipped if it is being called as part of FlinkRecomputeStatisticsProgram, + // since this program scans the entire table and all its partitions, which is a + // time-consuming operation. By returning an empty result, we can prompt the FlinkPlanner to + // use the FlinkTableSource#reportStatistics method to gather the necessary statistics. + if (isCalledFromFlinkRecomputeStatisticsProgram()) { + LOG.info( + "Skipping listPartitions method due to detection of FlinkRecomputeStatisticsProgram call."); + return Collections.emptyList(); + } return getPartitionSpecs(tablePath, null); } @@ -1057,4 +1071,14 @@ public Procedure getProcedure(ObjectPath procedurePath) return ProcedureUtil.getProcedure(catalog, procedurePath) .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } + + private boolean isCalledFromFlinkRecomputeStatisticsProgram() { + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + for (StackTraceElement stackTraceElement : stackTrace) { + if (stackTraceElement.getClassName().contains("FlinkRecomputeStatisticsProgram")) { + return true; + } + } + return false; + } }