Skip to content

Commit

Permalink
Fallback app hook implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 committed Dec 4, 2023
1 parent 17ed721 commit 85e2fe9
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.mybatis.handler;

import lombok.extern.slf4j.Slf4j;
import org.dinky.mybatis.properties.MybatisPlusFillProperties;

import org.apache.ibatis.reflection.MetaObject;
Expand All @@ -35,6 +36,7 @@
*
* @since 2021/5/25
*/
@Slf4j
public class DateMetaObjectHandler implements MetaObjectHandler {

private final MybatisPlusFillProperties mybatisPlusFillProperties;
Expand Down Expand Up @@ -68,14 +70,11 @@ public void insertFill(MetaObject metaObject) {
if (name == null) {
setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), name, metaObject);
}
// Determine if currently in a web context
boolean webContext = SpringMVCUtil.isWeb();
if (!webContext) {
int id = Integer.parseInt(Thread.currentThread().getId() + "");
setFillFieldValue(metaObject, id);
} else {
try {
int loginIdAsInt = StpUtil.getLoginIdAsInt();
setFillFieldValue(metaObject, loginIdAsInt);
}catch (Exception e){
log.debug("Ignore set creater filed, because userId cant't get",e);
}
}

Expand All @@ -97,18 +96,13 @@ private void setFillFieldValue(MetaObject metaObject, int userId) {

@Override
public void updateFill(MetaObject metaObject) {
// Determine if currently in a web context
boolean webContext = SpringMVCUtil.isWeb();
if (!webContext) {
long id = Thread.currentThread().getId();
setFieldValByName(mybatisPlusFillProperties.getUpdaterField(), Integer.valueOf(id + ""), metaObject);
setFieldValByName(mybatisPlusFillProperties.getOperatorField(), Integer.valueOf(id + ""), metaObject);
} else {
try {
int loginIdAsInt = StpUtil.getLoginIdAsInt();
setFieldValByName(mybatisPlusFillProperties.getUpdaterField(), loginIdAsInt, metaObject);
setFieldValByName(mybatisPlusFillProperties.getOperatorField(), loginIdAsInt, metaObject);
setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject);
}catch (Exception e){
log.debug("Ignore set update,operator filed, because userId cant't get",e);
}

setFieldValByName(mybatisPlusFillProperties.getUpdateTimeField(), LocalDateTime.now(), metaObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.app.constant.AppParamConstant;
import org.dinky.app.db.DBUtil;
import org.dinky.app.flinksql.Submitter;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.data.app.AppParamConfig;
import org.dinky.utils.JsonUtils;

Expand Down Expand Up @@ -53,8 +54,10 @@ public static void main(String[] args) throws Exception {
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed with config: {}", appConfig);
throw e;
log.error("exectue app failed : ", e);
}finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(appConfig.getTaskId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.app.constant.AppParamConstant;
import org.dinky.app.db.DBUtil;
import org.dinky.app.flinksql.Submitter;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.data.app.AppParamConfig;
import org.dinky.utils.JsonUtils;

Expand Down Expand Up @@ -53,8 +54,10 @@ public static void main(String[] args) throws Exception {
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed with config: {}", appConfig);
throw e;
log.error("exectue app failed : ", e);
}finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(appConfig.getTaskId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.app.constant.AppParamConstant;
import org.dinky.app.db.DBUtil;
import org.dinky.app.flinksql.Submitter;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.data.app.AppParamConfig;
import org.dinky.utils.JsonUtils;

Expand Down Expand Up @@ -54,6 +55,9 @@ public static void main(String[] args) throws Exception {
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed : ", e);
}finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(appConfig.getTaskId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.app.constant.AppParamConstant;
import org.dinky.app.db.DBUtil;
import org.dinky.app.flinksql.Submitter;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.data.app.AppParamConfig;
import org.dinky.utils.JsonUtils;

Expand Down Expand Up @@ -53,8 +54,10 @@ public static void main(String[] args) throws Exception {
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed with config: {}", appConfig);
throw e;
log.error("exectue app failed : ", e);
}finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(appConfig.getTaskId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.app.constant.AppParamConstant;
import org.dinky.app.db.DBUtil;
import org.dinky.app.flinksql.Submitter;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.data.app.AppParamConfig;
import org.dinky.utils.JsonUtils;

Expand Down Expand Up @@ -53,8 +54,10 @@ public static void main(String[] args) throws Exception {
DBUtil.init(appConfig);
Submitter.submit(appConfig);
} catch (Exception e) {
log.error("exectue app failed with config: {}", appConfig);
throw e;
log.error("exectue app failed : ", e);
}finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(appConfig.getTaskId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ public static void submit(AppParamConfig config) throws SQLException {

Executor executor = ExecutorFactory.buildAppStreamExecutor(executorConfig);

log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(config.getTaskId());

// 加载第三方jar //TODO 这里有问题,需要修一修
// loadDep(appTask.getType(),
// config.getTaskId(),DBUtil.getSysConfig(Status.SYS_ENV_SETTINGS_DINKYADDR.getKey()), executorConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@

package org.dinky.app.util;

import org.apache.flink.configuration.ReadableConfig;
import org.dinky.app.db.DBUtil;
import org.dinky.constant.FlinkConstant;
import org.dinky.context.CustomTableEnvironmentContext;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.enums.Status;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.utils.JsonUtils;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobStatusMessage;

import java.util.Collection;

import cn.hutool.core.text.StrFormatter;
import cn.hutool.http.HttpUtil;
Expand All @@ -47,36 +49,27 @@ public class FlinkAppUtil {
* If the task is completed, it sends a hook notification and stops monitoring.
*/
public static void monitorFlinkTask(int taskId) {
StreamExecutionEnvironment streamExecutionEnvironment =
CustomTableEnvironmentContext.get().getStreamExecutionEnvironment();
streamExecutionEnvironment.registerJobListener(new JobListener() {
@Override
public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
jobClient.getJobExecutionResult().thenAccept(jobExecutionResult -> finshedHook(jobClient, taskId));
jobClient.getJobStatus().thenAccept(job -> {
if (job == JobStatus.FINISHED) {
finshedHook(jobClient, taskId);
boolean isRun = true;
try (RestClusterClient<StandaloneClusterId> client = createClient()) {
while (isRun) {
Collection<JobStatusMessage> jobs = client.listJobs().get();
if (jobs.isEmpty()) {
log.error("No Flink task found, try again in 2 seconds.....");
}
for (JobStatusMessage job : jobs) {
if (JobStatus.isDone(job.getJobState().toString())) {
sendHook(taskId, job.getJobId().toHexString(), 0);
log.info("hook {} finished.", job.getJobName());
// There should be only one in application mode, so stop monitoring here
isRun = false;
}
});
}

@Override
public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
if (throwable instanceof JobCancellationException) {
// todo cancel task
} else {
// other exception
}
Thread.sleep(5000);
}
});
}

private static void finshedHook(JobClient jobClient, int taskId) {
try {
sendHook(taskId, jobClient.getJobID().toHexString(), 0);
log.info("hook finished.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (Exception e) {
// If an exception is thrown, it will cause the k8s pod to trigger a restart,
// resulting in an inability to exit normally
log.error("hook failed:", e);
}
}

Expand All @@ -90,13 +83,13 @@ private static void finshedHook(JobClient jobClient, int taskId) {
private static void sendHook(int taskId, String jobId, int reTryCount) throws InterruptedException {
try {
String dinkyAddr = SystemConfiguration.getInstances().getDinkyAddr().getValue();
String url =
StrFormatter.format("{}/api/jobInstance/hookJobDone?taskId={}&jobId={}", dinkyAddr, taskId, jobId);
String url = StrFormatter.format(
"http://{}/api/jobInstance/hookJobDone?taskId={}&jobId={}", dinkyAddr, taskId, jobId);
String resultStr = HttpUtil.get(url);
// TODO 这里应该使用Result实体类,但是Result.class不在comm里,迁移改动太大,暂时不搞
String code = JsonUtils.parseObject(resultStr).get("code").toString();
if (!"0".equals(code)) {
throw new RuntimeException("Hook Job Done result failed: " + resultStr);
throw new RuntimeException(StrFormatter.format("Send Hook Job Done result failed,url:{},err:{} ",url,resultStr));
}
} catch (Exception e) {
if (reTryCount < 30) {
Expand All @@ -111,7 +104,6 @@ private static void sendHook(int taskId, String jobId, int reTryCount) throws In

/**
* Create a REST cluster client for Flink.
*
* @return
* @throws Exception
*/
Expand Down

0 comments on commit 85e2fe9

Please sign in to comment.