Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zzm0809 authored Dec 21, 2023
2 parents 13ab4b5 + 90c8b61 commit e3c2433
Show file tree
Hide file tree
Showing 46 changed files with 818 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.dinky.data.enums.SseTopic;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.BusException;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.ProcessEntity;
import org.dinky.data.model.ProcessStepEntity;
import org.dinky.utils.LogUtil;
Expand Down Expand Up @@ -94,7 +93,7 @@ public ProcessEntity getProcess(String processName) {
*
* @param processName process name
* @param stepPid process step type
* @param logLine messages
* @param logLine messages
* @throws BusException Throws an exception if the process does not exist
*/
public void appendLog(String processName, String stepPid, String logLine, boolean recordGlobal) {
Expand All @@ -108,7 +107,12 @@ public void appendLog(String processName, String stepPid, String logLine, boolea
}
if (stepPid != null) {
ProcessStepEntity stepNode = getStepNode(stepPid, getStepsMap(processName));
stepNode.appendLog(logLine);
if (stepNode != null) {
stepNode.appendLog(logLine);
process.setLastUpdateStep(stepNode);
} else {
log.error("process step not found {},{}", processName, stepPid);
}
process.setLastUpdateStep(stepNode);
}
// /TOPIC/PROCESS_CONSOLE/FlinkSubmit/12
Expand Down Expand Up @@ -172,7 +176,11 @@ public ProcessStepEntity registerProcessStep(ProcessStepType type, String proces
process.getChildren().add(processStepEntity);
} else {
ProcessStepEntity stepNode = getStepNode(parentStepPid, process.getChildren());
stepNode.getChildren().add(processStepEntity);
if (stepNode == null) {
log.error("registerProcessStep {} failed in {}", type.getDesc(), processName);
} else {
stepNode.getChildren().add(processStepEntity);
}
}
return processStepEntity;
}
Expand Down Expand Up @@ -241,7 +249,8 @@ private ProcessStepEntity getStepNode(String stepPid, CopyOnWriteArrayList<Proce
JSONObject.toJSONString(logPross),
stepPid,
MDC.get(ProcessAspect.PROCESS_NAME));
throw new DinkyException(errorStr);
log.debug(errorStr);
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static void sendTopic(String topic, Object content) {
SseDataVo data = new SseDataVo(sessionKey, topic, content);
sendSse(sessionKey, data);
} catch (Exception e) {
log.error("Error sending sse data", e);
log.error("Error sending sse data:{}", e.getMessage());
onError(sessionKey, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.dinky.data.annotations.ExecuteProcess;
import org.dinky.data.annotations.Log;
import org.dinky.data.annotations.ProcessId;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSaveDTO;
Expand Down Expand Up @@ -96,8 +95,8 @@ public Result<JobResult> submitTask(@ProcessId @RequestParam Integer id) throws
dataType = "DebugDTO",
paramType = "body")
@ExecuteProcess(type = ProcessType.FLINK_SUBMIT)
public Result<JobResult> debugTask(@RequestBody DebugDTO debugDTO) throws Exception {
JobResult result = taskService.debugTask(debugDTO);
public Result<JobResult> debugTask(@RequestBody TaskDTO task) throws Exception {
JobResult result = taskService.debugTask(task);
if (result.isSuccess()) {
return Result.succeed(result, Status.DEBUG_SUCCESS);
}
Expand Down
8 changes: 4 additions & 4 deletions dinky-admin/src/main/java/org/dinky/data/dto/TaskDTO.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class TaskDTO extends AbstractStatementDTO {
@ApiModelProperty(
value = "Run Mode",
dataType = "String",
example = "BATCH",
example = "Local",
notes = "The execution mode for the SQL query")
private String type;

Expand Down Expand Up @@ -199,14 +199,14 @@ public class TaskDTO extends AbstractStatementDTO {
dataType = "boolean",
example = "false",
notes = "Flagindicatingwhethertousechangelogs")
private boolean useChangeLog;
private boolean useChangeLog = false;

@ApiModelProperty(
value = "Use Auto Cancel",
dataType = "boolean",
example = "false",
notes = "Flag indicating whether to use auto-canceling")
private boolean useAutoCancel;
private boolean useAutoCancel = true;

@ApiModelProperty(value = "Session", dataType = "String", example = "session_id", notes = "The session identifier")
private String session;
Expand All @@ -219,7 +219,7 @@ public class TaskDTO extends AbstractStatementDTO {
dataType = "Integer",
example = "100",
notes = "The maximum number of rows to return")
private Integer maxRowNum;
private Integer maxRowNum = 100;

public JobConfig getJobConfig() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.data.model.ext;

import org.dinky.assertion.Asserts;
import org.dinky.data.ext.ConfigItem;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down
5 changes: 2 additions & 3 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.dinky.service;

import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
Expand Down Expand Up @@ -75,11 +74,11 @@ public interface TaskService extends ISuperService<Task> {
/**
* Debug the given task and return the job result.
*
* @param debugDTO The param of preview task.
* @param task The param of preview task.
* @return A {@link JobResult} object representing the result of the submitted task.
* @throws ExcuteException If there is an error debugging the task.
*/
JobResult debugTask(DebugDTO debugDTO) throws Exception;
JobResult debugTask(TaskDTO task) throws Exception;

/**
* Restart the given task and return the job result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ public JobInfoDetail getJobInfoDetail(Integer id) {

@Override
public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
Asserts.checkNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage());

JobInfoDetail jobInfoDetail = new JobInfoDetail(jobInstance.getId());

Asserts.checkNull(jobInstance, Status.JOB_INSTANCE_NOT_EXIST.getMessage());
jobInfoDetail.setInstance(jobInstance);

ClusterInstance clusterInstance = clusterInstanceService.getById(jobInstance.getClusterId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.dinky.data.app.AppParamConfig;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.dto.TaskSubmitDto;
Expand Down Expand Up @@ -204,6 +203,9 @@ public JobResult executeJob(TaskDTO task) throws Exception {
// Submit and export task
@ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG)
public JobConfig buildJobSubmitConfig(TaskDTO task) {
if (Asserts.isNull(task.getType())) {
task.setType(GatewayType.LOCAL.getLongValue());
}
task.setStatement(buildEnvSql(task) + task.getStatement());
JobConfig config = task.getJobConfig();
Savepoints savepoints = savepointsService.getSavePointWithStrategy(task);
Expand Down Expand Up @@ -238,6 +240,9 @@ public JobConfig buildJobSubmitConfig(TaskDTO task) {
// Savepoint and cancel task
@ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG)
public JobConfig buildJobConfig(TaskDTO task) {
if (Asserts.isNull(task.getType())) {
task.setType(GatewayType.LOCAL.getLongValue());
}
JobConfig config = task.getJobConfig();
if (GatewayType.get(task.getType()).isDeployCluster()) {
log.info("Init gateway config, type:{}", task.getType());
Expand Down Expand Up @@ -317,24 +322,18 @@ public JobResult submitTask(TaskSubmitDto submitDto) throws Exception {

@Override
@ProcessStep(type = ProcessStepType.SUBMIT_TASK)
public JobResult debugTask(DebugDTO debugDTO) throws Exception {
initTenantByTaskId(debugDTO.getId());

TaskDTO taskDTO = this.getTaskInfoById(debugDTO.getId());
public JobResult debugTask(TaskDTO task) throws Exception {
// Debug mode need return result
taskDTO.setUseResult(true);
taskDTO.setUseChangeLog(debugDTO.isUseChangeLog());
taskDTO.setUseAutoCancel(debugDTO.isUseAutoCancel());
taskDTO.setMaxRowNum(debugDTO.getMaxRowNum());
task.setUseResult(true);
// Debug mode need execute
taskDTO.setStatementSet(false);
task.setStatementSet(false);
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
JobResult jobResult = taskServiceBean.executeJob(taskDTO);
JobResult jobResult = taskServiceBean.executeJob(task);
if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
log.info("Job debug success");
Task task = new Task(debugDTO.getId(), jobResult.getJobInstanceId());
if (!this.updateById(task)) {
Task newTask = new Task(task.getId(), jobResult.getJobInstanceId());
if (!this.updateById(newTask)) {
throw new BusException(Status.TASK_UPDATE_FAILED.getMessage());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.dinky.service.task;

import org.dinky.assertion.Asserts;
import org.dinky.config.Dialect;
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.gateway.enums.GatewayType;
import org.dinky.job.JobManager;
import org.dinky.job.JobResult;
import org.dinky.service.impl.TaskServiceImpl;
Expand All @@ -42,6 +44,10 @@ public class FlinkSqlTask extends BaseTask {

public FlinkSqlTask(TaskDTO task) {
super(task);
// Default run mode is local.
if (Asserts.isNull(task.getType())) {
task.setType(GatewayType.LOCAL.getLongValue());
}
this.jobManager = getJobManager();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testSend() {
FeiShuAlert feiShuAlert = new FeiShuAlert();
AlertConfig alertConfig = new AlertConfig();

alertConfig.setType("FeiShu");
alertConfig.setType(FeiShuConstants.TYPE);
alertConfig.setParam(feiShuConfig);
feiShuAlert.setConfig(alertConfig);

Expand Down
33 changes: 33 additions & 0 deletions dinky-alert/dinky-alert-http/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>dinky-alert-http</artifactId>

<packaging>jar</packaging>

<name>Dinky : Alter : Http</name>

<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-alert-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.alert.http;

import org.dinky.alert.AbstractAlert;
import org.dinky.alert.AlertResult;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* DingTalkAlert
*
* @since 2022/2/23 19:28
*/
public class HttpAlert extends AbstractAlert {
private static final Logger log = LoggerFactory.getLogger(HttpAlert.class);

@Override
public String getType() {
return HttpConstants.TYPE;
}

@Override
public AlertResult send(String title, String content) {
HttpSender sender = new HttpSender(getConfig().getParam());
Map<String, Object> templateParams = sender.buildTemplateParams(title, content);
return sender.send(templateParams);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.alert.http;

/** DingTalkConstants */
public class HttpConstants {
public static final String TYPE = "Http";
public static final String ALERT_TEMPLATE_TITLE = "title";
public static final String ALERT_TEMPLATE_CONTENT = "content";

public static final String REQUEST_TYPE_POST = "POST";
public static final String REQUEST_TYPE_GET = "GET";
public static final String DEFAULT_CHARSET = "UTF-8";
}
Loading

0 comments on commit e3c2433

Please sign in to comment.