Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Dec 27, 2023
2 parents 6ba0014 + cdec3e0 commit 75a870d
Show file tree
Hide file tree
Showing 51 changed files with 411 additions and 3,918 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,8 @@ Thanks to [JetBrains](https://www.jetbrains.com/?from=dlink) for providing a fre

Please refer to the [LICENSE](https://github.com/DataLinkDC/dinky/blob/dev/LICENSE) document.

# Contributors

<a href="https://github.com/DataLinkDC/dinky/graphs/contributors">
<img src="https://contrib.rocks/image?repo=DataLinkDC/dinky" />
</a>
8 changes: 7 additions & 1 deletion README_zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,10 @@ Dinky 是一个 `开箱即用` 、`易扩展` ,以 `Apache Flink` 为基础,

## 版权

请参考 [LICENSE](https://github.com/DataLinkDC/dinky/blob/dev/LICENSE) 文件。
请参考 [LICENSE](https://github.com/DataLinkDC/dinky/blob/dev/LICENSE) 文件。

# 贡献者

<a href="https://github.com/DataLinkDC/dinky/graphs/contributors">
<img src="https://contrib.rocks/image?repo=DataLinkDC/dinky" />
</a>
2 changes: 1 addition & 1 deletion dinky-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
<dependency>
<groupId>io.prometheus.jmx</groupId>
<artifactId>jmx_prometheus_javaagent</artifactId>
<version>0.16.1</version>
<version>0.20.0</version>
<exclusions>
<exclusion>
<groupId>org.yaml</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,21 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
boolean saved = saveOrUpdate(task.buildTask());
if (saved && Asserts.isNotNull(task.getJobInstanceId())) {
JobInstance jobInstance = jobInstanceService.getById(task.getJobInstanceId());
jobInstance.setStep(lifeCycle.getValue());
jobInstanceService.updateById(jobInstance);
log.info("jobInstance [{}] step change to {}", jobInstance.getJid(), lifeCycle.getValue());
if (Asserts.isNotNull(jobInstance)) {
jobInstance.setStep(lifeCycle.getValue());
boolean updatedJobInstance = jobInstanceService.updateById(jobInstance);
if (updatedJobInstance) jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true);
log.warn(
"JobInstance [{}] step change to [{}] ,Trigger Force Refresh",
jobInstance.getName(),
lifeCycle.getValue());
}
}
return saved;
}

@Override
@Transactional(rollbackFor = Exception.class)
public boolean saveOrUpdateTask(Task task) {
Task byId = getById(task.getId());
if (byId != null && JobLifeCycle.PUBLISH.equalsValue(byId.getStep())) {
Expand Down
6 changes: 6 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.parser.SqlType;
import org.dinky.trans.Operations;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.LogUtil;
Expand All @@ -76,10 +77,12 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import java.io.File;
import java.lang.ref.WeakReference;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import com.fasterxml.jackson.databind.node.ObjectNode;

Expand Down Expand Up @@ -341,6 +344,9 @@ public IResult executeDDL(String statement) {
SqlType operationType = Operations.getOperationType(newStatement);
if (SqlType.INSERT == operationType || SqlType.SELECT == operationType) {
continue;
} else if (operationType.equals(SqlType.ADD) || operationType.equals(SqlType.ADD_JAR)) {
Set<File> allFilePath = AddJarSqlParseStrategy.getAllFilePath(item);
getExecutor().getDinkyClassLoader().addURLs(allFilePath);
}
LocalDateTime startTime = LocalDateTime.now();
TableResult tableResult = executor.executeSql(newStatement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ export const buildStepValue = (step: number) => {
return {
title: l('global.table.lifecycle.dev'),
status: 'processing',
color: '#1890ff'
color: 'cyan'
};
case 2:
return {
title: l('global.table.lifecycle.online'),
status: 'success',
color: '#52c41a'
color: 'purple'
};
default:
return {
title: l('global.table.lifecycle.dev'),
status: 'default',
color: '#1890ff'
color: 'cyan'
};
}
};
Expand Down Expand Up @@ -144,7 +144,8 @@ export const buildProjectTree = (
<>
<Badge
title={stepValue.title}
status={(stepValue.status as PresetStatusColorType) ?? 'default'}
color={stepValue.color}
// status={(stepValue.status as PresetStatusColorType) ?? 'default'}
/>
</>
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

import StatusTag from '@/components/JobTags/StatusTag';
import { JobProps } from '@/pages/DevOps/JobDetail/data';
import { parseByteStr, parseMilliSecondStr, parseNumStr } from '@/utils/function';
import {
formatTimestampToYYYYMMDDHHMMSS,
parseByteStr,
parseMilliSecondStr,
parseNumStr
} from '@/utils/function';
import { l } from '@/utils/intl';
import { ProCard, ProColumns, ProTable } from '@ant-design/pro-components';
import { Typography } from 'antd';
Expand All @@ -32,8 +37,10 @@ export type VerticesTableListItem = {
metrics: any;
parallelism: number;
startTime?: number;
'start-time'?: number;
duration?: number;
endTime?: number;
'end-time'?: number;
tasks: any;
};

Expand Down Expand Up @@ -101,15 +108,20 @@ const FlinkTable = (props: JobProps): JSX.Element => {
},
{
title: l('global.table.startTime'),
dataIndex: 'startTime',
valueType: 'dateTime'
render: (dom, entity) => {
return entity.startTime === -1 || entity['start-time'] === -1
? '-'
: formatTimestampToYYYYMMDDHHMMSS(entity['start-time'] as number) ||
formatTimestampToYYYYMMDDHHMMSS(entity.startTime as number);
}
},
{
title: l('global.table.endTime'),
dataIndex: 'endTime',
valueType: 'dateTime',
render: (dom, entity) => {
return entity.endTime === -1 ? '-' : entity.endTime;
return entity.endTime === -1 || entity['end-time'] === -1
? '-'
: formatTimestampToYYYYMMDDHHMMSS(entity['end-time'] as number) ||
formatTimestampToYYYYMMDDHHMMSS(entity.endTime as number);
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const JobDesc = (props: JobProps) => {

<Descriptions.Item label={l('devops.jobinfo.config.RestartStrategy')}>
<Tag color='blue' title={'Restart Strategy'}>
{jobDetail?.jobDataDto?.config?.executionConfig?.restartStrategy}
{jobDetail?.jobDataDto?.config['execution-config']['restart-strategy']}
</Tag>
</Descriptions.Item>

Expand All @@ -109,13 +109,11 @@ const JobDesc = (props: JobProps) => {
</Descriptions.Item>

<Descriptions.Item label={l('devops.jobinfo.config.useSqlFragment')}>
{jobDetail?.history?.configJson?.useSqlFragment
? l('button.enable')
: l('button.disable')}
{jobDetail?.history?.configJson?.fragment ? l('button.enable') : l('button.disable')}
</Descriptions.Item>

<Descriptions.Item label={l('devops.jobinfo.config.execmode')}>
{jobDetail?.history?.configJson?.useBatchModel
{jobDetail?.history?.configJson?.batchModel
? l('global.table.execmode.batch')
: l('global.table.execmode.streaming')}
</Descriptions.Item>
Expand All @@ -125,7 +123,7 @@ const JobDesc = (props: JobProps) => {
</Descriptions.Item>

<Descriptions.Item label={l('devops.jobinfo.config.JobParallelism')}>
{jobDetail?.jobDataDto?.config?.executionConfig?.jobParallelism}
{jobDetail?.jobDataDto?.config['execution-config']['job-parallelism']}
</Descriptions.Item>

<Descriptions.Item label={l('global.table.useTime')}>
Expand All @@ -137,7 +135,7 @@ const JobDesc = (props: JobProps) => {
</Descriptions.Item>

<Descriptions.Item label={l('devops.jobinfo.config.savePointPath')} span={2}>
{jobDetail?.history?.configJson.savePointPath}
{jobDetail?.history?.configJson?.configJson['state.savepoints.dir'] ?? '-'}
</Descriptions.Item>
</Descriptions>
</ProCard>
Expand Down
2 changes: 1 addition & 1 deletion dinky-web/src/pages/DevOps/JobDetail/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const JobDetail = (props: any) => {
pollingInterval: 3000
});

const jobInfoDetail: Jobs.JobInfoDetail = data;
const jobInfoDetail = data as Jobs.JobInfoDetail;

const [tabKey, setTabKey] = useState<string>(OperatorEnum.JOB_BASE_INFO);

Expand Down
71 changes: 66 additions & 5 deletions dinky-web/src/types/DevOps/data.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { BaseBeanColumns } from '@/types/Public/data';
import { Alert } from '@/types/RegCenter/data.d';
import { Alert, Cluster } from '@/types/RegCenter/data.d';

/**
* about flink job
Expand Down Expand Up @@ -48,6 +48,62 @@ declare namespace Jobs {
useBatchModel: string;
};

export type ExecutorSetting = {
type: string;
host: string;
port: number;
useBatchModel: boolean;
checkpoint: string;
parallelism: number;
useSqlFragment: boolean;
useStatementSet: boolean;
savePointPath: string;
jobName: string;
config: Map<string, string>;
variables: Map<string, string>;
jarFiles: [];
jobManagerAddress: string;
plan: boolean;
remote: boolean;
validParallelism: boolean;
validJobName: boolean;
validHost: boolean;
validPort: boolean;
validConfig: boolean;
validVariables: boolean;
validJarFiles: boolean;
};

export type JobConfigJsonInfo = {
type: string;
checkpoint: string;
savePointStrategy: string;
savePointPath: string;
parallelism: number;
clusterId: number;
clusterConfigurationId: number;
step: number;
configJson: {
'state.savepoints.dir': string;
};
useResult: boolean;
useChangeLog: boolean;
useAutoCancel: boolean;
useRemote: boolean;
address: string;
taskId: number;
jarFiles: [];
pyFiles: [];
jobName: string;
fragment: boolean;
statementSet: boolean;
batchModel: boolean;
maxRowNum: number;
gatewayConfig: any;
variables: Map<string, string>;
executorSetting: ExecutorSetting;
};

export type History = {
id: number;
tenantId: number;
Expand All @@ -61,7 +117,7 @@ declare namespace Jobs {
type: string;
error: string;
result: string;
configJson: JobConfig;
configJson: JobConfigJsonInfo;
startTime: string;
endTime: string;
taskId: number;
Expand Down Expand Up @@ -166,13 +222,18 @@ declare namespace Jobs {
jid: string;
name: string;
executionConfig: ExecutionConfig;
'execution-config': ExecutionConfig;
};
export type ExecutionConfig = {
executionMode: string;
'execution-mode': string;
restartStrategy: string;
'restart-strategy': string;
jobParallelism: number;
objectReuse: boolean;
'job-parallelism': number;
'object-reuse': boolean;
userConfig: any;
'user-config': any;
};

export type JobDataDtoItem = {
Expand All @@ -191,8 +252,8 @@ declare namespace Jobs {
export type JobInfoDetail = {
id: number;
instance: JobInstance;
clusterInstance: any;
clusterConfiguration: any;
clusterInstance: Cluster.Instance;
clusterConfiguration: Cluster.Config;
history: History;
jobDataDto: JobDataDtoItem;
jobManagerConfiguration: any;
Expand Down
8 changes: 7 additions & 1 deletion dinky-web/src/utils/function.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import { l } from '@/utils/intl';
import { Monaco } from '@monaco-editor/react';
import dayjs from 'dayjs';
import cookies from 'js-cookie';
import { trim } from 'lodash';
import { editor, KeyCode, KeyMod } from 'monaco-editor';
import path from 'path';
import { format } from 'sql-formatter';
Expand Down Expand Up @@ -619,6 +618,13 @@ export const formatDateToYYYYMMDDHHMMSS = (date: Date) => {
return dayjs(date).format(DATETIME_FORMAT);
};

export const formatTimestampToYYYYMMDDHHMMSS = (timestamp: number) => {
if (timestamp == null) {
return '-';
}
return dayjs(timestamp).format(DATETIME_FORMAT);
};

export const parseDateStringToDate = (dateString: Date) => {
return dayjs(dateString).toDate();
};
Loading

0 comments on commit 75a870d

Please sign in to comment.