Skip to content

Commit

Permalink
fix dinky app not throw exception
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 26, 2023
1 parent a4c4b09 commit a4a4b54
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 50 deletions.
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.14/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.15/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.16/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.17/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
15 changes: 5 additions & 10 deletions dinky-app/dinky-app-1.18/src/main/java/org/dinky/app/MainApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ public static void main(String[] args) throws Exception {
String config = parameters.get(AppParamConstant.config);
config = isEncrypt ? new String(Base64.getDecoder().decode(config)) : config;
AppParamConfig appConfig = JsonUtils.toJavaBean(config, AppParamConfig.class);
try {
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
log.info("dinky app is Ready to run, config is {}", appConfig);
DBUtil.init(appConfig);
Submitter.submit(appConfig);
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, appConfig.getTaskId());
}
}
1 change: 1 addition & 0 deletions dinky-app/dinky-app-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.dinky</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import cn.hutool.core.text.StrFormatter;

/**
* KubernetesSessionGateway
Expand All @@ -46,6 +49,8 @@ public GatewayType getType() {
@Override
public GatewayResult deployCluster(FlinkUdfPathContextHolder udfPathContextHolder) {
if (Asserts.isNull(client)) {
String clusterId = StrFormatter.format("dinky-flink-session-{}", System.currentTimeMillis());
addConfigParas(KubernetesConfigOptions.CLUSTER_ID, clusterId);
init();
}

Expand Down

0 comments on commit a4a4b54

Please sign in to comment.