Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Jan 19, 2024
2 parents b35531c + 71f6bb9 commit 03b8c90
Show file tree
Hide file tree
Showing 43 changed files with 609 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.data.dto;

import java.util.HashMap;
import java.util.Map;

import io.swagger.annotations.ApiModel;
Expand All @@ -41,12 +42,12 @@ public class AbstractStatementDTO {
private Integer envId;

@ApiModelProperty(value = "Fragment Flag", dataType = "boolean", example = "false", notes = "是否为片段")
private Boolean fragment;
private Boolean fragment = false;

@ApiModelProperty(
value = "Variables",
dataType = "Map<String, String>",
example = "{\"key\": \"value\"}",
notes = "变量集合")
private Map<String, String> variables;
private Map<String, String> variables = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.dinky.data.options.JobAlertRuleOptions;
import org.dinky.service.AlertGroupService;
import org.dinky.service.AlertHistoryService;
import org.dinky.service.SysConfigService;
import org.dinky.service.TaskService;
import org.dinky.service.impl.AlertRuleServiceImpl;
import org.dinky.service.impl.SysConfigServiceImpl;
import org.dinky.utils.JsonUtils;

import java.io.IOException;
Expand All @@ -59,6 +61,10 @@
import org.jeasy.rules.spel.SpELCondition;
import org.springframework.context.annotation.DependsOn;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
Expand All @@ -73,6 +79,7 @@ public class JobAlertHandler {
private static final AlertHistoryService alertHistoryService;
private static final AlertGroupService alertGroupService;
private static final TaskService taskService;
private static final SysConfigService sysConfigService;
private static final AlertRuleServiceImpl alertRuleService;

/**
Expand All @@ -97,6 +104,7 @@ public class JobAlertHandler {
alertHistoryService = SpringContextUtils.getBean("alertHistoryServiceImpl", AlertHistoryService.class);
alertGroupService = SpringContextUtils.getBean("alertGroupServiceImpl", AlertGroupService.class);
alertRuleService = SpringContextUtils.getBean("alertRuleServiceImpl", AlertRuleServiceImpl.class);
sysConfigService = SpringContextUtils.getBean("sysConfigServiceImpl", SysConfigServiceImpl.class);
}

public static JobAlertHandler getInstance() {
Expand Down Expand Up @@ -201,17 +209,81 @@ private void executeAlertAction(Facts facts, AlertRuleDTO alertRuleDTO) {
if (alertInstance == null || !alertInstance.getEnabled()) {
continue;
}
sendAlert(
alertInstance,
facts.get(JobAlertRuleOptions.FIELD_JOB_INSTANCE_ID),
alertGroup.getId(),
alertRuleDTO.getName(),
alertContent);
// if current time in diff minute time, and alert send record count > diff minute max send count,
// then not send, else send
// todo: 多线程会重复发送,需要优化
if (isGTEMaxSendRecordCount(alertGroup, task) && timeIsInDiffMinute(alertGroup, task)) {
sendAlert(
alertInstance,
facts.get(JobAlertRuleOptions.FIELD_JOB_INSTANCE_ID),
alertGroup.getId(),
alertRuleDTO.getName(),
alertContent);
}
}
}
}
}

/**
* 判断是否大于最大发送次数 | Whether it is greater than the maximum number of sending times
*
* @param alertGroup
* @param task
* @return if true, then send alert
*/
private boolean isGTEMaxSendRecordCount(AlertGroup alertGroup, TaskDTO task) {
// check diff seconds max send count| 指定时间间隔内最大发送次数 2
int diffMinuteMaxSendCount = (int) sysConfigService
.getOneConfigByKey(Status.SYS_ENV_SETTINGS_DIFF_MINUTE_MAX_SEND_COUNT.getKey())
.getValue();
// check diff seconds max send count | 指定时间间隔 1
int jobResendDiffSecond = (int) sysConfigService
.getOneConfigByKey(Status.SYS_ENV_SETTINGS_JOB_RESEND_DIFF_SECOND.getKey())
.getValue();

// 获取当前时间 - 指定时间间隔 = 指定时间间隔前的时间 | get current time - diff seconds = diff seconds time
DateTime diffMinuteTime = DateUtil.offsetSecond(DateUtil.date(), -jobResendDiffSecond);
// 获取指定时间间隔前的时间到当前时间之间的发送记录数 | get diff minute time to current time alert send record count
long jobInstanceAlertSendRecordCount = alertHistoryService.count(
new LambdaQueryWrapper<>(AlertHistory.class)
.eq(AlertHistory::getAlertGroupId, alertGroup.getId()) // alert group id
.eq(AlertHistory::getJobInstanceId, task.getJobInstanceId()) // assert group id
.ge(true, AlertHistory::getCreateTime, diffMinuteTime) // 指定时间间隔前的时间 | diff minute time
.le(true, AlertHistory::getCreateTime, DateUtil.date()) // 当前时间 | current time
);
// 1. 如果 当前时间 在 指定时间间隔前的时间区间内,且发送记录数大于指定时间间隔内最大发送次数,则不发送 | if current time in diff minute time, and alert send
// record count > diff minute max send count, then not send
// 2. 如果 当前时间 不在 指定时间间隔前的时间区间内,则发送 | if current time not in diff minute time, then send
if (jobInstanceAlertSendRecordCount >= diffMinuteMaxSendCount) {
log.warn(
Status.JOB_ALERT_MAX_SEND_COUNT.getMessage(),
jobResendDiffSecond,
diffMinuteMaxSendCount,
jobInstanceAlertSendRecordCount);
return false;
}
return true;
}

/**
* 判断当前时间是否在指定时间间隔前的时间区间内 | Whether the current time is in the time interval before the specified time interval
*
* @param alertGroup
* @param task
* @return
*/
private boolean timeIsInDiffMinute(AlertGroup alertGroup, TaskDTO task) {
// check diff minute max send count | 指定时间间隔 1
int jobResendDiffSecond = (int) sysConfigService
.getOneConfigByKey(Status.SYS_ENV_SETTINGS_JOB_RESEND_DIFF_SECOND.getKey())
.getValue();
// 获取当前时间 - 指定时间间隔 = 指定时间间隔前的时间 | get current time - diff seconds = diff seconds time
DateTime diffSecondTime = DateUtil.offsetSecond(DateUtil.date(), -jobResendDiffSecond);
// 1. 如果 当前时间 在 指定时间间隔前的时间区间内
return !DateUtil.date().before(diffSecondTime);
}

/**
* Sends an alert based on the alert instance's configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public interface SysConfigService extends ISuperService<SysConfig> {
*/
Map<String, List<Configuration<?>>> getAll();

/**
* Get one configuration by key.
*
* @return A map of string keys to lists of {@link Configuration} objects.
*/
Configuration<Object> getOneConfigByKey(String key);

/**
* Initialize system configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ public Map<String, List<Configuration<?>>> getAll() {
return SystemConfiguration.getInstances().getAllConfiguration();
}

/**
* Get one configuration by key.
*
* @param key
* @return A map of string keys to lists of {@link Configuration} objects.
*/
@Override
public Configuration<Object> getOneConfigByKey(String key) {

List<Configuration<?>> configurationList =
getAll().entrySet().stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
return (Configuration<Object>) configurationList.stream()
.filter(x -> x.getKey().equals(key))
.findFirst()
.orElseThrow(() -> new RuntimeException("No such configuration: " + key));
}

@Override
public void initSysConfig() {
SystemConfiguration systemConfiguration = SystemConfiguration.getInstances();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void write(String str) {
super.write(str);
}
}))
.setCredentialsProvider(new UsernamePasswordCredentialsProvider(username, password))
.call();
git.close();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.TableResult;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -70,6 +72,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -129,15 +132,21 @@ public static void submit(AppParamConfig config) throws SQLException {

String[] statements =
SqlUtil.getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator());
Optional<JobClient> jobClient = Optional.empty();
try {
if (Dialect.FLINK_JAR == appTask.getDialect()) {
executeJarJob(appTask.getType(), executor, statements);
jobClient = executeJarJob(appTask.getType(), executor, statements);
} else {
executeJob(executor, statements);
jobClient = executeJob(executor, statements);
}
} finally {
log.info("Start Monitor Job");
FlinkAppUtil.monitorFlinkTask(Submitter.executor, config.getTaskId());
if (jobClient.isPresent()) {
FlinkAppUtil.monitorFlinkTask(jobClient.get(), config.getTaskId());
} else {
log.error("jobClient is empty, can not monitor job");
// FlinkAppUtil.monitorFlinkTask(Submitter.executor, config.getTaskId());
}
}
}

Expand Down Expand Up @@ -251,7 +260,9 @@ public static boolean downloadFile(String url, String path) throws IOException {
}

@SneakyThrows
public static void executeJarJob(String type, Executor executor, String[] statements) {
public static Optional<JobClient> executeJarJob(String type, Executor executor, String[] statements) {
Optional<JobClient> jobClient = Optional.empty();

for (int i = 0; i < statements.length; i++) {
String sqlStatement = executor.pretreatStatement(statements[i]);
if (ExecuteJarParseStrategy.INSTANCE.match(sqlStatement)) {
Expand All @@ -269,7 +280,8 @@ public static void executeJarJob(String type, Executor executor, String[] statem
streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(
savePointPath, configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)));
}
executor.getStreamExecutionEnvironment().executeAsync(streamGraph);
JobClient client = executor.getStreamExecutionEnvironment().executeAsync(streamGraph);
jobClient = Optional.of(client);
break;
}
if (Operations.getOperationType(sqlStatement) == SqlType.ADD) {
Expand All @@ -280,9 +292,11 @@ public static void executeJarJob(String type, Executor executor, String[] statem
}
}
}
return jobClient;
}

public static void executeJob(Executor executor, String[] statements) {
public static Optional<JobClient> executeJob(Executor executor, String[] statements) {
Optional<JobClient> jobClient = Optional.empty();

ExecutorConfig executorConfig = executor.getExecutorConfig();
List<StatementParam> ddl = new ArrayList<>();
Expand Down Expand Up @@ -326,12 +340,15 @@ public static void executeJob(Executor executor, String[] statements) {
}
}
log.info("Executing FlinkSQL statement set: {}", String.join(FlinkSQLConstant.SEPARATOR, inserts));
executor.executeStatementSet(inserts);
TableResult tableResult = executor.executeStatementSet(inserts);
jobClient = tableResult.getJobClient();
log.info("Execution succeeded.");
} else {
// UseStatementSet defaults to true, where the logic is never executed
StatementParam item = trans.get(0);
log.info("Executing FlinkSQL: {}", item.getValue());
executor.executeSql(item.getValue());
TableResult tableResult = executor.executeSql(item.getValue());
jobClient = tableResult.getJobClient();
log.info("Execution succeeded.");
}
}
Expand All @@ -350,12 +367,14 @@ public static void executeJob(Executor executor, String[] statements) {
"The FlinkSQL statement set is being executed: {}",
String.join(FlinkSQLConstant.SEPARATOR, executes));
try {
executor.executeAsync(executorConfig.getJobName());
JobClient client = executor.executeAsync(executorConfig.getJobName());
jobClient = Optional.of(client);
log.info("The execution was successful");
} catch (Exception e) {
log.error("Execution failed, {}", e.getMessage(), e);
}
}
log.info("{} The task is successfully submitted", LocalDateTime.now());
return jobClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobStatusMessage;

import java.util.Collection;
Expand Down Expand Up @@ -75,16 +76,38 @@ public static void monitorFlinkTask(Executor executor, int taskId) {
}
}

public static void monitorFlinkTask(JobClient jobClient, int taskId) {
boolean isRun = true;
String jobId = jobClient.getJobID().toHexString();

try {
while (isRun) {
String jobStatus = jobClient.getJobStatus().get().toString();
JobStatus status = JobStatus.get(jobStatus);
if (status.isDone()) {
sendHook(taskId, jobId, 0);
log.info("refesh job status finished, status is {}", status);
isRun = false;
}
Thread.sleep(5000);
}
} catch (Exception e) {
// If an exception is thrown, it will cause the k8s pod to trigger a restart,
// resulting in an inability to exit normally
log.error("refesh status failed:", e);
}
}

/**
* The sendHook method is used to send a Hook request.
* This method sends an HTTP request to notify a specific address about the completion status of a task.
* If the request is successful, the returned code in the result is 0; otherwise, an exception is thrown.
* If sending the request fails, it will be retried up to 30 times with a 1-second interval between each retry.
* If the retry limit is exceeded, an exception is thrown.
*/
private static void sendHook(int taskId, String jobId, int reTryCount) throws InterruptedException {
private static void sendHook(int taskId, String jobId, int reTryCount) {
String dinkyAddr = SystemConfiguration.getInstances().getDinkyAddr().getValue();
try {
String dinkyAddr = SystemConfiguration.getInstances().getDinkyAddr().getValue();
String url = StrFormatter.format(
"http://{}/api/jobInstance/hookJobDone?taskId={}&jobId={}", dinkyAddr, taskId, jobId);
String resultStr = HttpUtil.get(url);
Expand All @@ -97,10 +120,14 @@ private static void sendHook(int taskId, String jobId, int reTryCount) throws In
} catch (Exception e) {
if (reTryCount < 30) {
log.error("send hook failed,retry later taskId:{},jobId:{},{}", taskId, jobId, e.getMessage());
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
sendHook(taskId, jobId, reTryCount + 1);
} else {
throw new RuntimeException("Hook Job Done failed, The retry limit is exceeded: " + e.getMessage());
log.error("Hook Job Done failed to {}, The retry limit is exceeded: {}", dinkyAddr, e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected FlatMapFunction<Map, RowData> sinkRowDataFunction(
break;
case "u":
rowDataCollect(columnNameList, columnTypeList, out, RowKind.UPDATE_BEFORE, value);
rowDataCollect(columnNameList, columnTypeList, out, RowKind.UPDATE_BEFORE, value);
rowDataCollect(columnNameList, columnTypeList, out, RowKind.UPDATE_AFTER, value);
break;
default:
}
Expand Down Expand Up @@ -211,7 +211,7 @@ protected Object buildRowDataValues(Map value, RowKind rowKind, String columnNam
}

@SuppressWarnings("rawtypes")
public static Map getOriginRowData(RowKind rowKind, Map value) {
protected Map getOriginRowData(RowKind rowKind, Map value) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
Expand Down
Loading

0 comments on commit 03b8c90

Please sign in to comment.