Skip to content

Commit

Permalink
MockStatementExplainer refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MactavishCui committed Nov 6, 2024
1 parent defa77d commit 0cb2690
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 75 deletions.
8 changes: 5 additions & 3 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ public JobParam pretreatStatements(String[] statements) {
}
JobParam jobParam =
new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString());
if (jobManager.getConfig().isMockSinkFunction()) {
MockStatementExplainer.jobParamMock(jobParam);
}

MockStatementExplainer.build(executor.getCustomTableEnvironment())
.isMockSink(jobManager.getConfig().isMockSinkFunction())
.jobParamMock(jobParam);

return jobParam;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,89 @@

package org.dinky.explainer.mock;

import org.dinky.assertion.Asserts;
import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.job.JobParam;
import org.dinky.job.StatementParam;
import org.dinky.parser.SqlType;
import org.dinky.utils.JsonUtils;

import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MockStatementExplainer {

public static final String PATTERN_STR =
"CREATE\\s+TABLE\\s+(\\w+)\\s*\\(\\s*([\\s\\S]*?)\\s*\\)\\s*WITH\\s*\\(\\s*([\\s\\S]*?)\\s*\\)";
public static final Pattern PATTERN = Pattern.compile(PATTERN_STR, Pattern.CASE_INSENSITIVE);
// Because calcite cannot parse flink sql ddl, a table environment is designed here for flink sql ddl pars
private final CustomTableEnvironment tableEnv;
private boolean isMockSink = false;
public static final String MOCK_SQL_TEMPLATE = "CREATE TABLE {0} ({1}) WITH ({2})";

public static MockStatementExplainer build(CustomTableEnvironment tableEnv) {
return new MockStatementExplainer(tableEnv);
}

public MockStatementExplainer(CustomTableEnvironment tableEnv) {
this.tableEnv = tableEnv;
}

public MockStatementExplainer isMockSink(boolean isMockSink) {
this.isMockSink = isMockSink;
return this;
}

public void jobParamMock(JobParam jobParam) {
if (isMockSink) {
mockSink(jobParam);
}
}

/**
* The connector of insert tables will be changed to {@link MockDynamicTableSinkFactory}
*
* @param jobParam job param
*/
public static void jobParamMock(JobParam jobParam) {
private void mockSink(JobParam jobParam) {
// Based on insert statements, get table names need to be mocked
Set<String> tablesNeedMock = getMockedTableNames(jobParam.getTrans());
Set<String> tablesNeedMock = getSinkTableNamesNeedMock(jobParam.getTrans());
// mock insert table ddl
List<StatementParam> mockedDdl = new ArrayList<>();

for (StatementParam ddl : jobParam.getDdl()) {
// table name check
String tableName =
getDdlTableName(ddl.getValue().replaceAll("\\n", " ").replaceAll(" +", " "));
// mock connector
if (Asserts.isNotNull(tableName) && tablesNeedMock.contains(tableName.toUpperCase())) {
mockedDdl.add(new StatementParam(getSinkMockDdlStatement(ddl.getValue()), SqlType.CREATE));
} else {
mockedDdl.add(ddl);
List<Operation> parseOperationList = tableEnv.getParser().parse(ddl.getValue());

for (Operation operation : parseOperationList) {
if (operation instanceof CreateTableOperation) {
CreateTableOperation createOperation = (CreateTableOperation) operation;
CatalogTable catalogTable = createOperation.getCatalogTable();
// get table name and check if it should be mocked
String tableName = createOperation.getTableIdentifier().getObjectName();
if (tablesNeedMock.contains(tableName.toUpperCase())) {
// generate mock statement
mockedDdl.add(
new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE));
} else {
mockedDdl.add(ddl);
}
}
}
}
jobParam.setDdl(mockedDdl);
log.info("Mock succeed: {}", JsonUtils.toJsonString(jobParam));
log.info("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam));
}

/**
Expand All @@ -80,7 +110,7 @@ public static void jobParamMock(JobParam jobParam) {
* @param transStatements trans statement that contains all insert statements
* @return a hash set, which contains all insert table names
*/
private static Set<String> getMockedTableNames(List<StatementParam> transStatements) {
private static Set<String> getSinkTableNamesNeedMock(List<StatementParam> transStatements) {
Set<String> insertTables = new HashSet<>();
for (StatementParam statement : transStatements) {
if (statement.getType().equals(SqlType.INSERT)) {
Expand All @@ -96,64 +126,30 @@ private static Set<String> getMockedTableNames(List<StatementParam> transStateme
return insertTables;
}

/**
* get table name from ddl
*
* @param ddl ddl statement
* @return table name
*/
private static String getDdlTableName(String ddl) {
Matcher matcher = PATTERN.matcher(ddl);
if (matcher.find()) {
// table name and columns
return matcher.group(1);
}
return "";
}

/**
* get mocked ddl statement
*
* @param ddl ddl statement
* @param tableName table name
* @param catalogTable catalog table
* @return ddl that connector is changed as well as other options not changed
*/
private static String getSinkMockDdlStatement(String ddl) {
Matcher matcher = PATTERN.matcher(ddl);
if (matcher.find()) {
// table name and columns
String tableName = matcher.group(1);
String columns = matcher.group(2);
// with clause
Map<String, String> withClauseMap = parseWithClause(matcher.group(3));
// connector mock
withClauseMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER);
List<String> withOptionList = new ArrayList<>(withClauseMap.size());
for (Map.Entry<String, String> entry : withClauseMap.entrySet()) {
withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'");
}
String mockedWithOption = String.join(", ", withOptionList);
return MessageFormat.format(MOCK_SQL_TEMPLATE, tableName, columns, mockedWithOption);
}
return ddl;
}

/**
* parse with clause
*
* @param withClause with clause string
* @return a hash map contains with clause information
*/
private static Map<String, String> parseWithClause(String withClause) {
Map<String, String> options = new HashMap<>();
String[] keyValuePairs = withClause.split(",");
for (String pair : keyValuePairs) {
String[] keyValue = pair.split("=");
if (keyValue.length == 2) {
String key = keyValue[0].trim().replaceAll("['\"]", "");
String value = keyValue[1].trim().replaceAll("['\"]", "");
options.put(key, value);
}
private static String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
// options
Map<String, String> optionsMap = catalogTable.getOptions();
optionsMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER);
List<String> withOptionList = new ArrayList<>(optionsMap.size());
for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'");
}
return options;
String mockedWithOption = String.join(", ", withOptionList);
// columns
Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
String columns = unresolvedSchema.getColumns().stream()
.map(column -> {
Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column;
return physicalColumn.getName() + " " + physicalColumn.getDataType();
})
.collect(Collectors.joining(", "));
return MessageFormat.format(MOCK_SQL_TEMPLATE, tableName, columns, mockedWithOption);
}
}

0 comments on commit 0cb2690

Please sign in to comment.