Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]Flink sql task customized test case input #3974

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.dinky.data.enums.BusinessType;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.NotSupportExplainExcepition;
import org.dinky.data.model.TableTestCase;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
Expand All @@ -40,6 +41,7 @@

import java.util.List;

import org.dinky.service.TaskTestCaseService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class APIController {

private final TaskService taskService;
private final JobInstanceService jobInstanceService;
private final TaskTestCaseService taskTestCaseService;

@GetMapping("/version")
@ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number")
Expand Down Expand Up @@ -217,4 +220,9 @@ public Result getTaskLineage(@RequestParam Integer id) {
public ProTableResult<JobInstanceVo> listJobInstances(@RequestBody JsonNode para) {
return jobInstanceService.listJobInstances(para);
}

@GetMapping("/getTableTestCase")
public Result<TableTestCase> getTableTestCase(@RequestParam Integer taskId, @RequestParam String tableName) {
return Result.succeed(taskTestCaseService.getTestCaseByTaskIdAndTableName(taskId,tableName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.dinky.controller;

import cn.dev33.satoken.annotation.SaCheckLogin;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dinky.data.annotations.CheckTaskOwner;
import org.dinky.data.annotations.TaskId;
import org.dinky.data.dto.TaskTestCaseListDTO;
import org.dinky.data.enums.Status;
import org.dinky.data.result.Result;
import org.dinky.service.TaskTestCaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@Api(tags = "Task Test Case Controller")
@RequestMapping("/api/testCase")
@SaCheckLogin
@RequiredArgsConstructor
public class TaskTestCaseController {
private final TaskTestCaseService taskTestCaseService;

@PostMapping("/listTaskTestCaseByStatement")
@ApiOperation("Query Test cases Bv TaskId")
@ApiImplicitParam(
name = "TaskTestcaseDTO",
value = "Task Test case DTO",
dataType = "TaskTestcaseDTO",
paramType = "body",
required = true
)
@CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskTestCaseService.class)
public Result<TaskTestCaseListDTO> getTestCaseBasedOnStatement(@RequestBody TaskTestCaseListDTO taskTestCaseListDTO) {
return Result.succeed(taskTestCaseService.getTestCaseBasedOnStatement(taskTestCaseListDTO));
}

@PostMapping("/saveDrUpdateTestCase")
@ApiOperation("Save Or Update Test Cases By TaskId")
@ApiImplicitParam(
name = "taskTestCaseDTO",
value = "task id and test cases of every source table",
dataType = "TaskTestCaseDTO",
paramType = "body",
required = true
)
@CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskTestCaseService.class)
public Result<Void> saveOrUpdateTestCase(@RequestBody TaskTestCaseListDTO taskTestCaseListDTO) {
try {
taskTestCaseService.savOrUpdateTestCase(taskTestCaseListDTO);
return Result.succeed(Status.MODIFY_SUCCESS);
} catch (Exception e) {
log.error(e.getMessage());
return Result.failed(Status.MODIFY_FAILED);

}
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.dinky.data.dto;


import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.dinky.data.annotations.TaskId;
import org.dinky.data.model.TableTestCase;

import java.util.List;

@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@ApiModel(value = "TaskTestCaseDTo", description = "Task Test Case DTo")
public class TaskTestCaseListDTO extends AbstractStatementDTO {

@ApiModelProperty(value = "taskId", required = true, dataType = "Integer")
@TaskId
private Integer taskId;
@ApiModelProperty(value = "testCaseList", required = true, dataType = "List", allowEmptyValue = true)
private List<TableTestCase> taskTableInputList;
}
49 changes: 49 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/model/TableTestCase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.dinky.data.model;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.dinky.data.typehandler.JSONObjectHandler;

import java.util.List;
import java.util.Map;

/**
* Test case for a table in a task
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@EqualsAndHashCode
@TableName("dinky_task_test_case")
@ApiModel(value = "Table Test Case", description = "Test cases for a table in a task")
public class TableTestCase {
@TableId(type = IdType.AUTO)
@ApiModelProperty(value = "ID", dataType = "Integer", example = "1", notes = "Unique identifier for a test case")
private Integer id;

@ApiModelProperty(value = "Task Id", dataType = "Integer", example = "1", notes = "test case related task id")
private Integer taskId;

@ApiModelProperty(value = "Task Name", dataType = "String", example = "inputTableName", notes = "test case related table name in a task")
private String tableName;

@ApiModelProperty(value = "columns", dataType = "String", example = "[\"columnA\", \"columnB\"]", notes = "table columns")
@TableField(typeHandler = JSONObjectHandler.class)
private List<String> columns;

@ApiModelProperty(value = "row data", dataType = "String", example = "[{\"columnA\":\"valueA\", \"columnB\":\"valueB\"}]")
@TableField(typeHandler = JSONObjectHandler.class)
private List<Map<String, String>> rowData;


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.dinky.mapper;

import org.dinky.data.model.TableTestCase;
import org.dinky.mybatis.mapper.SuperMapper;

public interface TaskTestCaseMapper extends SuperMapper<TableTestCase> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.dinky.service;

import org.dinky.data.dto.TaskTestCaseListDTO;
import org.dinky.data.model.TableTestCase;
import org.dinky.mybatis.service.ISuperService;

public interface TaskTestCaseService extends ISuperService<TableTestCase> {

TaskTestCaseListDTO getTestCaseBasedOnStatement(TaskTestCaseListDTO taskTestCaseListDTO);

void savOrUpdateTestCase(TaskTestCaseListDTO taskTestCaseListDTO);

Boolean checkTaskOperatePermission(Integer taskId);

TableTestCase getTestCaseByTaskIdAndTableName(Integer taskId, String tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package org.dinky.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dinky.assertion.Asserts;
import org.dinky.data.dto.TaskTestCaseListDTO;
import org.dinky.data.model.TableTestCase;
import org.dinky.executor.ExecutorConfig;
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageColumn;
import org.dinky.explainer.lineage.LineageTable;
import org.dinky.mapper.TaskTestCaseMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.TaskService;
import org.dinky.service.TaskTestCaseService;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
@Slf4j
public class TaskTestCaseServiceImpl extends SuperServiceImpl<TaskTestCaseMapper, TableTestCase> implements TaskTestCaseService {

private final TaskService taskService;

@Override
public TaskTestCaseListDTO getTestCaseBasedOnStatement(TaskTestCaseListDTO taskTestCaseListDTO) {
// generate source tables and schemes
List<LineageTable> tableschemeParsedFromstatementlist = LineageBuilder.getSourceTablesByLogicalPlan(taskTestCaseListDTO.getStatement(), ExecutorConfig.DEFAULT);

// Merged test case list
List<TableTestCase> mergedTestCaseList = new ArrayList<>();

// get existed test cases from database
List<TableTestCase> testCaseFromDbList = baseMapper.selectList(
new LambdaQueryWrapper<TableTestCase>().eq(TableTestCase::getTaskId, taskTestCaseListDTO.getTaskId()));
Map<String, TableTestCase> testCaseFromDbMap = new HashMap<>();
testCaseFromDbList.forEach((testCase) -> testCaseFromDbMap.put(testCase.getTableName(), testCase));

// ergodic every table and fetch test case row data as much as possible
for (LineageTable tableScheme : tableschemeParsedFromstatementlist) {
// table parsed from the latest statement
String tableName = tableScheme.getName();
TableTestCase mergedTableTestCase = TableTestCase
.builder()
.tableName(tableName)
.columns(tableScheme.getColumns().stream().map(LineageColumn::getName).collect(Collectors.toList()))
.rowData(new ArrayList<>())
.build();
mergedTestCaseList.add(mergedTableTestCase);
// try to reuse test cases
if (testCaseFromDbMap.containsKey(tableName)) {
Set<String> latestColumnSet = new HashSet<>(mergedTableTestCase.getColumns());
List<Map<String, String>> rowDataListFromDb = testCaseFromDbMap.get(tableName).getRowData();
// handle every row data
mergedTableTestCase.getRowData().addAll(getRowDataCanBeUsedInNewScheme(latestColumnSet, rowDataListFromDb));
}
}

return TaskTestCaseListDTO.builder().taskTableInputList(mergedTestCaseList).build();
}

@Override
public void savOrUpdateTestCase(TaskTestCaseListDTO taskTestCaseListDTO) {
List<TableTestCase> testCaseList = taskTestCaseListDTO.getTaskTableInputList();

Map<String, TableTestCase> tableNameTestCaseMap = new HashMap<>();
if (Asserts.isNotNull(testCaseList)) {
// task id init
testCaseList.forEach((tableTestCase -> {
tableTestCase.setTaskId(taskTestCaseListDTO.getTaskId());
tableNameTestCaseMap.put(tableTestCase.getTableName(), tableTestCase);
}));
// update id in order to update existed records
List<TableTestCase> existedTestCaselist = baseMapper.selectList(new LambdaQueryWrapper<TableTestCase>().eq(TableTestCase::getTaskId, taskTestCaseListDTO.getTaskId()));
// delete test cases that do not exist in latest input source tables
List<Integer> deleteTestCaseIdlist = new ArrayList<>();

for (TableTestCase existedTestCase : existedTestCaselist) {
String existedTableName = existedTestCase.getTableName();
if (tableNameTestCaseMap.containsKey(existedTableName)) {
// based on existed id, existed table test case will be updated
TableTestCase tableTestCase = tableNameTestCaseMap.get(existedTableName);
tableTestCase.setId(existedTestCase.getId());
} else {
deleteTestCaseIdlist.add(existedTestCase.getId());
}
}

if (Asserts.isNotNullCollection(testCaseList)) {
baseMapper.insertOrUpdate(testCaseList);
}
if (Asserts.isNotNullCollection(deleteTestCaseIdlist)) {
baseMapper.deleteByIds(deleteTestCaseIdlist);
}

}

}

@Override
public Boolean checkTaskOperatePermission(Integer taskId) {
return taskService.checkTaskOperatePermission(taskId);
}

@Override
public TableTestCase getTestCaseByTaskIdAndTableName(Integer taskId, String tableName) {
return baseMapper.selectOne(new LambdaQueryWrapper<TableTestCase>().eq(TableTestCase::getTaskId, taskId).eq(TableTestCase::getTableName, tableName));
}

private List<Map<String, String>> getRowDataCanBeUsedInNewScheme
(Set<String> latestColumnSet, List<Map<String, String>> existingList) {
List<Map<String, String>> mergedRowDataList = new ArrayList<>();
// handle every row data
for (Map<String, String> rowData : existingList) {
Map<String, String> mergedRowData = new HashMap<>();
for (Map.Entry<String, String> entry : rowData.entrySet()) {
String existedColumnName = entry.getKey();
String existedColumnData = entry.getValue();
// keep the column data that still contains in latest scheme
if (latestColumnSet.contains(existedColumnName)) {
mergedRowData.put(existedColumnName, existedColumnData);
}
}
if (Asserts.isNotNullMap(mergedRowData)) {
mergedRowDataList.add(mergedRowData);
}
}
return mergedRowDataList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for dinky_task_test_case
-- ----------------------------
CREATE TABLE IF NOT EXISTS `dinky_task_test_case` (
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`task_id` INT(11) NOT NULL COMMENT 'taskId',
`table_name` VARCHAR(255) CHARACTER SET Utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'table name',
`columns` MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'columns',
`row_data` MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'row data',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE INDEX `id_table_name_un_idx1` (`task_id`, `table_name`) USING BTREE,
FOREIGN KEY (`task_id`) REFERENCES `dinky_task` (`id`) ON DELETE CASCADE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'Test Case' ROW_FORMAT = Dynamic;


SET FOREIGN_KEY_CHECKS = 1;
Loading
Loading