Skip to content

Commit

Permalink
[Optimize][Flink]optimize flink mission cancel (DataLinkDC#2389)
Browse files Browse the repository at this point in the history
* optimize-cancel

* optimize-cancel
  • Loading branch information
zackyoungh authored Oct 16, 2023
1 parent 28b7440 commit f8fb133
Show file tree
Hide file tree
Showing 12 changed files with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.dinky.data.model.JobInstance;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.process.exception.ExcuteException;
Expand Down Expand Up @@ -89,8 +90,7 @@ public Result<Boolean> cancel(@RequestParam Integer id) {
@GetMapping(value = "/restartTask")
@ApiOperation("Restart Task")
// @Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION)
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath)
throws ExcuteException {
public Result<JobResult> restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException {
return Result.succeed(taskService.restartTask(id, savePointPath));
}

Expand All @@ -99,7 +99,8 @@ public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam Str
@ApiOperation("Savepoint Trigger")
public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestParam String savePointType) {
return Result.succeed(
taskService.savepointTaskJob(taskService.getTaskInfoById(taskId), savePointType),
taskService.savepointTaskJob(
taskService.getTaskInfoById(taskId), SavePointType.valueOf(savePointType.toUpperCase())),
Status.EXECUTE_SUCCESS);
}

Expand Down
14 changes: 4 additions & 10 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.process.exception.ExcuteException;
Expand Down Expand Up @@ -90,8 +91,7 @@ public Result<Boolean> cancel(@RequestParam Integer id) {
@GetMapping(value = "/restartTask")
@ApiOperation("Restart Task")
@Log(title = "Restart Task", businessType = BusinessType.REMOTE_OPERATION)
public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam String savePointPath)
throws ExcuteException {
public Result<JobResult> restartTask(@RequestParam Integer id, String savePointPath) throws ExcuteException {
return Result.succeed(taskService.restartTask(id, savePointPath));
}

Expand All @@ -100,7 +100,8 @@ public Result<JobResult> restartTask(@RequestParam Integer id, @RequestParam Str
@ApiOperation("Savepoint Trigger")
public Result<SavePointResult> savepoint(@RequestParam Integer taskId, @RequestParam String savePointType) {
return Result.succeed(
taskService.savepointTaskJob(taskService.getTaskInfoById(taskId), savePointType),
taskService.savepointTaskJob(
taskService.getTaskInfoById(taskId), SavePointType.valueOf(savePointType.toUpperCase())),
Status.EXECUTE_SUCCESS);
}

Expand All @@ -111,13 +112,6 @@ public Result<Boolean> onLineTask(@RequestParam Integer taskId) {
return Result.succeed(taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.ONLINE));
}

@GetMapping("/offLineTask")
@Log(title = "offLineTask", businessType = BusinessType.TRIGGER)
@ApiOperation("offLineTask")
public Result<Boolean> offLineTask(@RequestParam Integer taskId) {
return Result.succeed(taskService.changeTaskLifeRecyle(taskId, JobLifeCycle.DEVELOP));
}

@PostMapping("/explainSql")
@ApiOperation("Explain Sql")
public Result<List<SqlExplainResult>> explainSql(@RequestBody TaskDTO taskDTO) throws NotSupportExplainExcepition {
Expand Down
3 changes: 2 additions & 1 deletion dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.dinky.data.model.Task;
import org.dinky.data.result.Result;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.job.JobResult;
import org.dinky.mybatis.service.ISuperService;
Expand Down Expand Up @@ -85,7 +86,7 @@ public interface TaskService extends ISuperService<Task> {
* @param savePointType The type of savepoint to create.
* @return A {@link SavePointResult} object representing the savepoint result.
*/
SavePointResult savepointTaskJob(TaskDTO task, String savePointType);
SavePointResult savepointTaskJob(TaskDTO task, SavePointType savePointType);

/**
* Explain the given task and return a list of SQL explain results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.dinky.data.model.ClusterConfiguration;
import org.dinky.data.model.DataBase;
import org.dinky.data.model.Jar;
import org.dinky.data.model.JobInfoDetail;
import org.dinky.data.model.JobInstance;
import org.dinky.data.model.JobModelOverview;
import org.dinky.data.model.JobTypeOverView;
Expand All @@ -59,6 +58,7 @@
import org.dinky.function.util.UDFUtil;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.enums.SavePointStrategy;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.model.FlinkClusterConfig;
import org.dinky.gateway.model.JobInfo;
import org.dinky.gateway.result.SavePointResult;
Expand Down Expand Up @@ -312,13 +312,11 @@ public boolean cancelTaskJob(TaskDTO task) {
Assert.notNull(cluster, Status.CLUSTER_NOT_EXIST.getMessage());

JobManager jobManager = JobManager.build(buildJobConfig(task));
boolean cancelled = jobManager.cancel(jobInstance.getJid());
JobInfoDetail jobInfoDetail = jobInstanceService.refreshJobInfoDetail(jobInstance.getId());
return cancelled;
return jobManager.cancel(jobInstance.getJid());
}

@Override
public SavePointResult savepointTaskJob(TaskDTO task, String savePointType) {
public SavePointResult savepointTaskJob(TaskDTO task, SavePointType savePointType) {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
Assert.notNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage());

Expand All @@ -330,8 +328,8 @@ public SavePointResult savepointTaskJob(TaskDTO task, String savePointType) {
for (JobInfo item : savePointResult.getJobInfos()) {
if (Asserts.isEqualsIgnoreCase(jobId, item.getJobId()) && Asserts.isNotNull(jobInstance.getTaskId())) {
Savepoints savepoints = new Savepoints();
savepoints.setName(savePointType);
savepoints.setType(savePointType);
savepoints.setName(savePointType.getValue());
savepoints.setType(savePointType.getValue());
savepoints.setPath(item.getSavePoint());
savepoints.setTaskId(task.getId());
savepointsService.save(savepoints);
Expand Down
5 changes: 2 additions & 3 deletions dinky-core/src/main/java/org/dinky/api/FlinkAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,11 @@ public boolean stop(String jobId) {
}

@SuppressWarnings("checkstyle:Indentation")
public SavePointResult savepoints(String jobId, String savePointType, Map<String, String> taskConfig) {
SavePointType type = SavePointType.get(savePointType);
public SavePointResult savepoints(String jobId, SavePointType savePointType, Map<String, String> taskConfig) {
JobInfo jobInfo = new JobInfo(jobId);
Map<String, Object> paramMap = new HashMap<>(8);
String paramType = null;
switch (type) {
switch (savePointType) {
case CANCEL:
paramMap.put(CANCEL_JOB, true);
paramType = FlinkRestAPIConstant.SAVEPOINTS;
Expand Down
14 changes: 12 additions & 2 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.ActionType;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
Expand Down Expand Up @@ -633,6 +634,14 @@ public boolean cancel(String jobId) {
.setFlinkConfig(FlinkConfig.build(jobId, ActionType.CANCEL.getValue(), null, null));
Gateway.build(config.getGatewayConfig()).savepointJob();
return true;
} else if (useRestAPI) {
try {
// Try to savepoint, if it fails, it will stop normally(尝试进行savepoint,如果失败,即普通停止)
savepoint(jobId, SavePointType.CANCEL, null);
return true;
} catch (Exception e) {
return FlinkAPI.build(config.getAddress()).stop(jobId);
}
} else {
try {
return FlinkAPI.build(config.getAddress()).stop(jobId);
Expand All @@ -643,10 +652,11 @@ public boolean cancel(String jobId) {
}
}

public SavePointResult savepoint(String jobId, String savePointType, String savePoint) {
public SavePointResult savepoint(String jobId, SavePointType savePointType, String savePoint) {
if (useGateway && !useRestAPI) {
config.getGatewayConfig()
.setFlinkConfig(FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(), savePointType, null));
.setFlinkConfig(
FlinkConfig.build(jobId, ActionType.SAVEPOINT.getValue(), savePointType.getValue(), null));
return Gateway.build(config.getGatewayConfig()).savepointJob(savePoint);
} else {
return FlinkAPI.build(config.getAddress()).savepoints(jobId, savePointType, config.getConfigJson());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.core;

import org.dinky.api.FlinkAPI;
import org.dinky.gateway.enums.SavePointType;
import org.dinky.gateway.result.SavePointResult;

import java.util.List;
Expand Down Expand Up @@ -49,7 +50,7 @@ public void savepointTest() {
// JsonNode savepointInfo =
// FlinkAPI.build(address).getSavepointInfo("602ad9d03b872dba44267432d1a2a3b2","04044589477a973a32e7dd53e1eb20fd");
SavePointResult savepoints =
FlinkAPI.build(address).savepoints("243b97597448edbd2e635fc3d25b1064", "trigger", null);
FlinkAPI.build(address).savepoints("243b97597448edbd2e635fc3d25b1064", SavePointType.TRIGGER, null);
LOGGER.info(savepoints.toString());
}

Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ const HeaderContainer = (props: any) => {
const handleChangeJobLife = async () => {
if (!currentData) return;
if (isOnline(currentData)) {
await offLinelTask(currentData.id);
await cancelTask("",currentData.id);
currentData.step = JOB_LIFE_CYCLE.DEVELOP;
} else {
const saved = await handleSave();
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/pages/DataStudio/HeaderContainer/service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function onLineTask(id: number) {
}

export function offLinelTask(id: number) {
return handleGetOption('api/task/offLineTask', '', { taskId: id });
return handleGetOption('api/task/cancel', '', { taskId: id });
}

export const isSql = (dialect: string) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const DiffModal: React.FC<DiffModalProps> = (props) => {
{l('pages.datastudio.sql.sqlChanged')}-{fileName}
</div>
}
onCancel={()=>onUse(false)}
open={open}
footer={null}
width={'60%'}
Expand All @@ -104,8 +105,8 @@ const DiffModal: React.FC<DiffModalProps> = (props) => {
<Tabs
tabBarExtraContent={
<Space>
<Link onClick={() => onUse(false)}>{l('pages.datastudio.sql.useCache')}</Link>
<Link onClick={() => onUse(true)}>{l('pages.datastudio.sql.useServer')}</Link>
<Link onClick={() => onUse(false)}>{l('pages.datastudio.sql.useCache')}</Link>
</Space>
}
items={[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { API_CONSTANTS } from '@/services/endpoints';
import { l } from '@/utils/intl';
import { EllipsisOutlined } from '@ant-design/icons';
import { Button, Dropdown, message, Modal, Space } from 'antd';
import {cancelTask} from "@/pages/DataStudio/HeaderContainer/service";

const operatorType = {
RESTART_JOB: 'restart',
Expand Down Expand Up @@ -36,7 +37,7 @@ const JobOperator = (props: JobProps) => {
isOnLine: jobDetail?.instance?.step == JOB_LIFE_CYCLE.ONLINE
});
} else {
getData(API_CONSTANTS.OFFLINE_TASK, { id: jobDetail?.instance?.taskId, type: key });
cancelTask("",jobDetail?.instance?.taskId)
}
message.success(l('devops.jobinfo.job.key.success', '', { key: key }));
}
Expand Down Expand Up @@ -90,10 +91,6 @@ const JobOperator = (props: JobProps) => {
{
key: operatorType.SAVEPOINT_CANCEL,
label: l('devops.jobinfo.savepoint.cancel')
},
{
key: operatorType.CANCEL_JOB,
label: l('devops.jobinfo.savepoint.canceljob')
}
]
}}
Expand Down
1 change: 0 additions & 1 deletion dinky-web/src/services/endpoints.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ export enum API_CONSTANTS {
STUDIO_GET_LINEAGE = '/api/studio/getLineage',
// /api/jobInstance/getLineage
JOB_INSTANCE_GET_LINEAGE = '/api/jobInstance/getLineage',
OFFLINE_TASK = '/api/task/offLineTask',
RESTART_TASK = '/api/task/restartTask',
RESTART_TASK_FROM_CHECKPOINT = '/api/task/selectSavePointRestartTask',
GET_SAVEPOINTS = '/api/savepoints',
Expand Down

0 comments on commit f8fb133

Please sign in to comment.