Skip to content

Commit

Permalink
[Feature-1608][admin] Support get task lineage by taskid with openapi (
Browse files Browse the repository at this point in the history
…DataLinkDC#2486)

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Nov 2, 2023
1 parent 09aa5c0 commit 2b67317
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 11 deletions.
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/APIController.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,19 @@ public Result<JobInstance> getJobInstanceByTaskId(@RequestParam Integer id) {
public Result<String> exportSql(@RequestParam Integer id) {
return Result.succeed(taskService.exportSql(id));
}

@GetMapping("/getTaskLineage")
@ApiOperation("Get Task Lineage")
@Log(title = "Get Task Lineage", businessType = BusinessType.OTHER)
@ApiImplicitParam(
name = "id",
value = "Task Id",
required = true,
dataType = "Integer",
paramType = "query",
dataTypeClass = Integer.class)
public Result getTaskLineage(@RequestParam Integer id) {
taskService.initTenantByTaskId(id);
return Result.succeed(taskService.getTaskLineage(id), "获取成功");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.dinky.assertion.Asserts;
import org.dinky.data.annotation.Log;
import org.dinky.data.dto.StudioCADTO;
import org.dinky.data.dto.StudioDDLDTO;
import org.dinky.data.dto.StudioLineageDTO;
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
Expand Down Expand Up @@ -112,7 +112,7 @@ public Result<JdbcSelectResult> getJobData(@RequestParam Integer taskId) {
required = true,
dataType = "StudioCADTO",
paramType = "body")
public Result<LineageResult> getLineage(@RequestBody StudioCADTO studioCADTO) {
public Result<LineageResult> getLineage(@RequestBody StudioLineageDTO studioCADTO) {
LineageResult lineage = studioService.getLineage(studioCADTO);
return Asserts.isNull(lineage) ? Result.failed("血缘分析异常") : Result.succeed(lineage, "血缘分析成功");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Getter
@Setter
@ApiModel(value = "StudioCADTO", description = "DTO for Studio SQL query")
public class StudioCADTO extends AbstractStatementDTO {
public class StudioLineageDTO extends AbstractStatementDTO {

@ApiModelProperty(
value = "Use Statement Set",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.dinky.service;

import org.dinky.data.dto.StudioCADTO;
import org.dinky.data.dto.StudioDDLDTO;
import org.dinky.data.dto.StudioLineageDTO;
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.Column;
Expand All @@ -47,7 +47,7 @@ public interface StudioService {

SelectResult getJobData(String jobId);

LineageResult getLineage(StudioCADTO studioCADTO);
LineageResult getLineage(StudioLineageDTO studioCADTO);

List<JsonNode> listFlinkJobs(Integer clusterId);

Expand Down
9 changes: 9 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.dinky.data.model.Task;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
Expand Down Expand Up @@ -270,4 +271,12 @@ public interface TaskService extends ISuperService<Task> {
* @return A {@link JobModelOverview} object representing the job model overview.
*/
JobModelOverview getJobStreamingOrBatchModelOverview();

/**
* Get the task with the given name and tenant ID.
*
* @param id The id of the task to get.
* @return A {@link LineageResult} object representing the found task lineage.
*/
LineageResult getTaskLineage(Integer id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.dinky.api.FlinkAPI;
import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.data.dto.StudioCADTO;
import org.dinky.data.dto.StudioDDLDTO;
import org.dinky.data.dto.StudioLineageDTO;
import org.dinky.data.dto.StudioMetaStoreDTO;
import org.dinky.data.model.Catalog;
import org.dinky.data.model.ClusterInstance;
Expand All @@ -38,6 +38,7 @@
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.job.JobConfig;
import org.dinky.job.JobManager;
import org.dinky.metadata.driver.Driver;
Expand Down Expand Up @@ -104,7 +105,7 @@ public SelectResult getJobData(String jobId) {
}

@Override
public LineageResult getLineage(StudioCADTO studioCADTO) {
public LineageResult getLineage(StudioLineageDTO studioCADTO) {
// TODO 添加ProcessStep
if (Asserts.isNotNullString(studioCADTO.getDialect())
&& !Dialect.FLINK_SQL.equalsVal(studioCADTO.getDialect())) {
Expand All @@ -118,10 +119,9 @@ public LineageResult getLineage(StudioCADTO studioCADTO) {
return null;
}
if (Dialect.DORIS.equalsVal(studioCADTO.getDialect())) {
return org.dinky.explainer.sqllineage.LineageBuilder.getSqlLineage(
studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
return SQLLineageBuilder.getSqlLineage(studioCADTO.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return org.dinky.explainer.sqllineage.LineageBuilder.getSqlLineage(
return SQLLineageBuilder.getSqlLineage(
studioCADTO.getStatement(), studioCADTO.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.dinky.data.model.UDFTemplate;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.explainer.sqllineage.SQLLineageBuilder;
import org.dinky.function.compiler.CustomStringJavaCompiler;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.function.util.UDFUtil;
Expand Down Expand Up @@ -778,6 +781,28 @@ public Result<Tree<Integer>> queryAllCatalogue() {
TreeUtil.build(dealWithCatalogue(catalogueList), -1).get(0));
}

@Override
public LineageResult getTaskLineage(Integer id) {
TaskDTO task = getTaskInfoById(id);
if (!Dialect.isCommonSql(task.getDialect())) {
if (Asserts.isNull(task.getDatabaseId())) {
return null;
}
DataBase dataBase = dataBaseService.getById(task.getDatabaseId());
if (Asserts.isNull(dataBase)) {
return null;
}
if (task.getDialect().equalsIgnoreCase("doris") || task.getDialect().equalsIgnoreCase("starrocks")) {
return SQLLineageBuilder.getSqlLineage(task.getStatement(), "mysql", dataBase.getDriverConfig());
} else {
return SQLLineageBuilder.getSqlLineage(
task.getStatement(), task.getDialect().toLowerCase(), dataBase.getDriverConfig());
}
} else {
return LineageBuilder.getColumnLineageByLogicalPlan(buildEnvSql(task));
}
}

private List<TreeNode<Integer>> dealWithCatalogue(List<Catalogue> catalogueList) {
final List<TreeNode<Integer>> treeNodes = new ArrayList<>(8);
treeNodes.add(new TreeNode<>(-1, null, "全部", -1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class LineageBuilder {
public class SQLLineageBuilder {

public static LineageResult getSqlLineageByOne(String statement, String type) {
// TODO 改为ProcessStep注释
Expand Down

0 comments on commit 2b67317

Please sign in to comment.