diff --git a/dinky-admin/src/main/java/org/dinky/controller/CatalogueController.java b/dinky-admin/src/main/java/org/dinky/controller/CatalogueController.java index c0170eb1e3..b3c25c8ab7 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/CatalogueController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/CatalogueController.java @@ -147,6 +147,9 @@ public Result> getCatalogueTree() { dataType = "CatalogueTaskDTO", dataTypeClass = CatalogueTaskDTO.class) public Result createTask(@RequestBody CatalogueTaskDTO catalogueTaskDTO) { + if (catalogueService.checkCatalogueTaskNameIsExist(catalogueTaskDTO.getName())) { + return Result.failed(Status.TASK_IS_EXIST); + } Catalogue catalogue = catalogueService.saveOrUpdateCatalogueAndTask(catalogueTaskDTO); if (catalogue.getId() != null) { return Result.succeed(catalogue, Status.SAVE_SUCCESS); diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/CatalogueTaskDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/CatalogueTaskDTO.java index e7b80a5eac..d58a63270f 100644 --- a/dinky-admin/src/main/java/org/dinky/data/dto/CatalogueTaskDTO.java +++ b/dinky-admin/src/main/java/org/dinky/data/dto/CatalogueTaskDTO.java @@ -80,4 +80,7 @@ public class CatalogueTaskDTO { dataType = "TaskExtConfig", notes = "The task's extended configuration in JSON format") private TaskExtConfig configJson; + + @ApiModelProperty(value = "Task", dataType = "TaskDTO", notes = "The task information") + private TaskDTO task; } diff --git a/dinky-admin/src/main/java/org/dinky/service/CatalogueService.java b/dinky-admin/src/main/java/org/dinky/service/CatalogueService.java index 7145e3b912..d60c5aa17f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/CatalogueService.java +++ b/dinky-admin/src/main/java/org/dinky/service/CatalogueService.java @@ -131,4 +131,11 @@ public interface CatalogueService extends ISuperService { * @return A boolean value indicating whether the operation was successful. */ Boolean saveOrUpdateOrRename(Catalogue catalogue); + + /** + * Check if the catalogue task name is exist + * @param name catalogue task name + * @return true if the catalogue task name is exist + */ + boolean checkCatalogueTaskNameIsExist(String name); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/CatalogueServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/CatalogueServiceImpl.java index d31bea6e99..03993efcc1 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/CatalogueServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/CatalogueServiceImpl.java @@ -22,6 +22,7 @@ import static org.dinky.assertion.Asserts.isNull; import org.dinky.assertion.Asserts; +import org.dinky.config.Dialect; import org.dinky.data.dto.CatalogueTaskDTO; import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.enums.Status; @@ -31,6 +32,7 @@ import org.dinky.data.model.job.JobHistory; import org.dinky.data.model.job.JobInstance; import org.dinky.data.result.Result; +import org.dinky.gateway.enums.GatewayType; import org.dinky.mapper.CatalogueMapper; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.service.CatalogueService; @@ -58,6 +60,7 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.lang.Opt; import cn.hutool.core.util.ObjectUtil; import lombok.RequiredArgsConstructor; @@ -86,7 +89,8 @@ public List getCatalogueTree() { } /** - * build catalogue tree + * build catalogue tree + * * @param catalogueList catalogue list * @return catalogue tree */ @@ -114,6 +118,7 @@ public List buildCatalogueTree(List catalogueList) { /** * recursion build catalogue and children + * * @param list * @param catalogues */ @@ -140,6 +145,7 @@ private void recursionBuildCatalogueAndChildren(List list, Catalogue /** * Determine whether there are child nodes + * * @param list * @param catalogue * @return @@ -150,6 +156,7 @@ private boolean hasChild(List list, Catalogue catalogue) { /** * get child list + * * @param list * @param catalogue * @return @@ -171,13 +178,46 @@ public Catalogue findByParentIdAndName(Integer parentId, String name) { .eq(Catalogue::getName, name)); } + /** + * check catalogue task name is exist + * @param name name + * @return true if exist , otherwise false + */ + @Override + public boolean checkCatalogueTaskNameIsExist(String name) { + return getBaseMapper().exists(new LambdaQueryWrapper().eq(Catalogue::getName, name)); + } + + /** + * init some value + * @param catalogueTask {@link CatalogueTaskDTO} + * @return {@link Task} + */ + private Task initTaskValue(CatalogueTaskDTO catalogueTask) { + Task task = new Task(); + if (Opt.ofNullable(catalogueTask.getTask()).isPresent()) { + task = catalogueTask.getTask().buildTask(); + } else { + task.setStep(JobLifeCycle.DEVELOP.getValue()); + task.setEnabled(true); + if (Dialect.isFlinkSql(catalogueTask.getType(), false)) { + task.setType(GatewayType.LOCAL.getLongValue()); + task.setParallelism(1); + task.setSavePointStrategy(0); // 0 is disabled + task.setEnvId(-1); // -1 is disabled + task.setAlertGroupId(-1); // -1 is disabled + } + } + return task; + } + @Transactional(rollbackFor = Exception.class) @Override public Catalogue saveOrUpdateCatalogueAndTask(CatalogueTaskDTO catalogueTaskDTO) { Task task = null; Catalogue catalogue = null; if (catalogueTaskDTO.getId() == null) { - task = new Task(); + task = initTaskValue(catalogueTaskDTO); catalogue = new Catalogue(); } else { catalogue = baseMapper.selectById(catalogueTaskDTO.getId()); @@ -439,10 +479,11 @@ public Result deleteCatalogueById(Integer catalogueId) { /** *

- * 1. save catalogue - * 2. save task - * 3. save statement - * 4. rename + * 1. save catalogue + * 2. save task + * 3. save statement + * 4. rename + * * @param catalogue * @return */ diff --git a/dinky-common/src/main/java/org/dinky/config/Dialect.java b/dinky-common/src/main/java/org/dinky/config/Dialect.java index 1ce5c28afa..fa79c1f56f 100644 --- a/dinky-common/src/main/java/org/dinky/config/Dialect.java +++ b/dinky-common/src/main/java/org/dinky/config/Dialect.java @@ -109,6 +109,19 @@ public static boolean isUDF(String value) { } } + public static boolean isFlinkSql(String value, boolean includeFlinksqlEnv) { + Dialect dialect = Dialect.get(value); + switch (dialect) { + case FLINK_SQL: + case FLINK_JAR: + return true; + case FLINK_SQL_ENV: + return includeFlinksqlEnv; + default: + return false; + } + } + public static boolean isJarDialect(String value) { Dialect dialect = Dialect.get(value); switch (dialect) { diff --git a/dinky-common/src/main/java/org/dinky/data/enums/Status.java b/dinky-common/src/main/java/org/dinky/data/enums/Status.java index 1e77d7f0b7..54485a0d34 100644 --- a/dinky-common/src/main/java/org/dinky/data/enums/Status.java +++ b/dinky-common/src/main/java/org/dinky/data/enums/Status.java @@ -183,6 +183,7 @@ public enum Status { TASK_SQL_EXPLAN_FAILED(12007, "task.sql.explain.failed"), TASK_UPDATE_FAILED(12008, "task.update.failed"), TASK_IS_ONLINE(12009, "task.is.online"), + TASK_IS_EXIST(12010, "task.is.existed"), /** * alert instance diff --git a/dinky-common/src/main/resources/i18n/messages_en_US.properties b/dinky-common/src/main/resources/i18n/messages_en_US.properties index f8fcd47744..c95644d7f3 100644 --- a/dinky-common/src/main/resources/i18n/messages_en_US.properties +++ b/dinky-common/src/main/resources/i18n/messages_en_US.properties @@ -103,6 +103,7 @@ failed=Failed added.failed=Added Failed task.not.exist=Task Not Exist task.is.online= Task is online, modification is prohibited +task.is.existed=Task is existed cluster.instance.deploy=Deploy Success clear.failed=Clear Failed rename.success=Rename Successfully diff --git a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties index ff7459310f..765c8d02f2 100644 --- a/dinky-common/src/main/resources/i18n/messages_zh_CN.properties +++ b/dinky-common/src/main/resources/i18n/messages_zh_CN.properties @@ -103,6 +103,7 @@ failed=获取失败 added.failed=新增失败 task.not.exist=任务不存在 task.is.online=任务已上线,禁止修改 +task.is.existed=作业已存在 cluster.instance.deploy=部署完成 clear.failed=清除失败 rename.success=重命名成功 @@ -180,7 +181,7 @@ sys.env.settings.dinkyAddr.note=该地址必须与Dinky Application后台url中 sys.env.settings.maxRetainDays=作业历史最大保留天数 sys.env.settings.maxRetainDays.note=提交的作业历史与自动注册的集群记录最大保留天数,过期会被自动删除 sys.env.settings.maxRetainCount=作业历史最大保留数量 -sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了做大保留天数 +sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了最大保留天数 sys.dolphinscheduler.settings.enable=是否启用 DolphinScheduler sys.dolphinscheduler.settings.enable.note=是否启用 DolphinScheduler ,启用后才能使用 DolphinScheduler 的相关功能,请先填写下列配置项,完成后再开启此项配置, 另:请确保 DolphinScheduler 的相关配置正确 diff --git a/dinky-web/src/components/Flink/OptionsSelect/index.tsx b/dinky-web/src/components/Flink/OptionsSelect/index.tsx new file mode 100644 index 0000000000..1dc648e2a5 --- /dev/null +++ b/dinky-web/src/components/Flink/OptionsSelect/index.tsx @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { l } from '@/utils/intl'; +import { ProFormSelect } from '@ant-design/pro-components'; +import { ProFormSelectProps } from '@ant-design/pro-form/es/components/Select'; +import { Divider, Typography } from 'antd'; + +const { Link } = Typography; + +export type FlinkOptionsProps = ProFormSelectProps & {}; + +const FlinkOptionsSelect = (props: FlinkOptionsProps) => { + const renderTemplateDropDown = (item: any) => { + return ( + <> + + {l('rc.cc.addConfig')} + + {item} + + ); + }; + + return ( + renderTemplateDropDown(item) }} + /> + ); +}; + +export default FlinkOptionsSelect; diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx index 53a4edb1a5..00274ce183 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx @@ -32,7 +32,7 @@ import { transformTableDataToCsv } from '@/utils/function'; import { l } from '@/utils/intl'; import { SearchOutlined } from '@ant-design/icons'; import { Highlight } from '@ant-design/pro-layout/es/components/Help/Search'; -import {Button, Empty, Input, InputRef, Space, Table, Tabs} from 'antd'; +import { Button, Empty, Input, InputRef, Space, Table, Tabs } from 'antd'; import { ColumnsType, ColumnType } from 'antd/es/table'; import { FilterConfirmProps } from 'antd/es/table/interface'; import { DataIndex } from 'rc-table/es/interface'; @@ -44,7 +44,7 @@ type Data = { columns?: string[]; rowData?: object[]; }; -type DataList=Data[]; +type DataList = Data[]; const Result = (props: any) => { const { tabs: { panes, activeKey } @@ -132,11 +132,9 @@ const Result = (props: any) => { const consoleData = currentTabs.console; if (consoleData.result && !isRefresh) { setData(consoleData.result); - } - else if(consoleData.results && !isRefresh){ - setDataList(consoleData.results) - } - else { + } else if (consoleData.results && !isRefresh) { + setDataList(consoleData.results); + } else { if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) { // flink sql // to do: get job data by history id list, not flink jid @@ -173,7 +171,7 @@ const Result = (props: any) => { setData({}); setDataList([]); loadData(); - }, [currentTabs?.console?.result,currentTabs?.console?.results]); + }, [currentTabs?.console?.result, currentTabs?.console?.results]); const getColumns = (columns: string[]) => { return columns?.map((item) => { @@ -243,28 +241,26 @@ const Result = (props: any) => { })} loading={loading} /> + ) : dataList.length > 0 ? ( + + {dataList.map((data, index) => { + return ( + + { + return { ...item, key: index }; + })} + loading={loading} + /> + + ); + })} + ) : ( - dataList.length>0?( - - - {dataList.map((data, index) => { - return ( - -
{ - return { ...item, key: index }; - })} - loading={loading} - /> - - ); - })} - ): - - )} - + + )} ); }; diff --git a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx index 78efa65add..64e7f4b4ee 100644 --- a/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx @@ -223,8 +223,7 @@ const HeaderContainer = (props: connect) => { if (isSql(currentData.dialect)) { currentData.status = JOB_STATUS.FINISHED; if (currentTab) currentTab.console.results = res.data.results; - } - else { + } else { if (currentTab) currentTab.console.result = res.data.result; } // Common sql task is synchronized, so it needs to automatically update the status to finished. diff --git a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx index eac45fa97e..7717e801ee 100644 --- a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx +++ b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx @@ -19,8 +19,9 @@ import { FormContextValue } from '@/components/Context/FormContext'; import { JOB_TYPE } from '@/pages/DataStudio/LeftContainer/Project/constants'; -import { isUDF } from '@/pages/DataStudio/LeftContainer/Project/function'; +import { isFlinkJob, isUDF } from '@/pages/DataStudio/LeftContainer/Project/function'; import { queryDataByParams } from '@/services/BusinessCrud'; +import { RUN_MODE } from '@/services/constants'; import { API_CONSTANTS } from '@/services/endpoints'; import { Catalogue } from '@/types/Studio/data'; import { l } from '@/utils/intl'; @@ -109,7 +110,21 @@ const JobModal: React.FC = (props) => { const { selectKeys } = formData.configJson.udfConfig; formData.configJson.udfConfig.templateId = selectKeys[selectKeys.length - 1]; } - onSubmit({ ...values, ...formData } as Catalogue); + // if this type is flink job, init task value and submit + if (isFlinkJob(formData.type ?? '')) { + const initTaskValue = { + savePointStrategy: -1, // -1 is disabled + parallelism: 1, // default parallelism + envId: -1, // -1 is disabled + step: 1, // default step is develop + alertGroupId: -1, // -1 is disabled + type: RUN_MODE.LOCAL, // default run mode is local + dialect: formData.type + }; + onSubmit({ ...values, ...formData, task: initTaskValue } as Catalogue); + } else { + onSubmit({ ...values, ...formData } as Catalogue); + } }; /** diff --git a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx index 5bb3d109f8..7e475fa28d 100644 --- a/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx +++ b/dinky-web/src/pages/DataStudio/RightContainer/JobConfig/index.tsx @@ -17,6 +17,7 @@ * */ +import FlinkOptionsSelect from '@/components/Flink/OptionsSelect'; import { SAVE_POINT_TYPE } from '@/pages/DataStudio/constants'; import { getCurrentData, @@ -305,7 +306,7 @@ const JobConfig = (props: any) => { > - > -