From 410681ad3ad5b4c3497cafe9f789059e592d9069 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Mon, 18 Sep 2023 19:54:07 +0800 Subject: [PATCH] [improve] --- .../paimon/flink/procedure/CompactProcedure.java | 12 ++++++------ .../apache/paimon/flink/procedure/ProcedureBase.java | 7 +++++++ .../apache/paimon/flink/procedure/ProcedureUtil.java | 11 +---------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 4ca38e8de5dc..cb8aa316272b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.CompactAction; import org.apache.paimon.flink.action.SortCompactAction; @@ -48,12 +50,8 @@ */ public class CompactProcedure extends ProcedureBase { - private final String warehouse; - private final Map catalogOptions; - - public CompactProcedure(String warehouse, Map catalogOptions) { - this.warehouse = warehouse; - this.catalogOptions = catalogOptions; + public CompactProcedure(Catalog catalog) { + super(catalog); } public String[] call(ProcedureContext procedureContext, String tableId) throws Exception { @@ -76,6 +74,8 @@ public String[] call( String orderByColumns, String... partitionStrings) throws Exception { + String warehouse = ((AbstractCatalog) catalog).warehouse(); + Map catalogOptions = ((AbstractCatalog) catalog).options(); Identifier identifier = Identifier.fromString(tableId); CompactAction action; String jobName; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index 061718bdf2da..c44fe2701bbe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.flink.configuration.PipelineOptions; @@ -32,6 +33,12 @@ /** Base implementation for flink {@link Procedure}. */ public class ProcedureBase implements Procedure { + protected final Catalog catalog; + + ProcedureBase(Catalog catalog) { + this.catalog = catalog; + } + protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) throws Exception { ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index daf3a458c13e..e90450afcd9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.action.CompactActionFactory; @@ -29,8 +28,6 @@ import java.util.List; import java.util.Optional; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Utility methods for {@link Procedure}. */ public class ProcedureUtil { @@ -47,15 +44,9 @@ public static List listProcedures() { } public static Optional getProcedure(Catalog catalog, String procedureName) { - checkArgument( - catalog instanceof AbstractCatalog, - "Currently, only Paimon built-in AbstractCatalog supports procedure."); - AbstractCatalog abstractCatalog = (AbstractCatalog) catalog; switch (procedureName) { case CompactActionFactory.IDENTIFIER: - return Optional.of( - new CompactProcedure( - abstractCatalog.warehouse(), abstractCatalog.options())); + return Optional.of(new CompactProcedure(catalog)); default: return Optional.empty(); }