Skip to content

Commit

Permalink
[Feature-2447][studio] Add FlinkSQL and common SQL debug preview (Dat…
Browse files Browse the repository at this point in the history
…aLinkDC#2449)

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Oct 27, 2023
1 parent 1cd2930 commit d9edebe
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import org.dinky.data.model.History;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.service.HistoryService;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -62,4 +65,14 @@ public class HistoryController {
public ProTableResult<History> listHistory(@RequestBody JsonNode para) {
return historyService.selectForProTable(para);
}

/**
* 获取Job实例的所有信息
*/
@GetMapping("/getLatestHistoryById")
@ApiOperation("Get latest history info by id")
@ApiImplicitParam(name = "id", value = "task id", dataType = "Integer", paramType = "query", required = true)
public Result<History> getLatestHistoryById(@RequestParam Integer id) {
return Result.succeed(historyService.getLatestHistoryById(id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public interface HistoryService extends ISuperService<History> {
*/
@Deprecated
boolean removeHistoryById(Integer id);

/**
* Get latest history info by task id.
*
* @param id The ID of the task.
* @return History info.
*/
History getLatestHistoryById(Integer id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

/**
* HistoryServiceImpl
*
Expand All @@ -43,4 +45,12 @@ public boolean removeHistoryById(Integer id) {
}
return removeById(id);
}

@Override
public History getLatestHistoryById(Integer id) {
return baseMapper.selectOne(new QueryWrapper<History>()
.eq("task_id", id)
.orderByDesc("start_time")
.last("limit 1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {

@Override
public JdbcSelectResult getCommonSqlData(Integer taskId) {
return ResultPool.getCommonSqlCache(taskId);
return (JdbcSelectResult) ResultPool.getCommonSqlCache(taskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception

@Override
public boolean cancelTaskJob(TaskDTO task) {
if (Dialect.isCommonSql(task.getDialect())) {
return true;
}
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
Assert.notNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage());
ClusterInstance clusterInstance = clusterInstanceService.getById(jobInstance.getClusterId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.dinky.data.annotation.SupportDialect;
import org.dinky.data.dto.SqlDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.result.ResultPool;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.job.JobResult;
import org.dinky.service.DataBaseService;

import java.util.List;
import java.util.concurrent.TimeUnit;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,8 +48,6 @@
Dialect.PRESTO
})
public class CommonSqlTask extends BaseTask {
private static final Cache<Integer, JobResult> COMMON_SQL_SEARCH_CACHE =
new TimedCache<>(TimeUnit.MINUTES.toMillis(10));

public CommonSqlTask(TaskDTO task) {
super(task);
Expand All @@ -68,7 +64,9 @@ public JobResult execute() {
log.info("Preparing to execute common sql...");
SqlDTO sqlDTO = SqlDTO.build(task.getStatement(), task.getDatabaseId(), null);
DataBaseService dataBaseService = SpringUtil.getBean(DataBaseService.class);
return COMMON_SQL_SEARCH_CACHE.get(task.getId(), () -> dataBaseService.executeCommonSql(sqlDTO));
JobResult jobResult = dataBaseService.executeCommonSql(sqlDTO);
ResultPool.putCommonSqlCache(task.getId(), jobResult.getResult());
return jobResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.dinky.data.result;

import org.dinky.metadata.result.JdbcSelectResult;

import java.util.concurrent.TimeUnit;

import cn.hutool.cache.Cache;
Expand All @@ -35,7 +33,7 @@ public final class ResultPool {

private ResultPool() {}

private static final Cache<Integer, JdbcSelectResult> COMMON_SQL_SEARCH_CACHE =
private static final Cache<Integer, IResult> COMMON_SQL_SEARCH_CACHE =
new TimedCache<>(TimeUnit.MINUTES.toMillis(10));
private static final Cache<String, SelectResult> results = new TimedCache<>(TimeUnit.MINUTES.toMillis(10));

Expand All @@ -47,11 +45,11 @@ public static void put(SelectResult result) {
results.put(result.getJobId(), result);
}

public static void putCommonSqlCache(Integer taskId, JdbcSelectResult result) {
public static void putCommonSqlCache(Integer taskId, IResult result) {
COMMON_SQL_SEARCH_CACHE.put(taskId, result);
}

public static JdbcSelectResult getCommonSqlCache(Integer taskId) {
public static IResult getCommonSqlCache(Integer taskId) {
return COMMON_SQL_SEARCH_CACHE.get(taskId);
}

Expand Down
2 changes: 1 addition & 1 deletion dinky-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"redux-persist": "^6.0.0",
"remark-gfm": "^4.0.0",
"screenfull": "^6.0.2",
"sql-formatter": "^13.0.1",
"sql-formatter": "^13.0.2",
"styled-components": "^6.0.8",
"use-sse": "^2.0.1"
},
Expand Down
1 change: 0 additions & 1 deletion dinky-web/src/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ export async function getInitialState(): Promise<{

// ProLayout 支持的api https://procomponents.ant.design/components/layout
export const layout: RunTimeLayoutConfig = ({ initialState }) => {
console.log(initialState);
const fullscreen = initialState?.fullscreen;

const defaultSettings = {
Expand Down
19 changes: 10 additions & 9 deletions dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import { isSql } from '@/pages/DataStudio/HeaderContainer/service';
import { StateType } from '@/pages/DataStudio/model';
import { postAll } from '@/services/api';
import { handleGetOption } from '@/services/BusinessCrud';
import {handleGetOption, handleGetOptionWithoutMsg} from '@/services/BusinessCrud';
import { API_CONSTANTS } from '@/services/endpoints';
import { transformTableDataToCsv } from '@/utils/function';
import { l } from '@/utils/intl';
Expand Down Expand Up @@ -142,18 +142,18 @@ const Result = (props: any) => {
} else {
// flink sql
// to do: get job data by history id list, not flink jid
if (current.jobInstanceId) {
const res = await postAll(API_CONSTANTS.GET_JOB_BY_ID, {
id: current.jobInstanceId
console.log(current);
if (current.id) {
const res = await handleGetOptionWithoutMsg(API_CONSTANTS.GET_LATEST_HISTORY_BY_ID, {
id: current.id
});
const jobData = res.datas;
if ('unknown' !== jobData.status.toLowerCase()) {
const jid = jobData.jid;
const historyData = res.datas;
if ('2' == historyData.status) {
const historyId = historyData.id;
const tableData = await handleGetOption('api/studio/getJobData', 'Get Data', {
jobId: jid
jobId: historyId
});
const datas = tableData.datas;
datas.jid = jid;
if (datas.success) {
params.resultData = datas;
setData(datas);
Expand Down Expand Up @@ -238,6 +238,7 @@ const Result = (props: any) => {
{data.columns ? (
<Table
columns={getColumns(data.columns)}
size="small"
dataSource={data.rowData?.map((item: any, index: number) => {
return { ...item, key: index };
})}
Expand Down
13 changes: 13 additions & 0 deletions dinky-web/src/services/BusinessCrud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ export const handleGetOption = async (url: string, title: string, param: any) =>
}
};

export const handleGetOptionWithoutMsg = async (url: string, param: any) => {
try {
const result = await getData(url, param);
if (result.code === RESPONSE_CODE.SUCCESS) {
return result;
}
WarningMessage(result.msg);
return undefined;
} catch (error) {
return undefined;
}
};

export const handleData = async (url: string, id: any) => {
try {
const { code, datas } = await getInfoById(url, id);
Expand Down
1 change: 1 addition & 0 deletions dinky-web/src/services/endpoints.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export enum API_CONSTANTS {
// ---- devops
GET_JOB_LIST = '/api/jobInstance',
GET_JOB_BY_ID = '/api/jobInstance/getOneById',
GET_LATEST_HISTORY_BY_ID = '/api/history/getLatestHistoryById',
GET_JOB_DETAIL = '/api/jobInstance/getJobInfoDetail',
REFRESH_JOB_DETAIL = '/api/jobInstance/refreshJobInfoDetail',
READ_CHECKPOINT = '/api/flinkConf/readCheckPoint',
Expand Down

0 comments on commit d9edebe

Please sign in to comment.