Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Dec 23, 2023
2 parents a36b6cd + 2d54ad7 commit 14773d7
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public Result<List<Catalogue>> getCatalogueTree() {
dataType = "CatalogueTaskDTO",
dataTypeClass = CatalogueTaskDTO.class)
public Result<Catalogue> createTask(@RequestBody CatalogueTaskDTO catalogueTaskDTO) {
if (catalogueService.checkCatalogueTaskNameIsExist(catalogueTaskDTO.getName())) {
return Result.failed(Status.TASK_IS_EXIST);
}
Catalogue catalogue = catalogueService.saveOrUpdateCatalogueAndTask(catalogueTaskDTO);
if (catalogue.getId() != null) {
return Result.succeed(catalogue, Status.SAVE_SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ public class CatalogueTaskDTO {
dataType = "TaskExtConfig",
notes = "The task's extended configuration in JSON format")
private TaskExtConfig configJson;

@ApiModelProperty(value = "Task", dataType = "TaskDTO", notes = "The task information")
private TaskDTO task;
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,11 @@ public interface CatalogueService extends ISuperService<Catalogue> {
* @return A boolean value indicating whether the operation was successful.
*/
Boolean saveOrUpdateOrRename(Catalogue catalogue);

/**
* Check if the catalogue task name is exist
* @param name catalogue task name
* @return true if the catalogue task name is exist
*/
boolean checkCatalogueTaskNameIsExist(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.dinky.assertion.Asserts.isNull;

import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.data.dto.CatalogueTaskDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.Status;
Expand All @@ -31,6 +32,7 @@
import org.dinky.data.model.job.JobHistory;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.Result;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.mapper.CatalogueMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.service.CatalogueService;
Expand Down Expand Up @@ -58,6 +60,7 @@

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -86,7 +89,8 @@ public List<Catalogue> getCatalogueTree() {
}

/**
* build catalogue tree
* build catalogue tree
*
* @param catalogueList catalogue list
* @return catalogue tree
*/
Expand Down Expand Up @@ -114,6 +118,7 @@ public List<Catalogue> buildCatalogueTree(List<Catalogue> catalogueList) {

/**
* recursion build catalogue and children
*
* @param list
* @param catalogues
*/
Expand All @@ -140,6 +145,7 @@ private void recursionBuildCatalogueAndChildren(List<Catalogue> list, Catalogue

/**
* Determine whether there are child nodes
*
* @param list
* @param catalogue
* @return
Expand All @@ -150,6 +156,7 @@ private boolean hasChild(List<Catalogue> list, Catalogue catalogue) {

/**
* get child list
*
* @param list
* @param catalogue
* @return
Expand All @@ -171,13 +178,46 @@ public Catalogue findByParentIdAndName(Integer parentId, String name) {
.eq(Catalogue::getName, name));
}

/**
* check catalogue task name is exist
* @param name name
* @return true if exist , otherwise false
*/
@Override
public boolean checkCatalogueTaskNameIsExist(String name) {
return getBaseMapper().exists(new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getName, name));
}

/**
* init some value
* @param catalogueTask {@link CatalogueTaskDTO}
* @return {@link Task}
*/
private Task initTaskValue(CatalogueTaskDTO catalogueTask) {
Task task = new Task();
if (Opt.ofNullable(catalogueTask.getTask()).isPresent()) {
task = catalogueTask.getTask().buildTask();
} else {
task.setStep(JobLifeCycle.DEVELOP.getValue());
task.setEnabled(true);
if (Dialect.isFlinkSql(catalogueTask.getType(), false)) {
task.setType(GatewayType.LOCAL.getLongValue());
task.setParallelism(1);
task.setSavePointStrategy(0); // 0 is disabled
task.setEnvId(-1); // -1 is disabled
task.setAlertGroupId(-1); // -1 is disabled
}
}
return task;
}

@Transactional(rollbackFor = Exception.class)
@Override
public Catalogue saveOrUpdateCatalogueAndTask(CatalogueTaskDTO catalogueTaskDTO) {
Task task = null;
Catalogue catalogue = null;
if (catalogueTaskDTO.getId() == null) {
task = new Task();
task = initTaskValue(catalogueTaskDTO);
catalogue = new Catalogue();
} else {
catalogue = baseMapper.selectById(catalogueTaskDTO.getId());
Expand Down Expand Up @@ -439,10 +479,11 @@ public Result<Void> deleteCatalogueById(Integer catalogueId) {

/**
* <p>
* 1. save catalogue
* 2. save task
* 3. save statement
* 4. rename
* 1. save catalogue
* 2. save task
* 3. save statement
* 4. rename
*
* @param catalogue
* @return
*/
Expand Down
13 changes: 13 additions & 0 deletions dinky-common/src/main/java/org/dinky/config/Dialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ public static boolean isUDF(String value) {
}
}

public static boolean isFlinkSql(String value, boolean includeFlinksqlEnv) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
case FLINK_SQL:
case FLINK_JAR:
return true;
case FLINK_SQL_ENV:
return includeFlinksqlEnv;
default:
return false;
}
}

public static boolean isJarDialect(String value) {
Dialect dialect = Dialect.get(value);
switch (dialect) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public enum Status {
TASK_SQL_EXPLAN_FAILED(12007, "task.sql.explain.failed"),
TASK_UPDATE_FAILED(12008, "task.update.failed"),
TASK_IS_ONLINE(12009, "task.is.online"),
TASK_IS_EXIST(12010, "task.is.existed"),

/**
* alert instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ failed=Failed
added.failed=Added Failed
task.not.exist=Task Not Exist
task.is.online= Task is online, modification is prohibited
task.is.existed=Task is existed
cluster.instance.deploy=Deploy Success
clear.failed=Clear Failed
rename.success=Rename Successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ failed=获取失败
added.failed=新增失败
task.not.exist=任务不存在
task.is.online=任务已上线,禁止修改
task.is.existed=作业已存在
cluster.instance.deploy=部署完成
clear.failed=清除失败
rename.success=重命名成功
Expand Down Expand Up @@ -180,7 +181,7 @@ sys.env.settings.dinkyAddr.note=该地址必须与Dinky Application后台url中
sys.env.settings.maxRetainDays=作业历史最大保留天数
sys.env.settings.maxRetainDays.note=提交的作业历史与自动注册的集群记录最大保留天数,过期会被自动删除
sys.env.settings.maxRetainCount=作业历史最大保留数量
sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了做大保留天数
sys.env.settings.maxRetainCount.note=提交的作业历史与自动注册的集群记录最大保留数量,如果不足该数量,则不会被删除,即使已经过了最大保留天数

sys.dolphinscheduler.settings.enable=是否启用 DolphinScheduler
sys.dolphinscheduler.settings.enable.note=是否启用 DolphinScheduler ,启用后才能使用 DolphinScheduler 的相关功能,请先填写下列配置项,完成后再开启此项配置, 另:请确保 DolphinScheduler 的相关配置正确
Expand Down
48 changes: 48 additions & 0 deletions dinky-web/src/components/Flink/OptionsSelect/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { l } from '@/utils/intl';
import { ProFormSelect } from '@ant-design/pro-components';
import { ProFormSelectProps } from '@ant-design/pro-form/es/components/Select';
import { Divider, Typography } from 'antd';

const { Link } = Typography;

export type FlinkOptionsProps = ProFormSelectProps & {};

const FlinkOptionsSelect = (props: FlinkOptionsProps) => {
const renderTemplateDropDown = (item: any) => {
return (
<>
<Link href={'#/registration/document'}>+ {l('rc.cc.addConfig')}</Link>
<Divider style={{ margin: '8px 0' }} />
{item}
</>
);
};

return (
<ProFormSelect
{...props}
fieldProps={{ dropdownRender: (item) => renderTemplateDropDown(item) }}
/>
);
};

export default FlinkOptionsSelect;
54 changes: 25 additions & 29 deletions dinky-web/src/pages/DataStudio/BottomContainer/Result/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { transformTableDataToCsv } from '@/utils/function';
import { l } from '@/utils/intl';
import { SearchOutlined } from '@ant-design/icons';
import { Highlight } from '@ant-design/pro-layout/es/components/Help/Search';
import {Button, Empty, Input, InputRef, Space, Table, Tabs} from 'antd';
import { Button, Empty, Input, InputRef, Space, Table, Tabs } from 'antd';
import { ColumnsType, ColumnType } from 'antd/es/table';
import { FilterConfirmProps } from 'antd/es/table/interface';
import { DataIndex } from 'rc-table/es/interface';
Expand All @@ -44,7 +44,7 @@ type Data = {
columns?: string[];
rowData?: object[];
};
type DataList=Data[];
type DataList = Data[];
const Result = (props: any) => {
const {
tabs: { panes, activeKey }
Expand Down Expand Up @@ -132,11 +132,9 @@ const Result = (props: any) => {
const consoleData = currentTabs.console;
if (consoleData.result && !isRefresh) {
setData(consoleData.result);
}
else if(consoleData.results && !isRefresh){
setDataList(consoleData.results)
}
else {
} else if (consoleData.results && !isRefresh) {
setDataList(consoleData.results);
} else {
if (current.dialect && current.dialect.toLowerCase() == DIALECT.FLINK_SQL) {
// flink sql
// to do: get job data by history id list, not flink jid
Expand Down Expand Up @@ -173,7 +171,7 @@ const Result = (props: any) => {
setData({});
setDataList([]);
loadData();
}, [currentTabs?.console?.result,currentTabs?.console?.results]);
}, [currentTabs?.console?.result, currentTabs?.console?.results]);

const getColumns = (columns: string[]) => {
return columns?.map((item) => {
Expand Down Expand Up @@ -243,28 +241,26 @@ const Result = (props: any) => {
})}
loading={loading}
/>
) : dataList.length > 0 ? (
<Tabs defaultActiveKey='0'>
{dataList.map((data, index) => {
return (
<Tabs.TabPane key={index} tab={`Table ${index + 1}`}>
<Table
columns={getColumns(data.columns)}
size='small'
dataSource={data.rowData?.map((item: any, index: number) => {
return { ...item, key: index };
})}
loading={loading}
/>
</Tabs.TabPane>
);
})}
</Tabs>
) : (
dataList.length>0?(

<Tabs defaultActiveKey="0">
{dataList.map((data, index) => {
return (
<Tabs.TabPane key={index} tab={`Table ${index + 1}`}>
<Table
columns={getColumns(data.columns)}
size='small'
dataSource={data.rowData?.map((item: any, index: number) => {
return { ...item, key: index };
})}
loading={loading}
/>
</Tabs.TabPane>
);
})}
</Tabs>):
<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
)}

<Empty image={Empty.PRESENTED_IMAGE_SIMPLE} />
)}
</div>
);
};
Expand Down
3 changes: 1 addition & 2 deletions dinky-web/src/pages/DataStudio/HeaderContainer/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ const HeaderContainer = (props: connect) => {
if (isSql(currentData.dialect)) {
currentData.status = JOB_STATUS.FINISHED;
if (currentTab) currentTab.console.results = res.data.results;
}
else {
} else {
if (currentTab) currentTab.console.result = res.data.result;
}
// Common sql task is synchronized, so it needs to automatically update the status to finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import { FormContextValue } from '@/components/Context/FormContext';
import { JOB_TYPE } from '@/pages/DataStudio/LeftContainer/Project/constants';
import { isUDF } from '@/pages/DataStudio/LeftContainer/Project/function';
import { isFlinkJob, isUDF } from '@/pages/DataStudio/LeftContainer/Project/function';
import { queryDataByParams } from '@/services/BusinessCrud';
import { RUN_MODE } from '@/services/constants';
import { API_CONSTANTS } from '@/services/endpoints';
import { Catalogue } from '@/types/Studio/data';
import { l } from '@/utils/intl';
Expand Down Expand Up @@ -109,7 +110,21 @@ const JobModal: React.FC<JobModalProps> = (props) => {
const { selectKeys } = formData.configJson.udfConfig;
formData.configJson.udfConfig.templateId = selectKeys[selectKeys.length - 1];
}
onSubmit({ ...values, ...formData } as Catalogue);
// if this type is flink job, init task value and submit
if (isFlinkJob(formData.type ?? '')) {
const initTaskValue = {
savePointStrategy: -1, // -1 is disabled
parallelism: 1, // default parallelism
envId: -1, // -1 is disabled
step: 1, // default step is develop
alertGroupId: -1, // -1 is disabled
type: RUN_MODE.LOCAL, // default run mode is local
dialect: formData.type
};
onSubmit({ ...values, ...formData, task: initTaskValue } as Catalogue);
} else {
onSubmit({ ...values, ...formData } as Catalogue);
}
};

/**
Expand Down
Loading

0 comments on commit 14773d7

Please sign in to comment.