Skip to content

Commit

Permalink
[Optimization-3906][web] Optimize debug task to preview data
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Nov 11, 2024
1 parent 40e32e0 commit bf7735b
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,13 @@ public boolean success() {
history.setClusterId(clusterId);
historyService.updateById(history);

if (!job.getJobConfig().isStatementSet()) {
if (!job.isPipeline()) {
return true;
}

if (Asserts.isNullCollection(job.getJids()) || Asserts.isNullString(job.getJobManagerAddress())) {
throw new BusException("The JobID or JobManagerAddress is null. ");
if (Asserts.isNullCollection(job.getJids())) {
throw new BusException("Job ID retrieval failed, possibly due to timeout of job deployment. "
+ "Please modify the system configuration to increase the waiting time for job submission.");
}

JobInstance jobInstance = history.buildJobInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ public JobResult submitTask(TaskSubmitDto submitDto) throws Exception {
public JobResult debugTask(TaskDTO task) throws Exception {
// Debug mode need return result
task.setUseResult(true);
// Debug mode need execute
task.setStatementSet(task.isMockSinkFunction());
// mode check
if (GatewayType.get(task.getType()).isDeployCluster()) {
throw new BusException(Status.MODE_IS_NOT_ALLOW_SELECT.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public enum SqlType {

INSERT("INSERT", "^INSERT.*", SqlCategory.DML),

MOCKED_INSERT("MOCKED_INSERT", "^MOCKED_INSERT.*", SqlCategory.DML),

DESC("DESC", "^DESC.*", SqlCategory.DDL),

DESCRIBE("DESCRIBE", "^DESCRIBE.*", SqlCategory.DDL),
Expand Down Expand Up @@ -81,6 +79,8 @@ public enum SqlType {
private static final List<SqlType> TRANS_SQL_TYPES =
Lists.newArrayList(INSERT, SELECT, WITH, SHOW, DESCRIBE, DESC, CTAS);

private static final List<SqlType> PIPELINE_SQL_TYPES = Lists.newArrayList(INSERT, SELECT, WITH, CTAS);

SqlType(String type, String regrex, SqlCategory category) {
this.type = type;
this.pattern = Pattern.compile(regrex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
Expand All @@ -106,4 +106,8 @@ public boolean match(String statement) {
public static List<SqlType> getTransSqlTypes() {
return TRANS_SQL_TYPES;
}

public boolean isPipeline() {
return PIPELINE_SQL_TYPES.contains(this);
}
}
17 changes: 14 additions & 3 deletions dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ static ResultBuilder build(
boolean isChangeLog,
boolean isAutoCancel,
String timeZone) {
return build(operationType, id, maxRowNum, isChangeLog, isAutoCancel, timeZone, false);
}

static ResultBuilder build(
SqlType operationType,
String id,
Integer maxRowNum,
boolean isChangeLog,
boolean isAutoCancel,
String timeZone,
boolean isMockSinkFunction) {
switch (operationType) {
case SELECT:
case WITH:
Expand All @@ -47,9 +58,9 @@ static ResultBuilder build(
case DESCRIBE:
return new ShowResultBuilder(id);
case INSERT:
return new InsertResultBuilder();
case MOCKED_INSERT:
return new MockResultBuilder(id, maxRowNum, isAutoCancel);
return isMockSinkFunction
? new MockResultBuilder(id, maxRowNum, isAutoCancel)
: new InsertResultBuilder();
default:
return new DDLResultBuilder();
}
Expand Down
3 changes: 0 additions & 3 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ public JobParam pretreatStatements(String[] statements) {
} else if (transSqlTypeSet.contains(operationType)) {
trans.add(new StatementParam(statement, operationType));
statementList.add(statement);
if (!useStatementSet) {
break;
}
} else if (operationType.equals(SqlType.EXECUTE)) {
execute.add(new StatementParam(statement, operationType));
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.operations.Operation;
Expand All @@ -42,7 +43,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -84,25 +84,19 @@ public void jobParamMock(JobParam jobParam) {
* @param jobParam job param
*/
private void mockSink(JobParam jobParam) {
// Based on insert statements, get table names need to be mocked, and modify insert statements' target table
// Get table names that need to be mocked, and modify insert statement.
Set<String> tablesNeedMock = getTableNamesNeedMockAndModifyTrans(jobParam);
// mock insert table ddl
List<StatementParam> mockedDdl = new ArrayList<>();

for (StatementParam ddl : jobParam.getDdl()) {
List<Operation> parseOperationList = tableEnv.getParser().parse(ddl.getValue());

for (Operation operation : parseOperationList) {
if (operation instanceof CreateTableOperation) {
CreateTableOperation createOperation = (CreateTableOperation) operation;
CatalogTable catalogTable = createOperation.getCatalogTable();
// get table name and check if it should be mocked
String tableName = createOperation.getTableIdentifier().getObjectName();
if (tablesNeedMock.contains(tableName)) {
// drop table first
mockedDdl.add(new StatementParam(
MessageFormat.format(DROP_TABLE_SQL_TEMPLATE, generateMockedTableName(tableName)),
SqlType.DROP));
// generate mock statement
mockedDdl.add(
new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE));
Expand All @@ -113,7 +107,7 @@ private void mockSink(JobParam jobParam) {
}
}
jobParam.setDdl(mockedDdl);
log.info("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam));
log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam));
}

/**
Expand All @@ -136,7 +130,7 @@ private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
sqlInsert.getParserPosition(),
SqlNodeList.EMPTY,
new SqlIdentifier(
generateMockedTableName(
generateMockedTableIdentifier(
sqlInsert.getTargetTable().toString()),
SqlParserPos.ZERO),
sqlInsert.getSource(),
Expand All @@ -149,6 +143,8 @@ private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
} catch (Exception e) {
log.error("Statement parse error, statement: {}", statement.getValue());
}
} else {
mockedTransStatements.add(statement);
}
}
jobParam.setTrans(mockedTransStatements);
Expand All @@ -163,14 +159,7 @@ private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
* @return ddl that connector is changed as well as other options not changed
*/
private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
// options
Map<String, String> optionsMap = catalogTable.getOptions();
optionsMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER);
List<String> withOptionList = new ArrayList<>(optionsMap.size());
for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'");
}
String mockedWithOption = String.join(", ", withOptionList);
String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'";
// columns
Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
String columns = unresolvedSchema.getColumns().stream()
Expand All @@ -179,15 +168,23 @@ private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTab
return physicalColumn.getName() + " " + physicalColumn.getDataType();
})
.collect(Collectors.joining(", "));
return MessageFormat.format(MOCK_SQL_TEMPLATE, generateMockedTableName(tableName), columns, mockedWithOption);
return MessageFormat.format(
MOCK_SQL_TEMPLATE,
StringUtils.join(generateMockedTableIdentifier(tableName), "."),
columns,
mockedOption);
}

/**
* generate table name with mocked prefix
* generate table identifier with mocked prefix info
* @param tableName table name
* @return table name with mocked prefix
* @return table identifier with mocked prefix info
*/
private String generateMockedTableName(String tableName) {
return "mock_sink_" + tableName;
private List<String> generateMockedTableIdentifier(String tableName) {
List<String> names = new ArrayList<>();
names.add("default_catalog");
names.add("default_database");
names.add("mock_sink_" + tableName);
return names;
}
}
4 changes: 3 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Job {
private Executor executor;
private boolean useGateway;
private List<String> jids;
private boolean isPipeline = true;

@Getter
public enum JobStatus {
Expand Down Expand Up @@ -113,7 +114,8 @@ public JobResult getJobResult() {
error,
result,
startTime,
endTime);
endTime,
isPipeline);
}

public boolean isFailed() {
Expand Down
2 changes: 2 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class JobManager {
private boolean useGateway = false;
private boolean isPlanMode = false;
private boolean useStatementSet = false;
private boolean useMockSinkFunction = false;
private boolean useRestAPI = false;
private GatewayType runMode = GatewayType.LOCAL;
private JobParam jobParam = null;
Expand Down Expand Up @@ -214,6 +215,7 @@ public void init() {
handler = JobHandler.build();
}
useStatementSet = config.isStatementSet();
useMockSinkFunction = config.isMockSinkFunction();
useRestAPI = SystemConfiguration.getInstances().isUseRestAPI();
executorConfig = config.getExecutorSetting();
executorConfig.setPlan(isPlanMode);
Expand Down
11 changes: 10 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/JobResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ public class JobResult {
notes = "End time of job execution")
private LocalDateTime endTime;

@ApiModelProperty(
value = "Flag indicating whether the job was pipeline",
dataType = "boolean",
example = "true",
notes = "Flag indicating whether the job was pipeline")
private boolean isPipeline;

public JobResult() {}

public JobResult(
Expand All @@ -127,7 +134,8 @@ public JobResult(
String error,
IResult result,
LocalDateTime startTime,
LocalDateTime endTime) {
LocalDateTime endTime,
boolean isPipeline) {
this.id = id;
this.jobInstanceId = jobInstanceId;
this.jobConfig = jobConfig;
Expand All @@ -140,6 +148,7 @@ public JobResult(
this.result = result;
this.startTime = startTime;
this.endTime = endTime;
this.isPipeline = isPipeline;
}

public void setStartTimeNow() {
Expand Down
48 changes: 23 additions & 25 deletions dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.table.api.TableResult;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* JobTransBuilder
Expand All @@ -66,17 +66,27 @@ public void run() throws Exception {
return;
}

if (useStatementSet) {
if (inferStatementSet()) {
handleStatementSet();
return;
} else {
handleNonStatementSet();
}
}

handleNonStatementSet();
private boolean inferStatementSet() {
boolean hasInsert = false;
for (StatementParam item : jobParam.getTrans()) {
if (item.getType().equals(SqlType.INSERT)) {
hasInsert = true;
break;
}
}
return hasInsert;
}

private void handleStatementSet() throws Exception {
List<String> inserts = collectInserts();

List<String> inserts =
jobParam.getTrans().stream().map(StatementParam::getValue).collect(Collectors.toList());
if (useGateway) {
processWithGateway(inserts);
return;
Expand All @@ -92,18 +102,6 @@ private void handleNonStatementSet() throws Exception {
processFirstStatement();
}

private List<String> collectInserts() {
List<String> inserts = new ArrayList<>();
List<StatementParam> statementParams = useStatementSet
? jobParam.getTrans()
: Collections.singletonList(jobParam.getTrans().get(0));
for (StatementParam item : statementParams) {

inserts.add(item.getValue());
}
return inserts;
}

private void processWithGateway(List<String> inserts) throws Exception {
jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts));
GatewayResult gatewayResult = submitByGateway(inserts);
Expand All @@ -114,16 +112,14 @@ private void processWithoutGateway(List<String> inserts) throws Exception {
if (!inserts.isEmpty()) {
jobManager.setCurrentSql(String.join(FlinkSQLConstant.SEPARATOR, inserts));
TableResult tableResult = executor.executeStatementSet(inserts);
if (jobManager.getConfig().isMockSinkFunction()) {
updateJobWithTableResult(tableResult, SqlType.MOCKED_INSERT);
} else {
updateJobWithTableResult(tableResult);
}
updateJobWithTableResult(tableResult);
}
}

private void processSingleInsertWithGateway() throws Exception {
List<String> singleInsert = collectInserts();
List<String> singleInsert =
Collections.singletonList(jobParam.getTrans().get(0).getValue());
job.setPipeline(jobParam.getTrans().get(0).getType().isPipeline());
processWithGateway(singleInsert);
}

Expand All @@ -133,6 +129,7 @@ private void processFirstStatement() throws Exception {
}
// Only process the first statement when not using statement set
StatementParam item = jobParam.getTrans().get(0);
job.setPipeline(item.getType().isPipeline());
jobManager.setCurrentSql(item.getValue());
processSingleStatement(item);
}
Expand Down Expand Up @@ -178,7 +175,8 @@ private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType)
config.getMaxRowNum(),
config.isUseChangeLog(),
config.isUseAutoCancel(),
executor.getTimeZone())
executor.getTimeZone(),
jobManager.getConfig().isMockSinkFunction())
.getResultWithPersistence(tableResult, jobManager.getHandler());
job.setResult(result);
}
Expand Down
Loading

0 comments on commit bf7735b

Please sign in to comment.