From 7abe338636d1525bf9bc50770d10cd3bfd459c0f Mon Sep 17 00:00:00 2001 From: ZackYoung Date: Thu, 26 Sep 2024 14:19:09 +0800 Subject: [PATCH] [Optimize]Add repeat import task (#3836) Co-authored-by: zackyoungh --- .../catalogue/impl/CatalogueServiceImpl.java | 62 +++- .../resources/dinky-loader/FlinkConfClass | 3 +- dinky-app/dinky-app-base/pom.xml | 8 + .../org/dinky/app/flinksql/Submitter.java | 6 +- .../dinky/trans/dml/ExecuteJarOperation.java | 2 +- dinky-web/package.json | 10 +- .../src/components/Flink/FlinkDag/index.tsx | 7 +- .../Project/FolderModal/FolderForm/index.tsx | 3 + .../Project/FolderModal/index.tsx | 1 + .../LeftContainer/Project/JobModal/index.tsx | 4 + .../pages/DataStudio/RightContainer/index.tsx | 2 - .../JobList/components/Overview/index.tsx | 4 +- dinky-web/src/pages/DevOps/JobList/index.tsx | 269 +++++++++--------- dinky-web/src/pages/DevOps/index.tsx | 6 +- 14 files changed, 217 insertions(+), 170 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java index 1fc37cac67..8b78802696 100644 --- a/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java @@ -64,6 +64,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -85,6 +86,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.lang.Opt; +import cn.hutool.core.map.BiMap; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; @@ -632,7 +634,7 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) { } Catalogue parentCatalogue = this.getById(parentCatalogueId); // check param - checkImportCatalogueParam(parentCatalogue, exportCatalogue); + BiMap existsNameMap = checkImportCatalogueParam(parentCatalogue, exportCatalogue); // create catalogue and task List createTasks = Lists.newArrayList(); @@ -655,6 +657,9 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) { ExportTaskBO exportTaskBO = searchCatalogue.getTask(); if (Objects.nonNull(exportTaskBO)) { Task task = catalogueFactory.getTask(exportTaskBO, currentUserId); + if (existsNameMap.containsKey(task.getName())) { + task.setName(existsNameMap.get(task.getName())); + } createTasks.add(task); exportCatalogueTaskMap.put(searchCatalogue, task); } @@ -670,6 +675,9 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) { throw new BusException(Status.FAILED); } Catalogue catalogue = catalogueFactory.getCatalogue(searchCatalogue, parentId, taskId); + if (existsNameMap.containsKey(catalogue.getName())) { + catalogue.setName(existsNameMap.get(catalogue.getName())); + } createCatalogues.add(catalogue); exportCatalogueMap.put(searchCatalogue, catalogue); List children = searchCatalogue.getChildren(); @@ -701,20 +709,16 @@ protected Integer getCurrentUserId() { return StpUtil.getLoginIdAsInt(); } - private void checkImportCatalogueParam(Catalogue parentCatalogue, ExportCatalogueBO exportCatalogue) { + private BiMap checkImportCatalogueParam( + Catalogue parentCatalogue, ExportCatalogueBO exportCatalogue) { // verify that the parent directory exists if (Objects.isNull(parentCatalogue)) { throw new BusException(Status.CATALOGUE_NOT_EXIST); } - // check if a catalogue with the same name exists List catalogueNames = getCatalogueNames(exportCatalogue); - List existCatalogues = - this.list(new LambdaQueryWrapper().in(Catalogue::getName, catalogueNames)); - if (CollectionUtil.isNotEmpty(existCatalogues)) { - throw new BusException( - Status.CATALOGUE_IS_EXIST, - existCatalogues.stream().map(Catalogue::getName).collect(Collectors.joining(","))); - } + // check if a catalogue with the same name exists + BiMap existsNameMap = new BiMap<>(new HashMap<>()); + getNotExistsCatalogueName(catalogueNames, existsNameMap); // verify that the task name and parent catalogue name are consistent List searchExportCatalogues = Lists.newArrayList(exportCatalogue); while (CollectionUtil.isNotEmpty(searchExportCatalogues)) { @@ -736,6 +740,44 @@ private void checkImportCatalogueParam(Catalogue parentCatalogue, ExportCatalogu } searchExportCatalogues = nextSearchExportCatalogues; } + return existsNameMap; + } + + /** + * Modify the name of the Catalogue. Duplicate additions to copy data and increments + * + * @param catalogueNames catalogue names + * @return List 不重复的目录名称 + */ + private void getNotExistsCatalogueName(List catalogueNames, BiMap existsNameMap) { + List existCatalogues = + this.list(new LambdaQueryWrapper().in(Catalogue::getName, catalogueNames)); + if (CollectionUtil.isEmpty(existCatalogues)) { + return; + } + List existCataloguesList = existCatalogues.stream() + .map(Catalogue::getName) + .map(name -> { + String key = name; + if (existsNameMap.containsValue(name)) { + key = existsNameMap.getKey(name); + } + // Configure the suffix - copy\(\d+\), match \d+1 if it exists, add the suffix - copy(1) if it does + // not exist. + String regex = ".*-copy\\((\\d+)\\)"; + if (name.matches(regex)) { + String[] split = name.split("\\("); + String num = split[1].split("\\)")[0]; + int i = Integer.parseInt(num) + 1; + existsNameMap.put(key, name.replace(num, String.valueOf(i))); + } else { + existsNameMap.put(key, name + "-copy(1)"); + } + return existsNameMap.get(key); + }) + .collect(Collectors.toList()); + + getNotExistsCatalogueName(existCataloguesList, existsNameMap); } private List getCatalogueNames(ExportCatalogueBO exportCatalogue) { diff --git a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass index 505fcc8769..724eacae82 100644 --- a/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass +++ b/dinky-admin/src/main/resources/dinky-loader/FlinkConfClass @@ -8,6 +8,7 @@ org.apache.flink.configuration.JobManagerOptions org.apache.flink.configuration.TaskManagerOptions org.apache.flink.configuration.HighAvailabilityOptions org.apache.flink.configuration.KubernetesConfigOptions +org.apache.flink.client.cli.ArtifactFetchOptions org.apache.flink.configuration.ClusterOptions org.apache.flink.configuration.StateBackendOptions org.apache.flink.configuration.QueryableStateOptions @@ -41,4 +42,4 @@ org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions org.apache.flink.runtime.jobgraph.SavepointConfigOptions org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions -org.dinky.constant.CustomerConfigureOptions \ No newline at end of file +org.dinky.constant.CustomerConfigureOptions diff --git a/dinky-app/dinky-app-base/pom.xml b/dinky-app/dinky-app-base/pom.xml index de389aec7f..405114bfdd 100644 --- a/dinky-app/dinky-app-base/pom.xml +++ b/dinky-app/dinky-app-base/pom.xml @@ -82,6 +82,14 @@ org.dinky dinky-gateway + + org.dinky + dinky-alert-sms + + + org.dinky + dinky-alert-dingtalk + diff --git a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java index 5a5c599c1e..24f4e7c351 100644 --- a/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java +++ b/dinky-app/dinky-app-base/src/main/java/org/dinky/app/flinksql/Submitter.java @@ -53,6 +53,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; import org.apache.flink.python.PythonOptions; @@ -254,9 +255,12 @@ public static Optional executeJarJob(String type, Executor executor, for (String statement : statements) { if (ExecuteJarParseStrategy.INSTANCE.match(statement)) { ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(statement); - Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment()); + ReadableConfig configuration = executor.getStreamExecutionEnvironment().getConfiguration(); + List jars = configuration.get(PipelineOptions.JARS); + List jarsUrl = jars.stream().map(URLUtil::getURL).collect(Collectors.toList()); + Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment(), jarsUrl); if (pipeline instanceof StreamGraph) { // stream job StreamGraph streamGraph = (StreamGraph) pipeline; diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index 2cce7d1792..cd158e0c81 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -92,7 +92,7 @@ public static Pipeline getStreamGraph( submitParam.getSavepointPath(), submitParam.getAllowNonRestoredState()); PackagedProgram program; try { - Configuration configuration = tEnv.getConfig().getConfiguration(); + Configuration configuration = tEnv.getRootConfiguration(); File file = Opt.ofBlankAble(submitParam.getUri()).map(URLUtils::toFile).orElse(null); String submitArgs = Opt.ofBlankAble(submitParam.getArgs()).orElse(""); diff --git a/dinky-web/package.json b/dinky-web/package.json index dfe6b0c2c6..a717a5c0d4 100644 --- a/dinky-web/package.json +++ b/dinky-web/package.json @@ -38,8 +38,8 @@ "@andrewray/react-multi-split-pane": "^0.3.5", "@ant-design/charts": "^2.2.1", "@ant-design/icons": "^5.3.7", - "@ant-design/pro-components": "^2.7.18", - "@ant-design/pro-layout": "^7.20.1", + "@ant-design/pro-components": "^2.7.19", + "@ant-design/pro-layout": "^7.20.2", "@ant-design/pro-table": "^3.17.2", "@ant-design/use-emotion-css": "^1.0.4", "@antv/g2": "^5.2.5", @@ -52,8 +52,8 @@ "@umijs/route-utils": "^4.0.1", "@xterm/addon-fit": "^0.10.0", "@xterm/xterm": "^5.5.0", - "antd": "^5.20.6", - "antd-style": "^3.6.2", + "antd": "^5.21.0", + "antd-style": "^3.6.3", "butterfly-dag": "^4.3.29", "classnames": "^2.5.1", "dayjs": "^1.11.11", @@ -70,7 +70,7 @@ "path-to-regexp": "^6.2.2", "rc-menu": "^9.14.0", "rc-util": "^5.43.0", - "re-resizable": "^6.9.17", + "re-resizable": "^6.10.0", "react": "^18.3.1", "react-countup": "^6.5.3", "react-dom": "^18.3.1", diff --git a/dinky-web/src/components/Flink/FlinkDag/index.tsx b/dinky-web/src/components/Flink/FlinkDag/index.tsx index ad421c8479..641d6b9af7 100644 --- a/dinky-web/src/components/Flink/FlinkDag/index.tsx +++ b/dinky-web/src/components/Flink/FlinkDag/index.tsx @@ -210,6 +210,7 @@ const FlinkDag = (props: DagProps) => { }; const handleClose = () => { + graph?.unselect(currentSelect); setOpen(false); setCurrentSelect(undefined); graph?.zoomToFit(zoomOptions); @@ -410,11 +411,9 @@ const FlinkDag = (props: DagProps) => { mask={false} onClose={handleClose} destroyOnClose={true} - closable={false} + closable={true} > - {onlyPlan ? ( - <> - ) : ( + {!onlyPlan && ( { name='name' label={l('datastudio.project.create.folder.name')} placeholder={l('datastudio.project.create.folder.name.placeholder')} + fieldProps={{ + autoFocus: true + }} rules={[ { required: true, diff --git a/dinky-web/src/pages/DataStudio/LeftContainer/Project/FolderModal/index.tsx b/dinky-web/src/pages/DataStudio/LeftContainer/Project/FolderModal/index.tsx index 674083fe47..1bcd25ac0a 100644 --- a/dinky-web/src/pages/DataStudio/LeftContainer/Project/FolderModal/index.tsx +++ b/dinky-web/src/pages/DataStudio/LeftContainer/Project/FolderModal/index.tsx @@ -72,6 +72,7 @@ const FolderModal: React.FC = (props) => { return ( <> + isKeyPressSubmit title={title} form={form} width={'30%'} diff --git a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx index 9f5886c26e..42d2a41fa8 100644 --- a/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx +++ b/dinky-web/src/pages/DataStudio/LeftContainer/Project/JobModal/index.tsx @@ -198,6 +198,9 @@ const JobModal: React.FC = (props) => { placeholder={l('catalog.name.placeholder')} validateTrigger={['onBlur', 'onChange', 'onSubmit']} rules={[{ required: true, validator: validateName }]} + fieldProps={{ + autoFocus: true + }} width={'xl'} /> = (props) => { return ( + isKeyPressSubmit title={title} form={form} width={'60%'} diff --git a/dinky-web/src/pages/DataStudio/RightContainer/index.tsx b/dinky-web/src/pages/DataStudio/RightContainer/index.tsx index 5d8aa24863..268977f2ff 100644 --- a/dinky-web/src/pages/DataStudio/RightContainer/index.tsx +++ b/dinky-web/src/pages/DataStudio/RightContainer/index.tsx @@ -47,8 +47,6 @@ const RightContainer: React.FC = (prop: any) => { const leftContainerWidth = leftContainer.selectKey === '' ? 0 : leftContainer.width; const maxWidth = size.width - 2 * VIEW.leftToolWidth - leftContainerWidth - 50; - - console.log(leftContainer); return ( { - const { statusFilter, setStatusFilter } = useContext(DevopContext); + const { statusFilter, setStatusFilter } = useContext(DevopsContext); const { data } = useHookRequest(getData, { defaultParams: [API_CONSTANTS.GET_STATUS_COUNT] }); const statusCount = data as StatusCountOverView; diff --git a/dinky-web/src/pages/DevOps/JobList/index.tsx b/dinky-web/src/pages/DevOps/JobList/index.tsx index 8ddfcce720..e843d9f341 100644 --- a/dinky-web/src/pages/DevOps/JobList/index.tsx +++ b/dinky-web/src/pages/DevOps/JobList/index.tsx @@ -32,7 +32,7 @@ import { searchInTree } from '@/pages/DataStudio/LeftContainer/Project/function'; import { StateType } from '@/pages/DataStudio/model'; -import { DevopContext } from '@/pages/DevOps'; +import { DevopsContext } from '@/pages/DevOps'; import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants'; import { getJobDuration } from '@/pages/DevOps/function'; import JobHistoryList from '@/pages/DevOps/JobList/components/JobHistoryList/JobHistoryList'; @@ -44,8 +44,6 @@ import { API_CONSTANTS } from '@/services/endpoints'; import { Jobs } from '@/types/DevOps/data'; import { getTenantByLocalStorage } from '@/utils/function'; import { l } from '@/utils/intl'; -import { SplitPane } from '@andrewray/react-multi-split-pane'; -import { Pane } from '@andrewray/react-multi-split-pane/dist/lib/Pane'; import { ArrowsAltOutlined, ClearOutlined, @@ -57,7 +55,7 @@ import { import type { ActionType, ProColumns } from '@ant-design/pro-components'; import { ProCard, ProTable } from '@ant-design/pro-components'; import { connect, useModel } from '@umijs/max'; -import { Button, Empty, Radio, Table, Tree } from 'antd'; +import { Button, Col, Empty, Flex, Radio, Splitter, Table, Tree } from 'antd'; import Search from 'antd/es/input/Search'; import { Key, useContext, useEffect, useRef, useState } from 'react'; import { history } from 'umi'; @@ -73,9 +71,8 @@ const JobList = (props: connect) => { queryTaskOwnerLockingStrategy, projectData } = props; - const refObject = useRef(null); const tableRef = useRef(); - const { statusFilter, setStatusFilter } = useContext(DevopContext); + const { statusFilter, setStatusFilter } = useContext(DevopsContext); const [stepFilter, setStepFilter] = useState(); const [taskFilter, setTaskFilter] = useState(); const [taskId, setTaskId] = useState(); @@ -241,144 +238,134 @@ const JobList = (props: connect) => { width: '99vw' }} > - - -
- 0 && ( - - ) - } - /> - - } - onClick={() => setExpandedKeys(getLeafKeyList(data))} + + + + + 0 && ( + + ) + } /> - - - } - onClick={() => setExpandedKeys([])} - /> - -
- - {data.length ? ( - onNodeClick(info)} - // onRightClick={onRightClick} - expandedKeys={expandedKeys} - // expandAction={'doubleClick'} - selectedKeys={selectedKey} - onExpand={(expandedKeys: Key[]) => setExpandedKeys(expandedKeys)} - treeData={data} - /> - ) : ( - - )} -
- - - {...PROTABLE_OPTIONS_PUBLIC} - search={false} - tableStyle={{ height: parent.innerHeight - 245 }} - loading={{ delay: 1000 }} - rowKey={(record) => record.id} - columns={jobListColumns} - params={{ - isHistory: false, - status: statusFilter, - step: stepFilter, - name: taskFilter, - taskId: taskId - }} - actionRef={tableRef} - toolbar={{ - settings: false, - search: { onSearch: (value: string) => setTaskFilter(value) }, - filter: ( - <> - setStepFilter(e.target.value)} - > - - {l('global.table.lifecycle.all')} - - - {l('global.table.lifecycle.publish')} - - - {l('global.table.lifecycle.dev')} - - - - ), - actions: [ -