From 2401f613d8cbc36946ce7ba95e6af06bf8a92754 Mon Sep 17 00:00:00 2001 From: wenmo <32723967+aiwenmo@users.noreply.github.com> Date: Wed, 13 Nov 2024 00:25:48 +0800 Subject: [PATCH] [Optimization-3909][core] Optimize FlinkDDL execution sequence --- .../org/dinky/trans/AbstractOperation.java | 15 +- .../java/org/dinky/trans/ExtendOperation.java | 3 + .../dinky/trans/ddl/CustomSetOperation.java | 12 +- .../dinky/trans/dml/ExecuteJarOperation.java | 2 +- .../dinky/data/result/SqlExplainResult.java | 19 ++ .../java/org/dinky/executor/Executor.java | 14 +- .../java/org/dinky/explainer/Explainer.java | 248 +++------------ .../main/java/org/dinky/job/JobBuilder.java | 16 + .../main/java/org/dinky/job/JobManager.java | 13 +- .../src/main/java/org/dinky/job/JobParam.java | 4 + .../org/dinky/job/builder/JobDDLBuilder.java | 295 +++++++++++++++++- .../dinky/job/builder/JobExecuteBuilder.java | 71 +++++ .../job/builder/JobJarStreamGraphBuilder.java | 7 + .../dinky/job/builder/JobTransBuilder.java | 88 ++++++ .../org/dinky/job/builder/JobUDFBuilder.java | 7 + .../java/org/dinky/function/util/UDFUtil.java | 4 + 16 files changed, 606 insertions(+), 212 deletions(-) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/AbstractOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/AbstractOperation.java index ceb178d958..c6b74607fb 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/AbstractOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/AbstractOperation.java @@ -19,6 +19,10 @@ package org.dinky.trans; +import org.dinky.executor.CustomTableEnvironment; + +import org.apache.flink.table.operations.Operation; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +31,7 @@ * * @since 2021/6/14 18:18 */ -public class AbstractOperation { +public class AbstractOperation implements Operation { protected static final Logger logger = LoggerFactory.getLogger(AbstractOperation.class); @@ -50,4 +54,13 @@ public void setStatement(String statement) { public boolean noExecute() { return true; } + + public String explain(CustomTableEnvironment tEnv) { + return asSummaryString(); + } + + @Override + public String asSummaryString() { + return ""; + } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ExtendOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ExtendOperation.java index 3a2b6d0aa8..2801e19444 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ExtendOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ExtendOperation.java @@ -35,8 +35,11 @@ /** */ public interface ExtendOperation extends Operation { + Optional execute(CustomTableEnvironment tEnv); + String explain(CustomTableEnvironment tEnv); + TableResult TABLE_RESULT_OK = TableResultImpl.builder() .resultKind(ResultKind.SUCCESS) .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/CustomSetOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/CustomSetOperation.java index 686d3db814..1b011ed6d9 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/CustomSetOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/ddl/CustomSetOperation.java @@ -98,6 +98,11 @@ public Optional execute(CustomTableEnvironment tEnv) { return Optional.of(TABLE_RESULT_OK); } + @Override + public String explain(CustomTableEnvironment tEnv) { + return asSummaryString(); + } + private void callSet(SetOperation setOperation, CustomTableEnvironment environment) { if (!setOperation.getKey().isPresent() || !setOperation.getValue().isPresent()) { return; @@ -121,6 +126,11 @@ private void setConfiguration(CustomTableEnvironment environment, Map statements) { + return tableEnvironment.getStreamGraphFromInserts(statements); } public ObjectNode getStreamGraph(List statements) { @@ -251,7 +255,7 @@ public ObjectNode getStreamGraph(List statements) { return getStreamGraphJsonNode(streamGraph); } - private ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) { + public ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) { JSONGenerator jsonGenerator = new JSONGenerator(streamGraph); String json = jsonGenerator.getJSON(); ObjectMapper mapper = new ObjectMapper(); @@ -261,7 +265,6 @@ private ObjectNode getStreamGraphJsonNode(StreamGraph streamGraph) { } catch (JsonProcessingException e) { logger.error("Get stream graph json node error.", e); } - return objectNode; } @@ -269,6 +272,11 @@ public StreamGraph getStreamGraph() { return environment.getStreamGraph(); } + public StreamGraph getStreamGraphFromCustomStatements(List statements) { + statements.forEach(this::executeSql); + return getStreamGraph(); + } + public ObjectNode getStreamGraphFromDataStream(List statements) { statements.forEach(this::executeSql); return getStreamGraphJsonNode(getStreamGraph()); diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 98c3f63cb5..75e94d53b3 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -25,7 +25,6 @@ import org.dinky.data.model.LineageRel; import org.dinky.data.result.ExplainResult; import org.dinky.data.result.SqlExplainResult; -import org.dinky.executor.CustomTableEnvironment; import org.dinky.executor.Executor; import org.dinky.explainer.mock.MockStatementExplainer; import org.dinky.explainer.print_table.PrintStatementExplainer; @@ -36,29 +35,18 @@ import org.dinky.job.JobManager; import org.dinky.job.JobParam; import org.dinky.job.StatementParam; +import org.dinky.job.builder.JobDDLBuilder; +import org.dinky.job.builder.JobExecuteBuilder; +import org.dinky.job.builder.JobTransBuilder; import org.dinky.job.builder.JobUDFBuilder; import org.dinky.parser.SqlType; import org.dinky.trans.Operations; -import org.dinky.trans.ddl.CustomSetOperation; -import org.dinky.trans.dml.ExecuteJarOperation; -import org.dinky.trans.parse.AddFileSqlParseStrategy; -import org.dinky.trans.parse.AddJarSqlParseStrategy; -import org.dinky.trans.parse.ExecuteJarParseStrategy; -import org.dinky.trans.parse.SetSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; -import org.dinky.utils.FlinkStreamEnvironmentUtil; import org.dinky.utils.IpUtil; -import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; -import org.dinky.utils.URLUtils; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import java.net.URL; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -73,7 +61,6 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.text.StrBuilder; -import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; @@ -137,27 +124,7 @@ public JobParam pretreatStatements(String[] statements) { continue; } SqlType operationType = Operations.getOperationType(statement); - if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(statement)) { - CustomSetOperation customSetOperation = new CustomSetOperation(statement); - customSetOperation.execute(this.executor.getCustomTableEnvironment()); - } else if (operationType.equals(SqlType.ADD)) { - AddJarSqlParseStrategy.getAllFilePath(statement) - .forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t)); - (executor.getDinkyClassLoader()) - .addURLs(URLUtils.getURLs( - jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); - } else if (operationType.equals(SqlType.ADD_FILE)) { - AddFileSqlParseStrategy.getAllFilePath(statement) - .forEach(t -> jobManager.getUdfPathContextHolder().addFile(t)); - (executor.getDinkyClassLoader()) - .addURLs(URLUtils.getURLs( - jobManager.getUdfPathContextHolder().getFiles())); - } else if (operationType.equals(SqlType.ADD_JAR)) { - Configuration combinationConfig = getCombinationConfig(); - FileSystem.initialize(combinationConfig, null); - ddl.add(new StatementParam(statement, operationType)); - statementList.add(statement); - } else if (transSqlTypeSet.contains(operationType)) { + if (transSqlTypeSet.contains(operationType)) { trans.add(new StatementParam(statement, operationType)); statementList.add(statement); } else if (operationType.equals(SqlType.EXECUTE)) { @@ -172,10 +139,6 @@ public JobParam pretreatStatements(String[] statements) { PrintStatementExplainer.getCreateStatement(tableName, host, port), SqlType.CTAS)); } } else { - UDF udf = UDFUtil.toUDF(statement, jobManager.getDinkyClassLoader()); - if (Asserts.isNotNull(udf)) { - udfList.add(udf); - } ddl.add(new StatementParam(statement, operationType)); statementList.add(statement); } @@ -190,16 +153,6 @@ public JobParam pretreatStatements(String[] statements) { return jobParam; } - private Configuration getCombinationConfig() { - CustomTableEnvironment cte = executor.getCustomTableEnvironment(); - Configuration rootConfig = cte.getRootConfiguration(); - Configuration config = cte.getConfig().getConfiguration(); - Configuration combinationConfig = new Configuration(); - combinationConfig.addAll(rootConfig); - combinationConfig.addAll(config); - return combinationConfig; - } - public List parseUDFFromStatements(String[] statements) { List udfList = new ArrayList<>(); for (String statement : statements) { @@ -218,178 +171,81 @@ public ExplainResult explainSql(String statement) { log.info("Start explain FlinkSQL..."); JobParam jobParam; List sqlExplainRecords = new ArrayList<>(); - int index = 1; boolean correct = true; try { jobParam = pretreatStatements(SqlUtil.getStatements(statement)); + jobManager.setJobParam(jobParam); } catch (Exception e) { SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); resultBuilder.error(e.getMessage()).parseTrue(false); sqlExplainRecords.add(resultBuilder.build()); - log.error("failed pretreatStatements:", e); + log.error("Failed to pretreat statements:", e); return new ExplainResult(false, sqlExplainRecords.size(), sqlExplainRecords); } - for (StatementParam item : jobParam.getDdl()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - try { - SqlExplainResult recordResult = executor.explainSqlRecord(item.getValue()); - if (Asserts.isNull(recordResult)) { - continue; - } - resultBuilder = SqlExplainResult.newBuilder(recordResult); - executor.executeSql(item.getValue()); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - LogUtil.getError(e)); - resultBuilder - .error(error) - .explainTrue(false) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()) - .index(index); - sqlExplainRecords.add(resultBuilder.build()); + // step 1: explain and execute ddl + List ddlSqlExplainResults = + JobDDLBuilder.build(jobManager).explain(); + sqlExplainRecords.addAll(ddlSqlExplainResults); + for (SqlExplainResult item : ddlSqlExplainResults) { + if (!item.isParseTrue() || !item.isExplainTrue()) { correct = false; - log.error(error); - break; } - resultBuilder - .explainTrue(true) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()) - .index(index++); - sqlExplainRecords.add(resultBuilder.build()); } if (correct && !jobParam.getTrans().isEmpty()) { - if (useStatementSet) { - List inserts = new ArrayList<>(); - for (StatementParam item : jobParam.getTrans()) { - if (item.getType().equals(SqlType.INSERT) || item.getType().equals(SqlType.CTAS)) { - inserts.add(item.getValue()); - } - } - if (!inserts.isEmpty()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - String sqlSet = String.join(";\r\n ", inserts); - try { - resultBuilder - .explain(executor.explainStatementSet(inserts)) - .parseTrue(true) - .explainTrue(true); - } catch (Exception e) { - String error = LogUtil.getError(e); - resultBuilder.error(error).parseTrue(false).explainTrue(false); - correct = false; - log.error(error); - } finally { - resultBuilder - .type("Modify DML") - .explainTime(LocalDateTime.now()) - .sql(sqlSet) - .index(index); - sqlExplainRecords.add(resultBuilder.build()); - } - } - } else { - for (StatementParam item : jobParam.getTrans()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - - try { - resultBuilder = SqlExplainResult.newBuilder(executor.explainSqlRecord(item.getValue())); - resultBuilder.parseTrue(true).explainTrue(true); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - e.getMessage()); - resultBuilder.error(error).parseTrue(false).explainTrue(false); - correct = false; - log.error(error); - } finally { - resultBuilder - .type("Modify DML") - .explainTime(LocalDateTime.now()) - .sql(item.getValue()) - .index(index++); - sqlExplainRecords.add(resultBuilder.build()); - } - } - } + // step 2: explain modifyOptions + sqlExplainRecords.addAll(JobTransBuilder.build(jobManager).explain()); + // step 3: explain pipeline + sqlExplainRecords.addAll(JobExecuteBuilder.build(jobManager).explain()); } - for (StatementParam item : jobParam.getExecute()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - - try { - SqlExplainResult sqlExplainResult = executor.explainSqlRecord(item.getValue()); - if (Asserts.isNull(sqlExplainResult)) { - sqlExplainResult = new SqlExplainResult(); - } else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) { - - List allFileByAdd = jobManager.getAllFileSet(); - Pipeline pipeline = new ExecuteJarOperation(item.getValue()) - .explain(executor.getCustomTableEnvironment(), allFileByAdd); - sqlExplainResult.setExplain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline)); - } else { - executor.executeSql(item.getValue()); - } - resultBuilder = SqlExplainResult.newBuilder(sqlExplainResult); - resultBuilder.type("DATASTREAM").parseTrue(true); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - e.getMessage()); - resultBuilder - .error(error) - .explainTrue(false) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()) - .index(index); - sqlExplainRecords.add(resultBuilder.build()); + int index = 1; + for (SqlExplainResult item : sqlExplainRecords) { + item.setIndex(index++); + if (!item.isParseTrue() || !item.isExplainTrue()) { correct = false; - log.error(error); - break; } - resultBuilder - .explainTrue(true) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()) - .index(index++); - sqlExplainRecords.add(resultBuilder.build()); } log.info(StrUtil.format("A total of {} FlinkSQL have been Explained.", sqlExplainRecords.size())); return new ExplainResult(correct, sqlExplainRecords.size(), sqlExplainRecords); } public ObjectNode getStreamGraph(String statement) { - JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); - jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue())); - - if (!jobParam.getTrans().isEmpty()) { - return executor.getStreamGraph(jobParam.getTransStatement()); - } - - if (!jobParam.getExecute().isEmpty()) { - List dataStreamPlans = - jobParam.getExecute().stream().map(StatementParam::getValue).collect(Collectors.toList()); - return executor.getStreamGraphFromDataStream(dataStreamPlans); + try { + JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); + jobManager.setJobParam(jobParam); + // step 1: execute ddl + JobDDLBuilder.build(jobManager).run(); + // step 2: get the stream graph of trans + if (!jobParam.getTrans().isEmpty()) { + return executor.getStreamGraphJsonNode( + JobTransBuilder.build(jobManager).getStreamGraph()); + } + // step 3: get the stream graph of pipeline + if (!jobParam.getExecute().isEmpty()) { + return executor.getStreamGraphJsonNode( + JobExecuteBuilder.build(jobManager).getStreamGraph()); + } + } catch (Exception e) { + throw new RuntimeException(e); } return mapper.createObjectNode(); } public JobPlanInfo getJobPlanInfo(String statement) { - JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); - jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue())); - - if (!jobParam.getTrans().isEmpty()) { - return executor.getJobPlanInfo(jobParam.getTransStatement()); - } - - if (!jobParam.getExecute().isEmpty()) { - List dataStreamPlans = - jobParam.getExecute().stream().map(StatementParam::getValue).collect(Collectors.toList()); - return executor.getJobPlanInfoFromDataStream(dataStreamPlans); + try { + JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement)); + jobManager.setJobParam(jobParam); + // step 1: execute ddl + JobDDLBuilder.build(jobManager).run(); + // step 2: get the job plan info of trans + if (!jobParam.getTrans().isEmpty()) { + return JobTransBuilder.build(jobManager).getJobPlanInfo(); + } + // step 3: get the job plan info of pipeline + if (!jobParam.getExecute().isEmpty()) { + return JobExecuteBuilder.build(jobManager).getJobPlanInfo(); + } + } catch (Exception e) { + throw new RuntimeException(e); } throw new RuntimeException("Creating job plan fails because this job doesn't contain an insert statement."); } diff --git a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java b/dinky-core/src/main/java/org/dinky/job/JobBuilder.java index c31f33bd55..497ebd8641 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/JobBuilder.java @@ -20,8 +20,14 @@ package org.dinky.job; import org.dinky.data.enums.GatewayType; +import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.Executor; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import java.util.List; + public abstract class JobBuilder { protected JobManager jobManager; @@ -45,4 +51,14 @@ public JobBuilder(JobManager jobManager) { } public abstract void run() throws Exception; + + public abstract List explain(); + + public StreamGraph getStreamGraph() { + return executor.getStreamGraph(); + } + + public JobPlanInfo getJobPlanInfo() { + return null; + } } diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 0e5d7410d9..9dd9e57947 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -54,7 +54,6 @@ import org.dinky.job.builder.JobExecuteBuilder; import org.dinky.job.builder.JobJarStreamGraphBuilder; import org.dinky.job.builder.JobTransBuilder; -import org.dinky.job.builder.JobUDFBuilder; import org.dinky.parser.SqlType; import org.dinky.trans.Operations; import org.dinky.trans.parse.AddFileSqlParseStrategy; @@ -293,13 +292,11 @@ public JobResult executeSql(String statement) throws Exception { jobParam = Explainer.build(executor, useStatementSet, this).pretreatStatements(SqlUtil.getStatements(statement)); try { - // step 1: init udf - JobUDFBuilder.build(this).run(); - // step 2: execute ddl + // step 1: execute ddl JobDDLBuilder.build(this).run(); - // step 3: execute insert/select/show/desc/CTAS... + // step 2: execute insert/select/show/desc/CTAS... JobTransBuilder.build(this).run(); - // step 4: execute custom data stream task + // step 3: execute custom data stream task JobExecuteBuilder.build(this).run(); // finished job.setEndTime(LocalDateTime.now()); @@ -370,9 +367,7 @@ public static SelectResult getJobData(String jobId) { } public ExplainResult explainSql(String statement) { - return Explainer.build(executor, useStatementSet, this) - .initialize(config, statement) - .explainSql(statement); + return Explainer.build(executor, useStatementSet, this).explainSql(statement); } public ObjectNode getStreamGraph(String statement) { diff --git a/dinky-core/src/main/java/org/dinky/job/JobParam.java b/dinky-core/src/main/java/org/dinky/job/JobParam.java index fb2f7b29ff..8226938a58 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobParam.java +++ b/dinky-core/src/main/java/org/dinky/job/JobParam.java @@ -100,6 +100,10 @@ public List getTransStatement() { return trans.stream().map(StatementParam::getValue).collect(Collectors.toList()); } + public List getExecuteStatement() { + return execute.stream().map(StatementParam::getValue).collect(Collectors.toList()); + } + public void setTrans(List trans) { this.trans = trans; } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java index 64869061ec..739dd537a5 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java @@ -19,10 +19,42 @@ package org.dinky.job.builder; +import static org.dinky.function.util.UDFUtil.*; + +import org.dinky.assertion.Asserts; +import org.dinky.data.model.SystemConfiguration; +import org.dinky.data.result.SqlExplainResult; +import org.dinky.executor.CustomTableEnvironment; +import org.dinky.function.data.model.UDF; +import org.dinky.function.util.UDFUtil; import org.dinky.job.JobBuilder; import org.dinky.job.JobManager; import org.dinky.job.StatementParam; +import org.dinky.parser.SqlType; +import org.dinky.trans.ddl.CustomSetOperation; +import org.dinky.trans.parse.AddFileSqlParseStrategy; +import org.dinky.trans.parse.AddJarSqlParseStrategy; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; +import org.dinky.utils.URLUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import java.io.File; +import java.net.URL; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.text.StrFormatter; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; /** @@ -42,9 +74,270 @@ public static JobDDLBuilder build(JobManager jobManager) { @Override public void run() throws Exception { + List udfList = new ArrayList<>(); for (StatementParam item : jobParam.getDdl()) { jobManager.setCurrentSql(item.getValue()); - executor.executeSql(item.getValue()); + switch (item.getType()) { + case SET: + executeSet(item.getValue()); + break; + case ADD: + executeAdd(item.getValue()); + break; + case ADD_FILE: + executeAddFile(item.getValue()); + break; + case ADD_JAR: + executeAddJar(item.getValue()); + break; + case CREATE: + if (UDFUtil.isUdfStatement(item.getValue())) { + udfList.add(UDFUtil.toUDF(item.getValue(), executor.getDinkyClassLoader())); + } else { + executor.executeSql(item.getValue()); + } + break; + default: + executor.executeSql(item.getValue()); + } + } + if (!udfList.isEmpty()) { + executeCreateFunction(udfList); + } + } + + @Override + public List explain() { + List sqlExplainResults = new ArrayList<>(); + if (Asserts.isNullCollection(jobParam.getDdl())) { + return sqlExplainResults; + } + List udfList = new ArrayList<>(); + List udfStatements = new ArrayList<>(); + for (StatementParam item : jobParam.getDdl()) { + jobManager.setCurrentSql(item.getValue()); + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + SqlExplainResult recordResult = null; + switch (item.getType()) { + case SET: + recordResult = explainSet(item.getValue()); + break; + case ADD: + recordResult = explainAdd(item.getValue()); + break; + case ADD_FILE: + recordResult = explainAddFile(item.getValue()); + break; + case ADD_JAR: + recordResult = explainAddJar(item.getValue()); + break; + case CREATE: + if (UDFUtil.isUdfStatement(item.getValue())) { + udfList.add(UDFUtil.toUDF(item.getValue(), executor.getDinkyClassLoader())); + udfStatements.add(item.getValue()); + } else { + recordResult = explainOtherDDL(item.getValue()); + } + break; + default: + recordResult = explainOtherDDL(item.getValue()); + } + if (Asserts.isNull(recordResult) || recordResult.isInvalid()) { + continue; + } + resultBuilder = SqlExplainResult.newBuilder(recordResult) + .type(item.getType().getType()); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in executing FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(item.getValue()), + LogUtil.getError(e)); + resultBuilder + .type(item.getType().getType()) + .error(error) + .explainTrue(false) + .explainTime(LocalDateTime.now()) + .sql(item.getValue()); + log.error(error); + sqlExplainResults.add(resultBuilder.build()); + } + resultBuilder + .type(item.getType().getType()) + .explainTrue(true) + .explainTime(LocalDateTime.now()) + .sql(item.getValue()); + sqlExplainResults.add(resultBuilder.build()); + } + if (!udfList.isEmpty()) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + String udfStatement = StringUtils.join(udfStatements, ";\n"); + try { + explainCreateFunction(udfList); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in executing CreateFunction:\n{}\n{}", + SqlUtil.addLineNumber(udfStatement), + LogUtil.getError(e)); + resultBuilder + .type(SqlType.CREATE.getType()) + .error(error) + .explainTrue(false) + .explainTime(LocalDateTime.now()) + .sql(udfStatement); + log.error(error); + sqlExplainResults.add(resultBuilder.build()); + } + resultBuilder + .type(SqlType.CREATE.getType()) + .explainTrue(true) + .explainTime(LocalDateTime.now()) + .sql(udfStatement); + sqlExplainResults.add(resultBuilder.build()); + } + return sqlExplainResults; + } + + private void executeSet(String statement) { + CustomSetOperation customSetOperation = new CustomSetOperation(statement); + customSetOperation.execute(executor.getCustomTableEnvironment()); + } + + private void executeAdd(String statement) { + AddJarSqlParseStrategy.getAllFilePath(statement) + .forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t)); + (executor.getDinkyClassLoader()) + .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); + } + + private void executeAddFile(String statement) { + AddFileSqlParseStrategy.getAllFilePath(statement) + .forEach(t -> jobManager.getUdfPathContextHolder().addFile(t)); + (executor.getDinkyClassLoader()) + .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles())); + } + + private void executeAddJar(String statement) { + Configuration combinationConfig = getCombinationConfig(); + FileSystem.initialize(combinationConfig, null); + executor.executeSql(statement); + } + + private void executeCreateFunction(List udfList) { + Integer taskId = config.getTaskId(); + if (taskId == null) { + taskId = -RandomUtil.randomInt(0, 1000); + } + // 1. Obtain the path of the jar package and inject it into the remote environment + List jarFiles = + new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); + + String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId); + String[] jarPaths = CollUtil.removeNull(jarFiles).stream() + .map(File::getAbsolutePath) + .toArray(String[]::new); + if (GATEWAY_TYPE_MAP.get(SESSION).contains(runMode)) { + config.setJarFiles(jarPaths); } + + // 2.Compile Python + String[] pyPaths = UDFUtil.initPythonUDF( + udfList, runMode, config.getTaskId(), executor.getTableConfig().getConfiguration()); + + executor.initUDF(userCustomUdfJarPath); + executor.initUDF(jarPaths); + + if (ArrayUtil.isNotEmpty(pyPaths)) { + for (String pyPath : pyPaths) { + if (StrUtil.isNotBlank(pyPath)) { + jarFiles.add(new File(pyPath)); + jobManager.getUdfPathContextHolder().addPyUdfPath(new File(pyPath)); + } + } + } + if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) { + for (String jarPath : userCustomUdfJarPath) { + if (StrUtil.isNotBlank(jarPath)) { + jarFiles.add(new File(jarPath)); + jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath)); + } + } + } + + Set pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile(); + executor.initPyUDF( + SystemConfiguration.getInstances().getPythonHome(), + pyUdfFile.stream().map(File::getAbsolutePath).toArray(String[]::new)); + if (GATEWAY_TYPE_MAP.get(YARN).contains(runMode)) { + config.getGatewayConfig().setJarPaths(ArrayUtil.append(jarPaths, pyPaths)); + } + + try { + List jarList = CollUtil.newArrayList(URLUtils.getURLs(jarFiles)); + // 3.Write the required files for UDF + UDFUtil.writeManifest(taskId, jarList, jobManager.getUdfPathContextHolder()); + UDFUtil.addConfigurationClsAndJars( + jobManager.getExecutor().getCustomTableEnvironment(), + jarList, + CollUtil.newArrayList(URLUtils.getURLs(jarFiles))); + } catch (Exception e) { + throw new RuntimeException("add configuration failed: ", e); + } + + log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); + log.info("Initializing Flink UDF...Finish"); + } + + private Configuration getCombinationConfig() { + CustomTableEnvironment cte = executor.getCustomTableEnvironment(); + Configuration rootConfig = cte.getRootConfiguration(); + Configuration config = cte.getConfig().getConfiguration(); + Configuration combinationConfig = new Configuration(); + combinationConfig.addAll(rootConfig); + combinationConfig.addAll(config); + return combinationConfig; + } + + private SqlExplainResult explainSet(String statement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + CustomSetOperation customSetOperation = new CustomSetOperation(statement); + String explain = customSetOperation.explain(executor.getCustomTableEnvironment()); + customSetOperation.execute(executor.getCustomTableEnvironment()); + return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); + } + + private SqlExplainResult explainAdd(String statement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + executeAdd(statement); + String explain = Arrays.toString( + URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); + return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); + } + + private SqlExplainResult explainAddFile(String statement) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + executeAddFile(statement); + String explain = Arrays.toString( + URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles())); + return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); + } + + private SqlExplainResult explainAddJar(String statement) { + SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); + executeAddJar(statement); + return sqlExplainResult; + } + + private SqlExplainResult explainCreateFunction(List udfList) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + executeCreateFunction(udfList); + String explain = udfList.toString(); + return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); + } + + private SqlExplainResult explainOtherDDL(String statement) { + SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); + executor.executeSql(statement); + return sqlExplainResult; } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java index 3253393f97..b31844b9c0 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java @@ -23,6 +23,7 @@ import org.dinky.data.result.IResult; import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; +import org.dinky.data.result.SqlExplainResult; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; import org.dinky.job.Job; @@ -30,19 +31,32 @@ import org.dinky.job.JobManager; import org.dinky.job.StatementParam; import org.dinky.parser.SqlType; +import org.dinky.trans.dml.ExecuteJarOperation; +import org.dinky.trans.parse.ExecuteJarParseStrategy; +import org.dinky.utils.FlinkStreamEnvironmentUtil; +import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; +import org.apache.flink.api.dag.Pipeline; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.streaming.api.graph.StreamGraph; +import java.net.URL; +import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.List; + +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; /** * JobExecuteBuilder * */ +@Slf4j public class JobExecuteBuilder extends JobBuilder { public JobExecuteBuilder(JobManager jobManager) { @@ -124,4 +138,61 @@ public void run() throws Exception { } } } + + @Override + public List explain() { + List sqlExplainResults = new ArrayList<>(); + if (Asserts.isNullCollection(jobParam.getExecute())) { + return sqlExplainResults; + } + for (StatementParam item : jobParam.getExecute()) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + SqlExplainResult sqlExplainResult = executor.explainSqlRecord(item.getValue()); + if (Asserts.isNull(sqlExplainResult)) { + sqlExplainResult = new SqlExplainResult(); + } else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) { + List allFileByAdd = jobManager.getAllFileSet(); + Pipeline pipeline = new ExecuteJarOperation(item.getValue()) + .explain(executor.getCustomTableEnvironment(), allFileByAdd); + sqlExplainResult.setExplain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline)); + } else { + executor.executeSql(item.getValue()); + } + resultBuilder = SqlExplainResult.newBuilder(sqlExplainResult); + resultBuilder.type(item.getType().getType()).parseTrue(true); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in executing FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(item.getValue()), + e.getMessage()); + resultBuilder + .type(item.getType().getType()) + .error(error) + .explainTrue(false) + .explainTime(LocalDateTime.now()) + .sql(item.getValue()); + sqlExplainResults.add(resultBuilder.build()); + log.error(error); + break; + } + resultBuilder + .type(item.getType().getType()) + .explainTrue(true) + .explainTime(LocalDateTime.now()) + .sql(item.getValue()); + sqlExplainResults.add(resultBuilder.build()); + } + return sqlExplainResults; + } + + @Override + public StreamGraph getStreamGraph() { + return executor.getStreamGraphFromCustomStatements(jobParam.getExecuteStatement()); + } + + @Override + public JobPlanInfo getJobPlanInfo() { + return executor.getJobPlanInfo(jobParam.getExecuteStatement()); + } } diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java index 3e8ee6c83a..182177b92b 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java @@ -23,6 +23,7 @@ import org.dinky.classloader.DinkyClassLoader; import org.dinky.data.exception.DinkyException; import org.dinky.data.result.InsertResult; +import org.dinky.data.result.SqlExplainResult; import org.dinky.gateway.Gateway; import org.dinky.gateway.config.GatewayConfig; import org.dinky.gateway.result.GatewayResult; @@ -55,6 +56,7 @@ import java.io.File; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -117,6 +119,11 @@ public void run() throws Exception { } } + @Override + public List explain() { + return Collections.emptyList(); + } + private GatewayResult submitGateway() throws Exception { configuration.set(PipelineOptions.JARS, getUris(job.getStatement())); config.addGatewayConfig(configuration); diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index cf10e0518c..8b678c1803 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -25,6 +25,7 @@ import org.dinky.data.result.IResult; import org.dinky.data.result.InsertResult; import org.dinky.data.result.ResultBuilder; +import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.Executor; import org.dinky.gateway.Gateway; import org.dinky.gateway.result.GatewayResult; @@ -36,20 +37,31 @@ import org.dinky.job.JobManager; import org.dinky.job.StatementParam; import org.dinky.parser.SqlType; +import org.dinky.utils.LogUtil; +import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.table.api.TableResult; +import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import cn.hutool.core.text.StrFormatter; +import lombok.extern.slf4j.Slf4j; + /** * JobTransBuilder */ +@Slf4j public class JobTransBuilder extends JobBuilder { public JobTransBuilder(JobManager jobManager) { @@ -73,6 +85,82 @@ public void run() throws Exception { } } + @Override + public List explain() { + List sqlExplainResults = new ArrayList<>(); + if (Asserts.isNullCollection(jobParam.getTrans())) { + return sqlExplainResults; + } + if (useStatementSet) { + List inserts = new ArrayList<>(); + for (StatementParam item : jobParam.getTrans()) { + if (item.getType().equals(SqlType.INSERT) || item.getType().equals(SqlType.CTAS)) { + inserts.add(item.getValue()); + } + } + if (!inserts.isEmpty()) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + String sqlSet = StringUtils.join(inserts, ";\r"); + try { + resultBuilder + .explain(executor.explainStatementSet(inserts)) + .parseTrue(true) + .explainTrue(true); + } catch (Exception e) { + String error = LogUtil.getError(e); + resultBuilder + .type(SqlType.INSERT.getType()) + .error(error) + .parseTrue(false) + .explainTrue(false); + log.error(error); + } finally { + resultBuilder + .type(SqlType.INSERT.getType()) + .explainTime(LocalDateTime.now()) + .sql(sqlSet); + sqlExplainResults.add(resultBuilder.build()); + } + } + } else { + for (StatementParam item : jobParam.getTrans()) { + SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); + try { + resultBuilder = SqlExplainResult.newBuilder(executor.explainSqlRecord(item.getValue())); + resultBuilder.parseTrue(true).explainTrue(true); + } catch (Exception e) { + String error = StrFormatter.format( + "Exception in explaining FlinkSQL:\n{}\n{}", + SqlUtil.addLineNumber(item.getValue()), + e.getMessage()); + resultBuilder + .type(item.getType().getType()) + .error(error) + .parseTrue(false) + .explainTrue(false); + log.error(error); + } finally { + resultBuilder + .type(item.getType().getType()) + .explainTime(LocalDateTime.now()) + .sql(item.getValue()); + sqlExplainResults.add(resultBuilder.build()); + } + } + } + return sqlExplainResults; + } + + @Override + public StreamGraph getStreamGraph() { + return executor.getStreamGraphFromStatement(jobParam.getTransStatement()); + } + + @Override + public JobPlanInfo getJobPlanInfo() { + return executor.getJobPlanInfo(jobParam.getTransStatement()); + } + private boolean inferStatementSet() { boolean hasInsert = false; for (StatementParam item : jobParam.getTrans()) { diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java index c219c2337c..2977ff301f 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java @@ -25,6 +25,7 @@ import org.dinky.assertion.Asserts; import org.dinky.data.model.SystemConfiguration; +import org.dinky.data.result.SqlExplainResult; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; import org.dinky.job.JobBuilder; @@ -34,6 +35,7 @@ import java.io.File; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -125,4 +127,9 @@ public void run() throws Exception { log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); log.info("Initializing Flink UDF...Finish"); } + + @Override + public List explain() { + return Collections.emptyList(); + } } diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 24d86c33d3..65e425889a 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -304,6 +304,10 @@ public static boolean isUdfStatement(Pattern pattern, String statement) { return !StrUtil.isBlank(statement) && CollUtil.isNotEmpty(ReUtil.findAll(pattern, statement, 0)); } + public static boolean isUdfStatement(String statement) { + return !StrUtil.isBlank(statement) && CollUtil.isNotEmpty(ReUtil.findAll(PATTERN, statement, 0)); + } + public static UDF toUDF(String statement, DinkyClassLoader classLoader) { if (isUdfStatement(PATTERN, statement)) { List groups = CollUtil.removeEmpty(ReUtil.getAllGroups(PATTERN, statement));