Skip to content

Commit

Permalink
[Fix][Flink]Fix local model bug and FlinkSql task type selected by de…
Browse files Browse the repository at this point in the history
…fault (DataLinkDC#2403)

* refactor: build pattern

* feat: the first task type selected by default.

* feat: detect remote by run model

* fix: use GatewayType detection remmote

---------

Co-authored-by: leechor <[email protected]>
  • Loading branch information
leechor and leechor authored Oct 19, 2023
1 parent 0d2d3d5 commit 4fb4303
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static String getFlinkSQLStatement(Integer id, DBConfig config) {
"{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ",
LocalDateTime.now(),
id,
config.toString(),
config,
e.getMessage(),
e);
}
Expand All @@ -115,7 +115,7 @@ public static Map<String, String> getTaskConfig(Integer id, DBConfig config) {
"{} --> 获取 FlinkSQL 配置异常,ID 为 {}, 连接信息为:{} ,异常信息为:{} ",
LocalDateTime.now(),
id,
config.toString(),
config,
e.getMessage(),
e);
}
Expand All @@ -134,25 +134,20 @@ public static String getDbSourceSqlStatements(DBConfig dbConfig, Integer id) {
String fragment = DBUtil.getOneByID(sqlCheck, dbConfig);
if ("1".equals(fragment)) {
return DBUtil.getDbSourceSQLStatement(sql, dbConfig);
} else {
// 全局变量未开启,返回空字符串
logger.info("任务 {} 未开启全局变量,不进行变量加载。");
return "";
}

// 全局变量未开启,返回空字符串
logger.info("任务 {} 未开启全局变量,不进行变量加载。", id);
} catch (IOException | SQLException e) {
logger.error(
"{} --> 获取 数据源信息异常,请检查数据库连接,连接信息为:{} ,异常信息为:{}",
LocalDateTime.now(),
dbConfig.toString(),
e.getMessage(),
e);
"{} --> 获取 数据源信息异常,请检查数据库连接,连接信息为:{} ,异常信息为:{}", LocalDateTime.now(), dbConfig, e.getMessage(), e);
}

return "";
}

public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
logger.info(LocalDateTime.now() + "开始提交作业 -- " + id);
logger.info("{}开始提交作业 -- {}", LocalDateTime.now(), id);
if (NULL.equals(dinkyAddr)) {
dinkyAddr = "";
}
Expand All @@ -166,13 +161,13 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
}
sb.append("\n");
}

// 添加数据源全局变量
sb.append(getDbSourceSqlStatements(dbConfig, id));
// 添加自定义全局变量信息
sb.append(getFlinkSQLStatement(id, dbConfig));
List<String> statements = Submitter.getStatements(sb.toString());
ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig);

ExecutorConfig executorConfig = ExecutorConfig.buildFromMap(taskConfig);
// 加载第三方jar
loadDep(taskConfig.get("type"), id, dinkyAddr, executorConfig);

Expand All @@ -181,11 +176,14 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
List<StatementParam> ddl = new ArrayList<>();
List<StatementParam> trans = new ArrayList<>();
List<StatementParam> execute = new ArrayList<>();

List<String> statements = Submitter.getStatements(sb.toString());
for (String item : statements) {
String statement = FlinkInterceptor.pretreatStatement(executor, item);
if (statement.isEmpty()) {
continue;
}

SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT)) {
trans.add(new StatementParam(statement, operationType));
Expand All @@ -201,32 +199,33 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
ddl.add(new StatementParam(statement, operationType));
}
}

for (StatementParam item : ddl) {
logger.info("Executing FlinkSQL: " + item.getValue());
logger.info("Executing FlinkSQL: {}", item.getValue());
executor.executeSql(item.getValue());
logger.info("Execution succeeded.");
}
if (trans.size() > 0) {

if (!trans.isEmpty()) {
if (executorConfig.isUseStatementSet()) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : trans) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
}
}
logger.info("Executing FlinkSQL statement set: " + String.join(FlinkSQLConstant.SEPARATOR, inserts));
logger.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts));
executor.executeStatementSet(inserts);
logger.info("Execution succeeded.");
} else {
for (StatementParam item : trans) {
logger.info("Executing FlinkSQL: " + item.getValue());
executor.executeSql(item.getValue());
logger.info("Execution succeeded.");
break;
}
StatementParam item = trans.get(0);
logger.info("Executing FlinkSQL: {}", item.getValue());
executor.executeSql(item.getValue());
logger.info("Execution succeeded.");
}
}
if (execute.size() > 0) {

if (!execute.isEmpty()) {
List<String> executes = new ArrayList<>();
for (StatementParam item : execute) {
executes.add(item.getValue());
Expand All @@ -235,7 +234,8 @@ public static void submit(Integer id, DBConfig dbConfig, String dinkyAddr) {
break;
}
}
logger.info("正在执行 FlinkSQL 语句集: " + String.join(FlinkSQLConstant.SEPARATOR, executes));

logger.info("正在执行 FlinkSQL 语句集: {}", String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
executor.execute(executorConfig.getJobName());
logger.info("执行成功");
Expand All @@ -250,6 +250,7 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu
if (StringUtils.isBlank(dinkyAddr)) {
return;
}

if ("kubernetes-application".equals(type)) {
try {
String httpJar = "http://" + dinkyAddr + "/download/downloadDepJar/" + taskId;
Expand Down Expand Up @@ -301,9 +302,8 @@ private static void loadDep(String type, Integer taskId, String dinkyAddr, Execu

private static void addURLs(URL[] jarUrls) {
URLClassLoader urlClassLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Method add = null;
try {
add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
Method add = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
add.setAccessible(true);
for (URL jarUrl : jarUrls) {
add.invoke(urlClassLoader, jarUrl);
Expand All @@ -319,18 +319,18 @@ public static boolean downloadFile(String url, String path) throws IOException {
// 设置超时间为3秒
conn.setConnectTimeout(3 * 1000);
// 获取输入流
InputStream inputStream = conn.getInputStream();
// 获取输出流
FileOutputStream outputStream = new FileOutputStream(path);
// 每次下载1024位
byte[] b = new byte[1024];
int len = -1;
while ((len = inputStream.read(b)) != -1) {
outputStream.write(b, 0, len);
try (InputStream inputStream = conn.getInputStream()) {
// 获取输出流
try (FileOutputStream outputStream = new FileOutputStream(path)) {
// 每次下载1024位
byte[] b = new byte[1024];
int len = -1;
while ((len = inputStream.read(b)) != -1) {
outputStream.write(b, 0, len);
}
return true;
}
}
inputStream.close();
outputStream.close();
return true;
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class LineageBuilder {

public static LineageResult getColumnLineageByLogicalPlan(String statement) {
Explainer explainer = new Explainer(ExecutorFactory.getExecutor(), false);
Explainer explainer = new Explainer(ExecutorFactory.getDefaultExecutor(), false);
List<LineageRel> lineageRelList = explainer.getLineage(statement);
List<LineageRelation> relations = new ArrayList<>();
Map<String, LineageTable> tableMap = new HashMap<>();
Expand Down
7 changes: 6 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@
@ApiModel(value = "JobConfig", description = "Configuration details of a job")
public class JobConfig {

@ApiModelProperty(value = "Flink run mode", dataType = "String", example = "batch", notes = "Flink run mode")
@ApiModelProperty(
value = "Flink run mode",
dataType = "String",
example = "local standalone",
notes = "Flink run mode")
private String type;

@ApiModelProperty(value = "Check Point", dataType = "Integer", example = "1", notes = "Check point for the task")
Expand Down Expand Up @@ -217,6 +221,7 @@ public void setAddress(String address) {

public ExecutorConfig getExecutorSetting() {
return ExecutorConfig.build(
type,
address,
checkpoint,
parallelism,
Expand Down
4 changes: 4 additions & 0 deletions dinky-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-core</artifactId>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-gateway</artifactId>
</dependency>
</dependencies>

<profiles>
Expand Down
Loading

0 comments on commit 4fb4303

Please sign in to comment.