Skip to content

Commit

Permalink
Debugging a normal job returns multiple query result sets
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzehan committed Dec 20, 2023
1 parent 177330d commit 9bd0c19
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,6 @@ public interface DataBaseService extends ISuperService<DataBase> {
JobResult executeCommonSql(SqlDTO sqlDTO);

List<DataBase> selectListByKeyWord(String keyword);

JobResult StreamExecuteCommonSql(SqlDTO sqlDTO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.dinky.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import org.apache.commons.lang3.StringUtils;
import org.dinky.assertion.Asserts;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.constant.CommonConstant;
Expand All @@ -27,32 +30,23 @@
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.enums.Status;
import org.dinky.data.model.Column;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.QueryData;
import org.dinky.data.model.Schema;
import org.dinky.data.model.SqlGeneration;
import org.dinky.data.model.Table;
import org.dinky.data.model.*;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;
import org.dinky.mapper.DataBaseMapper;
import org.dinky.metadata.driver.Driver;
import org.dinky.metadata.result.JdbcSelectResult;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.DataBaseService;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* DataBaseServiceImpl
Expand Down Expand Up @@ -311,4 +305,46 @@ public List<DataBase> selectListByKeyWord(String keyword) {
.or()
.like(DataBase::getNote, keyword));
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE_COMMON_SQL)
@Override
public JobResult StreamExecuteCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
result.setStartTime(LocalDateTime.now());

if (Asserts.isNull(sqlDTO.getDatabaseId())) {
result.setSuccess(false);
result.setError("please assign data source");
result.setEndTime(LocalDateTime.now());
return result;
}
DataBase dataBase = getById(sqlDTO.getDatabaseId());
if (Asserts.isNull(dataBase)) {
result.setSuccess(false);
result.setError("data source not exist.");
result.setEndTime(LocalDateTime.now());
return result;
}
try (Driver driver = Driver.build(dataBase.getDriverConfig())) {
Stream<JdbcSelectResult> jdbcSelectResultStream =
driver.StreamExecuteSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
List<JdbcSelectResult> jdbcSelectResults = jdbcSelectResultStream
.takeWhile(res -> {
if (!res.isSuccess()) {
result.setSuccess(false);
result.setError(res.getError());
result.setEndTime(LocalDateTime.now());
return false;
}
return true;
})
.collect(Collectors.toList());
if (result.getError() == null) {
result.setSuccess(true);
}
result.setResults(jdbcSelectResults);
return result;
}
}
}
102 changes: 42 additions & 60 deletions dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,30 @@

package org.dinky.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.context.TenantContextHolder;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.enums.ProcessStepType;
Expand All @@ -38,14 +51,6 @@
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.exception.SqlExplainExcepition;
import org.dinky.data.exception.TaskNotDoneException;
import org.dinky.data.model.Catalogue;
import org.dinky.data.model.ClusterConfiguration;
import org.dinky.data.model.ClusterInstance;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.Savepoints;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.model.Task;
import org.dinky.data.model.TaskVersion;
import org.dinky.data.model.alert.AlertGroup;
import org.dinky.data.model.ext.TaskExtConfig;
import org.dinky.data.model.home.JobModelOverview;
Expand All @@ -72,65 +77,24 @@
import org.dinky.job.JobResult;
import org.dinky.mapper.TaskMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.AlertGroupService;
import org.dinky.service.CatalogueService;
import org.dinky.service.ClusterConfigurationService;
import org.dinky.service.ClusterInstanceService;
import org.dinky.service.DataBaseService;
import org.dinky.service.FragmentVariableService;
import org.dinky.service.JobInstanceService;
import org.dinky.service.SavepointsService;
import org.dinky.service.TaskService;
import org.dinky.service.TaskVersionService;
import org.dinky.service.UDFTemplateService;
import org.dinky.service.UserService;
import org.dinky.service.task.BaseTask;
import org.dinky.utils.FragmentVariableUtils;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.RunTimeUtil;
import org.dinky.utils.UDFUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.tree.Tree;
import cn.hutool.core.lang.tree.TreeNode;
import cn.hutool.core.lang.tree.TreeUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;

/**
* TaskServiceImpl
Expand Down Expand Up @@ -201,6 +165,18 @@ public JobResult executeJob(TaskDTO task) throws Exception {
return jobResult;
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task, Boolean stream) throws Exception {
JobResult jobResult;
if (stream) {
jobResult = BaseTask.getTask(task).StreamExecute();
} else {
jobResult = BaseTask.getTask(task).execute();
}
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
}

// Submit and export task
@ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG)
public JobConfig buildJobSubmitConfig(TaskDTO task) {
Expand Down Expand Up @@ -330,7 +306,13 @@ public JobResult debugTask(DebugDTO debugDTO) throws Exception {
taskDTO.setStatementSet(false);
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
JobResult jobResult = taskServiceBean.executeJob(taskDTO);
JobResult jobResult = new JobResult();
if (Dialect.isCommonSql(taskDTO.getDialect())) {
jobResult = taskServiceBean.executeJob(taskDTO, true);
} else {
jobResult = taskServiceBean.executeJob(taskDTO);
}

if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
log.info("Job debug success");
Task task = new Task(debugDTO.getId(), jobResult.getJobInstanceId());
Expand Down
20 changes: 11 additions & 9 deletions dinky-admin/src/main/java/org/dinky/service/task/BaseTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

package org.dinky.service.task;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AllArgsConstructor;
import org.dinky.config.Dialect;
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.TaskDTO;
Expand All @@ -31,15 +38,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public abstract class BaseTask {

Expand Down Expand Up @@ -77,4 +75,8 @@ public static BaseTask getTask(TaskDTO taskDTO) {
}
throw new RuntimeException("Not support dialect: " + taskDTO.getDialect());
}

public JobResult StreamExecute() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.service.task;

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.dinky.config.Dialect;
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.SqlDTO;
Expand All @@ -29,9 +31,6 @@

import java.util.List;

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@SupportDialect({
Dialect.SQL,
Expand Down Expand Up @@ -67,6 +66,15 @@ public JobResult execute() {
return jobResult;
}

@Override
public JobResult StreamExecute() {
log.info("Preparing to execute common sql...");
SqlDTO sqlDTO = SqlDTO.build(task.getStatement(), task.getDatabaseId(), null);
DataBaseService dataBaseService = SpringUtil.getBean(DataBaseService.class);
JobResult jobResult = dataBaseService.StreamExecuteCommonSql(sqlDTO);
return jobResult;
}

@Override
public boolean stop() {
return false;
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.dinky.job;

import org.dinky.data.result.IResult;
import org.dinky.metadata.result.JdbcSelectResult;

import java.time.LocalDateTime;
import java.util.List;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
Expand Down Expand Up @@ -95,6 +97,9 @@ public class JobResult {
@ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job")
private IResult result;

@ApiModelProperty(value = "Result data of the job", dataType = "IResult", notes = "Result data of the job")
private List<JdbcSelectResult> results;

@ApiModelProperty(
value = "Start time of job execution",
dataType = "LocalDateTime",
Expand Down
Loading

0 comments on commit 9bd0c19

Please sign in to comment.