Skip to content

Commit

Permalink
Merge branch 'dev' into op_project_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Oct 10, 2023
2 parents 158d927 + 16835aa commit 27d2d26
Show file tree
Hide file tree
Showing 43 changed files with 890 additions and 71 deletions.
6 changes: 3 additions & 3 deletions dinky-admin/src/main/java/org/dinky/data/dto/JobDataDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.dinky.data.dto;

import org.dinky.data.flink.config.FlinkJobConfigInfo;
import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.data.flink.job.FlinkJobDetailInfo;
import org.dinky.data.model.JobHistory;
import org.dinky.data.model.flink.config.FlinkJobConfigInfo;
import org.dinky.data.model.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.data.model.flink.job.FlinkJobDetailInfo;
import org.dinky.utils.JsonUtils;

import java.time.LocalDateTime;
Expand Down
4 changes: 2 additions & 2 deletions dinky-admin/src/main/java/org/dinky/init/EnvInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public void run(ApplicationArguments args) throws Exception {
+ "Application 'Dinky' is running! Access URLs:\n\t"
+ "Local: \t\thttp://localhost:{}\n\t"
+ "External: \thttp://{}:{}\n\t"
+ "Doc: \thttp://{}:{}/doc.html\n"
+ "Druid Monitor: \thttp://{}:{}/druid/index.html\n"
+ "Doc: \thttp://{}:{}/doc.html\n\t"
+ "Druid Monitor: \thttp://{}:{}/druid/index.html\n\t"
+ "Actuator: \thttp://{}:{}/actuator\n"
+ "----------------------------------------------------------",
port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.dinky.data.dto.ClusterConfigurationDTO;
import org.dinky.data.dto.JobDataDto;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.flink.backpressure.FlinkJobNodeBackPressure;
import org.dinky.data.flink.config.FlinkJobConfigInfo;
import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.data.flink.job.FlinkJobDetailInfo;
import org.dinky.data.flink.watermark.FlinkJobNodeWaterMark;
import org.dinky.data.model.JobInfoDetail;
import org.dinky.data.model.JobInstance;
import org.dinky.data.model.flink.backpressure.FlinkJobNodeBackPressure;
import org.dinky.data.model.flink.config.FlinkJobConfigInfo;
import org.dinky.data.model.flink.exceptions.FlinkJobExceptionsDetail;
import org.dinky.data.model.flink.job.FlinkJobDetailInfo;
import org.dinky.data.model.flink.watermark.FlinkJobNodeWaterMark;
import org.dinky.gateway.Gateway;
import org.dinky.gateway.config.GatewayConfig;
import org.dinky.gateway.enums.GatewayType;
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/resources/db/db-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ CREATE TABLE `dinky_task` (
`alert_group_id` bigint(20) null DEFAULT null COMMENT 'alert group id',
`config_json` text null COMMENT 'configuration json',
`note` varchar(255) null DEFAULT null COMMENT 'Job Note',
`step` int(11) null DEFAULT null COMMENT 'Job lifecycle',
`step` int(11) null DEFAULT 1 COMMENT 'Job lifecycle',
`job_instance_id` bigint(20) null DEFAULT null COMMENT 'job instance id',
`enabled` tinyint(1) NOT null DEFAULT 1 COMMENT 'is enable',
`create_time` datetime(0) null DEFAULT null COMMENT 'create time',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.dinky.alert.Rules;

import org.dinky.data.flink.exceptions.FlinkJobExceptionsDetail;

import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -43,20 +44,20 @@ public ExceptionRule() {
* @param exceptions The exceptions object containing relevant data.
* @return True if the operation should be executed, false otherwise.
*/
public Boolean isException(Integer key, ObjectNode exceptions) {
public Boolean isException(Integer key, FlinkJobExceptionsDetail exceptions) {

// If the exception is the same as the previous one, it will not be reported again
if (exceptions.get("timestamp") == null) {
if (exceptions.getTimestamp() == null) {
return false;
}
long timestamp = exceptions.get("timestamp").asLong(0);
long timestamp = exceptions.getTimestamp();
Long hisTimeIfPresent = hisTime.getIfPresent(key);
if (hisTimeIfPresent != null && hisTimeIfPresent == timestamp) {
return false;
}
hisTime.put(key, timestamp);
if (exceptions.has("root-exception")) {
return !exceptions.get("root-exception").isNull();
if (exceptions.getRootException() != null) {
return !exceptions.getRootException().isEmpty();
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.utils.ExecutorUtils;
Expand All @@ -83,6 +85,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReflectUtil;

/**
Expand Down Expand Up @@ -382,6 +385,16 @@ public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<S
createTemporaryView(s, fromChangelogStream(dataStream));
}

@Override
public void executeCTAS(Operation operation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
executeInternal(createTableOperation);
getPlanner().translate(CollUtil.newArrayList(createTableASOperation.getInsertOperation()));
}
}

@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.typeutils.FieldInfoUtils;
Expand All @@ -82,6 +84,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReflectUtil;

/**
Expand Down Expand Up @@ -372,6 +375,16 @@ public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<S
createTemporaryView(s, fromChangelogStream(dataStream));
}

@Override
public void executeCTAS(Operation operation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
executeInternal(createTableOperation);
getPlanner().translate(CollUtil.newArrayList(createTableASOperation.getInsertOperation()));
}
}

@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.types.Row;

Expand All @@ -73,6 +76,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;

/**
* CustomTableEnvironmentImpl
*
Expand Down Expand Up @@ -329,6 +334,17 @@ public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<S
createTemporaryView(s, fromChangelogStream(dataStream));
}

@Override
public void executeCTAS(Operation operation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
executeInternal(createTableOperation);
SinkModifyOperation sinkModifyOperation = createTableASOperation.toSinkModifyOperation(getCatalogManager());
getPlanner().translate(CollUtil.newArrayList(sinkModifyOperation));
}
}

@Override
public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields) {
createTemporaryView(path, fromDataStream(dataStream, fields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.types.Row;

Expand All @@ -64,6 +67,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;

/**
* CustomTableEnvironmentImpl
*
Expand Down Expand Up @@ -149,7 +154,14 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {
throw new TableException("Only single statement is supported.");
}
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableAsOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableAsOperation.getCreateTableOperation();
executeInternal(createTableOperation);
SinkModifyOperation sinkModifyOperation =
createTableAsOperation.toSinkModifyOperation(getCatalogManager());
getPlanner().translate(CollUtil.newArrayList(sinkModifyOperation));
} else if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
Expand Down Expand Up @@ -257,4 +269,15 @@ public List<LineageRel> getLineage(String statement) {
public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<String> columnNameList) {
createTemporaryView(s, fromChangelogStream(dataStream));
}

@Override
public void executeCTAS(Operation operation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
executeInternal(createTableOperation);
SinkModifyOperation sinkModifyOperation = createTableASOperation.toSinkModifyOperation(getCatalogManager());
getPlanner().translate(CollUtil.newArrayList(sinkModifyOperation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.types.Row;

Expand All @@ -65,6 +68,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;

/**
* CustomTableEnvironmentImpl
*
Expand Down Expand Up @@ -150,7 +155,9 @@ public StreamGraph getStreamGraphFromInserts(List<String> statements) {
throw new TableException("Only single statement is supported.");
}
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
if (operation instanceof CreateTableASOperation) {
executeCTAS(operation);
} else if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
Expand Down Expand Up @@ -258,4 +265,15 @@ public List<LineageRel> getLineage(String statement) {
public <T> void createTemporaryView(String s, DataStream<Row> dataStream, List<String> columnNameList) {
createTemporaryView(s, fromChangelogStream(dataStream));
}

@Override
public void executeCTAS(Operation operation) {
if (operation instanceof CreateTableASOperation) {
CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation;
CreateTableOperation createTableOperation = createTableASOperation.getCreateTableOperation();
executeInternal(createTableOperation);
SinkModifyOperation sinkModifyOperation = createTableASOperation.toSinkModifyOperation(getCatalogManager());
getPlanner().translate(CollUtil.newArrayList(sinkModifyOperation));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.types.Row;

import java.util.Collections;
Expand Down Expand Up @@ -71,4 +72,6 @@ default List<LineageRel> getLineage(String statement) {
}

<T> void createTemporaryView(String s, DataStream<Row> dataStream, List<String> columnNameList);

void executeCTAS(Operation operation);
}
5 changes: 4 additions & 1 deletion dinky-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.backpressure;
package org.dinky.data.flink.backpressure;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.config;
package org.dinky.data.flink.config;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.config;
package org.dinky.data.flink.config;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.exceptions;
package org.dinky.data.flink.exceptions;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.job;
package org.dinky.data.flink.job;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.data.model.flink.job;
package org.dinky.data.flink.job;

import java.io.Serializable;
import java.util.List;
Expand Down
Loading

0 comments on commit 27d2d26

Please sign in to comment.