diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 0d0e75d1f69ae..d150689358a7f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -891,11 +891,7 @@ public List listProcedures(String dbName) */ public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { - Procedure procedure = ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName()); - if (procedure == null) { - throw new ProcedureNotExistException(name, procedurePath); - } - - return procedure; + return ProcedureUtil.getProcedure(catalog, procedurePath.getObjectName()) + .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java index 53a4dc748f3ae..3908b3e62b140 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.procedure.ProcedureContext; import org.apache.flink.table.procedures.Procedure; @@ -86,8 +87,8 @@ public String[] call( ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); String name = conf.getOptional(PipelineOptions.NAME).orElse("Compact job"); if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) { - env.executeAsync(name); - return new String[] {"Running"}; + JobClient jobClient = env.executeAsync(name); + return new String[] {"JobID=" + jobClient.getJobID()}; } else { env.execute(name); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java index 94de201bc1ad6..15a5a186a5296 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java @@ -23,11 +23,10 @@ import org.apache.flink.table.procedures.Procedure; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** Utility methods for {@link Procedure}. */ public class ProcedureUtil { @@ -44,13 +43,12 @@ public static List listProcedures() { return Collections.unmodifiableList(SYSTEM_PROCEDURES); } - @Nullable - public static Procedure getProcedure(Catalog catalog, String procedureName) { + public static Optional getProcedure(Catalog catalog, String procedureName) { switch (procedureName) { case CompactActionFactory.IDENTIFIER: - return new CompactProcedure(catalog.warehouse(), catalog.options()); + return Optional.of(new CompactProcedure(catalog.warehouse(), catalog.options())); default: - return null; + return Optional.empty(); } } }