From 3a7e1bf995298b51564d0150afd74d025179e384 Mon Sep 17 00:00:00 2001 From: zixi0825 Date: Thu, 31 Oct 2024 22:04:03 +0800 Subject: [PATCH] [Improve][Server] Improve job scheduler --- .../common/enums/ExecutionStatus.java | 4 +- .../coordinator/cache/JobExecuteManager.java | 4 ++ .../dqc/coordinator/runner/JobScheduler.java | 50 ++++++++++++------- .../server/repository/entity/Command.java | 3 ++ .../repository/mapper/CommandMapper.java | 8 +-- .../repository/service/CommandService.java | 8 ++- .../service/impl/CommandServiceImpl.java | 28 ++++++++++- .../service/impl/JobExecutionServiceImpl.java | 24 ++++++--- .../service/impl/JobExternalService.java | 8 ++- .../service/impl/JobServiceImpl.java | 6 ++- .../Main/HomeDetail/Jobs/JobsInstance.tsx | 2 +- scripts/sql/datavines-mysql.sql | 1 + 12 files changed, 105 insertions(+), 41 deletions(-) diff --git a/datavines-common/src/main/java/io/datavines/common/enums/ExecutionStatus.java b/datavines-common/src/main/java/io/datavines/common/enums/ExecutionStatus.java index 4519f1168..f5b0533e2 100644 --- a/datavines-common/src/main/java/io/datavines/common/enums/ExecutionStatus.java +++ b/datavines-common/src/main/java/io/datavines/common/enums/ExecutionStatus.java @@ -53,7 +53,7 @@ public enum ExecutionStatus { NEED_FAULT_TOLERANCE(8, "need fault tolerance","需要容错"), KILL(9, "kill", "强制终止"), WAITING_THREAD(10, "waiting thread", "等待线程"), - WAITING_DEPEND(11, "waiting depend node complete",""); + WAITING_SUMMIT(11, "waiting_summit","待提交"); ExecutionStatus(int code, String description,String zhDescription){ this.code = code; @@ -132,7 +132,7 @@ public boolean typeIsStop() { * @return status */ public boolean typeIsRunning() { - return this == RUNNING_EXECUTION || this == WAITING_DEPEND; + return this == RUNNING_EXECUTION; } /** diff --git a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/cache/JobExecuteManager.java b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/cache/JobExecuteManager.java index eb738a0c6..60503d00a 100644 --- a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/cache/JobExecuteManager.java +++ b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/cache/JobExecuteManager.java @@ -431,6 +431,10 @@ private void doKillCommand(Long jobExecutionId) { JobRunner jobRunner = jobExecutionContext.getJobRunner(); jobRunner.kill(); } else { + if (unFinishedJobExecutionMap.get(jobExecutionId) == null) { + return; + } + unFinishedJobExecutionMap.remove(jobExecutionId); JobExecution jobExecution = jobExternalService.getJobExecutionById(jobExecutionId); if (jobExecution != null) { diff --git a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/runner/JobScheduler.java b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/runner/JobScheduler.java index 377921611..031aee1df 100644 --- a/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/runner/JobScheduler.java +++ b/datavines-server/src/main/java/io/datavines/server/dqc/coordinator/runner/JobScheduler.java @@ -16,6 +16,8 @@ */ package io.datavines.server.dqc.coordinator.runner; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import io.datavines.common.enums.ExecutionStatus; import io.datavines.common.utils.*; import io.datavines.server.dqc.coordinator.cache.JobExecuteManager; import io.datavines.server.registry.Register; @@ -28,6 +30,7 @@ import io.datavines.server.utils.SpringApplicationContext; import io.datavines.server.repository.entity.Command; +import java.util.List; import java.util.Map; import static io.datavines.common.CommonConstants.*; @@ -59,16 +62,31 @@ public void run() { while (Stopper.isRunning()) { Command command = null; try { + + String executeHost = NetUtils.getAddr(CommonPropertyUtils.getInt(CommonPropertyUtils.SERVER_PORT, CommonPropertyUtils.SERVER_PORT_DEFAULT)); + + // 获取执行地址在本机的Kill命令 + List commandList = jobExternalService.listKillCommandByExecuteHost(executeHost); + if (CollectionUtils.isNotEmpty(commandList)) { + commandList.forEach(killCommand -> { + if (killCommand != null) { + jobExecuteManager.addKillCommand(killCommand.getJobExecutionId()); + logger.info(String.format("kill job execution %s in %s", killCommand.getJobExecutionId(), executeHost)); + jobExternalService.deleteCommandById(killCommand.getId()); + } + }); + } + boolean runCheckFlag = OSUtils.checkResource( CommonPropertyUtils.getDouble(MAX_CPU_LOAD_AVG, MAX_CPU_LOAD_AVG_DEFAULT), CommonPropertyUtils.getDouble(RESERVED_MEMORY, RESERVED_MEMORY_DEFAULT)); if (!runCheckFlag) { - ThreadUtils.sleep(SLEEP_TIME_MILLIS*10); + ThreadUtils.sleep(SLEEP_TIME_MILLIS * 10); continue; } - command = jobExternalService.getCommand(register.getTotalSlot(), register.getSlot()); + command = jobExternalService.getStartCommand(register.getTotalSlot(), register.getSlot()); if (command != null) { String parameter = command.getParameter(); String engineType = LOCAL; @@ -79,24 +97,20 @@ public void run() { } } - if (CommandType.START == command.getType()) { - JobExecution jobExecution = jobExternalService.executeCommand(command); - if (jobExecution == null) { - logger.warn(String.format("job execution not found , command : %s", JSONUtils.toJsonString(command))); - jobExternalService.deleteCommandById(command.getId()); - continue; - } + JobExecution jobExecution = jobExternalService.executeCommand(command); + if (jobExecution == null) { + logger.warn(String.format("job execution not found , command : %s", JSONUtils.toJsonString(command))); + jobExternalService.deleteCommandById(command.getId()); + continue; + } - if (!executionOutOfThreshold(engineType)) { - logger.info("start submit job execution : {} ", JSONUtils.toJsonString(jobExecution)); - jobExecuteManager.addExecuteCommand(jobExecution); - logger.info(String.format("submit success, job execution : %s", jobExecution.getName()) ); - jobExternalService.deleteCommandById(command.getId()); - } - } else if (CommandType.STOP == command.getType()) { - jobExecuteManager.addKillCommand(command.getJobExecutionId()); - logger.info(String.format("kill job execution : %s", command.getJobExecutionId()) ); + if (!executionOutOfThreshold(engineType)) { + logger.info("start submit job execution : {} ", JSONUtils.toJsonString(jobExecution)); + jobExecuteManager.addExecuteCommand(jobExecution); + logger.info(String.format("submit success, job execution : %s", jobExecution.getName()) ); jobExternalService.deleteCommandById(command.getId()); + jobExecution.setStatus(ExecutionStatus.SUBMITTED_SUCCESS); + jobExternalService.updateJobExecution(jobExecution); } ThreadUtils.sleep(SLEEP_TIME_MILLIS); diff --git a/datavines-server/src/main/java/io/datavines/server/repository/entity/Command.java b/datavines-server/src/main/java/io/datavines/server/repository/entity/Command.java index 5d0849b2b..b6575fa7c 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/entity/Command.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/entity/Command.java @@ -48,6 +48,9 @@ public class Command implements Serializable { @TableField(value = "job_execution_id") private Long jobExecutionId; + @TableField(value = "execute_host") + private String executeHost; + @TableField(value = "priority") private Priority priority; diff --git a/datavines-server/src/main/java/io/datavines/server/repository/mapper/CommandMapper.java b/datavines-server/src/main/java/io/datavines/server/repository/mapper/CommandMapper.java index 1ce732f03..af3d567bd 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/mapper/CommandMapper.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/mapper/CommandMapper.java @@ -25,10 +25,6 @@ @Mapper public interface CommandMapper extends BaseMapper { - /** - * SELECT BY ID - * @return - */ - @Select("SELECT * from dv_command where type in (0,1) and id % #{totalSlot} = #{currentSlot} order by update_time limit 1 ") - Command getOne(@Param("totalSlot") int totalSlot, @Param("currentSlot") int currentSlot); + @Select("SELECT * from dv_command where type in (0,1) and id % #{totalSlot} = #{currentSlot} and type = 0 order by update_time limit 1 ") + Command getStartCommand(@Param("totalSlot") int totalSlot, @Param("currentSlot") int currentSlot); } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/CommandService.java b/datavines-server/src/main/java/io/datavines/server/repository/service/CommandService.java index 68468550c..be291551a 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/CommandService.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/CommandService.java @@ -19,6 +19,8 @@ import com.baomidou.mybatisplus.extension.service.IService; import io.datavines.server.repository.entity.Command; +import java.util.List; + public interface CommandService extends IService { long insert(Command command); @@ -27,7 +29,11 @@ public interface CommandService extends IService { Command getById(long id); - Command getOne(int totalSlot, int currentSlot); + Command getStartCommand(int totalSlot, int currentSlot); int deleteById(long id); + + boolean deleteByJobExecutionId(Long jobExecutionId); + + List listKillCommandByExecuteHost(String executeHost); } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CommandServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CommandServiceImpl.java index e68ebe7cd..d45ff8d09 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CommandServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/CommandServiceImpl.java @@ -16,6 +16,9 @@ */ package io.datavines.server.repository.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import io.datavines.server.enums.CommandType; import io.datavines.server.repository.entity.Command; import io.datavines.server.repository.mapper.CommandMapper; import io.datavines.server.repository.service.CommandService; @@ -23,6 +26,9 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import java.util.ArrayList; +import java.util.List; + @Service("commandService") public class CommandServiceImpl extends ServiceImpl implements CommandService { @@ -43,12 +49,30 @@ public Command getById(long id) { } @Override - public Command getOne(int totalSlot, int currentSlot) { - return baseMapper.getOne(totalSlot, currentSlot); + public Command getStartCommand(int totalSlot, int currentSlot) { + return baseMapper.getStartCommand(totalSlot, currentSlot); } @Override public int deleteById(long id) { return baseMapper.deleteById(id); } + + @Override + public boolean deleteByJobExecutionId(Long jobExecutionId) { + return remove(new LambdaQueryWrapper().eq(Command::getJobExecutionId, jobExecutionId)); + } + + @Override + public List listKillCommandByExecuteHost(String executeHost) { + List commands = list(new LambdaQueryWrapper() + .eq(Command::getType, CommandType.STOP) + .eq(Command::getExecuteHost, executeHost).last("limit 20")); + + if (CollectionUtils.isEmpty(commands)) { + return new ArrayList<>(); + } + + return commands; + } } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java index 07568a376..e4f781c2b 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java @@ -204,15 +204,23 @@ public Long killJob(Long jobExecutionId) { return jobExecutionId; } - Command command = new Command(); - Map parameter = new HashMap<>(); - parameter.put("engine", jobExecution.getEngineType()); + boolean deleteCommandResult = false; + if (ExecutionStatus.WAITING_SUMMIT == jobExecution.getStatus()) { + deleteCommandResult = commandService.deleteByJobExecutionId(jobExecutionId); + } - command.setType(CommandType.STOP); - command.setPriority(Priority.MEDIUM); - command.setParameter(JSONUtils.toJsonString(parameter)); - command.setJobExecutionId(jobExecutionId); - commandService.insert(command); + if (!deleteCommandResult) { + Command command = new Command(); + Map parameter = new HashMap<>(); + parameter.put("engine", jobExecution.getEngineType()); + + command.setType(CommandType.STOP); + command.setPriority(Priority.MEDIUM); + command.setParameter(JSONUtils.toJsonString(parameter)); + command.setJobExecutionId(jobExecutionId); + command.setExecuteHost(jobExecution.getExecuteHost()); + commandService.insert(command); + } return jobExecutionId; } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExternalService.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExternalService.java index 4fc43b973..733a83838 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExternalService.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExternalService.java @@ -81,8 +81,12 @@ public JobExecution getJobExecutionById(Long id){ return jobExecutionService.getById(id); } - public Command getCommand(int totalSlot, int currentSlot){ - return commandService.getOne(totalSlot, currentSlot); + public Command getStartCommand(int totalSlot, int currentSlot) { + return commandService.getStartCommand(totalSlot, currentSlot); + } + + public List listKillCommandByExecuteHost(String executeHost) { + return commandService.listKillCommandByExecuteHost(executeHost); } public CommonTaskCommand getCatalogCommand(int totalSlot, int currentSlot){ diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobServiceImpl.java index e099187b3..b7145750a 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobServiceImpl.java @@ -529,11 +529,15 @@ private Long executeJob(Job job, LocalDateTime scheduleTime) { jobExecutionService.save(jobExecution); + Map parameter = new HashMap<>(); + parameter.put("engine", jobExecution.getEngineType()); + // add a command Command command = new Command(); command.setType(CommandType.START); command.setPriority(Priority.MEDIUM); command.setJobExecutionId(jobExecution.getId()); + command.setParameter(JSONUtils.toJsonString(parameter)); commandService.insert(command); return jobExecution.getId(); @@ -585,7 +589,7 @@ private JobExecution getJobExecution(Job job, LocalDateTime scheduleTime) { jobExecution.setErrorDataStorageType(errorDataStorageType); jobExecution.setErrorDataStorageParameter(errorDataStorageParameter); jobExecution.setErrorDataFileName(getErrorDataFileName(job.getParameter())); - jobExecution.setStatus(ExecutionStatus.SUBMITTED_SUCCESS); + jobExecution.setStatus(ExecutionStatus.WAITING_SUMMIT); jobExecution.setTenantCode(tenantStr); jobExecution.setEnv(envStr); jobExecution.setSubmitTime(LocalDateTime.now()); diff --git a/datavines-ui/src/view/Main/HomeDetail/Jobs/JobsInstance.tsx b/datavines-ui/src/view/Main/HomeDetail/Jobs/JobsInstance.tsx index b2775aa79..30a1edc17 100644 --- a/datavines-ui/src/view/Main/HomeDetail/Jobs/JobsInstance.tsx +++ b/datavines-ui/src/view/Main/HomeDetail/Jobs/JobsInstance.tsx @@ -160,7 +160,7 @@ const JobsInstance = () => { width: 300, render: (text: string, record: TJobsInstanceTableItem) => ( <> - + { onStop(record); }}>{intl.formatMessage({ id: 'jobs_task_stop_btn' })} { onLog(record); }}>{intl.formatMessage({ id: 'jobs_task_log_btn' })} diff --git a/scripts/sql/datavines-mysql.sql b/scripts/sql/datavines-mysql.sql index 7ffbe67e7..baeb8c59a 100644 --- a/scripts/sql/datavines-mysql.sql +++ b/scripts/sql/datavines-mysql.sql @@ -403,6 +403,7 @@ CREATE TABLE `dv_command` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `type` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Command type: 0 start task, 1 stop task', `parameter` text COMMENT 'json command parameters', + `execute_host` varchar(255) COMMENT 'job execute host', `job_execution_id` bigint(20) NOT NULL COMMENT 'task id', `priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',