diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index 1d58b27cf5..a3c8cc0821 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -22,6 +22,7 @@ import org.dinky.DinkyVersion; import org.dinky.data.annotations.Log; import org.dinky.data.dto.APISavePointTaskDTO; +import org.dinky.data.dto.CreatingCatalogueTaskDTO; import org.dinky.data.dto.TaskDTO; import org.dinky.data.dto.TaskSubmitDto; import org.dinky.data.enums.BusinessType; @@ -33,11 +34,13 @@ import org.dinky.gateway.enums.SavePointType; import org.dinky.gateway.result.SavePointResult; import org.dinky.job.JobResult; +import org.dinky.service.APIService; import org.dinky.service.JobInstanceService; import org.dinky.service.TaskService; import java.util.List; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -65,8 +68,12 @@ public class APIController { private final TaskService taskService; + private final JobInstanceService jobInstanceService; + @Autowired + private final APIService apiService; + @GetMapping("/version") @ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number") public Result getVersionInfo() { @@ -202,4 +209,19 @@ public Result getTaskLineage(@RequestParam Integer id) { taskService.initTenantByTaskId(id); return Result.succeed(taskService.getTaskLineage(id), Status.QUERY_SUCCESS); } + + @PostMapping("/createTaskAndSend2Ds") + @ApiOperation("Create Catalogues & Task and Send to DolphinScheduler") + @Log(title = "Create Catalogues & Task and Send to DolphinScheduler", businessType = BusinessType.OTHER) + public Result createTaskAndSend2Ds(@RequestBody CreatingCatalogueTaskDTO dto) { + return Result.succeed(apiService.createTaskAndSend2Ds(dto)); + } + + @PostMapping("/saveTask") + @ApiOperation("Save a task") + @Log(title = "Save a task", businessType = BusinessType.OTHER) + public Result saveTask(@RequestBody TaskDTO dto) { + apiService.saveTask(dto); + return Result.succeed(); + } } diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/CreatingCatalogueTaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/CreatingCatalogueTaskDTO.java new file mode 100644 index 0000000000..0445583a9b --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/dto/CreatingCatalogueTaskDTO.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.dto; + +import org.dinky.scheduler.model.DinkyTaskRequest; + +import java.util.List; + +import lombok.Data; + +@Data +public class CreatingCatalogueTaskDTO { + /** + * 目录名称列表 + * 例子:["catalogue1", "catalogue2"] + * 数组第 0 个元素为根目录,依次下一个元素是上一个元素的子目录 + * 目录不存在会新建,已存在就保持原来的目录 + */ + private List catalogueNames; + /** + * 作业类型:FlinkSql、Mysql 等,详见 dinky 创建作业时的作业类型下拉菜单 + */ + private String type; + /** + * 任务信息 + * 例子:{"name": "test", "note": "作业描述", "statement": "sql 语句", "type": "kubernetes-session", "clusterId": 36} + * 例子只列出了部分属性,其他属性请参考 TaskDTO 类 + */ + private TaskDTO task; + /** + * Dinky 推送时的作业配置 + * 例子: {"delayTime": 0, "taskPriority": "MEDIUM", "failRetryInterval": 2, "failRetryTimes": 3, "flag": "YES" } + * 例子只列出了部分属性,其他属性请参考 DinkyTaskRequest 类 + */ + private DinkyTaskRequest jobConfig; +} diff --git a/dinky-admin/src/main/java/org/dinky/service/APIService.java b/dinky-admin/src/main/java/org/dinky/service/APIService.java index 0c362b6b7e..377755626f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/APIService.java +++ b/dinky-admin/src/main/java/org/dinky/service/APIService.java @@ -19,9 +19,16 @@ package org.dinky.service; +import org.dinky.data.dto.CreatingCatalogueTaskDTO; +import org.dinky.data.dto.TaskDTO; + /** * APIService * * @since 2021/12/11 21:45 */ -public interface APIService {} +public interface APIService { + Integer createTaskAndSend2Ds(CreatingCatalogueTaskDTO dto); + + void saveTask(TaskDTO dto); +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/APIServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/APIServiceImpl.java index 99fdeec261..a11c6055c8 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/APIServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/APIServiceImpl.java @@ -19,8 +19,19 @@ package org.dinky.service.impl; +import org.dinky.data.dto.CatalogueTaskDTO; +import org.dinky.data.dto.CreatingCatalogueTaskDTO; +import org.dinky.data.dto.TaskDTO; +import org.dinky.data.enums.JobLifeCycle; +import org.dinky.data.model.Catalogue; +import org.dinky.data.model.Task; +import org.dinky.scheduler.model.DinkyTaskRequest; import org.dinky.service.APIService; +import org.dinky.service.SchedulerService; +import org.dinky.service.TaskService; +import org.dinky.service.catalogue.CatalogueService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; @@ -34,4 +45,73 @@ @Service @RequiredArgsConstructor @Slf4j -public class APIServiceImpl implements APIService {} +public class APIServiceImpl implements APIService { + @Autowired + private TaskService taskService; + + @Autowired + private SchedulerService schedulerService; + + @Autowired + private CatalogueService catalogueService; + + /** + * 创建目录、任务并推送到 DolphinScheduler + * + * @param dto CreateCatalogueTaskDTO + * @return Integer taskId + */ + @Override + public Integer createTaskAndSend2Ds(CreatingCatalogueTaskDTO dto) { + int parentId = 0; + for (String catalogueName : dto.getCatalogueNames()) { + Catalogue catalogue = catalogueService.findByParentIdAndName(parentId, catalogueName); + // 目录不存在则创建 + if (catalogue == null) { + catalogue = new Catalogue(); + catalogue.setName(catalogueName); + catalogue.setIsLeaf(false); + catalogue.setParentId(parentId); + catalogueService.save(catalogue); + } + parentId = catalogue.getId(); + } + TaskDTO taskDTO = dto.getTask(); + CatalogueTaskDTO catalogueTaskDTO = new CatalogueTaskDTO(); + catalogueTaskDTO.setLeaf(false); + catalogueTaskDTO.setName(taskDTO.getName()); + catalogueTaskDTO.setNote(taskDTO.getNote()); + catalogueTaskDTO.setParentId(parentId); + catalogueTaskDTO.setType(dto.getType()); + catalogueTaskDTO.setTask(taskDTO); + // 保存任务 + Catalogue catalogue = catalogueService.saveOrUpdateCatalogueAndTask(catalogueTaskDTO); + + // 发布任务 + try { + taskService.changeTaskLifeRecyle(catalogue.getTaskId(), JobLifeCycle.PUBLISH); + } catch (Exception e) { + log.error(e.getMessage()); + throw new RuntimeException(e); + } + // 推送任务 + DinkyTaskRequest dinkyTaskRequest = dto.getJobConfig(); + dinkyTaskRequest.setTaskId(catalogue.getTaskId() + ""); + schedulerService.pushAddTask(dinkyTaskRequest); + + return catalogue.getTaskId(); + } + + /** + * 更新任务的名称和 sql + * @param dto + */ + @Override + public void saveTask(TaskDTO dto) { + Task task = new Task(); + task.setId(dto.getId()); + task.setName(dto.getName()); + task.setStatement(dto.getStatement()); + taskService.save(task); + } +} diff --git a/dinky-admin/src/test/java/org/dinky/service/impl/TestServiceImplTest.java b/dinky-admin/src/test/java/org/dinky/service/impl/TestServiceImplTest.java new file mode 100644 index 0000000000..99b33fb722 --- /dev/null +++ b/dinky-admin/src/test/java/org/dinky/service/impl/TestServiceImplTest.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.service.impl; + +import org.dinky.Dinky; +import org.dinky.data.dto.CatalogueTaskDTO; +import org.dinky.data.dto.CreatingCatalogueTaskDTO; +import org.dinky.data.dto.TaskDTO; +import org.dinky.data.enums.JobLifeCycle; +import org.dinky.data.exception.SqlExplainExcepition; +import org.dinky.data.model.Catalogue; +import org.dinky.scheduler.model.DinkyTaskRequest; +import org.dinky.service.SchedulerService; +import org.dinky.service.TaskService; +import org.dinky.service.catalogue.CatalogueService; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.jetbrains.annotations.NotNull; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = Dinky.class) +public class TestServiceImplTest { + @Autowired + private CatalogueService catalogueService; + + @Autowired + private TaskService taskService; + + @Autowired + SchedulerService schedulerService; + + @Test + @Ignore + public void testCreateCatalogueAndTask() throws SqlExplainExcepition { + CreatingCatalogueTaskDTO dto = new CreatingCatalogueTaskDTO(); + List catalogueNames = new ArrayList<>(); + catalogueNames.add("test"); + catalogueNames.add("test2"); + catalogueNames.add("test3"); + dto.setCatalogueNames(catalogueNames); + int parentId = 0; + for (String catalogueName : catalogueNames) { + Catalogue catalogue = catalogueService.findByParentIdAndName(parentId, catalogueName); + // 目录不存在则创建 + if (catalogue == null) { + catalogue = new Catalogue(); + catalogue.setName(catalogueName); + catalogue.setIsLeaf(false); + catalogue.setParentId(parentId); + catalogueService.save(catalogue); + } + parentId = catalogue.getId(); + } + CatalogueTaskDTO catalogueTaskDTO = getCatalogueTaskDTO(parentId); + // 新建任务 + Catalogue catalogue = catalogueService.saveOrUpdateCatalogueAndTask(catalogueTaskDTO); + + // 发布任务 + taskService.changeTaskLifeRecyle(catalogue.getTaskId(), JobLifeCycle.PUBLISH); + DinkyTaskRequest dinkyTaskRequest = new DinkyTaskRequest(); + dinkyTaskRequest.setTaskId(catalogue.getTaskId() + ""); + dinkyTaskRequest.setDelayTime(0); + dinkyTaskRequest.setFailRetryTimes(3); + dinkyTaskRequest.setFailRetryInterval(2); + dinkyTaskRequest.setFlag("YES"); + dinkyTaskRequest.setTaskPriority("MEDIUM"); + // 推送任务 + schedulerService.pushAddTask(dinkyTaskRequest); + } + + private static @NotNull CatalogueTaskDTO getCatalogueTaskDTO(int parentId) { + TaskDTO taskDTO = new TaskDTO(); + taskDTO.setSavePointStrategy(0); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_hh_mm_ss"); + taskDTO.setName("Flink 测试任务" + sdf.format(new Date())); + taskDTO.setNote("备注信息"); + taskDTO.setStatement("select now()"); + taskDTO.setParallelism(5); + taskDTO.setEnvId(-1); + taskDTO.setStep(1); + taskDTO.setAlertGroupId(-1); + taskDTO.setType("kubernetes-session"); + taskDTO.setClusterId(36); + CatalogueTaskDTO catalogueTaskDTO = new CatalogueTaskDTO(); + catalogueTaskDTO.setLeaf(false); + catalogueTaskDTO.setName(taskDTO.getName()); + catalogueTaskDTO.setNote(taskDTO.getNote()); + catalogueTaskDTO.setParentId(parentId); + catalogueTaskDTO.setType("FlinkSql"); + catalogueTaskDTO.setTask(taskDTO); + + return catalogueTaskDTO; + } +} diff --git a/dinky-common/src/main/java/org/dinky/data/model/Schema.java b/dinky-common/src/main/java/org/dinky/data/model/Schema.java index 241460a666..bd092d7d31 100644 --- a/dinky-common/src/main/java/org/dinky/data/model/Schema.java +++ b/dinky-common/src/main/java/org/dinky/data/model/Schema.java @@ -20,7 +20,7 @@ package org.dinky.data.model; import java.io.Serializable; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import lombok.Getter; @@ -38,11 +38,11 @@ public class Schema implements Serializable, Comparable { private static final long serialVersionUID = 4278304357661271040L; private String name; - private List tables = new LinkedList<>(); - private List views = new LinkedList<>(); - private List functions = new LinkedList<>(); - private List userFunctions = new LinkedList<>(); - private List modules = new LinkedList<>(); + private List
tables = new ArrayList<>(); + private List views = new ArrayList<>(); + private List functions = new ArrayList<>(); + private List userFunctions = new ArrayList<>(); + private List modules = new ArrayList<>(); /** 需要保留一个空构造方法,否则序列化有问题 */ public Schema() {} diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/function.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/function.tsx index 91b3b4c215..f7114b2515 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/function.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/PushDolphin/function.tsx @@ -23,7 +23,8 @@ export const transformPushDolphinParams = ( dolphinTaskDefinition: DolphinTaskDefinition, pushDolphinParams: PushDolphinParams, toFormValues: boolean -) => { +) => { + if (toFormValues && dolphinTaskDefinition) { const transformValue: PushDolphinParams = { ...pushDolphinParams,