diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/StudioLineageDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/StudioLineageDTO.java index 2c7bf58389..9ba550e2c7 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/StudioLineageDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/StudioLineageDTO.java @@ -19,6 +19,8 @@ package org.dinky.data.dto; +import org.dinky.data.model.ext.TaskExtConfig; + import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Getter; @@ -41,9 +43,6 @@ public class StudioLineageDTO extends AbstractStatementDTO { notes = "Flag indicating whether to use Statement Set") private Boolean statementSet; - @ApiModelProperty(value = "Type", dataType = "Integer", example = "1", notes = "The type of the SQL query") - private Integer type; - @ApiModelProperty(value = "Dialect", dataType = "String", example = "MySQL", notes = "The SQL dialect") private String dialect; @@ -56,4 +55,10 @@ public class StudioLineageDTO extends AbstractStatementDTO { @ApiModelProperty(value = "Task ID", dataType = "Integer", example = "1", notes = "The identifier of the task") private Integer taskId; + + @ApiModelProperty( + value = "Configuration JSON", + dataType = "TaskExtConfig", + notes = "Extended configuration in JSON format for the task") + private TaskExtConfig configJson; } diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java index df437da715..8c07bed1ec 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/TaskExtConfig.java @@ -94,14 +94,13 @@ public Map getUdfReferMaps() { return Asserts.isNotNullCollection(udfRefer) ? udfRefer.stream() .filter(item -> item.getClassName() != null) - .map(t -> { + .peek(t -> { if (StringUtils.isEmpty(t.getName())) { String name = t.getClassName() .substring(t.getClassName().lastIndexOf(".") + 1); name = name.substring(0, 1).toLowerCase() + name.substring(1); t.setName(name); } - return t; }) .collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName)) : new HashMap<>(); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java index fc34addda9..6f516974d2 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/StudioServiceImpl.java @@ -122,6 +122,9 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) { TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId()); taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement()); JobConfig jobConfig = taskDTO.getJobConfig(); + jobConfig.setUdfRefer(studioCADTO.getConfigJson().getUdfReferMaps()); + jobConfig.setConfigJson(studioCADTO.getConfigJson().getCustomConfigMaps()); + return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig); } } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index b0bb24b11a..652523a30d 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -63,7 +63,6 @@ import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; import org.dinky.explainer.sqllineage.SQLLineageBuilder; -import org.dinky.function.FunctionFactory; import org.dinky.function.compiler.CustomStringJavaCompiler; import org.dinky.function.data.model.UDF; import org.dinky.function.pool.UdfCodePool; @@ -107,7 +106,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -583,15 +581,14 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro task.setVersionId(taskVersionId); if (Dialect.isUDF(task.getDialect())) { // compile udf class - UDF udf = UDFUtils.taskToUDF(task.buildTask()); try { - FunctionFactory.initUDF(Collections.singletonList(udf), task.getId()); + UDF udf = UDFUtils.taskToUDF(task.buildTask()); + UdfCodePool.addOrUpdate(udf); } catch (Throwable e) { throw new BusException( "UDF compilation failed and cannot be published. The error message is as follows:" + e.getMessage()); } - UdfCodePool.addOrUpdate(udf); } } else { if (Dialect.isUDF(task.getDialect()) diff --git a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java index bb41ea9db1..f13659fd3f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java @@ -39,6 +39,10 @@ @AllArgsConstructor public abstract class BaseTask { + + private static final Set> taskRegistry = + ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class); + final TaskDTO task; public abstract JobResult execute() throws Exception; @@ -58,9 +62,7 @@ public ObjectNode getJobPlan() throws NotSupportExplainExcepition { } public static BaseTask getTask(TaskDTO taskDTO) { - Set> classes = - ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class); - for (Class clazz : classes) { + for (Class clazz : taskRegistry) { SupportDialect annotation = clazz.getAnnotation(SupportDialect.class); if (annotation != null) { for (Dialect dialect : annotation.value()) { diff --git a/dinky-admin/src/main/java/org/dinky/service/task/FlinkJarSqlTask.java b/dinky-admin/src/main/java/org/dinky/service/task/FlinkJarSqlTask.java index de2716d3fb..5fb7fb7ca0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/FlinkJarSqlTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/FlinkJarSqlTask.java @@ -24,6 +24,7 @@ import org.dinky.data.dto.TaskDTO; import org.dinky.data.result.SqlExplainResult; import org.dinky.job.JobResult; +import org.dinky.job.runner.FlinkJarUtil; import java.util.List; @@ -53,8 +54,9 @@ public boolean stop() { @Override public ObjectNode getJobPlan() { + String statement = task.getStatement(); try { - return jobManager.getJarStreamGraphJson(task.getStatement()); + return FlinkJarUtil.getJobPlan(statement, jobManager); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/dinky-admin/src/main/java/org/dinky/service/task/UdfTask.java b/dinky-admin/src/main/java/org/dinky/service/task/UdfTask.java index 094670fdc2..66ea4945c7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/task/UdfTask.java +++ b/dinky-admin/src/main/java/org/dinky/service/task/UdfTask.java @@ -29,8 +29,6 @@ import org.dinky.job.JobResult; import org.dinky.utils.UDFUtils; -import java.util.Collections; - import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.exceptions.ExceptionUtil; @@ -47,7 +45,7 @@ public JobResult execute() throws Exception { jobResult.setStatus(Job.JobStatus.SUCCESS); try { UDF udf = UDFUtils.taskToUDF(BeanUtil.toBean(task, Task.class)); - FunctionFactory.initUDF(Collections.singletonList(udf), task.getId()); + FunctionFactory.initUDF(udf, task.getId()); } catch (Exception e) { jobResult.setSuccess(false); jobResult.setError(ExceptionUtil.getRootCauseMessage(e)); diff --git a/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java b/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java index 5345f81968..5bb06d0ac7 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java +++ b/dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java @@ -23,6 +23,8 @@ import org.dinky.data.exception.BusException; import org.dinky.data.model.Task; import org.dinky.data.model.udf.UDFManage; +import org.dinky.function.compiler.FunctionCompiler; +import org.dinky.function.compiler.FunctionPackage; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; @@ -33,11 +35,15 @@ public class UDFUtils extends UDFUtil { public static UDF taskToUDF(Task task) { if (Asserts.isNotNull(task.getConfigJson()) && Asserts.isNotNull(task.getConfigJson().getUdfConfig())) { - return UDF.builder() + UDF udf = UDF.builder() .className(task.getConfigJson().getUdfConfig().getClassName()) .code(task.getStatement()) .functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase())) .build(); + + FunctionCompiler.getCompilerByTask(udf, task.getConfigJson().getCustomConfigMaps(), task.getId()); + FunctionPackage.bale(udf, task.getId()); + return udf; } else { throw new BusException("udf `class` config is null,please check your udf task config"); } diff --git a/dinky-admin/src/main/resources/DinkyFlinkDockerfile b/dinky-admin/src/main/resources/DinkyFlinkDockerfile index db725a3595..710e66aa77 100644 --- a/dinky-admin/src/main/resources/DinkyFlinkDockerfile +++ b/dinky-admin/src/main/resources/DinkyFlinkDockerfile @@ -1,8 +1,8 @@ # 用来构建dinky环境 -ARG FLINK_VERSION=1.14.5 -ARG FLINK_BIG_VERSION=1.14 +ARG FLINK_VERSION=1.20.0 +ARG FLINK_BIG_VERSION=1.20 -FROM flink:${FLINK_VERSION} +FROM flink:${FLINK_VERSION}-scala_2.12-java8 ARG FLINK_VERSION ARG FLINK_BIG_VERSION @@ -10,13 +10,19 @@ ENV PYTHON_HOME /opt/miniconda3 USER root RUN wget "https://s3.jcloud.sjtu.edu.cn/899a892efef34b1b944a19981040f55b-oss01/anaconda/miniconda/Miniconda3-py38_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" && chmod +x miniconda.sh -RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME +RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME USER flink ENV PATH $PYTHON_HOME/bin:$PATH -RUN pip install "apache-flink==${FLINK_VERSION}" -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com -RUN cp /opt/flink/opt/flink-python_* /opt/flink/lib/ +RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com +RUN rm -rf /opt/miniconda3/lib/python3.8/site-packages/ruamel* +RUN pip install "apache-flink==${FLINK_VERSION}" -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com + +RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/ +RUN rm -f /opt/flink/lib/flink-table-planner-loader*.jar + +RUN cp /opt/flink/opt/flink-table-planner* /opt/flink/lib/ -RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar \ No newline at end of file +# RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar diff --git a/dinky-alert/dinky-alert-dingtalk/src/main/java/org/dinky/alert/dingtalk/params/DingTalkParams.java b/dinky-alert/dinky-alert-dingtalk/src/main/java/org/dinky/alert/dingtalk/params/DingTalkParams.java index 610321e330..87a884a39d 100644 --- a/dinky-alert/dinky-alert-dingtalk/src/main/java/org/dinky/alert/dingtalk/params/DingTalkParams.java +++ b/dinky-alert/dinky-alert-dingtalk/src/main/java/org/dinky/alert/dingtalk/params/DingTalkParams.java @@ -22,15 +22,13 @@ import java.util.ArrayList; import java.util.List; -import com.fasterxml.jackson.annotation.JsonCreator; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor(onConstructor = @__(@JsonCreator)) +@AllArgsConstructor public class DingTalkParams { private String webhook = ""; diff --git a/dinky-alert/dinky-alert-email/src/main/java/org/dinky/alert/email/params/EmailParams.java b/dinky-alert/dinky-alert-email/src/main/java/org/dinky/alert/email/params/EmailParams.java index 723a6c8fd4..d2dc63e333 100644 --- a/dinky-alert/dinky-alert-email/src/main/java/org/dinky/alert/email/params/EmailParams.java +++ b/dinky-alert/dinky-alert-email/src/main/java/org/dinky/alert/email/params/EmailParams.java @@ -22,15 +22,13 @@ import java.util.ArrayList; import java.util.List; -import com.fasterxml.jackson.annotation.JsonCreator; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor(onConstructor = @__(@JsonCreator)) +@AllArgsConstructor public class EmailParams { private List receivers = new ArrayList<>(); diff --git a/dinky-alert/dinky-alert-feishu/src/main/java/org/dinky/alert/feishu/params/FeiShuParams.java b/dinky-alert/dinky-alert-feishu/src/main/java/org/dinky/alert/feishu/params/FeiShuParams.java index 23f0ef91ee..4c7af8048d 100644 --- a/dinky-alert/dinky-alert-feishu/src/main/java/org/dinky/alert/feishu/params/FeiShuParams.java +++ b/dinky-alert/dinky-alert-feishu/src/main/java/org/dinky/alert/feishu/params/FeiShuParams.java @@ -22,14 +22,12 @@ import java.util.ArrayList; import java.util.List; -import com.fasterxml.jackson.annotation.JsonCreator; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor(onConstructor = @__(@JsonCreator)) +@NoArgsConstructor @AllArgsConstructor public class FeiShuParams { diff --git a/dinky-alert/dinky-alert-http/src/main/java/org/dinky/alert/http/params/HttpParams.java b/dinky-alert/dinky-alert-http/src/main/java/org/dinky/alert/http/params/HttpParams.java index dfa19d825f..29130260ed 100644 --- a/dinky-alert/dinky-alert-http/src/main/java/org/dinky/alert/http/params/HttpParams.java +++ b/dinky-alert/dinky-alert-http/src/main/java/org/dinky/alert/http/params/HttpParams.java @@ -23,14 +23,12 @@ import java.util.List; -import com.fasterxml.jackson.annotation.JsonCreator; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor(onConstructor = @__(@JsonCreator)) +@NoArgsConstructor @AllArgsConstructor public class HttpParams { diff --git a/dinky-alert/dinky-alert-wechat/src/main/java/org/dinky/alert/wechat/params/WechatParams.java b/dinky-alert/dinky-alert-wechat/src/main/java/org/dinky/alert/wechat/params/WechatParams.java index ead608b8fa..c46ded152c 100644 --- a/dinky-alert/dinky-alert-wechat/src/main/java/org/dinky/alert/wechat/params/WechatParams.java +++ b/dinky-alert/dinky-alert-wechat/src/main/java/org/dinky/alert/wechat/params/WechatParams.java @@ -22,14 +22,12 @@ import java.util.ArrayList; import java.util.List; -import com.fasterxml.jackson.annotation.JsonCreator; - import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor(onConstructor = @__(@JsonCreator)) +@NoArgsConstructor @AllArgsConstructor public class WechatParams { diff --git a/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java b/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java index febbfdbcb1..d47d7886ba 100644 --- a/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java +++ b/dinky-common/src/main/java/org/dinky/context/FlinkUdfPathContextHolder.java @@ -46,7 +46,9 @@ public void addFile(File file) { } public void addPyUdfPath(File file) { - getPyUdfFile().add(file); + Set pyUdfFile = getPyUdfFile(); + pyUdfFile.add(file); + addUdfPath(file); } public void addOtherPlugins(File file) { diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 39d86200e4..0971e510c1 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -31,16 +31,12 @@ import org.dinky.executor.Executor; import org.dinky.explainer.mock.MockStatementExplainer; import org.dinky.function.data.model.UDF; +import org.dinky.function.pool.UdfCodePool; import org.dinky.function.util.UDFUtil; -import org.dinky.interceptor.FlinkInterceptor; import org.dinky.job.JobConfig; import org.dinky.job.JobManager; -import org.dinky.job.JobParam; import org.dinky.job.JobRunnerFactory; import org.dinky.job.JobStatementPlan; -import org.dinky.job.builder.JobUDFBuilder; -import org.dinky.trans.Operations; -import org.dinky.utils.DinkyClassLoaderUtil; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; @@ -84,19 +80,6 @@ public static Explainer build(Executor executor, boolean useStatementSet, JobMan return new Explainer(executor, useStatementSet, jobManager); } - public Explainer initialize(JobConfig config, String statement) { - DinkyClassLoaderUtil.initClassLoader(config, jobManager.getDinkyClassLoader()); - String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement)); - List udfs = parseUDFFromStatements(statements); - jobManager.setJobParam(new JobParam(udfs)); - try { - JobUDFBuilder.build(jobManager).run(); - } catch (Exception e) { - e.printStackTrace(); - } - return this; - } - public JobStatementPlan parseStatements(String[] statements) { JobStatementPlan jobStatementPlanWithMock = new JobStatementPlan(); generateUDFStatement(jobStatementPlanWithMock); @@ -115,7 +98,9 @@ private void generateUDFStatement(JobStatementPlan jobStatementPlan) { List udfStatements = new ArrayList<>(); Optional.ofNullable(jobManager.getConfig().getUdfRefer()) .ifPresent(t -> t.forEach((key, value) -> { - String sql = String.format("create temporary function %s as '%s'", value, key); + UDF udf = UdfCodePool.getUDF(key); + String sql = String.format( + "create temporary function %s as '%s' language %s", value, key, udf.getFunctionLanguage()); udfStatements.add(sql); })); for (String udfStatement : udfStatements) { @@ -217,24 +202,27 @@ public List getLineage(String statement) { .fragment(true) .statementSet(useStatementSet) .parallelism(1) + .udfRefer(jobManager.getConfig().getUdfRefer()) .configJson(executor.getTableConfig().getConfiguration().toMap()) .build(); jobManager.setConfig(jobConfig); jobManager.setExecutor(executor); - this.initialize(jobConfig, statement); List lineageRelList = new ArrayList<>(); - for (String item : SqlUtil.getStatements(statement)) { + String[] statements = SqlUtil.getStatements(statement); + JobStatementPlan jobStatementPlan = parseStatements(statements); + List statementList = jobStatementPlan.getJobStatementList(); + JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(jobManager); + + for (JobStatement item : statementList) { + String sql = item.getStatement(); + SqlType sqlType = item.getSqlType(); + try { - String sql = FlinkInterceptor.pretreatStatement(executor, item); - if (Asserts.isNullString(sql)) { - continue; - } - SqlType operationType = Operations.getOperationType(sql); - if (operationType.equals(SqlType.INSERT)) { + if (sqlType.equals(SqlType.INSERT)) { lineageRelList.addAll(executor.getLineage(sql)); - } else if (!operationType.equals(SqlType.SELECT) && !operationType.equals(SqlType.PRINT)) { - executor.executeSql(sql); + } else if (!sqlType.equals(SqlType.SELECT) && !sqlType.equals(SqlType.PRINT)) { + jobRunnerFactory.getJobRunner(item.getStatementType()).run(item); } } catch (Exception e) { log.error("Exception occurred while fetching lineage information", e); diff --git a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java b/dinky-core/src/main/java/org/dinky/job/JobBuilder.java deleted file mode 100644 index 497ebd8641..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/JobBuilder.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job; - -import org.dinky.data.enums.GatewayType; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.executor.Executor; - -import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.util.List; - -public abstract class JobBuilder { - - protected JobManager jobManager; - protected JobConfig config; - protected JobParam jobParam; - protected GatewayType runMode; - protected Executor executor; - protected boolean useStatementSet; - protected boolean useGateway; - protected Job job; - - public JobBuilder(JobManager jobManager) { - this.jobManager = jobManager; - this.config = jobManager.getConfig(); - this.jobParam = jobManager.getJobParam(); - this.runMode = jobManager.getRunMode(); - this.executor = jobManager.getExecutor(); - this.useStatementSet = jobManager.isUseStatementSet(); - this.useGateway = jobManager.isUseGateway(); - this.job = jobManager.getJob(); - } - - public abstract void run() throws Exception; - - public abstract List explain(); - - public StreamGraph getStreamGraph() { - return executor.getStreamGraph(); - } - - public JobPlanInfo getJobPlanInfo() { - return null; - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/JobConfig.java b/dinky-core/src/main/java/org/dinky/job/JobConfig.java index 9ddb8070ee..5e812393e0 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobConfig.java +++ b/dinky-core/src/main/java/org/dinky/job/JobConfig.java @@ -29,6 +29,7 @@ import org.dinky.gateway.enums.SavePointStrategy; import org.dinky.gateway.model.FlinkClusterConfig; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestOptions; @@ -42,12 +43,14 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.extern.slf4j.Slf4j; /** * JobConfig * * @since 2021/6/27 18:45 */ +@Slf4j @Data @Builder @AllArgsConstructor @@ -257,9 +260,18 @@ public void buildGatewayConfig(FlinkClusterConfig config) { Assert.notNull(customConfig.getValue(), "Custom flink config has null value"); flinkConfig.getConfiguration().put(customConfig.getName(), customConfig.getValue()); } + + Map configuration = flinkConfig.getConfiguration(); + + // In Kubernetes mode, must set jobmanager.memory.process.size. + if (StringUtils.isBlank(configuration.get("jobmanager.memory.process.size"))) { + log.warn("In Kubernetes mode, please configure 'jobmanager.memory.process.size', default 2048m"); + configuration.put("jobmanager.memory.process.size", "2048m"); + } + // Load job configuration content afterwards - flinkConfig.getConfiguration().putAll(getConfigJson()); - flinkConfig.getConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); + configuration.putAll(getConfigJson()); + configuration.put(CoreOptions.DEFAULT_PARALLELISM.key(), String.valueOf(parallelism)); flinkConfig.setJobName(getJobName()); gatewayConfig = GatewayConfig.build(config); diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index f25fa56dd0..a703c63665 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -52,26 +52,19 @@ import org.dinky.gateway.result.GatewayResult; import org.dinky.gateway.result.SavePointResult; import org.dinky.gateway.result.TestResult; -import org.dinky.job.builder.JobJarStreamGraphBuilder; import org.dinky.job.runner.JobJarRunner; import org.dinky.trans.Operations; import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; import org.dinky.utils.DinkyClassLoaderUtil; -import org.dinky.utils.FlinkStreamEnvironmentUtil; -import org.dinky.utils.JsonUtils; import org.dinky.utils.LogUtil; import org.dinky.utils.SqlUtil; import org.dinky.utils.URLUtils; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.table.api.TableResult; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -248,13 +241,6 @@ public boolean close() { return true; } - public ObjectNode getJarStreamGraphJson(String statement) { - Pipeline pipeline = JobJarStreamGraphBuilder.build(this).getJarStreamGraph(statement, getDinkyClassLoader()); - Configuration configuration = Configuration.fromMap(getExecutorConfig().getConfig()); - JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); - return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(jobGraph)); - } - @ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE) public JobResult executeJarSql(String statement) throws Exception { List statements = Arrays.stream(SqlUtil.getStatements(statement)) diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java deleted file mode 100644 index b973065cb8..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobDDLBuilder.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job.builder; - -import static org.dinky.function.util.UDFUtil.*; - -import org.dinky.assertion.Asserts; -import org.dinky.data.job.SqlType; -import org.dinky.data.model.SystemConfiguration; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.executor.CustomTableEnvironment; -import org.dinky.function.data.model.UDF; -import org.dinky.function.util.UDFUtil; -import org.dinky.job.JobBuilder; -import org.dinky.job.JobManager; -import org.dinky.job.StatementParam; -import org.dinky.trans.ddl.CustomSetOperation; -import org.dinky.trans.parse.AddFileSqlParseStrategy; -import org.dinky.trans.parse.AddJarSqlParseStrategy; -import org.dinky.utils.LogUtil; -import org.dinky.utils.SqlUtil; -import org.dinky.utils.URLUtils; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; - -import java.io.File; -import java.net.URL; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.text.StrFormatter; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.RandomUtil; -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; - -/** - * JobDDLBuilder - * - */ -@Slf4j -public class JobDDLBuilder extends JobBuilder { - - public JobDDLBuilder(JobManager jobManager) { - super(jobManager); - } - - public static JobDDLBuilder build(JobManager jobManager) { - return new JobDDLBuilder(jobManager); - } - - @Override - public void run() throws Exception { - List udfList = new ArrayList<>(); - List udfStatements = new ArrayList<>(); - for (StatementParam item : jobParam.getDdl()) { - jobManager.setCurrentSql(item.getValue()); - switch (item.getType()) { - case SET: - executeSet(item.getValue()); - break; - case ADD: - executeAdd(item.getValue()); - break; - case ADD_FILE: - executeAddFile(item.getValue()); - break; - case ADD_JAR: - executeAddJar(item.getValue()); - break; - case CREATE: - if (UDFUtil.isUdfStatement(item.getValue())) { - UDF udf = toUDF(item.getValue(), executor.getDinkyClassLoader()); - if (Asserts.isNotNull(udf)) { - udfList.add(UDFUtil.toUDF(item.getValue(), executor.getDinkyClassLoader())); - } - udfStatements.add(item.getValue()); - } else { - executor.executeSql(item.getValue()); - } - break; - default: - executor.executeSql(item.getValue()); - } - } - if (!udfList.isEmpty()) { - compileUDF(udfList); - } - if (!udfStatements.isEmpty()) { - executeCreateFunction(udfStatements); - } - } - - @Override - public List explain() { - List sqlExplainResults = new ArrayList<>(); - if (Asserts.isNullCollection(jobParam.getDdl())) { - return sqlExplainResults; - } - List udfList = new ArrayList<>(); - List udfStatements = new ArrayList<>(); - for (StatementParam item : jobParam.getDdl()) { - jobManager.setCurrentSql(item.getValue()); - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - try { - SqlExplainResult recordResult = null; - switch (item.getType()) { - case SET: - recordResult = explainSet(item.getValue()); - break; - case ADD: - recordResult = explainAdd(item.getValue()); - break; - case ADD_FILE: - recordResult = explainAddFile(item.getValue()); - break; - case ADD_JAR: - recordResult = explainAddJar(item.getValue()); - break; - case CREATE: - if (UDFUtil.isUdfStatement(item.getValue())) { - udfList.add(UDFUtil.toUDF(item.getValue(), executor.getDinkyClassLoader())); - udfStatements.add(item.getValue()); - } else { - recordResult = explainOtherDDL(item.getValue()); - } - break; - default: - recordResult = explainOtherDDL(item.getValue()); - } - if (Asserts.isNull(recordResult) || recordResult.isInvalid()) { - continue; - } - resultBuilder = SqlExplainResult.newBuilder(recordResult) - .type(item.getType().getType()); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - LogUtil.getError(e)); - resultBuilder - .type(item.getType().getType()) - .error(error) - .explainTrue(false) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()); - log.error(error); - sqlExplainResults.add(resultBuilder.build()); - } - resultBuilder - .type(item.getType().getType()) - .explainTrue(true) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()); - sqlExplainResults.add(resultBuilder.build()); - } - if (!udfList.isEmpty()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - String udfStatement = StringUtils.join(udfStatements, ";\n"); - try { - SqlExplainResult recordResult = null; - recordResult = explainCreateFunction(udfList, udfStatements); - resultBuilder = SqlExplainResult.newBuilder(recordResult); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing CreateFunction:\n{}\n{}", - SqlUtil.addLineNumber(udfStatement), - LogUtil.getError(e)); - resultBuilder - .type(SqlType.CREATE.getType()) - .error(error) - .explainTrue(false) - .explainTime(LocalDateTime.now()) - .sql(udfStatement); - log.error(error); - sqlExplainResults.add(resultBuilder.build()); - } - resultBuilder - .type(SqlType.CREATE.getType()) - .explainTrue(true) - .explainTime(LocalDateTime.now()) - .sql(udfStatement); - sqlExplainResults.add(resultBuilder.build()); - } - return sqlExplainResults; - } - - private void executeSet(String statement) { - CustomSetOperation customSetOperation = new CustomSetOperation(statement); - customSetOperation.execute(executor.getCustomTableEnvironment()); - } - - private void executeAdd(String statement) { - AddJarSqlParseStrategy.getAllFilePath(statement) - .forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t)); - (executor.getDinkyClassLoader()) - .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); - } - - private void executeAddFile(String statement) { - AddFileSqlParseStrategy.getAllFilePath(statement) - .forEach(t -> jobManager.getUdfPathContextHolder().addFile(t)); - (executor.getDinkyClassLoader()) - .addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles())); - } - - private void executeAddJar(String statement) { - Configuration combinationConfig = getCombinationConfig(); - FileSystem.initialize(combinationConfig, null); - executor.executeSql(statement); - } - - private void executeCreateFunction(List udfStatements) { - for (String statement : udfStatements) { - executor.executeSql(statement); - } - } - - private void compileUDF(List udfList) { - Integer taskId = config.getTaskId(); - if (taskId == null) { - taskId = -RandomUtil.randomInt(0, 1000); - } - // 1. Obtain the path of the jar package and inject it into the remote environment - List jarFiles = - new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); - - String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId); - String[] jarPaths = CollUtil.removeNull(jarFiles).stream() - .map(File::getAbsolutePath) - .toArray(String[]::new); - if (GATEWAY_TYPE_MAP.get(SESSION).contains(runMode)) { - config.setJarFiles(jarPaths); - } - - // 2.Compile Python - String[] pyPaths = UDFUtil.initPythonUDF( - udfList, runMode, config.getTaskId(), executor.getTableConfig().getConfiguration()); - - executor.initUDF(userCustomUdfJarPath); - executor.initUDF(jarPaths); - - if (ArrayUtil.isNotEmpty(pyPaths)) { - for (String pyPath : pyPaths) { - if (StrUtil.isNotBlank(pyPath)) { - jarFiles.add(new File(pyPath)); - jobManager.getUdfPathContextHolder().addPyUdfPath(new File(pyPath)); - } - } - } - if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) { - for (String jarPath : userCustomUdfJarPath) { - if (StrUtil.isNotBlank(jarPath)) { - jarFiles.add(new File(jarPath)); - jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath)); - } - } - } - - Set pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile(); - executor.initPyUDF( - SystemConfiguration.getInstances().getPythonHome(), - pyUdfFile.stream().map(File::getAbsolutePath).toArray(String[]::new)); - if (GATEWAY_TYPE_MAP.get(YARN).contains(runMode)) { - config.getGatewayConfig().setJarPaths(ArrayUtil.append(jarPaths, pyPaths)); - } - - try { - List jarList = CollUtil.newArrayList(URLUtils.getURLs(jarFiles)); - // 3.Write the required files for UDF - UDFUtil.writeManifest(taskId, jarList, jobManager.getUdfPathContextHolder()); - UDFUtil.addConfigurationClsAndJars( - jobManager.getExecutor().getCustomTableEnvironment(), - jarList, - CollUtil.newArrayList(URLUtils.getURLs(jarFiles))); - } catch (Exception e) { - throw new RuntimeException("add configuration failed: ", e); - } - - log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); - log.info("Initializing Flink UDF...Finish"); - } - - private Configuration getCombinationConfig() { - CustomTableEnvironment cte = executor.getCustomTableEnvironment(); - Configuration rootConfig = cte.getRootConfiguration(); - Configuration config = cte.getConfig().getConfiguration(); - Configuration combinationConfig = new Configuration(); - combinationConfig.addAll(rootConfig); - combinationConfig.addAll(config); - return combinationConfig; - } - - private SqlExplainResult explainSet(String statement) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - CustomSetOperation customSetOperation = new CustomSetOperation(statement); - String explain = customSetOperation.explain(executor.getCustomTableEnvironment()); - customSetOperation.execute(executor.getCustomTableEnvironment()); - return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); - } - - private SqlExplainResult explainAdd(String statement) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - executeAdd(statement); - String explain = Arrays.toString( - URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles())); - return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); - } - - private SqlExplainResult explainAddFile(String statement) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - executeAddFile(statement); - String explain = Arrays.toString( - URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles())); - return resultBuilder.parseTrue(true).explainTrue(true).explain(explain).build(); - } - - private SqlExplainResult explainAddJar(String statement) { - SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); - executeAddJar(statement); - return sqlExplainResult; - } - - private SqlExplainResult explainCreateFunction(List udfList, List udfStatements) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - compileUDF(udfList); - executeCreateFunction(udfStatements); - String explain = udfList.toString(); - return resultBuilder - .type(SqlType.CREATE.getType()) - .parseTrue(true) - .explainTrue(true) - .explain(explain) - .build(); - } - - private SqlExplainResult explainOtherDDL(String statement) { - SqlExplainResult sqlExplainResult = executor.explainSqlRecord(statement); - executor.executeSql(statement); - return sqlExplainResult; - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java deleted file mode 100644 index 743afb9570..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobExecuteBuilder.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job.builder; - -import org.dinky.assertion.Asserts; -import org.dinky.data.job.SqlType; -import org.dinky.data.result.IResult; -import org.dinky.data.result.InsertResult; -import org.dinky.data.result.ResultBuilder; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.gateway.Gateway; -import org.dinky.gateway.result.GatewayResult; -import org.dinky.job.Job; -import org.dinky.job.JobBuilder; -import org.dinky.job.JobManager; -import org.dinky.job.StatementParam; -import org.dinky.trans.dml.ExecuteJarOperation; -import org.dinky.trans.parse.ExecuteJarParseStrategy; -import org.dinky.utils.FlinkStreamEnvironmentUtil; -import org.dinky.utils.SqlUtil; -import org.dinky.utils.URLUtils; - -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.net.URL; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; - -import cn.hutool.core.text.StrFormatter; -import lombok.extern.slf4j.Slf4j; - -/** - * JobExecuteBuilder - * - */ -@Slf4j -public class JobExecuteBuilder extends JobBuilder { - - public JobExecuteBuilder(JobManager jobManager) { - super(jobManager); - } - - public static JobExecuteBuilder build(JobManager jobManager) { - return new JobExecuteBuilder(jobManager); - } - - @Override - public void run() throws Exception { - if (!jobParam.getExecute().isEmpty()) { - if (useGateway) { - for (StatementParam item : jobParam.getExecute()) { - executor.executeSql(item.getValue()); - if (!useStatementSet) { - break; - } - } - GatewayResult gatewayResult = null; - config.addGatewayConfig(executor.getSetConfig()); - config.addGatewayConfig( - executor.getCustomTableEnvironment().getConfig().getConfiguration()); - config.getGatewayConfig().setSql(jobParam.getParsedSql()); - - if (runMode.isApplicationMode()) { - gatewayResult = Gateway.build(config.getGatewayConfig()) - .submitJar(executor.getDinkyClassLoader().getUdfPathContextHolder()); - } else { - StreamGraph streamGraph = executor.getStreamGraph(); - streamGraph.setJobName(config.getJobName()); - JobGraph jobGraph = streamGraph.getJobGraph(); - if (Asserts.isNotNullString(config.getSavePointPath())) { - jobGraph.setSavepointRestoreSettings( - SavepointRestoreSettings.forPath(config.getSavePointPath(), true)); - } - gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); - } - job.setResult(InsertResult.success(gatewayResult.getId())); - job.setJobId(gatewayResult.getId()); - job.setJids(gatewayResult.getJids()); - job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); - - if (gatewayResult.isSuccess()) { - job.setStatus(Job.JobStatus.SUCCESS); - } else { - job.setStatus(Job.JobStatus.FAILED); - job.setError(gatewayResult.getError()); - } - } else { - for (StatementParam item : jobParam.getExecute()) { - executor.executeSql(item.getValue()); - if (!useStatementSet) { - break; - } - } - JobClient jobClient = executor.executeAsync(config.getJobName()); - if (Asserts.isNotNull(jobClient)) { - job.setJobId(jobClient.getJobID().toHexString()); - job.setJids(new ArrayList() { - - { - add(job.getJobId()); - } - }); - } - if (config.isUseResult()) { - IResult result = ResultBuilder.build( - SqlType.EXECUTE, - job.getId().toString(), - config.getMaxRowNum(), - config.isUseChangeLog(), - config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(null); - job.setResult(result); - } - } - } - } - - @Override - public List explain() { - List sqlExplainResults = new ArrayList<>(); - if (Asserts.isNullCollection(jobParam.getExecute())) { - return sqlExplainResults; - } - for (StatementParam item : jobParam.getExecute()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - try { - SqlExplainResult sqlExplainResult = executor.explainSqlRecord(item.getValue()); - if (!sqlExplainResult.isInvalid()) { - sqlExplainResult = new SqlExplainResult(); - } else if (ExecuteJarParseStrategy.INSTANCE.match(item.getValue())) { - List allFileByAdd = jobManager.getAllFileSet(); - Pipeline pipeline = new ExecuteJarOperation(item.getValue()) - .explain(executor.getCustomTableEnvironment(), allFileByAdd); - sqlExplainResult.setExplain(FlinkStreamEnvironmentUtil.getStreamingPlanAsJSON(pipeline)); - } else { - executor.executeSql(item.getValue()); - } - resultBuilder = SqlExplainResult.newBuilder(sqlExplainResult); - resultBuilder.type(item.getType().getType()).parseTrue(true); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in executing FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - e.getMessage()); - resultBuilder - .type(item.getType().getType()) - .error(error) - .explainTrue(false) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()); - sqlExplainResults.add(resultBuilder.build()); - log.error(error); - break; - } - resultBuilder - .type(item.getType().getType()) - .explainTrue(true) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()); - sqlExplainResults.add(resultBuilder.build()); - } - return sqlExplainResults; - } - - @Override - public StreamGraph getStreamGraph() { - return executor.getStreamGraphFromCustomStatements(jobParam.getExecuteStatement()); - } - - @Override - public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(); - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java deleted file mode 100644 index 9268f840c4..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobJarStreamGraphBuilder.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job.builder; - -import org.dinky.assertion.Asserts; -import org.dinky.classloader.DinkyClassLoader; -import org.dinky.data.exception.DinkyException; -import org.dinky.data.job.SqlType; -import org.dinky.data.model.JarSubmitParam; -import org.dinky.data.result.InsertResult; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.gateway.Gateway; -import org.dinky.gateway.config.GatewayConfig; -import org.dinky.gateway.result.GatewayResult; -import org.dinky.job.Job; -import org.dinky.job.JobBuilder; -import org.dinky.job.JobManager; -import org.dinky.trans.Operations; -import org.dinky.trans.ddl.CustomSetOperation; -import org.dinky.trans.dml.ExecuteJarOperation; -import org.dinky.trans.parse.AddFileSqlParseStrategy; -import org.dinky.trans.parse.AddJarSqlParseStrategy; -import org.dinky.trans.parse.ExecuteJarParseStrategy; -import org.dinky.trans.parse.SetSqlParseStrategy; -import org.dinky.utils.DinkyClassLoaderUtil; -import org.dinky.utils.FlinkStreamEnvironmentUtil; -import org.dinky.utils.SqlUtil; -import org.dinky.utils.URLUtils; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.streaming.api.graph.StreamGraph; - -import java.io.File; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import cn.hutool.core.lang.Assert; -import lombok.extern.slf4j.Slf4j; - -/** - * JobJarStreamGraphBuilder - */ -@Slf4j -public class JobJarStreamGraphBuilder extends JobBuilder { - - private final Configuration configuration; - - public JobJarStreamGraphBuilder(JobManager jobManager) { - super(jobManager); - configuration = executor.getCustomTableEnvironment().getConfig().getConfiguration(); - } - - public static JobJarStreamGraphBuilder build(JobManager jobManager) { - return new JobJarStreamGraphBuilder(jobManager); - } - - private Pipeline getPipeline() { - Pipeline pipeline = getJarStreamGraph(job.getStatement(), jobManager.getDinkyClassLoader()); - if (pipeline instanceof StreamGraph) { - if (Asserts.isNotNullString(config.getSavePointPath())) { - ((StreamGraph) pipeline) - .setSavepointRestoreSettings(SavepointRestoreSettings.forPath( - config.getSavePointPath(), - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))); - } - } - return pipeline; - } - - @Override - public void run() throws Exception { - if (!useGateway) { - submitNormal(); - } else { - GatewayResult gatewayResult; - if (runMode.isApplicationMode()) { - gatewayResult = submitGateway(); - } else { - gatewayResult = submitNormalWithGateway(); - } - job.setResult(InsertResult.success(gatewayResult.getId())); - job.setJobId(gatewayResult.getId()); - job.setJids(gatewayResult.getJids()); - job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); - - if (gatewayResult.isSuccess()) { - job.setStatus(Job.JobStatus.SUCCESS); - } else { - job.setStatus(Job.JobStatus.FAILED); - job.setError(gatewayResult.getError()); - log.error(gatewayResult.getError()); - } - } - } - - @Override - public List explain() { - return Collections.emptyList(); - } - - private GatewayResult submitGateway() throws Exception { - configuration.set(PipelineOptions.JARS, getUris(job.getStatement())); - config.addGatewayConfig(configuration); - config.getGatewayConfig().setSql(job.getStatement()); - return Gateway.build(config.getGatewayConfig()).submitJar(jobManager.getUdfPathContextHolder()); - } - - private GatewayResult submitNormalWithGateway() { - Pipeline pipeline = getPipeline(); - if (pipeline instanceof StreamGraph) { - ((StreamGraph) pipeline).setJobName(config.getJobName()); - } else if (pipeline instanceof Plan) { - ((Plan) pipeline).setJobName(config.getJobName()); - } - JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); - GatewayConfig gatewayConfig = config.getGatewayConfig(); - List uriList = getUris(job.getStatement()); - String[] jarPaths = uriList.stream() - .map(URLUtils::toFile) - .map(File::getAbsolutePath) - .toArray(String[]::new); - gatewayConfig.setJarPaths(jarPaths); - return Gateway.build(gatewayConfig).submitJobGraph(jobGraph); - } - - private void submitNormal() throws Exception { - JobClient jobClient = - FlinkStreamEnvironmentUtil.executeAsync(getPipeline(), executor.getStreamExecutionEnvironment()); - if (Asserts.isNotNull(jobClient)) { - job.setJobId(jobClient.getJobID().toHexString()); - job.setJids(new ArrayList() { - { - add(job.getJobId()); - } - }); - job.setStatus(Job.JobStatus.SUCCESS); - } else { - job.setStatus(Job.JobStatus.FAILED); - } - } - - public Pipeline getJarStreamGraph(String statement, DinkyClassLoader dinkyClassLoader) { - DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader); - String[] statements = SqlUtil.getStatements(statement); - ExecuteJarOperation executeJarOperation = null; - for (String sql : statements) { - String sqlStatement = executor.pretreatStatement(sql); - if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { - executeJarOperation = new ExecuteJarOperation(sqlStatement); - break; - } - SqlType operationType = Operations.getOperationType(sqlStatement); - if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(sqlStatement)) { - CustomSetOperation customSetOperation = new CustomSetOperation(sqlStatement); - customSetOperation.execute(this.executor.getCustomTableEnvironment()); - } else if (operationType.equals(SqlType.ADD)) { - Set files = AddJarSqlParseStrategy.getAllFilePath(sqlStatement); - files.forEach(executor::addJar); - files.forEach(jobManager.getUdfPathContextHolder()::addOtherPlugins); - } else if (operationType.equals(SqlType.ADD_FILE)) { - Set files = AddFileSqlParseStrategy.getAllFilePath(sqlStatement); - files.forEach(executor::addJar); - files.forEach(jobManager.getUdfPathContextHolder()::addFile); - } - } - Assert.notNull(executeJarOperation, () -> new DinkyException("Not found execute jar operation.")); - List urLs = jobManager.getAllFileSet(); - return executeJarOperation.explain(executor.getCustomTableEnvironment(), urLs); - } - - public List getUris(String statement) { - String[] statements = SqlUtil.getStatements(statement); - List uriList = new ArrayList<>(); - for (String sql : statements) { - String sqlStatement = executor.pretreatStatement(sql); - if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { - uriList.add(JarSubmitParam.getInfo(statement).getUri()); - break; - } - } - return uriList; - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java deleted file mode 100644 index 1360bcc962..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job.builder; - -import org.dinky.assertion.Asserts; -import org.dinky.constant.FlinkSQLConstant; -import org.dinky.data.enums.GatewayType; -import org.dinky.data.job.SqlType; -import org.dinky.data.result.IResult; -import org.dinky.data.result.InsertResult; -import org.dinky.data.result.ResultBuilder; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.executor.Executor; -import org.dinky.gateway.Gateway; -import org.dinky.gateway.result.GatewayResult; -import org.dinky.interceptor.FlinkInterceptor; -import org.dinky.interceptor.FlinkInterceptorResult; -import org.dinky.job.Job; -import org.dinky.job.JobBuilder; -import org.dinky.job.JobConfig; -import org.dinky.job.JobManager; -import org.dinky.job.StatementParam; -import org.dinky.utils.LogUtil; -import org.dinky.utils.SqlUtil; -import org.dinky.utils.URLUtils; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import org.apache.flink.runtime.rest.messages.JobPlanInfo; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.table.api.TableResult; - -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; - -import cn.hutool.core.text.StrFormatter; -import lombok.extern.slf4j.Slf4j; - -/** - * JobTransBuilder - */ -@Slf4j -public class JobTransBuilder extends JobBuilder { - - public JobTransBuilder(JobManager jobManager) { - super(jobManager); - } - - public static JobTransBuilder build(JobManager jobManager) { - return new JobTransBuilder(jobManager); - } - - @Override - public void run() throws Exception { - if (jobParam.getTrans().isEmpty()) { - return; - } - - if (inferStatementSet()) { - handleStatementSet(); - } else { - handleNonStatementSet(); - } - } - - @Override - public List explain() { - List sqlExplainResults = new ArrayList<>(); - if (Asserts.isNullCollection(jobParam.getTrans())) { - return sqlExplainResults; - } - if (inferStatementSet()) { - List inserts = new ArrayList<>(); - for (StatementParam item : jobParam.getTrans()) { - if (item.getType().equals(SqlType.INSERT) || item.getType().equals(SqlType.CTAS)) { - inserts.add(item.getValue()); - } - } - if (!inserts.isEmpty()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - String sqlSet = StringUtils.join(inserts, ";\r"); - try { - resultBuilder.explain(null).parseTrue(true).explainTrue(true); - } catch (Exception e) { - String error = LogUtil.getError(e); - resultBuilder - .type(SqlType.INSERT.getType()) - .error(error) - .parseTrue(false) - .explainTrue(false); - log.error(error); - } finally { - resultBuilder - .type(SqlType.INSERT.getType()) - .explainTime(LocalDateTime.now()) - .sql(sqlSet); - sqlExplainResults.add(resultBuilder.build()); - } - } - } else { - for (StatementParam item : jobParam.getTrans()) { - SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); - try { - resultBuilder = SqlExplainResult.newBuilder(executor.explainSqlRecord(item.getValue())); - resultBuilder.parseTrue(true).explainTrue(true); - } catch (Exception e) { - String error = StrFormatter.format( - "Exception in explaining FlinkSQL:\n{}\n{}", - SqlUtil.addLineNumber(item.getValue()), - e.getMessage()); - resultBuilder - .type(item.getType().getType()) - .error(error) - .parseTrue(false) - .explainTrue(false); - log.error(error); - } finally { - resultBuilder - .type(item.getType().getType()) - .explainTime(LocalDateTime.now()) - .sql(item.getValue()); - sqlExplainResults.add(resultBuilder.build()); - } - } - } - return sqlExplainResults; - } - - @Override - public StreamGraph getStreamGraph() { - return executor.getStreamGraphFromStatement(null); - } - - @Override - public JobPlanInfo getJobPlanInfo() { - return executor.getJobPlanInfo(); - } - - private boolean inferStatementSet() { - boolean hasInsert = false; - for (StatementParam item : jobParam.getTrans()) { - if (item.getType().equals(SqlType.INSERT)) { - hasInsert = true; - break; - } - } - return hasInsert; - } - - private void handleStatementSet() throws Exception { - List inserts = - jobParam.getTrans().stream().map(StatementParam::getValue).collect(Collectors.toList()); - if (useGateway) { - processWithGateway(inserts); - return; - } - processWithoutGateway(inserts); - } - - private void handleNonStatementSet() throws Exception { - if (useGateway) { - processSingleInsertWithGateway(); - return; - } - processFirstStatement(); - } - - private void processWithGateway(List inserts) throws Exception { - jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); - GatewayResult gatewayResult = submitByGateway(inserts); - setJobResultFromGatewayResult(gatewayResult); - } - - private void processWithoutGateway(List inserts) throws Exception { - if (!inserts.isEmpty()) { - jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts)); - TableResult tableResult = executor.executeStatementSet(inserts); - updateJobWithTableResult(tableResult); - } - } - - private void processSingleInsertWithGateway() throws Exception { - List singleInsert = - Collections.singletonList(jobParam.getTrans().get(0).getValue()); - job.setPipeline(jobParam.getTrans().get(0).getType().isPipeline()); - processWithGateway(singleInsert); - } - - private void processFirstStatement() throws Exception { - if (jobParam.getTrans().isEmpty()) { - return; - } - // Only process the first statement when not using statement set - StatementParam item = jobParam.getTrans().get(0); - job.setPipeline(item.getType().isPipeline()); - jobManager.setCurrentSql(item.getValue()); - processSingleStatement(item); - } - - private void processSingleStatement(StatementParam item) throws Exception { - FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue()); - if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { - updateJobWithTableResult(flinkInterceptorResult.getTableResult(), item.getType()); - } else if (!flinkInterceptorResult.isNoExecute()) { - TableResult tableResult = executor.executeSql(item.getValue()); - updateJobWithTableResult(tableResult, item.getType()); - } - } - - private void setJobResultFromGatewayResult(GatewayResult gatewayResult) { - job.setResult(InsertResult.success(gatewayResult.getId())); - job.setJobId(gatewayResult.getId()); - job.setJids(gatewayResult.getJids()); - job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); - job.setStatus(gatewayResult.isSuccess() ? Job.JobStatus.SUCCESS : Job.JobStatus.FAILED); - if (!gatewayResult.isSuccess()) { - job.setError(gatewayResult.getError()); - } - } - - private void updateJobWithTableResult(TableResult tableResult) { - updateJobWithTableResult(tableResult, SqlType.INSERT); - } - - private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) { - if (tableResult.getJobClient().isPresent()) { - job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); - job.setJids(Collections.singletonList(job.getJobId())); - } else if (!sqlType.getCategory().getHasJobClient()) { - job.setJobId(UUID.randomUUID().toString().replace("-", "")); - job.setJids(Collections.singletonList(job.getJobId())); - } - - if (config.isUseResult()) { - IResult result = ResultBuilder.build( - sqlType, - job.getId().toString(), - config.getMaxRowNum(), - config.isUseChangeLog(), - config.isUseAutoCancel(), - executor.getTimeZone(), - jobManager.getConfig().isMockSinkFunction()) - .getResultWithPersistence(tableResult, jobManager.getHandler()); - job.setResult(result); - } - } - - private GatewayResult submitByGateway(List inserts) { - JobConfig config = jobManager.getConfig(); - GatewayType runMode = jobManager.getRunMode(); - Executor executor = jobManager.getExecutor(); - - GatewayResult gatewayResult = null; - - // Use gateway need to build gateway config, include flink configuration. - config.addGatewayConfig(executor.getCustomTableEnvironment().getConfig().getConfiguration()); - config.getGatewayConfig().setSql(jobParam.getParsedSql()); - if (runMode.isApplicationMode()) { - // Application mode need to submit dinky-app.jar that in the hdfs or image. - gatewayResult = Gateway.build(config.getGatewayConfig()) - .submitJar(executor.getDinkyClassLoader().getUdfPathContextHolder()); - } else { - JobGraph jobGraph = executor.getJobGraphFromInserts(null); - // Perjob mode need to set savepoint restore path, when recovery from savepoint. - if (Asserts.isNotNullString(config.getSavePointPath())) { - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(config.getSavePointPath(), true)); - } - // Perjob mode need to submit job graph. - gatewayResult = Gateway.build(config.getGatewayConfig()).submitJobGraph(jobGraph); - } - return gatewayResult; - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java deleted file mode 100644 index 2977ff301f..0000000000 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobUDFBuilder.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.job.builder; - -import static org.dinky.function.util.UDFUtil.GATEWAY_TYPE_MAP; -import static org.dinky.function.util.UDFUtil.SESSION; -import static org.dinky.function.util.UDFUtil.YARN; - -import org.dinky.assertion.Asserts; -import org.dinky.data.model.SystemConfiguration; -import org.dinky.data.result.SqlExplainResult; -import org.dinky.function.data.model.UDF; -import org.dinky.function.util.UDFUtil; -import org.dinky.job.JobBuilder; -import org.dinky.job.JobManager; -import org.dinky.utils.URLUtils; - -import java.io.File; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.RandomUtil; -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; - -/** - * JobUDFBuilder - * - */ -@Slf4j -public class JobUDFBuilder extends JobBuilder { - - public JobUDFBuilder(JobManager jobManager) { - super(jobManager); - } - - public static JobUDFBuilder build(JobManager jobManager) { - return new JobUDFBuilder(jobManager); - } - - @Override - public void run() throws Exception { - Asserts.checkNotNull(jobParam, "No executable statement."); - List udfList = jobManager.getJobParam().getUdfList(); - Integer taskId = config.getTaskId(); - if (taskId == null) { - taskId = -RandomUtil.randomInt(0, 1000); - } - // 1. Obtain the path of the jar package and inject it into the remote environment - List jarFiles = - new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); - - String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId); - String[] jarPaths = CollUtil.removeNull(jarFiles).stream() - .map(File::getAbsolutePath) - .toArray(String[]::new); - if (GATEWAY_TYPE_MAP.get(SESSION).contains(runMode)) { - config.setJarFiles(jarPaths); - } - - // 2.Compile Python - String[] pyPaths = UDFUtil.initPythonUDF( - udfList, runMode, config.getTaskId(), executor.getTableConfig().getConfiguration()); - - executor.initUDF(userCustomUdfJarPath); - executor.initUDF(jarPaths); - - if (ArrayUtil.isNotEmpty(pyPaths)) { - for (String pyPath : pyPaths) { - if (StrUtil.isNotBlank(pyPath)) { - jarFiles.add(new File(pyPath)); - jobManager.getUdfPathContextHolder().addPyUdfPath(new File(pyPath)); - } - } - } - if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) { - for (String jarPath : userCustomUdfJarPath) { - if (StrUtil.isNotBlank(jarPath)) { - jarFiles.add(new File(jarPath)); - jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath)); - } - } - } - - Set pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile(); - executor.initPyUDF( - SystemConfiguration.getInstances().getPythonHome(), - pyUdfFile.stream().map(File::getAbsolutePath).toArray(String[]::new)); - if (GATEWAY_TYPE_MAP.get(YARN).contains(runMode)) { - config.getGatewayConfig().setJarPaths(ArrayUtil.append(jarPaths, pyPaths)); - } - - try { - List jarList = CollUtil.newArrayList(URLUtils.getURLs(jarFiles)); - // 3.Write the required files for UDF - UDFUtil.writeManifest(taskId, jarList, jobManager.getUdfPathContextHolder()); - UDFUtil.addConfigurationClsAndJars( - jobManager.getExecutor().getCustomTableEnvironment(), - jarList, - CollUtil.newArrayList(URLUtils.getURLs(jarFiles))); - } catch (Exception e) { - throw new RuntimeException("add configuration failed: ", e); - } - - log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); - log.info("Initializing Flink UDF...Finish"); - } - - @Override - public List explain() { - return Collections.emptyList(); - } -} diff --git a/dinky-core/src/main/java/org/dinky/job/runner/FlinkJarUtil.java b/dinky-core/src/main/java/org/dinky/job/runner/FlinkJarUtil.java new file mode 100644 index 0000000000..f4cec513dc --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/job/runner/FlinkJarUtil.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job.runner; + +import org.dinky.classloader.DinkyClassLoader; +import org.dinky.data.exception.DinkyException; +import org.dinky.data.job.SqlType; +import org.dinky.executor.Executor; +import org.dinky.job.JobConfig; +import org.dinky.job.JobManager; +import org.dinky.trans.Operations; +import org.dinky.trans.ddl.CustomSetOperation; +import org.dinky.trans.dml.ExecuteJarOperation; +import org.dinky.trans.parse.AddFileSqlParseStrategy; +import org.dinky.trans.parse.AddJarSqlParseStrategy; +import org.dinky.trans.parse.ExecuteJarParseStrategy; +import org.dinky.trans.parse.SetSqlParseStrategy; +import org.dinky.utils.DinkyClassLoaderUtil; +import org.dinky.utils.FlinkStreamEnvironmentUtil; +import org.dinky.utils.JsonUtils; +import org.dinky.utils.SqlUtil; + +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; + +import java.io.File; +import java.net.URL; +import java.util.List; +import java.util.Set; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +import cn.hutool.core.lang.Assert; + +public class FlinkJarUtil { + public static ObjectNode getJobPlan(String statement, JobManager jobManager) { + try { + Pipeline pipeline = getJarStreamGraph(statement, jobManager.getDinkyClassLoader(), jobManager); + Configuration configuration = + Configuration.fromMap(jobManager.getExecutorConfig().getConfig()); + JobGraph jobGraph = FlinkStreamEnvironmentUtil.getJobGraph(pipeline, configuration); + return JsonUtils.parseObject(JsonPlanGenerator.generatePlan(jobGraph)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static Pipeline getJarStreamGraph( + String statement, DinkyClassLoader dinkyClassLoader, JobManager jobManager) { + Executor executor = jobManager.getExecutor(); + JobConfig config = jobManager.getConfig(); + + DinkyClassLoaderUtil.initClassLoader(config, dinkyClassLoader); + String[] statements = SqlUtil.getStatements(statement); + ExecuteJarOperation executeJarOperation = null; + for (String sql : statements) { + String sqlStatement = executor.pretreatStatement(sql); + if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) { + executeJarOperation = new ExecuteJarOperation(sqlStatement); + break; + } + SqlType operationType = Operations.getOperationType(sqlStatement); + if (operationType.equals(SqlType.SET) && SetSqlParseStrategy.INSTANCE.match(sqlStatement)) { + CustomSetOperation customSetOperation = new CustomSetOperation(sqlStatement); + customSetOperation.execute(executor.getCustomTableEnvironment()); + } else if (operationType.equals(SqlType.ADD)) { + Set files = AddJarSqlParseStrategy.getAllFilePath(sqlStatement); + files.forEach(executor::addJar); + files.forEach(jobManager.getUdfPathContextHolder()::addOtherPlugins); + } else if (operationType.equals(SqlType.ADD_FILE)) { + Set files = AddFileSqlParseStrategy.getAllFilePath(sqlStatement); + files.forEach(executor::addJar); + files.forEach(jobManager.getUdfPathContextHolder()::addFile); + } + } + Assert.notNull(executeJarOperation, () -> new DinkyException("Not found execute jar operation.")); + List urLs = jobManager.getAllFileSet(); + return executeJarOperation.explain(executor.getCustomTableEnvironment(), urLs); + } +} diff --git a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java index 8852ae023f..19eb7b2a5f 100644 --- a/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java +++ b/dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java @@ -26,8 +26,10 @@ import org.dinky.data.model.SystemConfiguration; import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; +import org.dinky.function.constant.PathConstant; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; +import org.dinky.job.Job; import org.dinky.job.JobManager; import org.dinky.trans.parse.AddFileSqlParseStrategy; import org.dinky.trans.parse.AddJarSqlParseStrategy; @@ -37,26 +39,25 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.table.catalog.FunctionLanguage; import java.io.File; -import java.net.URL; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Set; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.io.FileUtil; +import cn.hutool.core.lang.Opt; import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.RandomUtil; -import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class JobDDLRunner extends AbstractJobRunner { - private List udfStatements = new ArrayList<>(); - public JobDDLRunner(JobManager jobManager) { this.jobManager = jobManager; } @@ -76,22 +77,19 @@ public void run(JobStatement jobStatement) throws Exception { break; case CREATE: if (UDFUtil.isUdfStatement(jobStatement.getStatement())) { - udfStatements.add(jobStatement.getStatement()); + executeCreateFunction(jobStatement.getStatement()); break; } default: jobManager.getExecutor().executeSql(jobStatement.getStatement()); } - if (jobStatement.isFinalCreateFunctionStatement() && !udfStatements.isEmpty()) { - executeCreateFunction(udfStatements); - } } @Override public SqlExplainResult explain(JobStatement jobStatement) { SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder(); try { - SqlExplainResult recordResult = null; + SqlExplainResult recordResult; switch (jobStatement.getSqlType()) { case ADD: recordResult = explainAdd(jobStatement.getStatement()); @@ -104,16 +102,13 @@ public SqlExplainResult explain(JobStatement jobStatement) { break; case CREATE: if (UDFUtil.isUdfStatement(jobStatement.getStatement())) { - udfStatements.add(jobStatement.getStatement()); + executeCreateFunction(jobStatement.getStatement()); recordResult = jobManager.getExecutor().explainSqlRecord(jobStatement.getStatement()); break; } default: recordResult = explainOtherDDL(jobStatement.getStatement()); } - if (jobStatement.isFinalCreateFunctionStatement() && !udfStatements.isEmpty()) { - explainCreateFunction(jobStatement); - } if (Asserts.isNull(recordResult)) { return resultBuilder.invalid().build(); } @@ -133,10 +128,9 @@ public SqlExplainResult explain(JobStatement jobStatement) { .sql(jobStatement.getStatement()) .index(jobStatement.getIndex()); log.error(error); - } finally { - resultBuilder.explainTime(LocalDateTime.now()); - return resultBuilder.build(); } + resultBuilder.explainTime(LocalDateTime.now()); + return resultBuilder.build(); } private void executeAdd(String statement) { @@ -163,19 +157,48 @@ private void executeAddJar(String statement) { jobManager.getExecutor().executeSql(statement); } - private void executeCreateFunction(List udfStatements) { - List udfList = new ArrayList<>(); - for (String statement : udfStatements) { - UDF udf = toUDF(statement, jobManager.getExecutor().getDinkyClassLoader()); - if (Asserts.isNotNull(udf)) { - udfList.add(UDFUtil.toUDF(statement, jobManager.getExecutor().getDinkyClassLoader())); - } + private void executeCreateFunction(String udfStatement) { + UDF udf = toUDF(udfStatement, jobManager.getExecutor().getDinkyClassLoader()); + if (udf != null) { + // 创建文件路径快捷链接 + copyUdfFileLinkAndAddToClassloader(udf, udf.getName()); } - if (!udfList.isEmpty()) { - compileUDF(udfList); + jobManager.getExecutor().executeSql(udfStatement); + } + + /** + * zh : 把编译打包好的udf文件,copy link 到当前任务下,并把产物添加到Classloader,最后add jar到各种执行模式 + * + * @param udf udf + * @param udfName udf 名称 + */ + private void copyUdfFileLinkAndAddToClassloader(UDF udf, String udfName) { + Integer jobId = Opt.ofNullable(jobManager.getJob()).map(Job::getId).orElse(null); + File udfLinkFile; + if (jobId == null) { + udfLinkFile = new File(udf.getCompilePackagePath()); + } else { + String udfFilePath = PathConstant.getTaskUdfPath(jobManager.getJob().getId()); + String udfPath = udf.getCompilePackagePath(); + String udfPathSuffix = FileUtil.getSuffix(udfPath); + Path linkFilePath = new File(udfFilePath + udfName + "." + udfPathSuffix).toPath(); + try { + FileUtil.mkParentDirs(linkFilePath); + Files.createSymbolicLink(linkFilePath, new File(udfPath).toPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + udfLinkFile = linkFilePath.toFile(); } - for (String statement : udfStatements) { - jobManager.getExecutor().executeSql(statement); + if (udf.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) { + jobManager.getUdfPathContextHolder().addPyUdfPath(udfLinkFile); + jobManager + .getExecutor() + .initPyUDF(SystemConfiguration.getInstances().getPythonHome(), udfLinkFile.getAbsolutePath()); + } else { + jobManager.getUdfPathContextHolder().addUdfPath(udfLinkFile); + jobManager.getDinkyClassLoader().addURLs(CollUtil.newArrayList(udfLinkFile)); + jobManager.getExecutor().addJar(udfLinkFile); } } @@ -201,15 +224,6 @@ private SqlExplainResult explainAddJar(String statement) { return sqlExplainResult; } - private SqlExplainResult explainCreateFunction(JobStatement jobStatement) { - udfStatements.add(jobStatement.getStatement()); - SqlExplainResult sqlExplainResult = explainOtherDDL(jobStatement.getStatement()); - if (jobStatement.isFinalCreateFunctionStatement()) { - executeCreateFunction(udfStatements); - } - return sqlExplainResult; - } - private SqlExplainResult explainOtherDDL(String statement) { SqlExplainResult sqlExplainResult = jobManager.getExecutor().explainSqlRecord(statement); jobManager.getExecutor().executeSql(statement); @@ -225,74 +239,4 @@ private Configuration getCombinationConfig() { combinationConfig.addAll(config); return combinationConfig; } - - private void compileUDF(List udfList) { - Integer taskId = jobManager.getConfig().getTaskId(); - if (taskId == null) { - taskId = -RandomUtil.randomInt(0, 1000); - } - // 1. Obtain the path of the jar package and inject it into the remote environment - List jarFiles = - new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet()); - - String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId); - String[] jarPaths = CollUtil.removeNull(jarFiles).stream() - .map(File::getAbsolutePath) - .toArray(String[]::new); - if (GATEWAY_TYPE_MAP.get(SESSION).contains(jobManager.getRunMode())) { - jobManager.getConfig().setJarFiles(jarPaths); - } - - // 2.Compile Python - String[] pyPaths = UDFUtil.initPythonUDF( - udfList, - jobManager.getRunMode(), - jobManager.getConfig().getTaskId(), - jobManager.getExecutor().getTableConfig().getConfiguration()); - - jobManager.getExecutor().initUDF(userCustomUdfJarPath); - jobManager.getExecutor().initUDF(jarPaths); - - if (ArrayUtil.isNotEmpty(pyPaths)) { - for (String pyPath : pyPaths) { - if (StrUtil.isNotBlank(pyPath)) { - jarFiles.add(new File(pyPath)); - jobManager.getUdfPathContextHolder().addPyUdfPath(new File(pyPath)); - } - } - } - if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) { - for (String jarPath : userCustomUdfJarPath) { - if (StrUtil.isNotBlank(jarPath)) { - jarFiles.add(new File(jarPath)); - jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath)); - } - } - } - - Set pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile(); - jobManager - .getExecutor() - .initPyUDF( - SystemConfiguration.getInstances().getPythonHome(), - pyUdfFile.stream().map(File::getAbsolutePath).toArray(String[]::new)); - if (GATEWAY_TYPE_MAP.get(YARN).contains(jobManager.getRunMode())) { - jobManager.getConfig().getGatewayConfig().setJarPaths(ArrayUtil.append(jarPaths, pyPaths)); - } - - try { - List jarList = CollUtil.newArrayList(URLUtils.getURLs(jarFiles)); - // 3.Write the required files for UDF - UDFUtil.writeManifest(taskId, jarList, jobManager.getUdfPathContextHolder()); - UDFUtil.addConfigurationClsAndJars( - jobManager.getExecutor().getCustomTableEnvironment(), - jarList, - CollUtil.newArrayList(URLUtils.getURLs(jarFiles))); - } catch (Exception e) { - throw new RuntimeException("add configuration failed: ", e); - } - - log.info(StrUtil.format("A total of {} UDF have been Init.", udfList.size() + pyUdfFile.size())); - log.info("Initializing Flink UDF...Finish"); - } } diff --git a/dinky-function/src/main/java/org/dinky/function/FunctionFactory.java b/dinky-function/src/main/java/org/dinky/function/FunctionFactory.java index 1fa1748122..ed48f4d9db 100644 --- a/dinky-function/src/main/java/org/dinky/function/FunctionFactory.java +++ b/dinky-function/src/main/java/org/dinky/function/FunctionFactory.java @@ -33,26 +33,34 @@ public class FunctionFactory { /** * UDF compilation & packaging initialization(udf编译 & 打包 初始化) * @param udfClassList udf列表 - * @param missionId 当前任务id + * @param taskId 当前任务id * @return 打包过后的路径 */ - public static UDFPath initUDF(List udfClassList, Integer missionId) { - return initUDF(udfClassList, missionId, new Configuration()); + public static UDFPath initUDF(List udfClassList, Integer taskId) { + return initUDF(udfClassList, taskId, new Configuration()); + } + + public static void initUDF(UDF udf, Integer taskId) { + // 编译 + FunctionCompiler.getCompiler(udf, new Configuration(), taskId); + + // 打包 + FunctionPackage.bale(udf, taskId); } /** * UDF compilation & packaging initialization(udf编译 & 打包 初始化) * * @param udfClassList udf列表 - * @param missionId 当前任务id + * @param taskId 当前任务id * @return 打包过后的路径 */ - public static UDFPath initUDF(List udfClassList, Integer missionId, Configuration configuration) { + public static UDFPath initUDF(List udfClassList, Integer taskId, Configuration configuration) { // 编译 - FunctionCompiler.getCompiler(udfClassList, configuration, missionId); + FunctionCompiler.getCompiler(udfClassList, configuration, taskId); // 打包 - return FunctionPackage.bale(udfClassList, missionId); + return FunctionPackage.bale(udfClassList, taskId); } } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java index 42b1e6c6d3..4661b8d875 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionCompiler.java @@ -19,21 +19,27 @@ package org.dinky.function.compiler; -import org.dinky.assertion.Asserts; import org.dinky.function.data.model.UDF; import org.dinky.function.exception.UDFCompilerException; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import cn.hutool.core.lang.Singleton; import cn.hutool.core.util.StrUtil; /** @since 0.6.8 */ public interface FunctionCompiler { + Logger log = LoggerFactory.getLogger(FunctionCompiler.class); + Set COMPILER_CACHE = new HashSet<>(); /** @@ -41,22 +47,24 @@ public interface FunctionCompiler { * * @param udf udf * @param conf flink-conf - * @param missionId 任务id + * @param taskId 任务id * @return 是否成功 */ - boolean compiler(UDF udf, ReadableConfig conf, Integer missionId); + boolean compiler(UDF udf, ReadableConfig conf, Integer taskId); + static boolean getCompilerByTask(UDF udf, Map conf, Integer taskId) { + return getCompiler(udf, Configuration.fromMap(conf), taskId); + } /** * 编译 * * @param udf udf实例 * @param conf flink-conf - * @param missionId 任务id + * @param taskId 任务id * @return 编译状态 */ - static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) { - Asserts.checkNull(udf, "udf为空"); - Asserts.checkNull(udf.getCode(), "udf 代码为空"); + static boolean getCompiler(UDF udf, ReadableConfig conf, Integer taskId) { + log.info("Compiled UDF: {},; Language: {}", udf.getClassName(), udf.getFunctionLanguage()); String key = udf.getClassName() + udf.getFunctionLanguage(); if (COMPILER_CACHE.contains(key)) { @@ -65,13 +73,13 @@ static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) { boolean success; switch (udf.getFunctionLanguage()) { case JAVA: - success = Singleton.get(JavaCompiler.class).compiler(udf, conf, missionId); + success = Singleton.get(JavaCompiler.class).compiler(udf, conf, taskId); break; case SCALA: - success = Singleton.get(ScalaCompiler.class).compiler(udf, conf, missionId); + success = Singleton.get(ScalaCompiler.class).compiler(udf, conf, taskId); break; case PYTHON: - success = Singleton.get(PythonFunction.class).compiler(udf, conf, missionId); + success = Singleton.get(PythonFunction.class).compiler(udf, conf, taskId); break; default: throw UDFCompilerException.notSupportedException( @@ -88,11 +96,11 @@ static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) { * * @param udfList udf、实例列表 * @param conf flink-conf - * @param missionId 任务id + * @param taskId 任务id */ - static void getCompiler(List udfList, ReadableConfig conf, Integer missionId) { + static void getCompiler(List udfList, ReadableConfig conf, Integer taskId) { for (UDF udf : udfList) { - if (!getCompiler(udf, conf, missionId)) { + if (!getCompiler(udf, conf, taskId)) { throw new UDFCompilerException(StrUtil.format( "codeLanguage:{} , className:{} 编译失败", udf.getFunctionLanguage(), udf.getClassName())); } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionPackage.java b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionPackage.java index 60b15a7340..d3dc365e69 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/FunctionPackage.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/FunctionPackage.java @@ -19,6 +19,7 @@ package org.dinky.function.compiler; +import org.dinky.data.exception.BusException; import org.dinky.function.data.model.UDF; import org.dinky.function.data.model.UDFPath; @@ -32,19 +33,21 @@ public interface FunctionPackage { * 打包 * * @param udfList udf列表 - * @param missionId 任务id + * @param taskId 任务id * @return 文件绝对路径 */ - String[] pack(List udfList, Integer missionId); + String[] pack(List udfList, Integer taskId); + + String pack(UDF udf, Integer taskId); /** * 打包 * * @param udfList udf 列表 - * @param missionId 任务id + * @param taskId 任务id * @return 打包结果 */ - static UDFPath bale(List udfList, Integer missionId) { + static UDFPath bale(List udfList, Integer taskId) { List jvmList = new ArrayList<>(); List pythonList = new ArrayList<>(); for (UDF udf : udfList) { @@ -59,8 +62,26 @@ static UDFPath bale(List udfList, Integer missionId) { } } return UDFPath.builder() - .jarPaths(new JVMPackage().pack(jvmList, missionId)) - .pyPaths(new PythonFunction().pack(pythonList, missionId)) + .jarPaths(new JVMPackage().pack(jvmList, taskId)) + .pyPaths(new PythonFunction().pack(pythonList, taskId)) .build(); } + /** + * 打包 + * + * @param udf udf 列表 + * @param taskId 任务id + * @return 打包结果 + */ + static String bale(UDF udf, Integer taskId) { + switch (udf.getFunctionLanguage()) { + case JAVA: + case SCALA: + return new JVMPackage().pack(udf, taskId); + case PYTHON: + return new PythonFunction().pack(udf, taskId); + default: + throw new BusException(""); + } + } } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/JVMPackage.java b/dinky-function/src/main/java/org/dinky/function/compiler/JVMPackage.java index 5efbf9b170..e2b326df46 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/JVMPackage.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/JVMPackage.java @@ -38,7 +38,7 @@ public class JVMPackage implements FunctionPackage { @Override - public String[] pack(List udfList, Integer missionId) { + public String[] pack(List udfList, Integer taskId) { if (CollUtil.isEmpty(udfList)) { return new String[0]; } @@ -62,7 +62,7 @@ public String[] pack(List udfList, Integer missionId) { fileInputStreams[i] = FileUtil.getInputStream(absoluteFilePath); } - String jarPath = PathConstant.getUdfPackagePath(missionId) + PathConstant.UDF_JAR_NAME; + String jarPath = PathConstant.getUdfPackagePath(taskId) + PathConstant.UDF_JAR_NAME; // 编译好的文件打包jar File file = FileUtil.file(jarPath); FileUtil.del(file); @@ -71,4 +71,22 @@ public String[] pack(List udfList, Integer missionId) { } return new String[] {jarPath}; } + + @Override + public String pack(UDF udf, Integer taskId) { + String className = udf.getClassName(); + String classFile = StrUtil.replace(className, ".", "/") + ".class"; + String absoluteFilePath = PathConstant.getUdfCompilerPath(FunctionLanguage.JAVA, classFile); + InputStream inputStream = FileUtil.getInputStream(absoluteFilePath); + + String jarPath = PathConstant.getUdfPackagePath(taskId) + PathConstant.UDF_JAR_NAME; + // 编译好的文件打包jar + File file = FileUtil.file(jarPath); + FileUtil.del(file); + try (ZipWriter zipWriter = new ZipWriter(file, Charset.defaultCharset())) { + zipWriter.add(classFile, inputStream); + } + udf.setCompilePackagePath(file.getAbsolutePath()); + return file.getAbsolutePath(); + } } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java index 0544badcb2..e66d78640c 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/JavaCompiler.java @@ -38,11 +38,11 @@ public class JavaCompiler implements FunctionCompiler { * * @param udf udf * @param conf flink-conf - * @param missionId 任务id + * @param taskId 任务id * @return 是否成功 */ @Override - public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { + public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer taskId) { // TODO 改为ProcessStep注释 log.info("Compiling java code, class: {}", udf.getClassName()); diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java b/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java index e0c1c74660..dbc0616f44 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/PythonFunction.java @@ -32,6 +32,7 @@ import org.apache.flink.python.PythonOptions; import org.apache.flink.table.catalog.FunctionLanguage; +import java.io.BufferedInputStream; import java.io.File; import java.io.InputStream; import java.nio.charset.Charset; @@ -58,15 +59,15 @@ public class PythonFunction implements FunctionCompiler, FunctionPackage { * * @param udf udf * @param conf flink-conf - * @param missionId 任务id + * @param taskId 任务id * @return 是否成功 */ @Override - public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { + public boolean compiler(UDF udf, ReadableConfig conf, Integer taskId) { Asserts.checkNull(udf, "flink-config 不能为空"); // TODO 改为ProcessStep注释 - log.info("正在编译 python 代码 , class: " + udf.getClassName()); + log.info("正在编译 python 代码 , class: {}", udf.getClassName()); File pyFile = FileUtil.writeUtf8String( udf.getCode(), PathConstant.getUdfCompilerPath( @@ -97,7 +98,7 @@ public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { } @Override - public String[] pack(List udfList, Integer missionId) { + public String[] pack(List udfList, Integer taskId) { if (CollUtil.isEmpty(udfList)) { return new String[0]; } @@ -122,7 +123,7 @@ public String[] pack(List udfList, Integer missionId) { String[] paths = udfList.stream() .map(x -> StrUtil.split(x.getClassName(), ".").get(0) + ".py") .toArray(String[]::new); - String path = PathConstant.getUdfPackagePath(missionId, PathConstant.UDF_PYTHON_NAME); + String path = PathConstant.getUdfPackagePath(taskId, PathConstant.UDF_PYTHON_NAME); File file = FileUtil.file(path); FileUtil.del(file); try (ZipWriter zipWriter = new ZipWriter(file, Charset.defaultCharset())) { @@ -130,4 +131,23 @@ public String[] pack(List udfList, Integer missionId) { } return new String[] {path}; } + + @Override + public String pack(UDF udf, Integer taskId) { + File udfFile = FileUtil.writeUtf8String( + udf.getCode(), + PathConstant.getUdfCompilerPath( + FunctionLanguage.PYTHON, UDFUtil.getPyFileName(udf.getClassName()) + ".py")); + BufferedInputStream inputStream = FileUtil.getInputStream(udfFile); + + String fileName = StrUtil.split(udf.getClassName(), ".").get(0) + ".py"; + String path = PathConstant.getUdfPackagePath(taskId, PathConstant.UDF_PYTHON_NAME); + File file = FileUtil.file(path); + FileUtil.del(file); + try (ZipWriter zipWriter = new ZipWriter(file, Charset.defaultCharset())) { + zipWriter.add(fileName, inputStream); + } + udf.setCompilePackagePath(file.getAbsolutePath()); + return file.getAbsolutePath(); + } } diff --git a/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java b/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java index 3fd46bcbd8..f509544d58 100644 --- a/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java +++ b/dinky-function/src/main/java/org/dinky/function/compiler/ScalaCompiler.java @@ -34,7 +34,7 @@ public class ScalaCompiler implements FunctionCompiler { @Override - public boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) { + public boolean compiler(UDF udf, ReadableConfig conf, Integer taskId) { // TODO 改为ProcessStep注释 String className = udf.getClassName(); diff --git a/dinky-function/src/main/java/org/dinky/function/constant/PathConstant.java b/dinky-function/src/main/java/org/dinky/function/constant/PathConstant.java index 83ac2f2f55..b9157a4b8c 100644 --- a/dinky-function/src/main/java/org/dinky/function/constant/PathConstant.java +++ b/dinky-function/src/main/java/org/dinky/function/constant/PathConstant.java @@ -35,6 +35,8 @@ public class PathConstant { /** UDF path */ public static final String UDF_PATH = TMP_PATH + "udf" + File.separator; + public static final String TASK_PATH = TMP_PATH + "task"; + public static final String COMPILER = "compiler"; public static final String PACKAGE = "package"; /** UDF jar rules */ @@ -63,7 +65,11 @@ public static String getUdfCompilerPath(FunctionLanguage language) { return getPath(UDF_PATH, COMPILER, language.name()); } - public static String getUdfPackagePath(Integer missionId, Object... path) { - return getPath(UDF_PATH, missionId, PACKAGE, path); + public static String getUdfPackagePath(Integer taskId, Object... path) { + return getPath(UDF_PATH, taskId, PACKAGE, path); + } + + public static String getTaskUdfPath(Integer taskId) { + return getPath(TASK_PATH, taskId, "udf"); } } diff --git a/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java b/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java index 6cb8c1535c..dd4ea22841 100644 --- a/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java +++ b/dinky-function/src/main/java/org/dinky/function/data/model/UDF.java @@ -41,4 +41,9 @@ public class UDF { FunctionLanguage functionLanguage; /** udf源代码 */ String code; + /** + * en: Compile the artifact path
+ * zh: 编译产物路径 + */ + String compilePackagePath; } diff --git a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java index 65e425889a..a41e8d1fef 100644 --- a/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java +++ b/dinky-function/src/main/java/org/dinky/function/util/UDFUtil.java @@ -27,7 +27,6 @@ import org.dinky.data.exception.DinkyException; import org.dinky.data.model.FlinkUdfManifest; import org.dinky.data.model.SystemConfiguration; -import org.dinky.executor.CustomTableEnvironment; import org.dinky.function.FunctionFactory; import org.dinky.function.compiler.CustomStringJavaCompiler; import org.dinky.function.compiler.CustomStringScalaCompiler; @@ -41,7 +40,6 @@ import org.apache.flink.client.python.PythonFunctionFactory; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.python.PythonOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.FunctionLanguage; @@ -154,21 +152,21 @@ public static String templateParse(String dialect, String template, String class } } - public static String[] initJavaUDF(List udf, Integer missionId) { + public static String[] initJavaUDF(List udf, Integer taskId) { return FunctionFactory.initUDF( CollUtil.newArrayList( CollUtil.filterNew(udf, x -> x.getFunctionLanguage() != FunctionLanguage.PYTHON)), - missionId, + taskId, null) .getJarPaths(); } public static String[] initPythonUDF( - List udf, GatewayType gatewayType, Integer missionId, Configuration configuration) { + List udf, GatewayType gatewayType, Integer taskId, Configuration configuration) { return FunctionFactory.initUDF( CollUtil.newArrayList( CollUtil.filterNew(udf, x -> x.getFunctionLanguage() == FunctionLanguage.PYTHON)), - missionId, + taskId, configuration) .getPyPaths(); } @@ -343,6 +341,7 @@ public static UDF toUDF(String statement, DinkyClassLoader classLoader) { .className(className) .code(udf.getCode()) .functionLanguage(udf.getFunctionLanguage()) + .compilePackagePath(udf.getCompilePackagePath()) .build(); } String gitPackage = UdfCodePool.getGitPackage(className); @@ -449,15 +448,6 @@ private static List execPyAndGetUdfNameList(String pyPath, String pyFile } } - public static void addConfigurationClsAndJars( - CustomTableEnvironment customTableEnvironment, List jarList, List classpaths) { - customTableEnvironment.addConfiguration( - PipelineOptions.CLASSPATHS, - classpaths.stream().map(URL::toString).collect(Collectors.toList())); - customTableEnvironment.addConfiguration( - PipelineOptions.JARS, jarList.stream().map(URL::toString).collect(Collectors.toList())); - } - public static void writeManifest( Integer taskId, List jarPaths, FlinkUdfPathContextHolder udfPathContextHolder) { FlinkUdfManifest flinkUdfManifest = new FlinkUdfManifest(); diff --git a/dinky-web/src/locales/en-US/pages.ts b/dinky-web/src/locales/en-US/pages.ts index 6001e26e9b..199cc2d60c 100644 --- a/dinky-web/src/locales/en-US/pages.ts +++ b/dinky-web/src/locales/en-US/pages.ts @@ -578,6 +578,7 @@ export default { 'pages.datastudio.label.jobInfo.versionId': 'Version number', 'pages.datastudio.label.jobInfo.firstLevelOwner': 'Owner', 'pages.datastudio.label.jobInfo.secondLevelOwners': 'Maintainer', + 'pages.datastudio.label.jobInfo.className': 'ClassName', 'pages.datastudio.label.result.query.latest.data': 'Get the latest data', 'pages.datastudio.label.result.query.latest.data.truncate': 'The data is too long to be displayed in full', diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index 3c3751288a..4dbfa27767 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -515,6 +515,7 @@ export default { 'pages.datastudio.label.jobInfo.versionId': '版本号', 'pages.datastudio.label.jobInfo.firstLevelOwner': '责任人', 'pages.datastudio.label.jobInfo.secondLevelOwners': '维护人', + 'pages.datastudio.label.jobInfo.className': '类名', 'pages.datastudio.label.result.query.latest.data': '获取最新数据', 'pages.datastudio.label.result.query.latest.data.truncate': '数据过长无法全部显示', 'pages.datastudio.label.version': '版本历史', diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/SavePoint.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/SavePoint.tsx new file mode 100644 index 0000000000..263c1089ff --- /dev/null +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/SavePoint.tsx @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { postAll } from '@/services/api'; +import { API_CONSTANTS } from '@/services/endpoints'; +import { l } from '@/utils/intl'; +import { ActionType, ProColumns, ProDescriptions, ProTable } from '@ant-design/pro-components'; +import { ProDescriptionsItemProps } from '@ant-design/pro-descriptions'; +import { Drawer } from 'antd'; +import { useRef, useState } from 'react'; + +export type SavePointData = { + id: number; + taskId: number; + name: string; + type: string; + path: string; + createTime: Date; +}; + +export const SavePoint = (props: { taskId: number }) => { + const { taskId } = props; + + const [row, setRow] = useState(); + const actionRef = useRef(); + actionRef.current?.reloadAndRest?.(); + + const columns: ProDescriptionsItemProps[] | ProColumns[] = [ + { + title: l('pages.task.savePointPath'), + dataIndex: 'path', + hideInForm: true, + hideInSearch: true + }, + { + title: l('global.table.createTime'), + dataIndex: 'createTime', + valueType: 'dateTime', + hideInForm: true, + hideInSearch: true, + render: (dom: any, entity: SavePointData) => { + return setRow(entity)}>{dom}; + } + } + ]; + + return ( + <> + + className={'datastudio-theme'} + actionRef={actionRef} + rowKey='id' + request={(params, sorter, filter) => + postAll(API_CONSTANTS.GET_SAVEPOINT_LIST, { ...params, sorter, filter }) + } + params={{ taskId }} + columns={columns as ProColumns[]} + search={false} + /> + { + setRow(undefined); + }} + closable={false} + > + {row?.name && ( + + column={2} + title={row?.name} + request={async () => ({ + data: row || {} + })} + params={{ + id: row?.name + }} + columns={columns as ProDescriptionsItemProps[]} + /> + )} + + + ); +}; diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskInfo.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskInfo.tsx index f6c2bd6e7e..cda34a8cc2 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskInfo.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskInfo.tsx @@ -23,13 +23,15 @@ import Paragraph from 'antd/es/typography/Paragraph'; import { TaskState } from '@/pages/DataStudio/type'; import { showFirstLevelOwner, showSecondLevelOwners } from '@/pages/DataStudio/function'; import { UserBaseInfo } from '@/types/AuthCenter/data'; +import { isUDF } from '@/pages/DataStudio/Toolbar/Project/function'; export const TaskInfo = (props: { params: TaskState; users: UserBaseInfo.User[] }) => { const { - params: { taskId, name, dialect, versionId, firstLevelOwner, secondLevelOwners }, + params: { taskId, name, dialect, versionId, firstLevelOwner, secondLevelOwners, savePointPath }, users } = props; + console.log(savePointPath); return (
@@ -51,6 +53,11 @@ export const TaskInfo = (props: { params: TaskState; users: UserBaseInfo.User[] {showSecondLevelOwners(secondLevelOwners, users)} + {isUDF(dialect) && ( + + {savePointPath} + + )}
); diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx index 687f4a37f1..3c6affbe07 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx @@ -97,6 +97,7 @@ import { SseData, Topic } from '@/models/UseWebSocketModel'; import { ResourceInfo } from '@/types/RegCenter/data'; import { buildResourceTreeDataAtTreeForm } from '@/pages/RegCenter/Resource/components/FileTree/function'; import { ProFormDependency } from '@ant-design/pro-form'; +import { SavePoint } from '@/pages/DataStudio/CenterTabContent/SqlTask/SavePoint'; export type FlinkSqlProps = { showDesc: boolean; @@ -226,6 +227,15 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { setLoading(false); }, []); + useEffect(() => { + setCurrentState((prevState) => ({ + ...prevState, + name: params.name, + note: params.note, + secondLevelOwners: params.secondLevelOwners, + firstLevelOwner: params.firstLevelOwner + })); + }, [params]); useEffect(() => { return subscribeTopic(Topic.TASK_RUN_INSTANCE, null, (data: SseData) => { if (data?.data?.RunningTaskId) { @@ -381,6 +391,11 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { /> ) }); + rightToolbarItem.push({ + label: l('menu.datastudio.savePoint'), + key: 'savePoint', + children: + }); } rightToolbarItem.push({ @@ -434,9 +449,8 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { }, [currentState, updateAction]); const handleLineage = useCallback(async () => { - const { type, dialect, databaseId, statement, envId, fragment, taskId } = currentState; + const { dialect, databaseId, statement, envId, fragment, taskId } = currentState; const params: StudioLineageParams = { - type: 1, // todo: 暂时写死 ,后续优化 dialect: dialect, envId: envId ?? -1, fragment: fragment, @@ -444,7 +458,8 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { statementSet: true, databaseId: databaseId ?? 0, variables: {}, - taskId: taskId + taskId: taskId, + configJson: currentState.configJson }; const data = (await getDataByParams( API_CONSTANTS.STUDIO_GET_LINEAGE, diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx index cee586c51e..dd5fc52c75 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx @@ -48,7 +48,6 @@ import { SseData, Topic } from '@/models/UseWebSocketModel'; export const Project = (props: any) => { const { - dispatch, centerContent, project: { expandKeys, selectedKeys }, action: { actionType, params }, diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Resource/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Resource/index.tsx index 331984a899..733bf6ed2c 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Resource/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Resource/index.tsx @@ -309,6 +309,7 @@ const Resource = (props: { ghost size={'small'} bodyStyle={{ height: 'calc(100vh - 180px)', overflow: 'auto' }} + className={'datastudio-theme'} >