Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/DataLinkDC/dinky into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Nov 28, 2024
2 parents f619a41 + 5c775d7 commit d315dc3
Show file tree
Hide file tree
Showing 36 changed files with 569 additions and 5,119 deletions.
16 changes: 16 additions & 0 deletions dinky-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,26 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-dingtalk</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-feishu</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-wechat</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-sms</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-email</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-http</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public boolean clearProcessLog(String processName) {
if (FileUtil.exist(filePath)) {
return FileUtil.del(filePath);
}
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,6 @@ public Result<String> flinkJarFormConvertSql(@RequestBody FlinkJarSqlConvertVO d
.orElse(false)
.toString());
String executeJarSql = ENGINE.getTemplate("executeJar.sql").render(objectMap);
return Result.succeed(dto.getInitSqlStatement() + "\n" + executeJarSql, "");
return Result.succeed(Opt.ofNullable(dto.getInitSqlStatement()).orElse("") + "\n" + executeJarSql, "");
}
}
6 changes: 6 additions & 0 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.dinky.data.model.Task;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.model.rbac.Tenant;
import org.dinky.function.FlinkUDFDiscover;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.ClearJobHistoryTask;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void run(ApplicationArguments args) {
initDaemon();
initDolphinScheduler();
registerUDF();
discoverUDF();
updateGitBuildState();
registerURL();
} catch (NoClassDefFoundError e) {
Expand Down Expand Up @@ -220,6 +222,10 @@ public void registerUDF() {
UdfCodePool.updateGitPool(gitProjectService.getGitPool());
}

public void discoverUDF() {
FlinkUDFDiscover.getCustomStaticUDFs();
}

public void updateGitBuildState() {
String path = PathConstant.TMP_PATH + "/build.list";
if (FileUtil.exist(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public CatalogueTreeSortStrategy getStrategy(String strategyName) {
CatalogueTreeSortStrategy catalogueTreeSortStrategy =
Safes.of(catalogueTreeSortStrategyMap).get(strategyName);
if (Objects.isNull(catalogueTreeSortStrategy)) {
log.warn("Strategy {} is not defined. Use DefaultStrategy", strategyName);
catalogueTreeSortStrategy =
Safes.of(catalogueTreeSortStrategyMap).get(CatalogueSortConstant.STRATEGY_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public class CatalogueServiceImpl extends SuperServiceImpl<CatalogueMapper, Cata
*/
@Override
public List<Catalogue> getCatalogueTree(CatalogueTreeQueryDTO catalogueTreeQueryDto) {
log.info("getCatalogueTree, catalogueTreeQueryDto: {}", catalogueTreeQueryDto);
List<Catalogue> catalogueTree = buildCatalogueTree(this.list());
// sort
CatalogueTreeSortStrategy strategy = catalogueTreeSortFactory.getStrategy(catalogueTreeQueryDto.getSortValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.dinky.data.model.udf.UDFManage;
import org.dinky.data.vo.CascaderVO;
import org.dinky.data.vo.UDFManageVO;
import org.dinky.function.FlinkUDFDiscover;
import org.dinky.function.data.model.UDF;
import org.dinky.mapper.UDFManageMapper;
import org.dinky.service.UDFService;
import org.dinky.service.resource.ResourcesService;
import org.dinky.trans.Operations;
import org.dinky.utils.UDFUtils;

import org.apache.flink.table.catalog.FunctionLanguage;
Expand Down Expand Up @@ -182,7 +182,7 @@ public List<UDFManage> getUDFFromUdfManage() {
@Override
public List<CascaderVO> getAllUdfsToCascader(List<UDF> userDefinedReleaseUdfs) {
// Get all UDFs of static UDFs and dynamic UDFs
List<UDF> staticUdfs = Operations.getCustomStaticUdfs();
List<UDF> staticUdfs = FlinkUDFDiscover.getCustomStaticUDFs();

// get all UDFs of UDFManage table
List<UDF> udfManageDynamic = getUDFFromUdfManage().stream()
Expand Down
1 change: 1 addition & 0 deletions dinky-app/dinky-app-1.20/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
</manifest>
</archive>
<outputDirectory>${project.parent.parent.basedir}/build/extends</outputDirectory>

</configuration>
<executions>
<execution>
Expand Down
12 changes: 0 additions & 12 deletions dinky-app/dinky-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-openapi2-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
Expand Down Expand Up @@ -82,14 +78,6 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-gateway</artifactId>
</exclusion>
<exclusion>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-sms</artifactId>
</exclusion>
<exclusion>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-dingtalk</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
8 changes: 8 additions & 0 deletions dinky-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@
<include>dinky-client-1.19-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>extends/flink1.20/dinky</outputDirectory>
<includes>
<include>dinky-catalog-mysql-1.20-${project.version}.jar</include>
<include>dinky-client-1.20-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/build/extends/</directory>
<outputDirectory>extends</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public ClusterClientProvider<String> deployApplicationCluster(
throws ClusterDeploymentException {
if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId))
.isPresent()) {
throw new ClusterDeploymentException("The Flink cluster " + clusterId + " already exists.");
client.stopAndCleanupCluster(clusterId);
LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId);
}

checkNotNull(clusterSpecification);
Expand Down
39 changes: 0 additions & 39 deletions dinky-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
<hadoop.version>3.1.0</hadoop.version>
</properties>
<dependencies>
<!--<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.9</version>
</dependency>-->
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-common</artifactId>
Expand All @@ -54,36 +49,6 @@
<artifactId>dinky-metadata-base</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-dingtalk</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-wechat</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-feishu</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-sms</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-email</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-http</artifactId>
<scope>${scope.runtime}</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-metadata-paimon</artifactId>
Expand Down Expand Up @@ -203,10 +168,6 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void catchChangLog(SelectResult selectResult) {
for (String tableIdentifier : tableIdentifierList) {
if (!tableIdentifierIndexMap.containsKey(tableIdentifier)) {
tableIdentifierIndexMap.put(tableIdentifier, 0);
} else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) {
} else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum - 1) {
allSinkFinished = true;
continue;
}
Expand All @@ -145,7 +145,7 @@ private void catchChangLog(SelectResult selectResult) {
rows.add(rowDataWithTableIdentifier);
tableIdentifierIndexMap.put(
tableIdentifier, tableIdentifierIndexMap.get(tableIdentifier) + 1);
if (tableIdentifierIndexMap.get(tableIdentifier) > maxRowNum) {
if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) {
break;
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ private void catchData(SelectResult selectResult) {
for (String tableIdentifier : tableIdentifierList) {
if (!tableIdentifierIndexMap.containsKey(tableIdentifier)) {
tableIdentifierIndexMap.put(tableIdentifier, 0);
} else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) {
} else if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum - 1) {
allSinkFinished = true;
continue;
}
Expand Down Expand Up @@ -219,7 +219,7 @@ private void catchData(SelectResult selectResult) {
}
tableIdentifierIndexMap.put(
tableIdentifier, tableIdentifierIndexMap.get(tableIdentifier) + 1);
if (tableIdentifierIndexMap.get(tableIdentifier) > maxRowNum) {
if (tableIdentifierIndexMap.get(tableIdentifier) >= maxRowNum) {
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void initPyUDF(String executable, String... udfPyFilePath) {
private void addJar(String... jarPath) {
Configuration configuration = tableEnvironment.getRootConfiguration();
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
if (CollUtil.isEmpty(jars)) {
tableEnvironment.addConfiguration(PipelineOptions.JARS, CollUtil.newArrayList(jarPath));
} else {
CollUtil.addAll(jars, jarPath);
Expand Down
4 changes: 3 additions & 1 deletion dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.dinky.job.builder.JobUDFBuilder;
import org.dinky.trans.Operations;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;

import org.apache.flink.runtime.rest.messages.JobPlanInfo;
Expand Down Expand Up @@ -141,8 +142,9 @@ public ExplainResult explainSql(String statement) {
jobStatementPlan.buildFinalStatement();
jobManager.setJobStatementPlan(jobStatementPlan);
} catch (Exception e) {
String error = LogUtil.getError("Exception in parsing FlinkSQL:\n" + SqlUtil.addLineNumber(statement), e);
SqlExplainResult.Builder resultBuilder = SqlExplainResult.Builder.newBuilder();
resultBuilder.error(e.getMessage()).parseTrue(false);
resultBuilder.error(error).parseTrue(false);
sqlExplainRecords.add(resultBuilder.build());
log.error("Failed parseStatements:", e);
return new ExplainResult(false, sqlExplainRecords.size(), sqlExplainRecords);
Expand Down
13 changes: 5 additions & 8 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.StrFormatter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -315,16 +314,14 @@ public JobResult executeSql(String statement) throws Exception {
}
} catch (Exception e) {
String errorMessage = e.getMessage();
if (errorMessage != null && errorMessage.contains("Only insert statement is supported now")) {
throw new BusException(Status.OPERATE_NOT_SUPPORT_QUERY.getMessage());
}
String error = StrFormatter.format(
"Exception in executing FlinkSQL:\n{}\n{}", SqlUtil.addLineNumber(currentSql), errorMessage);
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.FAILED);
job.setError(error);
job.setError(errorMessage);
failed();
throw new Exception(error, e);
if (errorMessage != null && errorMessage.contains("Only insert statement is supported now")) {
throw new BusException(Status.OPERATE_NOT_SUPPORT_QUERY.getMessage());
}
throw new Exception(errorMessage, e);
} finally {
close();
}
Expand Down
18 changes: 9 additions & 9 deletions dinky-core/src/main/java/org/dinky/job/runner/JobDDLRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Set;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -125,10 +124,8 @@ public SqlExplainResult explain(JobStatement jobStatement) {
.sql(jobStatement.getStatement())
.index(jobStatement.getIndex());
} catch (Exception e) {
String error = StrFormatter.format(
"Exception in explaining FlinkSQL:\n{}\n{}",
SqlUtil.addLineNumber(jobStatement.getStatement()),
LogUtil.getError(e));
String error = LogUtil.getError(
"Exception in explaining FlinkSQL:\n" + SqlUtil.addLineNumber(jobStatement.getStatement()), e);
resultBuilder
.error(error)
.explainTrue(false)
Expand All @@ -143,22 +140,25 @@ public SqlExplainResult explain(JobStatement jobStatement) {
}

private void executeAdd(String statement) {
AddJarSqlParseStrategy.getAllFilePath(statement)
.forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t));
Set<File> allFilePath = AddJarSqlParseStrategy.getAllFilePath(statement);
allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addOtherPlugins(t));
(jobManager.getExecutor().getDinkyClassLoader())
.addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getOtherPluginsFiles()));
}

private void executeAddFile(String statement) {
AddFileSqlParseStrategy.getAllFilePath(statement)
.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t));
Set<File> allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement);
allFilePath.forEach(t -> jobManager.getUdfPathContextHolder().addFile(t));
(jobManager.getExecutor().getDinkyClassLoader())
.addURLs(URLUtils.getURLs(jobManager.getUdfPathContextHolder().getFiles()));
jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class));
}

private void executeAddJar(String statement) {
Set<File> allFilePath = AddFileSqlParseStrategy.getAllFilePath(statement);
Configuration combinationConfig = getCombinationConfig();
FileSystem.initialize(combinationConfig, null);
jobManager.getExecutor().addJar(ArrayUtil.toArray(allFilePath, File.class));
jobManager.getExecutor().executeSql(statement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.ArrayList;
import java.util.List;

import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -100,10 +99,8 @@ public SqlExplainResult explain(JobStatement jobStatement) {
.explainTime(LocalDateTime.now())
.index(jobStatement.getIndex());
} catch (Exception e) {
String error = StrFormatter.format(
"Exception in explaining FlinkSQL:\n{}\n{}",
SqlUtil.addLineNumber(jobStatement.getStatement()),
LogUtil.getError(e));
String error = LogUtil.getError(
"Exception in explaining FlinkSQL:\n" + SqlUtil.addLineNumber(jobStatement.getStatement()), e);
resultBuilder
.parseTrue(false)
.error(error)
Expand Down
Loading

0 comments on commit d315dc3

Please sign in to comment.