Skip to content

Commit

Permalink
[Feature-2438][studio] Add Flink SQL debug (DataLinkDC#2439)
Browse files Browse the repository at this point in the history
Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Oct 26, 2023
1 parent 7bb0aae commit ffadb0f
Show file tree
Hide file tree
Showing 20 changed files with 246 additions and 30 deletions.
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.annotation.Log;
import org.dinky.data.annotations.ExecuteProcess;
import org.dinky.data.annotations.ProcessId;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.BusinessType;
Expand Down Expand Up @@ -81,6 +82,20 @@ public Result<JobResult> submitTask(@ProcessId @RequestParam Integer id) throws
}
}

@PostMapping("/debugTask")
@ApiOperation("Debug Task")
@Log(title = "Debug Task", businessType = BusinessType.DEBUG)
@ApiImplicitParam(
name = "debugTask",
value = "Debug Task",
required = true,
dataType = "DebugDTO",
paramType = "body")
public Result<JobResult> debugTask(@RequestBody DebugDTO debugDTO) throws Exception {
JobResult result = taskService.debugTask(debugDTO);
return Result.succeed(result, Status.EXECUTE_SUCCESS);
}

@GetMapping("/cancel")
@Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
Expand Down
69 changes: 69 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/DebugDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.dto;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;

/**
* Param for debug flink sql and common sql
*/
@Getter
@Setter
@ApiModel(value = "DebugDTO", description = "Param for debug flink sql and common sql")
public class DebugDTO {

@ApiModelProperty(
value = "Task ID",
dataType = "Integer",
example = "1",
notes = "The ID of Task which is debugged")
private Integer id;

@ApiModelProperty(
value = "Use Result",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to preview table result")
private boolean useResult = true;

@ApiModelProperty(
value = "Use ChangeLog",
dataType = "boolean",
example = "false",
notes = "Flag indicating whether to preview change log")
private boolean useChangeLog = false;

@ApiModelProperty(
value = "Use Auto Cancel",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to auto cancel after preview the maximum rows")
private boolean useAutoCancel = true;

@ApiModelProperty(
value = "Max Row Number",
dataType = "Integer",
example = "1000",
notes = "The maximum number of rows to preview")
private Integer maxRowNum = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public enum BusinessType {
/** 提交 */
SUBMIT,

/** Debug */
DEBUG,

/** 执行 */
EXECUTE,

Expand Down
10 changes: 10 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.service;

import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
Expand Down Expand Up @@ -69,6 +70,15 @@ public interface TaskService extends ISuperService<Task> {
*/
JobResult submitTask(Integer id, String savePointPath) throws Exception;

/**
* Debug the given task and return the job result.
*
* @param debugDTO The param of preview task.
* @return A {@link JobResult} object representing the result of the submitted task.
* @throws ExcuteException If there is an error debugging the task.
*/
JobResult debugTask(DebugDTO debugDTO) throws Exception;

/**
* Restart the given task and return the job result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
Expand Down Expand Up @@ -196,8 +197,7 @@ public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainEx

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task) throws Exception {
JobResult jobResult;
jobResult = BaseTask.getTask(task).execute();
JobResult jobResult = BaseTask.getTask(task).execute();
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public String buildEnvSql(AbstractStatementDTO task) {
task.setVariables(fragmentVariableService.listEnabledVariables());
}
int envId = Optional.ofNullable(task.getEnvId()).orElse(-1);
if (envId >= 0) {
if (envId > 0) {
TaskDTO envTask = this.getTaskInfoById(task.getEnvId());
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
sql += envTask.getStatement() + CommonConstant.LineSep;
Expand Down Expand Up @@ -279,6 +279,34 @@ public JobResult submitTask(Integer id, String savePointPath) throws Exception {
return jobResult;
}

@Override
@ProcessStep(type = ProcessStepType.SUBMIT_TASK)
public JobResult debugTask(DebugDTO debugDTO) throws Exception {
initTenantByTaskId(debugDTO.getId());

TaskDTO taskDTO = this.getTaskInfoById(debugDTO.getId());
taskDTO.setUseResult(debugDTO.isUseResult());
taskDTO.setUseChangeLog(debugDTO.isUseChangeLog());
taskDTO.setUseAutoCancel(debugDTO.isUseAutoCancel());
taskDTO.setMaxRowNum(debugDTO.getMaxRowNum());
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
taskServiceBean.preCheckTask(taskDTO);

JobResult jobResult = taskServiceBean.executeJob(taskDTO);

if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
log.info("Job debug success");
Task task = new Task(debugDTO.getId(), jobResult.getJobInstanceId());
if (!this.updateById(task)) {
throw new BusException(Status.TASK_UPDATE_FAILED.getMessage());
}
} else {
log.error("Job debug failed, error: " + jobResult.getError());
}
return jobResult;
}

@Override
public JobResult restartTask(Integer id, String savePointPath) throws Exception {
TaskDTO task = this.getTaskInfoById(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@Getter
public enum ProcessStepType {
SUBMIT_TASK("SUBMIT_TASK", Status.PROCESS_SUBMIT_SUBMITTASK),
SUBMIT_DEBUG("DEBUG_TASK", Status.PROCESS_SUBMIT_SUBMITTASK),
SUBMIT_PRECHECK("SUBMIT_PRECHECK", Status.PROCESS_SUBMIT_CHECKSQL),
SUBMIT_EXECUTE("SUBMIT_EXECUTE", Status.PROCESS_SUBMIT_EXECUTE),
SUBMIT_BUILD_CONFIG("SUBMIT_BUILD_CONFIG", Status.PROCESS_SUBMIT_BUILDCONFIG),
Expand Down
3 changes: 2 additions & 1 deletion dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public enum Status {
MOVE_FAILED(9028, "move.failed"),
TEST_CONNECTION_SUCCESS(9029, "test.connection.success"),
TEST_CONNECTION_FAILED(9030, "test.connection.failed"),

DEBUG_SUCCESS(9031, "debug.success"),
DEBUG_FAILED(9032, "debug.failed"),
/**
* user,tenant,role
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ delete.failed=Delete Failed
role.binding.user=Role Already Binding User , Can Not Delete
not.token=Can Not Read Token
execute.success=Execute Successfully
debug.success=Debug Successfully
debug.failed=Debug Failed
token.freezed=token has been frozen
menu.has.assign=Menu Has Assign , Can Not Delete
datasource.status.refresh.success=DataSource Status Refresh Success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ delete.failed=删除失败
role.binding.user=该角色已绑定用户,无法删除
not.token=未能读取到有效 Token
execute.success=执行成功
debug.success=调试成功
debug.failed=调试失败
token.freezed=token 已被冻结
menu.has.assign=菜单已分配,不允许删除
datasource.status.refresh.success=数据源状态刷新成功
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.result;

public abstract class AbstractResultBuilder {
protected String id;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
public interface ResultBuilder {

static ResultBuilder build(
SqlType operationType, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
SqlType operationType,
String id,
Integer maxRowNum,
boolean isChangeLog,
boolean isAutoCancel,
String timeZone) {
switch (operationType) {
case SELECT:
return new SelectResultBuilder(maxRowNum, isChangeLog, isAutoCancel, timeZone);
return new SelectResultBuilder(id, maxRowNum, isChangeLog, isAutoCancel, timeZone);
case SHOW:
case DESC:
case DESCRIBE:
Expand Down
19 changes: 12 additions & 7 deletions dinky-core/src/main/java/org/dinky/data/result/ResultRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,21 @@ public class ResultRunnable implements Runnable {

private static final String nullColumn = "";
private final TableResult tableResult;
private final String id;
private final Integer maxRowNum;
private final boolean isChangeLog;
private final boolean isAutoCancel;
private final String timeZone;

public ResultRunnable(
TableResult tableResult, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
TableResult tableResult,
String id,
Integer maxRowNum,
boolean isChangeLog,
boolean isAutoCancel,
String timeZone) {
this.tableResult = tableResult;
this.id = id;
this.maxRowNum = maxRowNum;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
Expand All @@ -67,16 +74,14 @@ public ResultRunnable(
public void run() {
try {
tableResult.getJobClient().ifPresent(jobClient -> {
String jobId = jobClient.getJobID().toHexString();
if (!ResultPool.containsKey(jobId)) {
ResultPool.put(new SelectResult(jobId, new ArrayList<>(), new LinkedHashSet<>()));
if (!ResultPool.containsKey(id)) {
ResultPool.put(new SelectResult(id, new ArrayList<>(), new LinkedHashSet<>()));
}

try {
if (isChangeLog) {
catchChangLog(ResultPool.get(jobId));
catchChangLog(ResultPool.get(id));
} else {
catchData(ResultPool.get(jobId));
catchData(ResultPool.get(id));
}
} catch (Exception e) {
log.error(String.format(e.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@
*
* @since 2021/5/25 16:03
*/
public class SelectResultBuilder implements ResultBuilder {
public class SelectResultBuilder extends AbstractResultBuilder implements ResultBuilder {

private final Integer maxRowNum;
private final boolean isChangeLog;
private final boolean isAutoCancel;
private final String timeZone;

public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
public SelectResultBuilder(
String id, Integer maxRowNum, boolean isChangeLog, boolean isAutoCancel, String timeZone) {
this.id = id;
this.maxRowNum = Asserts.isNotNull(maxRowNum) ? maxRowNum : 100;
this.isChangeLog = isChangeLog;
this.isAutoCancel = isAutoCancel;
Expand All @@ -46,7 +48,8 @@ public SelectResultBuilder(Integer maxRowNum, boolean isChangeLog, boolean isAut
public IResult getResult(TableResult tableResult) {
if (tableResult.getJobClient().isPresent()) {
String jobId = tableResult.getJobClient().get().getJobID().toHexString();
ResultRunnable runnable = new ResultRunnable(tableResult, maxRowNum, isChangeLog, isAutoCancel, timeZone);
ResultRunnable runnable =
new ResultRunnable(tableResult, id, maxRowNum, isChangeLog, isAutoCancel, timeZone);
Thread thread = new Thread(runnable, jobId);
thread.start();
return SelectResult.buildSuccess(jobId);
Expand Down
7 changes: 6 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ public static Job init(
Executor executor,
String statement,
boolean useGateway) {
return new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorConfig, executor, useGateway);
Job job = new Job(jobConfig, type, JobStatus.INITIALIZE, statement, executorConfig, executor, useGateway);
if (!useGateway) {
job.setJobManagerAddress(executorConfig.getJobManagerAddress());
}
JobContextHolder.setJob(job);
return job;
}

public JobResult getJobResult() {
Expand Down
Loading

0 comments on commit ffadb0f

Please sign in to comment.