diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 3102d0700f..2220cb56c0 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -44,11 +44,14 @@ import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.trans.parse.ExecuteJarParseStrategy; import org.dinky.url.RsURLStreamHandlerFactory; +import org.dinky.utils.FlinkStreamEnvironmentUtil; import org.dinky.utils.SqlUtil; import org.dinky.utils.ZipUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; @@ -252,20 +255,33 @@ public static Optional executeJarJob(String type, Executor executor, String sqlStatement = executor.pretreatStatement(statement); if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(sqlStatement); - StreamGraph streamGraph = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment()); + Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment()); ReadableConfig configuration = executor.getStreamExecutionEnvironment().getConfiguration(); - streamGraph - .getExecutionConfig() - .configure(configuration, Thread.currentThread().getContextClassLoader()); - streamGraph.getCheckpointConfig().configure(configuration); - streamGraph.setJobName(executor.getExecutorConfig().getJobName()); - String savePointPath = executor.getExecutorConfig().getSavePointPath(); - if (Asserts.isNotNullString(savePointPath)) { - streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath( - savePointPath, configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + if (pipeline instanceof StreamGraph) { + // stream job + StreamGraph streamGraph = (StreamGraph) pipeline; + streamGraph + .getExecutionConfig() + .configure(configuration, Thread.currentThread().getContextClassLoader()); + streamGraph.getCheckpointConfig().configure(configuration); + streamGraph.setJobName(executor.getExecutorConfig().getJobName()); + String savePointPath = executor.getExecutorConfig().getSavePointPath(); + if (Asserts.isNotNullString(savePointPath)) { + streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath( + savePointPath, + configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + } + } else if (pipeline instanceof Plan) { + // batch job + Plan plan = (Plan) pipeline; + plan.getExecutionConfig() + .configure(configuration, Thread.currentThread().getContextClassLoader()); + plan.setJobName(executor.getExecutorConfig().getJobName()); } - JobClient client = executor.getStreamExecutionEnvironment().executeAsync(streamGraph); + + JobClient client = + FlinkStreamEnvironmentUtil.executeAsync(pipeline, executor.getStreamExecutionEnvironment()); jobClient = Optional.of(client); break; } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index ac7bf1f3fb..dd1bb84b6e 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -23,6 +23,7 @@ import org.dinky.trans.AbstractOperation; import org.dinky.trans.ExtendOperation; import org.dinky.trans.parse.ExecuteJarParseStrategy; +import org.dinky.utils.FlinkStreamEnvironmentUtil; import org.dinky.utils.URLUtils; import org.apache.commons.lang.StringUtils; @@ -32,7 +33,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import java.io.File; @@ -60,7 +61,8 @@ public ExecuteJarOperation(String statement) { @Override public Optional execute(CustomTableEnvironment tEnv) { try { - tEnv.getStreamExecutionEnvironment().execute(getStreamGraph(tEnv)); + StreamExecutionEnvironment streamExecutionEnvironment = tEnv.getStreamExecutionEnvironment(); + FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment); } catch (Exception e) { throw new RuntimeException(e); } @@ -68,16 +70,16 @@ public Optional execute(CustomTableEnvironment tEnv) { return Optional.of(TABLE_RESULT_OK); } - public StreamGraph getStreamGraph(CustomTableEnvironment tEnv) { + public Pipeline getStreamGraph(CustomTableEnvironment tEnv) { return getStreamGraph(tEnv, Collections.emptyList()); } - public StreamGraph getStreamGraph(CustomTableEnvironment tEnv, List classpaths) { + public Pipeline getStreamGraph(CustomTableEnvironment tEnv, List classpaths) { JarSubmitParam submitParam = JarSubmitParam.build(statement); return getStreamGraph(submitParam, tEnv, classpaths); } - public static StreamGraph getStreamGraph( + public static Pipeline getStreamGraph( JarSubmitParam submitParam, CustomTableEnvironment tEnv, List classpaths) { SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath()) ? SavepointRestoreSettings.none() @@ -110,8 +112,7 @@ public static StreamGraph getStreamGraph( : tEnv.getStreamExecutionEnvironment().getParallelism(); Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true); program.close(); - Assert.isTrue(pipeline instanceof StreamGraph, "can not translate"); - return (StreamGraph) pipeline; + return pipeline; } catch (Exception e) { throw new RuntimeException(e); } @@ -146,11 +147,11 @@ public String asSummaryString() { return statement; } - public StreamGraph explain(CustomTableEnvironment tEnv) { + public Pipeline explain(CustomTableEnvironment tEnv) { return getStreamGraph(tEnv); } - public StreamGraph explain(CustomTableEnvironment tEnv, List classpaths) { + public Pipeline explain(CustomTableEnvironment tEnv, List classpaths) { return getStreamGraph(tEnv, classpaths); } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java new file mode 100644 index 0000000000..d0d6931879 --- /dev/null +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/utils/FlinkStreamEnvironmentUtil.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.utils; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.client.PlanTranslator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.core.execution.PipelineExecutorFactory; +import org.apache.flink.core.execution.PipelineExecutorServiceLoader; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import cn.hutool.core.util.ReflectUtil; + +public enum FlinkStreamEnvironmentUtil { + ; + + public static JobClient executeAsync(Pipeline pipeline, StreamExecutionEnvironment env) throws Exception { + Configuration configuration = new Configuration((Configuration) env.getConfiguration()); + checkNotNull(pipeline, "pipeline cannot be null."); + checkNotNull( + configuration.get(DeploymentOptions.TARGET), + "No execution.target specified in your configuration file."); + + PipelineExecutorServiceLoader executorServiceLoader = + (PipelineExecutorServiceLoader) ReflectUtil.getFieldValue(env, "executorServiceLoader"); + ClassLoader userClassloader = (ClassLoader) ReflectUtil.getFieldValue(env, "userClassloader"); + final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); + + checkNotNull( + executorFactory, + "Cannot find compatible factory for specified execution.target (=%s)", + configuration.get(DeploymentOptions.TARGET)); + + CompletableFuture jobClientFuture = + executorFactory.getExecutor(configuration).execute(pipeline, configuration, userClassloader); + + List jobListeners = env.getJobListeners(); + try { + JobClient jobClient = jobClientFuture.get(); + jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); + return jobClient; + } catch (ExecutionException executionException) { + final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException); + jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException)); + + throw new FlinkException( + String.format("Failed to execute job '%s'.", ReflectUtil.invoke(pipeline, "getJobName")), + strippedException); + } + } + + public static String getStreamingPlanAsJSON(Pipeline pipeline) { + if (pipeline instanceof StreamGraph) { + return ((StreamGraph) pipeline).getStreamingPlanAsJSON(); + } else if (pipeline instanceof Plan) { + ; + PlanTranslator planTranslator = new PlanTranslator(); + return planTranslator.translateToJSONExecutionPlan(pipeline); + } else { + throw new IllegalArgumentException("Unknown pipeline type: " + pipeline); + } + } + + public static JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) { + if (pipeline instanceof StreamGraph) { + return ((StreamGraph) pipeline).getJobGraph(); + } else if (pipeline instanceof Plan) { + ; + Plan plan = ((Plan) pipeline); + PlanTranslator planTranslator = new PlanTranslator(); + return planTranslator.translateToJobGraph(pipeline, configuration, plan.getDefaultParallelism()); + } else { + throw new IllegalArgumentException("Unknown pipeline type: " + pipeline); + } + } +} diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 59fb4ac5c3..e0f9f9fd95 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -44,15 +44,16 @@ import org.dinky.trans.parse.ExecuteJarParseStrategy; import org.dinky.trans.parse.SetSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; +import org.dinky.utils.FlinkStreamEnvironmentUtil; import org.dinky.utils.IpUtil; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; +import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.streaming.api.graph.StreamGraph; import java.net.URL; import java.time.LocalDateTime; @@ -312,9 +313,9 @@ public ExplainResult explainSql(String statement) { } else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) { List allFileByAdd = jobManager.getAllFileSet(); - StreamGraph streamGraph = new ExecuteJarOperation(item.getValue()) + Pipeline pipeline = new ExecuteJarOperation(item.getValue()) .explain(executor.getCustomTableEnvironment(), allFileByAdd); - sqlExplainResult.setExplain(streamGraph.getStreamingPlanAsJSON()); + sqlExplainResult.setExplain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline)); } else { executor.executeSql(item.getValue()); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 528b9571c2..1d890f30d5 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -61,11 +61,14 @@ import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; +import org.dinky.utils.FlinkStreamEnvironmentUtil; import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; @@ -246,9 +249,10 @@ public boolean close() { } public ObjectNode getJarStreamGraphJson(String statement) { - StreamGraph streamGraph = - JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader()); - return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph())); + Pipeline pipeline = JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader()); + Configuration configuration = Configuration.fromMap(getExecutorConfig().getConfig()); + JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); + return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(jobGraph)); } @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) @@ -260,17 +264,21 @@ public JobResult executeJarSql(String statement) throws Exception { job = Job.build(runMode, config, executorConfig, executor, statement, useGateway); ready(); JobJarStreamGraphBuilder jobJarStreamGraphBuilder = JobJarStreamGraphBuilder.build(this); - StreamGraph streamGraph = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader()); + Pipeline pipeline = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader()); Configuration configuration = executor.getCustomTableEnvironment().getConfig().getConfiguration(); - if (Asserts.isNotNullString(config.getSavePointPath())) { - streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath( - config.getSavePointPath(), - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + if (pipeline instanceof StreamGraph) { + if (Asserts.isNotNullString(config.getSavePointPath())) { + ((StreamGraph) pipeline) + .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( + config.getSavePointPath(), + configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); + } } try { if (!useGateway) { - JobClient jobClient = executor.getStreamExecutionEnvironment().executeAsync(streamGraph); + JobClient jobClient = + FlinkStreamEnvironmentUtil.executeAsync(pipeline, executor.getStreamExecutionEnvironment()); if (Asserts.isNotNull(jobClient)) { job.setJobId(jobClient.getJobID().toHexString()); job.setJids(new ArrayList() { @@ -292,8 +300,12 @@ public JobResult executeJarSql(String statement) throws Exception { config.getGatewayConfig().setSql(statement); gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(getUdfPathContextHolder()); } else { - streamGraph.setJobName(config.getJobName()); - JobGraph jobGraph = streamGraph.getJobGraph(); + if (pipeline instanceof StreamGraph) { + ((StreamGraph) pipeline).setJobName(config.getJobName()); + } else if (pipeline instanceof Plan) { + ((Plan) pipeline).setJobName(config.getJobName()); + } + JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); GatewayConfig gatewayConfig = config.getGatewayConfig(); List uriList = jobJarStreamGraphBuilder.getUris(statement); String[] jarPaths = uriList.stream() diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 4cd6653076..ed28ebad37 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -34,7 +34,7 @@ import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.SqlUtil; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.api.dag.Pipeline; import java.io.File; import java.net.URL; @@ -60,7 +60,7 @@ public static JobJarStreamGraphBuilder build(JobManager jobManager) { @Override public void run() throws Exception {} - public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) { + public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) { DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader); String[] statements = SqlUtil.getStatements(statement); ExecuteJarOperation executeJarOperation = null; diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/JobDesc.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/JobDesc.tsx index 9930111ee0..bc2d52e5a6 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/JobDesc.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobOverview/components/JobDesc.tsx @@ -88,7 +88,8 @@ const JobDesc = (props: JobProps) => { - {jobDetail?.jobDataDto?.config['execution-config']['restart-strategy']} + {jobDetail?.jobDataDto?.config?.executionConfig?.['restart-strategy'] ?? + jobDetail?.jobDataDto?.config?.['execution-config']?.['restart-strategy']} @@ -119,7 +120,7 @@ const JobDesc = (props: JobProps) => { - {jobDetail?.jobDataDto?.config['execution-config']['job-parallelism']} + {jobDetail?.jobDataDto?.config?.['execution-config']?.jobParallelism}