Skip to content

Commit

Permalink
[Optimization] [Flink Jar] Optimize flink jar task submission to supp…
Browse files Browse the repository at this point in the history
…ort batch tasks (DataLinkDC#3351)

Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Apr 4, 2024
1 parent 58afc08 commit 66c7194
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.url.RsURLStreamHandlerFactory;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.ZipUtils;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -252,20 +255,33 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
String sqlStatement = executor.pretreatStatement(statement);
if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) {
ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(sqlStatement);
StreamGraph streamGraph = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment());
Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment());
ReadableConfig configuration =
executor.getStreamExecutionEnvironment().getConfiguration();
streamGraph
.getExecutionConfig()
.configure(configuration, Thread.currentThread().getContextClassLoader());
streamGraph.getCheckpointConfig().configure(configuration);
streamGraph.setJobName(executor.getExecutorConfig().getJobName());
String savePointPath = executor.getExecutorConfig().getSavePointPath();
if (Asserts.isNotNullString(savePointPath)) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
savePointPath, configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
if (pipeline instanceof StreamGraph) {
// stream job
StreamGraph streamGraph = (StreamGraph) pipeline;
streamGraph
.getExecutionConfig()
.configure(configuration, Thread.currentThread().getContextClassLoader());
streamGraph.getCheckpointConfig().configure(configuration);
streamGraph.setJobName(executor.getExecutorConfig().getJobName());
String savePointPath = executor.getExecutorConfig().getSavePointPath();
if (Asserts.isNotNullString(savePointPath)) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
savePointPath,
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
} else if (pipeline instanceof Plan) {
// batch job
Plan plan = (Plan) pipeline;
plan.getExecutionConfig()
.configure(configuration, Thread.currentThread().getContextClassLoader());
plan.setJobName(executor.getExecutorConfig().getJobName());
}
JobClient client = executor.getStreamExecutionEnvironment().executeAsync(streamGraph);

JobClient client =
FlinkStreamEnvironmentUtil.executeAsync(pipeline, executor.getStreamExecutionEnvironment());
jobClient = Optional.of(client);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.dinky.trans.AbstractOperation;
import org.dinky.trans.ExtendOperation;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.URLUtils;

import org.apache.commons.lang.StringUtils;
Expand All @@ -32,7 +33,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;

import java.io.File;
Expand Down Expand Up @@ -60,24 +61,25 @@ public ExecuteJarOperation(String statement) {
@Override
public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
try {
tEnv.getStreamExecutionEnvironment().execute(getStreamGraph(tEnv));
StreamExecutionEnvironment streamExecutionEnvironment = tEnv.getStreamExecutionEnvironment();
FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment);
} catch (Exception e) {
throw new RuntimeException(e);
}

return Optional.of(TABLE_RESULT_OK);
}

public StreamGraph getStreamGraph(CustomTableEnvironment tEnv) {
public Pipeline getStreamGraph(CustomTableEnvironment tEnv) {
return getStreamGraph(tEnv, Collections.emptyList());
}

public StreamGraph getStreamGraph(CustomTableEnvironment tEnv, List<URL> classpaths) {
public Pipeline getStreamGraph(CustomTableEnvironment tEnv, List<URL> classpaths) {
JarSubmitParam submitParam = JarSubmitParam.build(statement);
return getStreamGraph(submitParam, tEnv, classpaths);
}

public static StreamGraph getStreamGraph(
public static Pipeline getStreamGraph(
JarSubmitParam submitParam, CustomTableEnvironment tEnv, List<URL> classpaths) {
SavepointRestoreSettings savepointRestoreSettings = StrUtil.isBlank(submitParam.getSavepointPath())
? SavepointRestoreSettings.none()
Expand Down Expand Up @@ -110,8 +112,7 @@ public static StreamGraph getStreamGraph(
: tEnv.getStreamExecutionEnvironment().getParallelism();
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true);
program.close();
Assert.isTrue(pipeline instanceof StreamGraph, "can not translate");
return (StreamGraph) pipeline;
return pipeline;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -146,11 +147,11 @@ public String asSummaryString() {
return statement;
}

public StreamGraph explain(CustomTableEnvironment tEnv) {
public Pipeline explain(CustomTableEnvironment tEnv) {
return getStreamGraph(tEnv);
}

public StreamGraph explain(CustomTableEnvironment tEnv, List<URL> classpaths) {
public Pipeline explain(CustomTableEnvironment tEnv, List<URL> classpaths) {
return getStreamGraph(tEnv, classpaths);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import static org.apache.flink.util.Preconditions.checkNotNull;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.PlanTranslator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import cn.hutool.core.util.ReflectUtil;

public enum FlinkStreamEnvironmentUtil {
;

public static JobClient executeAsync(Pipeline pipeline, StreamExecutionEnvironment env) throws Exception {
Configuration configuration = new Configuration((Configuration) env.getConfiguration());
checkNotNull(pipeline, "pipeline cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");

PipelineExecutorServiceLoader executorServiceLoader =
(PipelineExecutorServiceLoader) ReflectUtil.getFieldValue(env, "executorServiceLoader");
ClassLoader userClassloader = (ClassLoader) ReflectUtil.getFieldValue(env, "userClassloader");
final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration);

checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

CompletableFuture<JobClient> jobClientFuture =
executorFactory.getExecutor(configuration).execute(pipeline, configuration, userClassloader);

List<JobListener> jobListeners = env.getJobListeners();
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));

throw new FlinkException(
String.format("Failed to execute job '%s'.", ReflectUtil.invoke(pipeline, "getJobName")),
strippedException);
}
}

public static String getStreamingPlanAsJSON(Pipeline pipeline) {
if (pipeline instanceof StreamGraph) {
return ((StreamGraph) pipeline).getStreamingPlanAsJSON();
} else if (pipeline instanceof Plan) {
;
PlanTranslator planTranslator = new PlanTranslator();
return planTranslator.translateToJSONExecutionPlan(pipeline);
} else {
throw new IllegalArgumentException("Unknown pipeline type: " + pipeline);
}
}

public static JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
if (pipeline instanceof StreamGraph) {
return ((StreamGraph) pipeline).getJobGraph();
} else if (pipeline instanceof Plan) {
;
Plan plan = ((Plan) pipeline);
PlanTranslator planTranslator = new PlanTranslator();
return planTranslator.translateToJobGraph(pipeline, configuration, plan.getDefaultParallelism());
} else {
throw new IllegalArgumentException("Unknown pipeline type: " + pipeline);
}
}
}
7 changes: 4 additions & 3 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.trans.parse.SetSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.IpUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.net.URL;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -312,9 +313,9 @@ public ExplainResult explainSql(String statement) {
} else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) {

List<URL> allFileByAdd = jobManager.getAllFileSet();
StreamGraph streamGraph = new ExecuteJarOperation(item.getValue())
Pipeline pipeline = new ExecuteJarOperation(item.getValue())
.explain(executor.getCustomTableEnvironment(), allFileByAdd);
sqlExplainResult.setExplain(streamGraph.getStreamingPlanAsJSON());
sqlExplainResult.setExplain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline));
} else {
executor.executeSql(item.getValue());
}
Expand Down
34 changes: 23 additions & 11 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.FlinkStreamEnvironmentUtil;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.URLUtils;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
Expand Down Expand Up @@ -246,9 +249,10 @@ public boolean close() {
}

public ObjectNode getJarStreamGraphJson(String statement) {
StreamGraph streamGraph =
JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader());
return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(streamGraph.getJobGraph()));
Pipeline pipeline = JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader());
Configuration configuration = Configuration.fromMap(getExecutorConfig().getConfig());
JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration);
return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(jobGraph));
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
Expand All @@ -260,17 +264,21 @@ public JobResult executeJarSql(String statement) throws Exception {
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
JobJarStreamGraphBuilder jobJarStreamGraphBuilder = JobJarStreamGraphBuilder.build(this);
StreamGraph streamGraph = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader());
Pipeline pipeline = jobJarStreamGraphBuilder.getJarStreamGraph(statement, getDinkyClassLoader());
Configuration configuration =
executor.getCustomTableEnvironment().getConfig().getConfiguration();
if (Asserts.isNotNullString(config.getSavePointPath())) {
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
if (pipeline instanceof StreamGraph) {
if (Asserts.isNotNullString(config.getSavePointPath())) {
((StreamGraph) pipeline)
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
config.getSavePointPath(),
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
}
try {
if (!useGateway) {
JobClient jobClient = executor.getStreamExecutionEnvironment().executeAsync(streamGraph);
JobClient jobClient =
FlinkStreamEnvironmentUtil.executeAsync(pipeline, executor.getStreamExecutionEnvironment());
if (Asserts.isNotNull(jobClient)) {
job.setJobId(jobClient.getJobID().toHexString());
job.setJids(new ArrayList<String>() {
Expand All @@ -292,8 +300,12 @@ public JobResult executeJarSql(String statement) throws Exception {
config.getGatewayConfig().setSql(statement);
gatewayResult = Gateway.build(config.getGatewayConfig()).submitJar(getUdfPathContextHolder());
} else {
streamGraph.setJobName(config.getJobName());
JobGraph jobGraph = streamGraph.getJobGraph();
if (pipeline instanceof StreamGraph) {
((StreamGraph) pipeline).setJobName(config.getJobName());
} else if (pipeline instanceof Plan) {
((Plan) pipeline).setJobName(config.getJobName());
}
JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration);
GatewayConfig gatewayConfig = config.getGatewayConfig();
List<String> uriList = jobJarStreamGraphBuilder.getUris(statement);
String[] jarPaths = uriList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.SqlUtil;

import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.api.dag.Pipeline;

import java.io.File;
import java.net.URL;
Expand All @@ -60,7 +60,7 @@ public static JobJarStreamGraphBuilder build(JobManager jobManager) {
@Override
public void run() throws Exception {}

public StreamGraph getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) {
public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) {
DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader);
String[] statements = SqlUtil.getStatements(statement);
ExecuteJarOperation executeJarOperation = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ const JobDesc = (props: JobProps) => {

<Descriptions.Item label={l('devops.jobinfo.config.RestartStrategy')}>
<Tag color='blue' title={'Restart Strategy'}>
{jobDetail?.jobDataDto?.config['execution-config']['restart-strategy']}
{jobDetail?.jobDataDto?.config?.executionConfig?.['restart-strategy'] ??
jobDetail?.jobDataDto?.config?.['execution-config']?.['restart-strategy']}
</Tag>
</Descriptions.Item>

Expand Down Expand Up @@ -119,7 +120,7 @@ const JobDesc = (props: JobProps) => {
</Descriptions.Item>

<Descriptions.Item label={l('devops.jobinfo.config.JobParallelism')}>
{jobDetail?.jobDataDto?.config['execution-config']['job-parallelism']}
{jobDetail?.jobDataDto?.config?.['execution-config']?.jobParallelism}
</Descriptions.Item>

<Descriptions.Item label={l('global.table.useTime')}>
Expand Down

0 comments on commit 66c7194

Please sign in to comment.