Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/miaoze8/dinky into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ze.miao committed Dec 9, 2024
2 parents 84a107f + 1f039d3 commit 7ee84fe
Show file tree
Hide file tree
Showing 56 changed files with 570 additions and 1,552 deletions.
9 changes: 7 additions & 2 deletions dinky-admin/src/main/java/org/dinky/configure/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ public void addInterceptors(InterceptorRegistry registry) {
}))
.addPathPatterns("/api/**", "/openapi/**")
.excludePathPatterns(
"/api/login", "/api/ldap/ldapEnableStatus", "/download/**", "/druid/**", "/api/version");
"/api/login",
"/api/sysConfig/getNeededCfg",
"/api/sysConfig/setInitConfig",
"/download/**",
"/druid/**",
"/api/version");

registry.addInterceptor(new TenantInterceptor())
.addPathPatterns("/api/**")
.excludePathPatterns("/api/login", "/api/ldap/ldapEnableStatus")
.excludePathPatterns("/api/login", "/api/sysConfig/getNeededCfg", "/api/sysConfig/setInitConfig")
.addPathPatterns("/api/alertGroup/**")
.addPathPatterns("/api/alertHistory/**")
.addPathPatterns("/api/alertInstance/**")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.AuthException;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.model.rbac.User;
import org.dinky.data.result.Result;
import org.dinky.service.LdapService;
Expand All @@ -43,7 +42,6 @@
import org.springframework.web.bind.annotation.RestController;

import cn.dev33.satoken.annotation.SaCheckLogin;
import cn.dev33.satoken.annotation.SaIgnore;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand All @@ -63,13 +61,6 @@ public class LdapController {
@Autowired
UserService userService;

@GetMapping("/ldapEnableStatus")
@SaIgnore
@ApiOperation("Get LDAP enable status")
public Result<Boolean> ldapStatus() {
return Result.succeed(SystemConfiguration.getInstances().getLdapEnable().getValue());
}

@GetMapping("/testConnection")
@ApiOperation("Test connection to LDAP server")
@SaCheckLogin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.data.dto;

import org.dinky.data.model.ext.TaskExtConfig;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
Expand All @@ -41,9 +43,6 @@ public class StudioLineageDTO extends AbstractStatementDTO {
notes = "Flag indicating whether to use Statement Set")
private Boolean statementSet;

@ApiModelProperty(value = "Type", dataType = "Integer", example = "1", notes = "The type of the SQL query")
private Integer type;

@ApiModelProperty(value = "Dialect", dataType = "String", example = "MySQL", notes = "The SQL dialect")
private String dialect;

Expand All @@ -56,4 +55,10 @@ public class StudioLineageDTO extends AbstractStatementDTO {

@ApiModelProperty(value = "Task ID", dataType = "Integer", example = "1", notes = "The identifier of the task")
private Integer taskId;

@ApiModelProperty(
value = "Configuration JSON",
dataType = "TaskExtConfig",
notes = "Extended configuration in JSON format for the task")
private TaskExtConfig configJson;
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ public Map<String, String> getUdfReferMaps() {
return Asserts.isNotNullCollection(udfRefer)
? udfRefer.stream()
.filter(item -> item.getClassName() != null)
.map(t -> {
.peek(t -> {
if (StringUtils.isEmpty(t.getName())) {
String name = t.getClassName()
.substring(t.getClassName().lastIndexOf(".") + 1);
name = name.substring(0, 1).toLowerCase() + name.substring(1);
t.setName(name);
}
return t;
})
.collect(Collectors.toConcurrentMap(TaskUdfRefer::getClassName, TaskUdfRefer::getName))
: new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) {
TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId());
taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement());
JobConfig jobConfig = taskDTO.getJobConfig();
jobConfig.setUdfRefer(studioCADTO.getConfigJson().getUdfReferMaps());
jobConfig.setConfigJson(studioCADTO.getConfigJson().getCustomConfigMaps());

return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.FunctionFactory;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.data.model.UDF;
import org.dinky.function.pool.UdfCodePool;
Expand Down Expand Up @@ -107,7 +106,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -583,15 +581,14 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
task.setVersionId(taskVersionId);
if (Dialect.isUDF(task.getDialect())) {
// compile udf class
UDF udf = UDFUtils.taskToUDF(task.buildTask());
try {
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
UDF udf = UDFUtils.taskToUDF(task.buildTask());
UdfCodePool.addOrUpdate(udf);
} catch (Throwable e) {
throw new BusException(
"UDF compilation failed and cannot be published. The error message is as follows:"
+ e.getMessage());
}
UdfCodePool.addOrUpdate(udf);
}
} else {
if (Dialect.isUDF(task.getDialect())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@

@AllArgsConstructor
public abstract class BaseTask {

private static final Set<Class<?>> taskRegistry =
ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class);

final TaskDTO task;

public abstract JobResult execute() throws Exception;
Expand All @@ -58,9 +62,7 @@ public ObjectNode getJobPlan() throws NotSupportExplainExcepition {
}

public static BaseTask getTask(TaskDTO taskDTO) {
Set<Class<?>> classes =
ClassUtil.scanPackageBySuper(BaseTask.class.getPackage().getName(), BaseTask.class);
for (Class<?> clazz : classes) {
for (Class<?> clazz : taskRegistry) {
SupportDialect annotation = clazz.getAnnotation(SupportDialect.class);
if (annotation != null) {
for (Dialect dialect : annotation.value()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;
import org.dinky.job.runner.FlinkJarUtil;

import java.util.List;

Expand Down Expand Up @@ -53,8 +54,9 @@ public boolean stop() {

@Override
public ObjectNode getJobPlan() {
String statement = task.getStatement();
try {
return jobManager.getJarStreamGraphJson(task.getStatement());
return FlinkJarUtil.getJobPlan(statement, jobManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.dinky.job.JobResult;
import org.dinky.utils.UDFUtils;

import java.util.Collections;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.exceptions.ExceptionUtil;

Expand All @@ -47,7 +45,7 @@ public JobResult execute() throws Exception {
jobResult.setStatus(Job.JobStatus.SUCCESS);
try {
UDF udf = UDFUtils.taskToUDF(BeanUtil.toBean(task, Task.class));
FunctionFactory.initUDF(Collections.singletonList(udf), task.getId());
FunctionFactory.initUDF(udf, task.getId());
} catch (Exception e) {
jobResult.setSuccess(false);
jobResult.setError(ExceptionUtil.getRootCauseMessage(e));
Expand Down
8 changes: 7 additions & 1 deletion dinky-admin/src/main/java/org/dinky/utils/UDFUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.dinky.data.exception.BusException;
import org.dinky.data.model.Task;
import org.dinky.data.model.udf.UDFManage;
import org.dinky.function.compiler.FunctionCompiler;
import org.dinky.function.compiler.FunctionPackage;
import org.dinky.function.data.model.UDF;
import org.dinky.function.util.UDFUtil;

Expand All @@ -33,11 +35,15 @@ public class UDFUtils extends UDFUtil {
public static UDF taskToUDF(Task task) {
if (Asserts.isNotNull(task.getConfigJson())
&& Asserts.isNotNull(task.getConfigJson().getUdfConfig())) {
return UDF.builder()
UDF udf = UDF.builder()
.className(task.getConfigJson().getUdfConfig().getClassName())
.code(task.getStatement())
.functionLanguage(FunctionLanguage.valueOf(task.getDialect().toUpperCase()))
.build();

FunctionCompiler.getCompilerByTask(udf, task.getConfigJson().getCustomConfigMaps(), task.getId());
FunctionPackage.bale(udf, task.getId());
return udf;
} else {
throw new BusException("udf `class` config is null,please check your udf task config");
}
Expand Down
20 changes: 13 additions & 7 deletions dinky-admin/src/main/resources/DinkyFlinkDockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
# 用来构建dinky环境
ARG FLINK_VERSION=1.14.5
ARG FLINK_BIG_VERSION=1.14
ARG FLINK_VERSION=1.20.0
ARG FLINK_BIG_VERSION=1.20

FROM flink:${FLINK_VERSION}
FROM flink:${FLINK_VERSION}-scala_2.12-java8

ARG FLINK_VERSION
ARG FLINK_BIG_VERSION
ENV PYTHON_HOME /opt/miniconda3

USER root
RUN wget "https://s3.jcloud.sjtu.edu.cn/899a892efef34b1b944a19981040f55b-oss01/anaconda/miniconda/Miniconda3-py38_4.9.2-Linux-x86_64.sh" -O "miniconda.sh" && chmod +x miniconda.sh
RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME

RUN ./miniconda.sh -b -p $PYTHON_HOME && chown -R flink $PYTHON_HOME && ls $PYTHON_HOME
USER flink

ENV PATH $PYTHON_HOME/bin:$PATH
RUN pip install "apache-flink==${FLINK_VERSION}" -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

RUN cp /opt/flink/opt/flink-python_* /opt/flink/lib/
RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
RUN rm -rf /opt/miniconda3/lib/python3.8/site-packages/ruamel*
RUN pip install "apache-flink==${FLINK_VERSION}" -i https://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/
RUN rm -f /opt/flink/lib/flink-table-planner-loader*.jar

RUN cp /opt/flink/opt/flink-table-planner* /opt/flink/lib/

RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
# RUN wget -O dinky-app-${FLINK_BIG_VERSION}.jar - ${DINKY_HTTP}/downloadAppJar/${FLINK_BIG_VERSION} | mv dinky-app-${FLINK_BIG_VERSION}.jar
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@AllArgsConstructor
public class DingTalkParams {

private String webhook = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@AllArgsConstructor
public class EmailParams {

private List<String> receivers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class FeiShuParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@

import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class HttpParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonCreator;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor(onConstructor = @__(@JsonCreator))
@NoArgsConstructor
@AllArgsConstructor
public class WechatParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default void addJar(File... jarPath) {
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
if (CollUtil.isEmpty(jars)) {
addConfiguration(PipelineOptions.JARS, pathList);
} else {
CollUtil.addAll(jars, pathList);
Expand Down
Loading

0 comments on commit 7ee84fe

Please sign in to comment.