Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Jan 22, 2024
2 parents 1c0a35c + 2ae3943 commit ceb060e
Show file tree
Hide file tree
Showing 21 changed files with 311 additions and 48 deletions.
20 changes: 20 additions & 0 deletions dinky-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-jwt</artifactId>
<exclusions>
<exclusion>
<groupId>cn.hutool</groupId>
<artifactId>hutool-jwt</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
Expand Down Expand Up @@ -170,6 +176,20 @@
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-core</artifactId>
<exclusions>
<exclusion>
<groupId>cn.hutool</groupId>
<artifactId>hutool-crypto</artifactId>
</exclusion>
<exclusion>
<groupId>cn.hutool</groupId>
<artifactId>hutool-http</artifactId>
</exclusion>
<exclusion>
<groupId>cn.hutool</groupId>
<artifactId>hutool-json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
Expand Down
15 changes: 15 additions & 0 deletions dinky-admin/src/test/java/org/dinky/utils/SqlUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.dinky.utils;

import static org.dinky.data.enums.Status.SYS_FLINK_SETTINGS_SQLSEPARATOR;

import org.dinky.data.model.SystemConfiguration;

import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -45,4 +49,15 @@ public void testRemoveNote() {
Assertions.assertThat(removedNoteSql).isNotNull();
Assertions.assertThat(removedNoteSql).isNotEqualTo(testSql);
}

@Test
public void getStatements() {
String sql = "set 'state.savepoints.dir' = 'hdfs://namenode:9000/tmp/checkpoint'; --ddd\n"
+ "set 'state.checkpoints.dir' = 'hdfs://namenode:9000/tmp/checkpoint'; --dd \n"
+ "create table abc ;\n";
SystemConfiguration.getInstances()
.setConfiguration(SYS_FLINK_SETTINGS_SQLSEPARATOR.getKey(), ";\\s*(?:\\n|--.*)");
String[] statements = SqlUtil.getStatements(sql);
Assertions.assertThat(statements.length).isEqualTo(3);
}
}
135 changes: 135 additions & 0 deletions dinky-app/dinky-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,143 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>

<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-base</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-clickhouse</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-doris</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-hive</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-mysql</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-oracle</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-phoenix</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-postgresql</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-presto</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-sqlserver</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-starrocks</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>flink-single-version</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public static void monitorFlinkTask(Executor executor, int taskId) {
public static void monitorFlinkTask(JobClient jobClient, int taskId) {
boolean isRun = true;
String jobId = jobClient.getJobID().toHexString();

try {
while (isRun) {
String jobStatus = jobClient.getJobStatus().get().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private FlinkStatementUtil() {}

public static String getCDCInsertSql(Table table, String targetName, String sourceName) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(targetName);
sb.append("`").append(targetName).append("`");
sb.append(" SELECT\n");
for (int i = 0; i < table.getColumns().size(); i++) {
sb.append(" ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.dinky.sql;

import org.dinky.data.model.SystemConfiguration;

/**
* FlinkQuery
*
Expand All @@ -27,7 +29,7 @@
public class FlinkQuery {

public static String separator() {
return ";\n";
return SystemConfiguration.getInstances().getSqlSeparator();
}

public static String defaultCatalog() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static Configuration.OptionBuilder key(Status status) {
.note(Status.SYS_FLINK_SETTINGS_USERESTAPI_NOTE);
private final Configuration<String> sqlSeparator = key(Status.SYS_FLINK_SETTINGS_SQLSEPARATOR)
.stringType()
.defaultValue(";\\n")
.defaultValue(";\\s*(?:\\n|--.*)")
.note(Status.SYS_FLINK_SETTINGS_SQLSEPARATOR_NOTE);
private final Configuration<Integer> jobIdWait = key(Status.SYS_FLINK_SETTINGS_JOBIDWAIT)
.intType()
Expand Down
2 changes: 1 addition & 1 deletion dinky-common/src/main/java/org/dinky/utils/SqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static String[] getStatements(String sql, String sqlSeparator) {
return new String[0];
}

String[] splits = sql.replace(";\r\n", ";\n").split(sqlSeparator);
String[] splits = sql.replace("\r\n", "\n").split(sqlSeparator);
String lastStatement = splits[splits.length - 1].trim();
if (lastStatement.endsWith(SEMICOLON)) {
splits[splits.length - 1] = lastStatement.substring(0, lastStatement.length() - 1);
Expand Down
16 changes: 10 additions & 6 deletions dinky-core/src/main/java/org/dinky/constant/FlinkSQLConstant.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@

package org.dinky.constant;

import org.dinky.sql.FlinkQuery;

/**
* FlinkSQLConstant
*
* @since 2021/5/25 15:51
*/
public interface FlinkSQLConstant {
public class FlinkSQLConstant {
private FlinkSQLConstant() {}

/** 分隔符 */
String SEPARATOR = ";\n";
public static final String SEPARATOR = FlinkQuery.separator();
/** DDL 类型 */
String DDL = "DDL";
public static final String DDL = "DDL";
/** DML 类型 */
String DML = "DML";
public static final String DML = "DML";
/** DATASTREAM 类型 */
String DATASTREAM = "DATASTREAM";
public static final String DATASTREAM = "DATASTREAM";

/** The define identifier of FlinkSQL Variable */
String VARIABLES = ":=";
public static final String VARIABLES = ":=";
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public TableResult execute(Executor executor) {
// 分库分表所有表结构都是一样的,取出列表中第一个表名即可
String schemaTableName = table.getSchemaTableNameList().get(0);
// 真实的表名
String realSchemaName = schemaTableName.split("\\.")[0];
String tableName = schemaTableName.split("\\.")[1];
table.setColumns(driver.listColumnsSortByPK(schemaName, tableName));
table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
table.setColumns(driver.listColumnsSortByPK(realSchemaName, tableName));
schemaList.add(schema);

if (null != sinkDriver) {
Expand Down
47 changes: 29 additions & 18 deletions dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.dinky.gateway.result.TestResult;
import org.dinky.gateway.result.YarnResult;
import org.dinky.utils.FlinkJsonUtil;
import org.dinky.utils.ThreadUtil;

import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
Expand Down Expand Up @@ -340,32 +339,33 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
&& counts-- > 0) {
Thread.sleep(1000);
}
// 睡眠2秒,防止application快速识别抛出的错误
ThreadUtil.sleep(2000);
ApplicationReport applicationReport = yarnClient.getApplicationReport(clusterClient.getClusterId());
if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
String logUrl = yarnClient
.getContainers(applicationReport.getCurrentApplicationAttemptId())
.get(0)
.getLogUrl();
String log = ReUtil.getGroup1(HTML_TAG_REGEX, HttpUtil.get(logUrl + "/jobmanager.log?start=-10000"));
logger.error("\n\nHistory log url is: {}\n\n ", logUrl);
throw new RuntimeException(
"Yarn application state is not running, please check yarn cluster status. Log content:\n" + log);
}
webUrl = applicationReport.getOriginalTrackingUrl();
webUrl = clusterClient.getWebInterfaceURL();
final List<JobDetails> jobDetailsList = new ArrayList<>();
while (jobDetailsList.isEmpty() && counts-- > 0) {
ApplicationReport applicationReport = yarnClient.getApplicationReport(clusterClient.getClusterId());
if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
String log = getYarnContainerLog(applicationReport);
throw new RuntimeException(
"Yarn application state is not running, please check yarn cluster status. Log content:\n"
+ log);
}
// 睡眠1秒,防止flink因为依赖或其他问题导致任务秒挂
Thread.sleep(1000);

String url = yarnClient
.getApplicationReport(clusterClient.getClusterId())
.getTrackingUrl()
+ JobsOverviewHeaders.URL.substring(1);

String json = HttpUtil.get(url);
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
try {
MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance());
jobDetailsList.addAll(jobsDetails.getJobs());
} catch (Exception e) {
Thread.sleep(1000);
String log = getYarnContainerLog(applicationReport);
logger.error("Yarn application state is not running, please check yarn cluster status. Log content:\n"
+ log);
}
if (!jobDetailsList.isEmpty()) {
break;
}
Expand All @@ -380,4 +380,15 @@ protected String getWebUrl(ClusterClient<ApplicationId> clusterClient, YarnResul
}
return webUrl;
}

protected String getYarnContainerLog(ApplicationReport applicationReport) throws YarnException, IOException {
String logUrl = yarnClient
.getContainers(applicationReport.getCurrentApplicationAttemptId())
.get(0)
.getLogUrl();
String content = HttpUtil.get(logUrl + "/jobmanager.log?start=-10000");
String log = ReUtil.getGroup1(HTML_TAG_REGEX, content);
logger.info("\n\nHistory log url is: {}\n\n ", logUrl);
return log;
}
}
10 changes: 8 additions & 2 deletions dinky-web/config/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ export default [
]
},
{
path: '/',
redirect: '/datastudio'
path: '/',
redirect: '/redirect',
},
{
path: '/redirect',
component: './Other/Redirect',
layout: false,
hideInMenu: true,
},
// {
// path: '/home',
Expand Down
Loading

0 comments on commit ceb060e

Please sign in to comment.