From 0cb26903aad0fba7cd1f695e9e4718052c14e7da Mon Sep 17 00:00:00 2001 From: MactavishCui Date: Wed, 6 Nov 2024 22:47:07 +0800 Subject: [PATCH] MockStatementExplainer refactor --- .../java/org/dinky/explainer/Explainer.java | 8 +- .../mock/MockStatementExplainer.java | 140 +++++++++--------- 2 files changed, 73 insertions(+), 75 deletions(-) diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index febba23cb9..53f08b4e68 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -185,9 +185,11 @@ public JobParam pretreatStatements(String[] statements) { } JobParam jobParam = new JobParam(statementList, ddl, trans, execute, CollUtil.removeNull(udfList), parsedSql.toString()); - if (jobManager.getConfig().isMockSinkFunction()) { - MockStatementExplainer.jobParamMock(jobParam); - } + + MockStatementExplainer.build(executor.getCustomTableEnvironment()) + .isMockSink(jobManager.getConfig().isMockSinkFunction()) + .jobParamMock(jobParam); + return jobParam; } diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index 9b13be7542..aa04860e82 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -19,8 +19,8 @@ package org.dinky.explainer.mock; -import org.dinky.assertion.Asserts; import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; +import org.dinky.executor.CustomTableEnvironment; import org.dinky.job.JobParam; import org.dinky.job.StatementParam; import org.dinky.parser.SqlType; @@ -28,50 +28,80 @@ import org.apache.calcite.sql.SqlInsert; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; import java.text.MessageFormat; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j public class MockStatementExplainer { - public static final String PATTERN_STR = - "CREATE\\s+TABLE\\s+(\\w+)\\s*\\(\\s*([\\s\\S]*?)\\s*\\)\\s*WITH\\s*\\(\\s*([\\s\\S]*?)\\s*\\)"; - public static final Pattern PATTERN = Pattern.compile(PATTERN_STR, Pattern.CASE_INSENSITIVE); + // Because calcite cannot parse flink sql ddl, a table environment is designed here for flink sql ddl pars + private final CustomTableEnvironment tableEnv; + private boolean isMockSink = false; public static final String MOCK_SQL_TEMPLATE = "CREATE TABLE {0} ({1}) WITH ({2})"; + public static MockStatementExplainer build(CustomTableEnvironment tableEnv) { + return new MockStatementExplainer(tableEnv); + } + + public MockStatementExplainer(CustomTableEnvironment tableEnv) { + this.tableEnv = tableEnv; + } + + public MockStatementExplainer isMockSink(boolean isMockSink) { + this.isMockSink = isMockSink; + return this; + } + + public void jobParamMock(JobParam jobParam) { + if (isMockSink) { + mockSink(jobParam); + } + } + /** * The connector of insert tables will be changed to {@link MockDynamicTableSinkFactory} * * @param jobParam job param */ - public static void jobParamMock(JobParam jobParam) { + private void mockSink(JobParam jobParam) { // Based on insert statements, get table names need to be mocked - Set tablesNeedMock = getMockedTableNames(jobParam.getTrans()); + Set tablesNeedMock = getSinkTableNamesNeedMock(jobParam.getTrans()); // mock insert table ddl List mockedDdl = new ArrayList<>(); + for (StatementParam ddl : jobParam.getDdl()) { - // table name check - String tableName = - getDdlTableName(ddl.getValue().replaceAll("\\n", " ").replaceAll(" +", " ")); - // mock connector - if (Asserts.isNotNull(tableName) && tablesNeedMock.contains(tableName.toUpperCase())) { - mockedDdl.add(new StatementParam(getSinkMockDdlStatement(ddl.getValue()), SqlType.CREATE)); - } else { - mockedDdl.add(ddl); + List parseOperationList = tableEnv.getParser().parse(ddl.getValue()); + + for (Operation operation : parseOperationList) { + if (operation instanceof CreateTableOperation) { + CreateTableOperation createOperation = (CreateTableOperation) operation; + CatalogTable catalogTable = createOperation.getCatalogTable(); + // get table name and check if it should be mocked + String tableName = createOperation.getTableIdentifier().getObjectName(); + if (tablesNeedMock.contains(tableName.toUpperCase())) { + // generate mock statement + mockedDdl.add( + new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE)); + } else { + mockedDdl.add(ddl); + } + } } } jobParam.setDdl(mockedDdl); - log.info("Mock succeed: {}", JsonUtils.toJsonString(jobParam)); + log.info("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam)); } /** @@ -80,7 +110,7 @@ public static void jobParamMock(JobParam jobParam) { * @param transStatements trans statement that contains all insert statements * @return a hash set, which contains all insert table names */ - private static Set getMockedTableNames(List transStatements) { + private static Set getSinkTableNamesNeedMock(List transStatements) { Set insertTables = new HashSet<>(); for (StatementParam statement : transStatements) { if (statement.getType().equals(SqlType.INSERT)) { @@ -96,64 +126,30 @@ private static Set getMockedTableNames(List transStateme return insertTables; } - /** - * get table name from ddl - * - * @param ddl ddl statement - * @return table name - */ - private static String getDdlTableName(String ddl) { - Matcher matcher = PATTERN.matcher(ddl); - if (matcher.find()) { - // table name and columns - return matcher.group(1); - } - return ""; - } - /** * get mocked ddl statement * - * @param ddl ddl statement + * @param tableName table name + * @param catalogTable catalog table * @return ddl that connector is changed as well as other options not changed */ - private static String getSinkMockDdlStatement(String ddl) { - Matcher matcher = PATTERN.matcher(ddl); - if (matcher.find()) { - // table name and columns - String tableName = matcher.group(1); - String columns = matcher.group(2); - // with clause - Map withClauseMap = parseWithClause(matcher.group(3)); - // connector mock - withClauseMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER); - List withOptionList = new ArrayList<>(withClauseMap.size()); - for (Map.Entry entry : withClauseMap.entrySet()) { - withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'"); - } - String mockedWithOption = String.join(", ", withOptionList); - return MessageFormat.format(MOCK_SQL_TEMPLATE, tableName, columns, mockedWithOption); - } - return ddl; - } - - /** - * parse with clause - * - * @param withClause with clause string - * @return a hash map contains with clause information - */ - private static Map parseWithClause(String withClause) { - Map options = new HashMap<>(); - String[] keyValuePairs = withClause.split(","); - for (String pair : keyValuePairs) { - String[] keyValue = pair.split("="); - if (keyValue.length == 2) { - String key = keyValue[0].trim().replaceAll("['\"]", ""); - String value = keyValue[1].trim().replaceAll("['\"]", ""); - options.put(key, value); - } + private static String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) { + // options + Map optionsMap = catalogTable.getOptions(); + optionsMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER); + List withOptionList = new ArrayList<>(optionsMap.size()); + for (Map.Entry entry : optionsMap.entrySet()) { + withOptionList.add("'" + entry.getKey() + "' = '" + entry.getValue() + "'"); } - return options; + String mockedWithOption = String.join(", ", withOptionList); + // columns + Schema unresolvedSchema = catalogTable.getUnresolvedSchema(); + String columns = unresolvedSchema.getColumns().stream() + .map(column -> { + Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; + return physicalColumn.getName() + " " + physicalColumn.getDataType(); + }) + .collect(Collectors.joining(", ")); + return MessageFormat.format(MOCK_SQL_TEMPLATE, tableName, columns, mockedWithOption); } }