Skip to content

Commit

Permalink
Merge branch 'DataLinkDC:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Oct 27, 2023
2 parents c7fa405 + 2bcc490 commit bb12e0c
Show file tree
Hide file tree
Showing 45 changed files with 583 additions and 291 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ logs.zip
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
.idea/*
!.idea/icon.png
!.idea/icon.svg
!.idea/vcs.xml
build
target/*
Expand Down
15 changes: 15 additions & 0 deletions dinky-admin/src/main/java/org/dinky/controller/TaskController.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.data.annotation.Log;
import org.dinky.data.annotations.ExecuteProcess;
import org.dinky.data.annotations.ProcessId;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.BusinessType;
Expand Down Expand Up @@ -81,6 +82,20 @@ public Result<JobResult> submitTask(@ProcessId @RequestParam Integer id) throws
}
}

@PostMapping("/debugTask")
@ApiOperation("Debug Task")
@Log(title = "Debug Task", businessType = BusinessType.DEBUG)
@ApiImplicitParam(
name = "debugTask",
value = "Debug Task",
required = true,
dataType = "DebugDTO",
paramType = "body")
public Result<JobResult> debugTask(@RequestBody DebugDTO debugDTO) throws Exception {
JobResult result = taskService.debugTask(debugDTO);
return Result.succeed(result, Status.EXECUTE_SUCCESS);
}

@GetMapping("/cancel")
@Log(title = "Cancel Flink Job", businessType = BusinessType.TRIGGER)
@ApiOperation("Cancel Flink Job")
Expand Down
69 changes: 69 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/dto/DebugDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.dto;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;

/**
* Param for debug flink sql and common sql
*/
@Getter
@Setter
@ApiModel(value = "DebugDTO", description = "Param for debug flink sql and common sql")
public class DebugDTO {

@ApiModelProperty(
value = "Task ID",
dataType = "Integer",
example = "1",
notes = "The ID of Task which is debugged")
private Integer id;

@ApiModelProperty(
value = "Use Result",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to preview table result")
private boolean useResult = true;

@ApiModelProperty(
value = "Use ChangeLog",
dataType = "boolean",
example = "false",
notes = "Flag indicating whether to preview change log")
private boolean useChangeLog = false;

@ApiModelProperty(
value = "Use Auto Cancel",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether to auto cancel after preview the maximum rows")
private boolean useAutoCancel = true;

@ApiModelProperty(
value = "Max Row Number",
dataType = "Integer",
example = "1000",
notes = "The maximum number of rows to preview")
private Integer maxRowNum = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public enum BusinessType {
/** 提交 */
SUBMIT,

/** Debug */
DEBUG,

/** 执行 */
EXECUTE,

Expand Down
10 changes: 10 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.service;

import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
Expand Down Expand Up @@ -69,6 +70,15 @@ public interface TaskService extends ISuperService<Task> {
*/
JobResult submitTask(Integer id, String savePointPath) throws Exception;

/**
* Debug the given task and return the job result.
*
* @param debugDTO The param of preview task.
* @return A {@link JobResult} object representing the result of the submitted task.
* @throws ExcuteException If there is an error debugging the task.
*/
JobResult debugTask(DebugDTO debugDTO) throws Exception;

/**
* Restart the given task and return the job result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.constant.CommonConstant;
import org.dinky.data.dto.AbstractStatementDTO;
import org.dinky.data.dto.DebugDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.dto.TaskRollbackVersionDTO;
import org.dinky.data.enums.JobLifeCycle;
Expand Down Expand Up @@ -196,8 +197,7 @@ public void preCheckTask(TaskDTO task) throws TaskNotDoneException, SqlExplainEx

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task) throws Exception {
JobResult jobResult;
jobResult = BaseTask.getTask(task).execute();
JobResult jobResult = BaseTask.getTask(task).execute();
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public String buildEnvSql(AbstractStatementDTO task) {
task.setVariables(fragmentVariableService.listEnabledVariables());
}
int envId = Optional.ofNullable(task.getEnvId()).orElse(-1);
if (envId >= 0) {
if (envId > 0) {
TaskDTO envTask = this.getTaskInfoById(task.getEnvId());
if (Asserts.isNotNull(envTask) && Asserts.isNotNullString(envTask.getStatement())) {
sql += envTask.getStatement() + CommonConstant.LineSep;
Expand Down Expand Up @@ -279,6 +279,34 @@ public JobResult submitTask(Integer id, String savePointPath) throws Exception {
return jobResult;
}

@Override
@ProcessStep(type = ProcessStepType.SUBMIT_TASK)
public JobResult debugTask(DebugDTO debugDTO) throws Exception {
initTenantByTaskId(debugDTO.getId());

TaskDTO taskDTO = this.getTaskInfoById(debugDTO.getId());
taskDTO.setUseResult(debugDTO.isUseResult());
taskDTO.setUseChangeLog(debugDTO.isUseChangeLog());
taskDTO.setUseAutoCancel(debugDTO.isUseAutoCancel());
taskDTO.setMaxRowNum(debugDTO.getMaxRowNum());
// 注解自调用会失效,这里通过获取对象方法绕过此限制
TaskServiceImpl taskServiceBean = applicationContext.getBean(TaskServiceImpl.class);
taskServiceBean.preCheckTask(taskDTO);

JobResult jobResult = taskServiceBean.executeJob(taskDTO);

if (Job.JobStatus.SUCCESS == jobResult.getStatus()) {
log.info("Job debug success");
Task task = new Task(debugDTO.getId(), jobResult.getJobInstanceId());
if (!this.updateById(task)) {
throw new BusException(Status.TASK_UPDATE_FAILED.getMessage());
}
} else {
log.error("Job debug failed, error: " + jobResult.getError());
}
return jobResult;
}

@Override
public JobResult restartTask(Integer id, String savePointPath) throws Exception {
TaskDTO task = this.getTaskInfoById(id);
Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/resources/db/db-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,7 @@ CREATE TABLE `dinky_metrics` (
`position` int(11) DEFAULT null,
`show_type` varchar(255) DEFAULT null,
`show_size` varchar(255) DEFAULT null,
`title` varchar(255) DEFAULT null,
`title` CLOB DEFAULT null,
`layout_name` varchar(255) DEFAULT null,
`create_time` datetime DEFAULT null,
`update_time` datetime DEFAULT null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import javax.xml.bind.DatatypeConverter;

Expand Down Expand Up @@ -239,13 +240,16 @@ protected List<Operation> createInsertOperations(

List<Operation> operations = customTableEnvironment.getParser().parse(cdcSqlInsert);
logger.info("Create {} FlinkSQL insert into successful...", tableName);
if (operations.isEmpty()) {
return operations;
}

try {
if (!operations.isEmpty()) {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation) operation);
}

} catch (Exception e) {
logger.error("Translate to plan occur exception: {}", e.toString());
throw e;
Expand Down Expand Up @@ -445,53 +449,38 @@ public interface ConvertType {

@Override
public String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
}
return schemaName;
return config.getSink().getOrDefault("sink.db", table.getSchema());
}

@Override
public String getSinkTableName(Table table) {
String tableName = table.getName();
if (config.getSink().containsKey("table.prefix.schema")
&& (Boolean.parseBoolean(config.getSink().get("table.prefix.schema")))) {
Map<String, String> sink = config.getSink();
if (Boolean.parseBoolean(sink.get("table.prefix.schema"))) {
tableName = table.getSchema() + "_" + tableName;
}

if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
}

if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
}
tableName = sink.getOrDefault("table.prefix", "") + tableName + sink.getOrDefault("table.suffix", "");

if (config.getSink().containsKey("table.lower")
&& (Boolean.parseBoolean(config.getSink().get("table.lower")))) {
if (Boolean.parseBoolean(sink.get("table.lower"))) {
tableName = tableName.toLowerCase();
}

if (config.getSink().containsKey("table.upper")
&& (Boolean.parseBoolean(config.getSink().get("table.upper")))) {
if (Boolean.parseBoolean(sink.get("table.upper"))) {
tableName = tableName.toUpperCase();
}
return tableName;
}

protected List<String> getPKList(Table table) {
List<String> pks = new ArrayList<>();
if (Asserts.isNullCollection(table.getColumns())) {
return pks;
return new ArrayList<>();
}

for (Column column : table.getColumns()) {
if (column.isKeyFlag()) {
pks.add(column.getName());
}
}
return pks;
return table.getColumns().stream()
.filter(Column::isKeyFlag)
.map(Column::getName)
.collect(Collectors.toList());
}

protected ZoneId getSinkTimeZone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -64,16 +63,8 @@ private static Map<String, Supplier<SinkBuilder>> getPlusSinkBuilder() {
final ServiceLoader<SinkBuilder> loader = ServiceLoader.load(SinkBuilder.class);

final List<SinkBuilder> sinkBuilders = new ArrayList<>();
final Iterator<SinkBuilder> factories = loader.iterator();
while (factories.hasNext()) {
try {
final SinkBuilder factory = factories.next();
if (factory != null) {
sinkBuilders.add(factory);
}
} catch (Throwable e) {
logger.warn("Could not load service provider class : {}", e.getMessage());
}
for (SinkBuilder factory : loader) {
sinkBuilders.add(factory);
}

Map<String, Supplier<SinkBuilder>> plusSinkBuilder = sinkBuilders.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
} else {
sourceBuilder.schemaList();
}

List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
sourceBuilder.tableList(schemaTableNameList.toArray(new String[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@Getter
public enum ProcessStepType {
SUBMIT_TASK("SUBMIT_TASK", Status.PROCESS_SUBMIT_SUBMITTASK),
SUBMIT_DEBUG("DEBUG_TASK", Status.PROCESS_SUBMIT_SUBMITTASK),
SUBMIT_PRECHECK("SUBMIT_PRECHECK", Status.PROCESS_SUBMIT_CHECKSQL),
SUBMIT_EXECUTE("SUBMIT_EXECUTE", Status.PROCESS_SUBMIT_EXECUTE),
SUBMIT_BUILD_CONFIG("SUBMIT_BUILD_CONFIG", Status.PROCESS_SUBMIT_BUILDCONFIG),
Expand Down
3 changes: 2 additions & 1 deletion dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public enum Status {
MOVE_FAILED(9028, "move.failed"),
TEST_CONNECTION_SUCCESS(9029, "test.connection.success"),
TEST_CONNECTION_FAILED(9030, "test.connection.failed"),

DEBUG_SUCCESS(9031, "debug.success"),
DEBUG_FAILED(9032, "debug.failed"),
/**
* user,tenant,role
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ delete.failed=Delete Failed
role.binding.user=Role Already Binding User , Can Not Delete
not.token=Can Not Read Token
execute.success=Execute Successfully
debug.success=Debug Successfully
debug.failed=Debug Failed
token.freezed=token has been frozen
menu.has.assign=Menu Has Assign , Can Not Delete
datasource.status.refresh.success=DataSource Status Refresh Success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ delete.failed=删除失败
role.binding.user=该角色已绑定用户,无法删除
not.token=未能读取到有效 Token
execute.success=执行成功
debug.success=调试成功
debug.failed=调试失败
token.freezed=token 已被冻结
menu.has.assign=菜单已分配,不允许删除
datasource.status.refresh.success=数据源状态刷新成功
Expand Down
Loading

0 comments on commit bb12e0c

Please sign in to comment.