Skip to content

Commit

Permalink
Add a normal stop
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 23, 2023
1 parent 14773d7 commit 9e4e35e
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ public Result<JobResult> submitTask(@RequestBody TaskSubmitDto submitDto) throws
@GetMapping("/cancel")
// @Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
public Result<Boolean> cancel(@RequestParam Integer id) {
return Result.succeed(taskService.cancelTaskJob(taskService.getTaskInfoById(id)), Status.EXECUTE_SUCCESS);
public Result<Boolean> cancel(
@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
return Result.succeed(
taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint), Status.EXECUTE_SUCCESS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public Result<JobResult> debugTask(@RequestBody TaskDTO task) throws Exception {
@GetMapping("/cancel")
@Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
public Result<Void> cancel(@RequestParam Integer id) {
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id))) {
public Result<Void> cancel(@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean withSavePoint) {
if (taskService.cancelTaskJob(taskService.getTaskInfoById(id), withSavePoint)) {
return Result.succeed(Status.EXECUTE_SUCCESS);
} else {
return Result.failed(Status.EXECUTE_FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public interface TaskService extends ISuperService<Task> {
* @param task The {@link TaskDTO} object representing the task to cancel.
* @return true if the task job is successfully cancelled, false otherwise.
*/
boolean cancelTaskJob(TaskDTO task);
boolean cancelTaskJob(TaskDTO task, boolean withSavePoint);

/**
* Get the stream graph of the given task job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,15 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception
if (!Dialect.isCommonSql(task.getDialect()) && Asserts.isNotNull(task.getJobInstanceId())) {
String status = jobInstanceService.getById(task.getJobInstanceId()).getStatus();
if (!JobStatus.isDone(status)) {
cancelTaskJob(task);
cancelTaskJob(task, true);
}
}
return submitTask(
TaskSubmitDto.builder().id(id).savePointPath(savePointPath).build());
}

@Override
public boolean cancelTaskJob(TaskDTO task) {
public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint) {
if (Dialect.isCommonSql(task.getDialect())) {
return true;
}
Expand All @@ -385,7 +385,7 @@ public boolean cancelTaskJob(TaskDTO task) {
Assert.notNull(clusterInstance, Status.CLUSTER_NOT_EXIST.getMessage());

JobManager jobManager = JobManager.build(buildJobConfig(task));
return jobManager.cancel(jobInstance.getJid());
return jobManager.cancel(jobInstance.getJid(), withSavePoint);
}

@Override
Expand Down
24 changes: 15 additions & 9 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.dinky.context.RowLevelPermissionsContext;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.enums.ProcessStepType;
import org.dinky.data.exception.BusException;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.result.ErrorResult;
import org.dinky.data.result.ExplainResult;
Expand Down Expand Up @@ -378,27 +379,32 @@ public String getJobPlanJson(String statement) {
.getJsonPlan();
}

public boolean cancel(String jobId) {
public boolean cancel(String jobId, boolean withSavePoint) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig()
.setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), null, null));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else if (useRestAPI) {
} else if (useRestAPI && withSavePoint) {
try {
// Try to savepoint, if it fails, it will stop normally(尝试进行savepoint,如果失败,即普通停止)
savepoint(jobId, SavePointType.CANCEL, null);
return true;
} catch (Exception e) {
return FlinkAPI.build(config.getAddress()).stop(jobId);
log.warn("Stop with savcePoint failed: {}, will try normal rest api stop", e.getMessage());
return cancelNormal(jobId);
}
} else {
try {
return FlinkAPI.build(config.getAddress()).stop(jobId);
} catch (Exception e) {
log.error("停止作业时集群不存在: " + e);
}
return false;
return cancelNormal(jobId);
}
}

public boolean cancelNormal(String jobId) {
try {
return FlinkAPI.build(config.getAddress()).stop(jobId);
} catch (Exception e) {
log.error("stop flink job failed:", e);
throw new BusException(e.getMessage());
}
}

Expand Down
20 changes: 15 additions & 5 deletions dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
*
*/

import { handleGetOption, handleOption } from '@/services/BusinessCrud';
import {handleGetOption, handleOption} from '@/services/BusinessCrud';
import {API_CONSTANTS} from "@/services/endpoints";

export async function explainSql(title: string, params: any) {
return handleOption('/api/task/explainSql', title, params);
Expand All @@ -32,13 +33,22 @@ export async function debugTask(title: string, params: any) {
}

export async function executeSql(title: string, id: number) {
return handleGetOption('/api/task/submitTask', title, { id });
return handleGetOption('/api/task/submitTask', title, {id});
}

export function cancelTask(title: string, id: number) {
return handleGetOption('api/task/cancel', title, { id });
export function cancelTask(title: string, id: number, withSavePoint: boolean = true) {
return handleGetOption(API_CONSTANTS.CANCEL_JOB, title, {id, withSavePoint});
}

export function restartTask(title: string, id: number, isOnLine:boolean) {
return handleGetOption(API_CONSTANTS.RESTART_TASK, title, {id, isOnLine});
}
export function savePointTask(title: string, taskId: number, savePointType:string) {
return handleGetOption(API_CONSTANTS.SAVEPOINT, title, {taskId, savePointType});
}



export function changeTaskLife(title = '', id: number, life: number) {
return handleGetOption('api/task/changeTaskLife', title, { taskId: id, lifeCycle: life });
return handleGetOption('api/task/changeTaskLife', title, {taskId: id, lifeCycle: life});
}
63 changes: 26 additions & 37 deletions dinky-web/src/pages/DevOps/JobDetail/JobOperator/JobOperator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*
*/

import { cancelTask } from '@/pages/DataStudio/HeaderContainer/service';
import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants';
import { isStatusDone } from '@/pages/DevOps/function';
import { getData, postAll } from '@/services/api';
import { API_CONSTANTS } from '@/services/endpoints';
import { Jobs } from '@/types/DevOps/data';
import { l } from '@/utils/intl';
import { EllipsisOutlined, RedoOutlined } from '@ant-design/icons';
import { Button, Dropdown, message, Modal, Space } from 'antd';
import {cancelTask, restartTask, savePointTask} from '@/pages/DataStudio/HeaderContainer/service';
import {JOB_LIFE_CYCLE} from '@/pages/DevOps/constants';
import {isStatusDone} from '@/pages/DevOps/function';
import {getData, postAll} from '@/services/api';
import {API_CONSTANTS} from '@/services/endpoints';
import {Jobs} from '@/types/DevOps/data';
import {l} from '@/utils/intl';
import {EllipsisOutlined, RedoOutlined} from '@ant-design/icons';
import {Button, Dropdown, message, Modal, Space} from 'antd';

const operatorType = {
RESTART_JOB: 'restart',
Expand All @@ -40,52 +40,37 @@ export type OperatorType = {
refesh: (isForce: boolean) => void;
};
const JobOperator = (props: OperatorType) => {
const { jobDetail, refesh } = props;
const {jobDetail, refesh} = props;
const webUri = `/api/flink/${jobDetail?.history?.jobManagerAddress}/#/job/running/${jobDetail?.instance?.jid}/overview`;

const handleJobOperator = (key: string) => {
Modal.confirm({
title: l('devops.jobinfo.job.key', '', { key: key }),
content: l('devops.jobinfo.job.keyConfirm', '', { key: key }),
title: l('devops.jobinfo.job.key', '', {key: key}),
content: l('devops.jobinfo.job.keyConfirm', '', {key: key}),
okText: l('button.confirm'),
cancelText: l('button.cancel'),
onOk: async () => {
if (key == operatorType.CANCEL_JOB) {
postAll(API_CONSTANTS.CANCEL_JOB, {
clusterId: jobDetail?.clusterInstance?.id,
jobId: jobDetail?.instance?.jid
});
cancelTask('', jobDetail?.instance?.taskId, false);
} else if (key == operatorType.RESTART_JOB) {
getData(API_CONSTANTS.RESTART_TASK, {
id: jobDetail?.instance?.taskId,
isOnLine: jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH
});
restartTask('', jobDetail?.instance?.taskId, jobDetail?.instance?.step == JOB_LIFE_CYCLE.PUBLISH)
} else if (key == operatorType.SAVEPOINT_CANCEL) {
getData(API_CONSTANTS.SAVEPOINT, {
taskId: jobDetail?.instance?.taskId,
savePointType: 'cancel'
});
savePointTask('', jobDetail?.instance?.taskId, 'cancel')
} else if (key == operatorType.SAVEPOINT_STOP) {
getData(API_CONSTANTS.SAVEPOINT, {
taskId: jobDetail?.instance?.taskId,
savePointType: 'stop'
});
savePointTask('', jobDetail?.instance?.taskId, 'stop')
} else if (key == operatorType.SAVEPOINT_TRIGGER) {
getData(API_CONSTANTS.SAVEPOINT, {
taskId: jobDetail?.instance?.taskId,
savePointType: 'trigger'
});
} else {
savePointTask('', jobDetail?.instance?.taskId, 'trigger')
} else if (key == operatorType.AUTO_STOP) {
cancelTask('', jobDetail?.instance?.taskId);
}
message.success(l('devops.jobinfo.job.key.success', '', { key: key }));
message.success(l('devops.jobinfo.job.key.success', '', {key: key}));
}
});
};

return (
<Space>
<Button icon={<RedoOutlined />} onClick={() => refesh(true)} />
<Button icon={<RedoOutlined/>} onClick={() => refesh(true)}/>

<Button key='flinkwebui' href={webUri} target={'_blank'}>
FlinkWebUI
Expand Down Expand Up @@ -131,12 +116,16 @@ const JobOperator = (props: OperatorType) => {
{
key: operatorType.SAVEPOINT_CANCEL,
label: l('devops.jobinfo.savepoint.cancel')
},
{
key: operatorType.CANCEL_JOB,
label: l('devops.jobinfo.savepoint.canceljob')
}
]
}}
>
<Button key='4' style={{ padding: '0 8px' }}>
<EllipsisOutlined />
<Button key='4' style={{padding: '0 8px'}}>
<EllipsisOutlined/>
</Button>
</Dropdown>
</>
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/services/endpoints.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export enum API_CONSTANTS {
GET_TASKMANAGER_LIST = 'api/jobInstance/getTaskManagerList',
GET_TASKMANAGER_LOG = 'api/jobInstance/getTaskManagerLog',
GET_JOB_METRICS_ITEMS = 'api/jobInstance/getJobMetricsItems',
CANCEL_JOB = '/api/studio/cancel',
CANCEL_JOB = '/api/task/cancel',
// /api/studio/getLineage
STUDIO_GET_LINEAGE = '/api/studio/getLineage',
// /api/jobInstance/getLineage
Expand Down

0 comments on commit 9e4e35e

Please sign in to comment.