diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index 4a0d1c6b94..80f9df38c3 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -95,8 +95,10 @@ public Result submitTask(@RequestBody TaskSubmitDto submitDto) throws @GetMapping("/cancel") // @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER) @ApiOperation("Cancel Flink Job") - public Result cancel(@RequestParam Integer id) { - return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS); + public Result cancel( + @RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) { + return Result.succeed( + taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint), Status.EXECUTE_SUCCESS); } /** diff --git a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java index 9966b34bf1..e7a5c2d329 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/TaskController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/TaskController.java @@ -106,8 +106,8 @@ public Result debugTask(@RequestBody TaskDTO task) throws Exception { @GetMapping("/cancel") @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER) @ApiOperation("Cancel Flink Job") - public Result cancel(@RequestParam Integer id) { - if (taskService.cancelTaskJob(taskService.getTaskInfoById(id))) { + public Result cancel(@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) { + if (taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint)) { return Result.succeed(Status.EXECUTE_SUCCESS); } else { return Result.failed(Status.EXECUTE_FAILED); diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskService.java b/dinky-admin/src/main/java/org/dinky/service/TaskService.java index 0bad25193c..6fd411e55b 100644 --- a/dinky-admin/src/main/java/org/dinky/service/TaskService.java +++ b/dinky-admin/src/main/java/org/dinky/service/TaskService.java @@ -114,7 +114,7 @@ public interface TaskService extends ISuperService { * @param task The {@link TaskDTO} object representing the task to cancel. * @return true if the task job is successfully cancelled, false otherwise. */ - boolean cancelTaskJob(TaskDTO task); + boolean cancelTaskJob(TaskDTO task, boolean withSavePoint); /** * Get the stream graph of the given task job. diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 7b6157afb0..cb8855ba06 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -367,7 +367,7 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId())) { String status = jobInstanceService.getById(task.getJobInstanceId()).getStatus(); if (!JobStatus.isDone(status)) { - cancelTaskJob(task); + cancelTaskJob(task, true); } } return submitTask( @@ -375,7 +375,7 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception } @Override - public boolean cancelTaskJob(TaskDTO task) { + public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint) { if (Dialect.isCommonSql(task.getDialect())) { return true; } @@ -385,7 +385,7 @@ public boolean cancelTaskJob(TaskDTO task) { Assert.notNull(clusterInstance, Status.CLUSTER_NOT_EXIST.getMessage()); JobManager jobManager = JobManager.build(buildJobConfig(task)); - return jobManager.cancel(jobInstance.getJid()); + return jobManager.cancel(jobInstance.getJid(), withSavePoint); } @Override diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index 8163dc552a..2c53a6d2c0 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -28,6 +28,7 @@ import org.dinky.context.RowLevelPermissionsContext; import org.dinky.data.annotations.ProcessStep; import org.dinky.data.enums.ProcessStepType; +import org.dinky.data.exception.BusException; import org.dinky.data.model.SystemConfiguration; import org.dinky.data.result.ErrorResult; import org.dinky.data.result.ExplainResult; @@ -378,27 +379,32 @@ public String getJobPlanJson(String statement) { .getJsonPlan(); } - public boolean cancel(String jobId) { + public boolean cancel(String jobId, boolean withSavePoint) { if (useGateway && !useRestAPI) { config.getGatewayConfig() .setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), null, null)); Gateway.build(config.getGatewayConfig()).savepointJob(); return true; - } else if (useRestAPI) { + } else if (useRestAPI && withSavePoint) { try { // Try to savepoint, if it fails, it will stop normally(尝试进行savepoint,如果失败,即普通停止) savepoint(jobId, SavePointType.CANCEL, null); return true; } catch (Exception e) { - return FlinkAPI.build(config.getAddress()).stop(jobId); + log.warn("Stop with savcePoint failed: {}, will try normal rest api stop", e.getMessage()); + return cancelNormal(jobId); } } else { - try { - return FlinkAPI.build(config.getAddress()).stop(jobId); - } catch (Exception e) { - log.error("停止作业时集群不存在: " + e); - } - return false; + return cancelNormal(jobId); + } + } + + public boolean cancelNormal(String jobId) { + try { + return FlinkAPI.build(config.getAddress()).stop(jobId); + } catch (Exception e) { + log.error("stop flink job failed:", e); + throw new BusException(e.getMessage()); } } diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx index 9a86f6fe19..58070156d0 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx @@ -17,7 +17,8 @@ * */ -import { handleGetOption, handleOption } from '@/services/BusinessCrud'; +import {handleGetOption, handleOption} from '@/services/BusinessCrud'; +import {API_CONSTANTS} from "@/services/endpoints"; export async function explainSql(title: string, params: any) { return handleOption('/api/task/explainSql', title, params); @@ -32,13 +33,22 @@ export async function debugTask(title: string, params: any) { } export async function executeSql(title: string, id: number) { - return handleGetOption('/api/task/submitTask', title, { id }); + return handleGetOption('/api/task/submitTask', title, {id}); } -export function cancelTask(title: string, id: number) { - return handleGetOption('api/task/cancel', title, { id }); +export function cancelTask(title: string, id: number, withSavePoint: boolean = true) { + return handleGetOption(API_CONSTANTS.CANCEL_JOB, title, {id, withSavePoint}); } +export function restartTask(title: string, id: number, isOnLine:boolean) { + return handleGetOption(API_CONSTANTS.RESTART_TASK, title, {id, isOnLine}); +} +export function savePointTask(title: string, taskId: number, savePointType:string) { + return handleGetOption(API_CONSTANTS.SAVEPOINT, title, {taskId, savePointType}); +} + + + export function changeTaskLife(title = '', id: number, life: number) { - return handleGetOption('api/task/changeTaskLife', title, { taskId: id, lifeCycle: life }); + return handleGetOption('api/task/changeTaskLife', title, {taskId: id, lifeCycle: life}); } diff --git a/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx b/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx index 7155908b9d..4e96a10469 100644 --- a/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx +++ b/dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx @@ -17,15 +17,15 @@ * */ -import { cancelTask } from '@/pages/DataStudio/HeaderContainer/service'; -import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants'; -import { isStatusDone } from '@/pages/DevOps/function'; -import { getData, postAll } from '@/services/api'; -import { API_CONSTANTS } from '@/services/endpoints'; -import { Jobs } from '@/types/DevOps/data'; -import { l } from '@/utils/intl'; -import { EllipsisOutlined, RedoOutlined } from '@ant-design/icons'; -import { Button, Dropdown, message, Modal, Space } from 'antd'; +import {cancelTask, restartTask, savePointTask} from '@/pages/DataStudio/HeaderContainer/service'; +import {JOB_LIFE_CYCLE} from '@/pages/DevOps/constants'; +import {isStatusDone} from '@/pages/DevOps/function'; +import {getData, postAll} from '@/services/api'; +import {API_CONSTANTS} from '@/services/endpoints'; +import {Jobs} from '@/types/DevOps/data'; +import {l} from '@/utils/intl'; +import {EllipsisOutlined, RedoOutlined} from '@ant-design/icons'; +import {Button, Dropdown, message, Modal, Space} from 'antd'; const operatorType = { RESTART_JOB: 'restart', @@ -40,52 +40,37 @@ export type OperatorType = { refesh: (isForce: boolean) => void; }; const JobOperator = (props: OperatorType) => { - const { jobDetail, refesh } = props; + const {jobDetail, refesh} = props; const webUri = `/api/flink/${jobDetail?.history?.jobManagerAddress}/#/job/running/${jobDetail?.instance?.jid}/overview`; const handleJobOperator = (key: string) => { Modal.confirm({ - title: l('devops.jobinfo.job.key', '', { key: key }), - content: l('devops.jobinfo.job.keyConfirm', '', { key: key }), + title: l('devops.jobinfo.job.key', '', {key: key}), + content: l('devops.jobinfo.job.keyConfirm', '', {key: key}), okText: l('button.confirm'), cancelText: l('button.cancel'), onOk: async () => { if (key == operatorType.CANCEL_JOB) { - postAll(API_CONSTANTS.CANCEL_JOB, { - clusterId: jobDetail?.clusterInstance?.id, - jobId: jobDetail?.instance?.jid - }); + cancelTask('', jobDetail?.instance?.taskId, false); } else if (key == operatorType.RESTART_JOB) { - getData(API_CONSTANTS.RESTART_TASK, { - id: jobDetail?.instance?.taskId, - isOnLine: jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH - }); + restartTask('', jobDetail?.instance?.taskId, jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH) } else if (key == operatorType.SAVEPOINT_CANCEL) { - getData(API_CONSTANTS.SAVEPOINT, { - taskId: jobDetail?.instance?.taskId, - savePointType: 'cancel' - }); + savePointTask('', jobDetail?.instance?.taskId, 'cancel') } else if (key == operatorType.SAVEPOINT_STOP) { - getData(API_CONSTANTS.SAVEPOINT, { - taskId: jobDetail?.instance?.taskId, - savePointType: 'stop' - }); + savePointTask('', jobDetail?.instance?.taskId, 'stop') } else if (key == operatorType.SAVEPOINT_TRIGGER) { - getData(API_CONSTANTS.SAVEPOINT, { - taskId: jobDetail?.instance?.taskId, - savePointType: 'trigger' - }); - } else { + savePointTask('', jobDetail?.instance?.taskId, 'trigger') + } else if (key == operatorType.AUTO_STOP) { cancelTask('', jobDetail?.instance?.taskId); } - message.success(l('devops.jobinfo.job.key.success', '', { key: key })); + message.success(l('devops.jobinfo.job.key.success', '', {key: key})); } }); }; return ( - diff --git a/dinky-web/src/services/endpoints.tsx b/dinky-web/src/services/endpoints.tsx index ee951f1d8b..838899fd38 100644 --- a/dinky-web/src/services/endpoints.tsx +++ b/dinky-web/src/services/endpoints.tsx @@ -239,7 +239,7 @@ export enum API_CONSTANTS { GET_TASKMANAGER_LIST = 'api/jobInstance/getTaskManagerList', GET_TASKMANAGER_LOG = 'api/jobInstance/getTaskManagerLog', GET_JOB_METRICS_ITEMS = 'api/jobInstance/getJobMetricsItems', - CANCEL_JOB = '/api/studio/cancel', + CANCEL_JOB = '/api/task/cancel', // /api/studio/getLineage STUDIO_GET_LINEAGE = '/api/studio/getLineage', // /api/jobInstance/getLineage