Skip to content

Commit

Permalink
[Optimize]Add repeat import task (DataLinkDC#3836)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Sep 26, 2024
1 parent cf7f9b2 commit 7abe338
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -85,6 +86,7 @@
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.map.BiMap;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -632,7 +634,7 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) {
}
Catalogue parentCatalogue = this.getById(parentCatalogueId);
// check param
checkImportCatalogueParam(parentCatalogue, exportCatalogue);
BiMap<String, String> existsNameMap = checkImportCatalogueParam(parentCatalogue, exportCatalogue);

// create catalogue and task
List<Task> createTasks = Lists.newArrayList();
Expand All @@ -655,6 +657,9 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) {
ExportTaskBO exportTaskBO = searchCatalogue.getTask();
if (Objects.nonNull(exportTaskBO)) {
Task task = catalogueFactory.getTask(exportTaskBO, currentUserId);
if (existsNameMap.containsKey(task.getName())) {
task.setName(existsNameMap.get(task.getName()));
}
createTasks.add(task);
exportCatalogueTaskMap.put(searchCatalogue, task);
}
Expand All @@ -670,6 +675,9 @@ public void importCatalogue(ImportCatalogueDTO importCatalogueDto) {
throw new BusException(Status.FAILED);
}
Catalogue catalogue = catalogueFactory.getCatalogue(searchCatalogue, parentId, taskId);
if (existsNameMap.containsKey(catalogue.getName())) {
catalogue.setName(existsNameMap.get(catalogue.getName()));
}
createCatalogues.add(catalogue);
exportCatalogueMap.put(searchCatalogue, catalogue);
List<ExportCatalogueBO> children = searchCatalogue.getChildren();
Expand Down Expand Up @@ -701,20 +709,16 @@ protected Integer getCurrentUserId() {
return StpUtil.getLoginIdAsInt();
}

private void checkImportCatalogueParam(Catalogue parentCatalogue, ExportCatalogueBO exportCatalogue) {
private BiMap<String, String> checkImportCatalogueParam(
Catalogue parentCatalogue, ExportCatalogueBO exportCatalogue) {
// verify that the parent directory exists
if (Objects.isNull(parentCatalogue)) {
throw new BusException(Status.CATALOGUE_NOT_EXIST);
}
// check if a catalogue with the same name exists
List<String> catalogueNames = getCatalogueNames(exportCatalogue);
List<Catalogue> existCatalogues =
this.list(new LambdaQueryWrapper<Catalogue>().in(Catalogue::getName, catalogueNames));
if (CollectionUtil.isNotEmpty(existCatalogues)) {
throw new BusException(
Status.CATALOGUE_IS_EXIST,
existCatalogues.stream().map(Catalogue::getName).collect(Collectors.joining(",")));
}
// check if a catalogue with the same name exists
BiMap<String, String> existsNameMap = new BiMap<>(new HashMap<>());
getNotExistsCatalogueName(catalogueNames, existsNameMap);
// verify that the task name and parent catalogue name are consistent
List<ExportCatalogueBO> searchExportCatalogues = Lists.newArrayList(exportCatalogue);
while (CollectionUtil.isNotEmpty(searchExportCatalogues)) {
Expand All @@ -736,6 +740,44 @@ private void checkImportCatalogueParam(Catalogue parentCatalogue, ExportCatalogu
}
searchExportCatalogues = nextSearchExportCatalogues;
}
return existsNameMap;
}

/**
* Modify the name of the Catalogue. Duplicate additions to copy data and increments
*
* @param catalogueNames catalogue names
* @return List<String> 不重复的目录名称
*/
private void getNotExistsCatalogueName(List<String> catalogueNames, BiMap<String, String> existsNameMap) {
List<Catalogue> existCatalogues =
this.list(new LambdaQueryWrapper<Catalogue>().in(Catalogue::getName, catalogueNames));
if (CollectionUtil.isEmpty(existCatalogues)) {
return;
}
List<String> existCataloguesList = existCatalogues.stream()
.map(Catalogue::getName)
.map(name -> {
String key = name;
if (existsNameMap.containsValue(name)) {
key = existsNameMap.getKey(name);
}
// Configure the suffix - copy\(\d+\), match \d+1 if it exists, add the suffix - copy(1) if it does
// not exist.
String regex = ".*-copy\\((\\d+)\\)";
if (name.matches(regex)) {
String[] split = name.split("\\(");
String num = split[1].split("\\)")[0];
int i = Integer.parseInt(num) + 1;
existsNameMap.put(key, name.replace(num, String.valueOf(i)));
} else {
existsNameMap.put(key, name + "-copy(1)");
}
return existsNameMap.get(key);
})
.collect(Collectors.toList());

getNotExistsCatalogueName(existCataloguesList, existsNameMap);
}

private List<String> getCatalogueNames(ExportCatalogueBO exportCatalogue) {
Expand Down
3 changes: 2 additions & 1 deletion dinky-admin/src/main/resources/dinky-loader/FlinkConfClass
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ org.apache.flink.configuration.JobManagerOptions
org.apache.flink.configuration.TaskManagerOptions
org.apache.flink.configuration.HighAvailabilityOptions
org.apache.flink.configuration.KubernetesConfigOptions
org.apache.flink.client.cli.ArtifactFetchOptions
org.apache.flink.configuration.ClusterOptions
org.apache.flink.configuration.StateBackendOptions
org.apache.flink.configuration.QueryableStateOptions
Expand Down Expand Up @@ -41,4 +42,4 @@ org.apache.flink.table.endpoint.hive.HiveServer2EndpointConfigOptions
org.apache.flink.runtime.jobgraph.SavepointConfigOptions
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions
org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions
org.dinky.constant.CustomerConfigureOptions
org.dinky.constant.CustomerConfigureOptions
8 changes: 8 additions & 0 deletions dinky-app/dinky-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-gateway</artifactId>
</exclusion>
<exclusion>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-sms</artifactId>
</exclusion>
<exclusion>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-dingtalk</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
Expand Down Expand Up @@ -254,9 +255,12 @@ public static Optional<JobClient> executeJarJob(String type, Executor executor,
for (String statement : statements) {
if (ExecuteJarParseStrategy.INSTANCE.match(statement)) {
ExecuteJarOperation executeJarOperation = new ExecuteJarOperation(statement);
Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment());

ReadableConfig configuration =
executor.getStreamExecutionEnvironment().getConfiguration();
List<String> jars = configuration.get(PipelineOptions.JARS);
List<URL> jarsUrl = jars.stream().map(URLUtil::getURL).collect(Collectors.toList());
Pipeline pipeline = executeJarOperation.getStreamGraph(executor.getCustomTableEnvironment(), jarsUrl);
if (pipeline instanceof StreamGraph) {
// stream job
StreamGraph streamGraph = (StreamGraph) pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static Pipeline getStreamGraph(
submitParam.getSavepointPath(), submitParam.getAllowNonRestoredState());
PackagedProgram program;
try {
Configuration configuration = tEnv.getConfig().getConfiguration();
Configuration configuration = tEnv.getRootConfiguration();
File file =
Opt.ofBlankAble(submitParam.getUri()).map(URLUtils::toFile).orElse(null);
String submitArgs = Opt.ofBlankAble(submitParam.getArgs()).orElse("");
Expand Down
10 changes: 5 additions & 5 deletions dinky-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
"@andrewray/react-multi-split-pane": "^0.3.5",
"@ant-design/charts": "^2.2.1",
"@ant-design/icons": "^5.3.7",
"@ant-design/pro-components": "^2.7.18",
"@ant-design/pro-layout": "^7.20.1",
"@ant-design/pro-components": "^2.7.19",
"@ant-design/pro-layout": "^7.20.2",
"@ant-design/pro-table": "^3.17.2",
"@ant-design/use-emotion-css": "^1.0.4",
"@antv/g2": "^5.2.5",
Expand All @@ -52,8 +52,8 @@
"@umijs/route-utils": "^4.0.1",
"@xterm/addon-fit": "^0.10.0",
"@xterm/xterm": "^5.5.0",
"antd": "^5.20.6",
"antd-style": "^3.6.2",
"antd": "^5.21.0",
"antd-style": "^3.6.3",
"butterfly-dag": "^4.3.29",
"classnames": "^2.5.1",
"dayjs": "^1.11.11",
Expand All @@ -70,7 +70,7 @@
"path-to-regexp": "^6.2.2",
"rc-menu": "^9.14.0",
"rc-util": "^5.43.0",
"re-resizable": "^6.9.17",
"re-resizable": "^6.10.0",
"react": "^18.3.1",
"react-countup": "^6.5.3",
"react-dom": "^18.3.1",
Expand Down
7 changes: 3 additions & 4 deletions dinky-web/src/components/Flink/FlinkDag/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ const FlinkDag = (props: DagProps) => {
};

const handleClose = () => {
graph?.unselect(currentSelect);
setOpen(false);
setCurrentSelect(undefined);
graph?.zoomToFit(zoomOptions);
Expand Down Expand Up @@ -410,11 +411,9 @@ const FlinkDag = (props: DagProps) => {
mask={false}
onClose={handleClose}
destroyOnClose={true}
closable={false}
closable={true}
>
{onlyPlan ? (
<></>
) : (
{!onlyPlan && (
<Tabs
defaultActiveKey='1'
items={[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const FolderForm = () => {
name='name'
label={l('datastudio.project.create.folder.name')}
placeholder={l('datastudio.project.create.folder.name.placeholder')}
fieldProps={{
autoFocus: true
}}
rules={[
{
required: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const FolderModal: React.FC<JobModalProps> = (props) => {
return (
<>
<ModalForm<Catalogue>
isKeyPressSubmit
title={title}
form={form}
width={'30%'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ const JobModal: React.FC<JobModalProps> = (props) => {
placeholder={l('catalog.name.placeholder')}
validateTrigger={['onBlur', 'onChange', 'onSubmit']}
rules={[{ required: true, validator: validateName }]}
fieldProps={{
autoFocus: true
}}
width={'xl'}
/>
<ProFormSelect
Expand Down Expand Up @@ -294,6 +297,7 @@ const JobModal: React.FC<JobModalProps> = (props) => {

return (
<ModalForm<Catalogue>
isKeyPressSubmit
title={title}
form={form}
width={'60%'}
Expand Down
2 changes: 0 additions & 2 deletions dinky-web/src/pages/DataStudio/RightContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ const RightContainer: React.FC<RightContainerProps> = (prop: any) => {

const leftContainerWidth = leftContainer.selectKey === '' ? 0 : leftContainer.width;
const maxWidth = size.width - 2 * VIEW.leftToolWidth - leftContainerWidth - 50;

console.log(leftContainer);
return (
<MovableSidebar
contentHeight={toolContentHeight}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
UnknownIcons
} from '@/components/Icons/DevopsIcons';
import useHookRequest from '@/hooks/useHookRequest';
import { DevopContext } from '@/pages/DevOps';
import { DevopsContext } from '@/pages/DevOps';
import { JOB_STATUS } from '@/pages/DevOps/constants';
import StatisticsCard from '@/pages/DevOps/JobList/components/Overview/StatisticsCard';
import { getData } from '@/services/api';
Expand All @@ -41,7 +41,7 @@ import { Button, Col, Row, Space } from 'antd';
import { useContext } from 'react';

const JobOverview = (props: any) => {
const { statusFilter, setStatusFilter } = useContext<any>(DevopContext);
const { statusFilter, setStatusFilter } = useContext<any>(DevopsContext);
const { data } = useHookRequest(getData, { defaultParams: [API_CONSTANTS.GET_STATUS_COUNT] });
const statusCount = data as StatusCountOverView;

Expand Down
Loading

0 comments on commit 7abe338

Please sign in to comment.