Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Dec 7, 2024
2 parents 47eef1b + 0da1714 commit 914b84a
Show file tree
Hide file tree
Showing 44 changed files with 494 additions and 1,521 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.data.dto;

import org.dinky.data.model.ext.TaskExtConfig;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
Expand All @@ -41,9 +43,6 @@ public class StudioLineageDTO extends AbstractStatementDTO {
notes = "Flag indicating whether to use Statement Set")
private Boolean statementSet;

@ApiModelProperty(value = "Type", dataType = "Integer", example = "1", notes = "The type of the SQL query")
private Integer type;

@ApiModelProperty(value = "Dialect", dataType = "String", example = "MySQL", notes = "The SQL dialect")
private String dialect;

Expand All @@ -56,4 +55,10 @@ public class StudioLineageDTO extends AbstractStatementDTO {

@ApiModelProperty(value = "Task ID", dataType = "Integer", example = "1", notes = "The identifier of the task")
private Integer taskId;

@ApiModelProperty(
value = "Configuration JSON",
dataType = "TaskExtConfig",
notes = "Extended configuration in JSON format for the task")
private TaskExtConfig configJson;
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ public Map<String, String> getUdfReferMaps() {
return Asserts.isNotNullCollection(udfRefer)
? udfRefer.stream()
.filter(item -> item.getClassName() != null)
.map(t -> {
.peek(t -> {
if (StringUtils.isEmpty(t.getName())) {
String name = t.getClassName()
.substring(t.getClassName().lastIndexOf(".") + 1);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
t.setName(name);
}
return t;
})
.collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName))
: new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) {
TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId());
taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement());
JobConfig jobConfig = taskDTO.getJobConfig();
jobConfig.setUdfRefer(studioCADTO.getConfigJson().getUdfReferMaps());
jobConfig.setConfigJson(studioCADTO.getConfigJson().getCustomConfigMaps());

return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
Expand Down Expand Up @@ -107,7 +106,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -583,15 +581,14 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
task.setVersionId(taskVersionId);
if (Dialect.isUDF(task.getDialect())) {
// compile udf class
UDF udf = UDFUtils.taskToUDF(task.buildTask());
try {
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
UDF udf = UDFUtils.taskToUDF(task.buildTask());
UdfCodePool.addOrUpdate(udf);
} catch (Throwable e) {
throw new BusException(
"UDF compilation failed and cannot be published. The error message is as follows:"
+ e.getMessage());
}
UdfCodePool.addOrUpdate(udf);
}
} else {
if (Dialect.isUDF(task.getDialect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@

@AllArgsConstructor
public abstract class BaseTask {

private static final Set<Class<?>> taskRegistry =
ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class);

final TaskDTO task;

public abstract JobResult execute() throws Exception;
Expand All @@ -58,9 +62,7 @@ public ObjectNode getJobPlan() throws NotSupportExplainExcepition {
}

public static BaseTask getTask(TaskDTO taskDTO) {
Set<Class<?>> classes =
ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class);
for (Class<?> clazz : classes) {
for (Class<?> clazz : taskRegistry) {
SupportDialect annotation = clazz.getAnnotation(SupportDialect.class);
if (annotation != null) {
for (Dialect dialect : annotation.value()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;
import org.dinky.job.runner.FlinkJarUtil;

import java.util.List;

Expand Down Expand Up @@ -53,8 +54,9 @@ public boolean stop() {

@Override
public ObjectNode getJobPlan() {
String statement = task.getStatement();
try {
return jobManager.getJarStreamGraphJson(task.getStatement());
return FlinkJarUtil.getJobPlan(statement, jobManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.dinky.job.JobResult;
import org.dinky.utils.UDFUtils;

import java.util.Collections;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.exceptions.ExceptionUtil;

Expand All @@ -47,7 +45,7 @@ public JobResult execute() throws Exception {
jobResult.setStatus(Job.JobStatus.SUCCESS);
try {
UDF udf = UDFUtils.taskToUDF(BeanUtil.toBean(task, Task.class));
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
FunctionFactory.initUDF(udf, task.getId());
} catch (Exception e) {
jobResult.setSuccess(false);
jobResult.setError(ExceptionUtil.getRootCauseMessage(e));
Expand Down
8 changes: 7 additions & 1 deletion dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.dinky.data.exception.BusException;
import org.dinky.data.model.Task;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.function.compiler.FunctionCompiler;
import org.dinky.function.compiler.FunctionPackage;
import org.dinky.function.data.model.UDF;
import org.dinky.function.util.UDFUtil;

Expand All @@ -33,11 +35,15 @@ public class UDFUtils extends UDFUtil {
public static UDF taskToUDF(Task task) {
if (Asserts.isNotNull(task.getConfigJson())
&& Asserts.isNotNull(task.getConfigJson().getUdfConfig())) {
return UDF.builder()
UDF udf = UDF.builder()
.className(task.getConfigJson().getUdfConfig().getClassName())
.code(task.getStatement())
.functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build();

FunctionCompiler.getCompilerByTask(udf, task.getConfigJson().getCustomConfigMaps(), task.getId());
FunctionPackage.bale(udf, task.getId());
return udf;
} else {
throw new BusException("udf `class` config is null,please check your udf task config");
}
Expand Down
20 changes: 13 additions & 7 deletions dinky-admin/src/main/resources/DinkyFlinkDockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
# 用来构建dinky环境
ARG FLINK_VERSION=1.14.5
ARG FLINK_BIG_VERSION=1.14
ARG FLINK_VERSION=1.20.0
ARG FLINK_BIG_VERSION=1.20

FROM flink:${FLINK_VERSION}
FROM flink:${FLINK_VERSION}-scala_2.12-java8

ARG FLINK_VERSION
ARG FLINK_BIG_VERSION
ENV PYTHON_HOME /opt/miniconda3

USER root
RUN wget "https://s3.jcloud.sjtu.edu.cn/899a892efef34b1b944a19981040f55b-oss01/anaconda/miniconda/Miniconda3-py38_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" && chmod +x miniconda.sh
RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME

RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME
USER flink

ENV PATH $PYTHON_HOME/bin:$PATH
RUN pip install "apache-flink==${FLINK_VERSION}" -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

RUN cp /opt/flink/opt/flink-python_* /opt/flink/lib/
RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
RUN rm -rf /opt/miniconda3/lib/python3.8/site-packages/ruamel*
RUN pip install "apache-flink==${FLINK_VERSION}" -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/
RUN rm -f /opt/flink/lib/flink-table-planner-loader*.jar

RUN cp /opt/flink/opt/flink-table-planner* /opt/flink/lib/

RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
# RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@AllArgsConstructor
public class DingTalkParams {

private String webhook = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@AllArgsConstructor
public class EmailParams {

private List<String> receivers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class FeiShuParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@

import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class HttpParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class WechatParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public void addFile(File file) {
}

public void addPyUdfPath(File file) {
getPyUdfFile().add(file);
Set<File> pyUdfFile = getPyUdfFile();
pyUdfFile.add(file);
addUdfPath(file);
}

public void addOtherPlugins(File file) {
Expand Down
46 changes: 17 additions & 29 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,12 @@
import org.dinky.executor.Executor;
import org.dinky.explainer.mock.MockStatementExplainer;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.function.util.UDFUtil;
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.job.JobParam;
import org.dinky.job.JobRunnerFactory;
import org.dinky.job.JobStatementPlan;
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.trans.Operations;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;

Expand Down Expand Up @@ -84,19 +80,6 @@ public static Explainer build(Executor executor, boolean useStatementSet, JobMan
return new Explainer(executor, useStatementSet, jobManager);
}

public Explainer initialize(JobConfig config, String statement) {
DinkyClassLoaderUtil.initClassLoader(config, jobManager.getDinkyClassLoader());
String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement));
List<UDF> udfs = parseUDFFromStatements(statements);
jobManager.setJobParam(new JobParam(udfs));
try {
JobUDFBuilder.build(jobManager).run();
} catch (Exception e) {
e.printStackTrace();
}
return this;
}

public JobStatementPlan parseStatements(String[] statements) {
JobStatementPlan jobStatementPlanWithMock = new JobStatementPlan();
generateUDFStatement(jobStatementPlanWithMock);
Expand All @@ -115,7 +98,9 @@ private void generateUDFStatement(JobStatementPlan jobStatementPlan) {
List<String> udfStatements = new ArrayList<>();
Optional.ofNullable(jobManager.getConfig().getUdfRefer())
.ifPresent(t -> t.forEach((key, value) -> {
String sql = String.format("create temporary function %s as '%s'", value, key);
UDF udf = UdfCodePool.getUDF(key);
String sql = String.format(
"create temporary function %s as '%s' language %s", value, key, udf.getFunctionLanguage());
udfStatements.add(sql);
}));
for (String udfStatement : udfStatements) {
Expand Down Expand Up @@ -217,24 +202,27 @@ public List<LineageRel> getLineage(String statement) {
.fragment(true)
.statementSet(useStatementSet)
.parallelism(1)
.udfRefer(jobManager.getConfig().getUdfRefer())
.configJson(executor.getTableConfig().getConfiguration().toMap())
.build();
jobManager.setConfig(jobConfig);
jobManager.setExecutor(executor);
this.initialize(jobConfig, statement);

List<LineageRel> lineageRelList = new ArrayList<>();
for (String item : SqlUtil.getStatements(statement)) {
String[] statements = SqlUtil.getStatements(statement);
JobStatementPlan jobStatementPlan = parseStatements(statements);
List<JobStatement> statementList = jobStatementPlan.getJobStatementList();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(jobManager);

for (JobStatement item : statementList) {
String sql = item.getStatement();
SqlType sqlType = item.getSqlType();

try {
String sql = FlinkInterceptor.pretreatStatement(executor, item);
if (Asserts.isNullString(sql)) {
continue;
}
SqlType operationType = Operations.getOperationType(sql);
if (operationType.equals(SqlType.INSERT)) {
if (sqlType.equals(SqlType.INSERT)) {
lineageRelList.addAll(executor.getLineage(sql));
} else if (!operationType.equals(SqlType.SELECT) && !operationType.equals(SqlType.PRINT)) {
executor.executeSql(sql);
} else if (!sqlType.equals(SqlType.SELECT) && !sqlType.equals(SqlType.PRINT)) {
jobRunnerFactory.getJobRunner(item.getStatementType()).run(item);
}
} catch (Exception e) {
log.error("Exception occurred while fetching lineage information", e);
Expand Down
Loading

0 comments on commit 914b84a

Please sign in to comment.