From 7df5f5eceba9c15a334d3b1c0da0af7b8326a0bc Mon Sep 17 00:00:00 2001 From: MactavishCui Date: Wed, 13 Nov 2024 00:01:04 +0800 Subject: [PATCH] bug fix 4 object not found --- .../mock/MockStatementExplainer.java | 50 ++++++++----------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index 8f6f7873b9..9589a7cbf2 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -21,6 +21,7 @@ import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; import org.dinky.executor.CustomTableEnvironment; +import org.dinky.executor.ParserWrapper; import org.dinky.job.JobParam; import org.dinky.job.StatementParam; import org.dinky.parser.SqlType; @@ -29,22 +30,19 @@ import org.apache.calcite.config.Lex; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.sql.parser.ddl.SqlCreateTable; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -89,22 +87,24 @@ private void mockSink(JobParam jobParam) { // mock insert table ddl List mockedDdl = new ArrayList<>(); for (StatementParam ddl : jobParam.getDdl()) { - List parseOperationList = tableEnv.getParser().parse(ddl.getValue()); - for (Operation operation : parseOperationList) { - if (operation instanceof CreateTableOperation) { - CreateTableOperation createOperation = (CreateTableOperation) operation; - CatalogTable catalogTable = createOperation.getCatalogTable(); - // get table name and check if it should be mocked - String tableName = createOperation.getTableIdentifier().getObjectName(); - if (tablesNeedMock.contains(tableName)) { - // generate mock statement - mockedDdl.add( - new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE)); - } else { - mockedDdl.add(ddl); - } + ParserWrapper parser = (ParserWrapper) tableEnv.getParser(); + SqlNode sqlNode = parser.parseSql(ddl.getValue()); + boolean isDdlMocked = false; + if (sqlNode instanceof SqlCreateTable) { + SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode; + String tableName = sqlCreateTable.getTableName().toString(); + if (tablesNeedMock.contains(tableName)) { + // generate mock statement + mockedDdl.add(new StatementParam( + getSinkMockDdlStatement( + tableName, sqlCreateTable.getColumnList().toString()), + SqlType.CREATE)); + isDdlMocked = true; } } + if (!isDdlMocked) { + mockedDdl.add(ddl); + } } jobParam.setDdl(mockedDdl); log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam)); @@ -155,19 +155,11 @@ private Set getTableNamesNeedMockAndModifyTrans(JobParam jobParam) { * get mocked ddl statement * * @param tableName table name - * @param catalogTable catalog table + * @param columns columns * @return ddl that connector is changed as well as other options not changed */ - private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) { + private String getSinkMockDdlStatement(String tableName, String columns) { String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'"; - // columns - Schema unresolvedSchema = catalogTable.getUnresolvedSchema(); - String columns = unresolvedSchema.getColumns().stream() - .map(column -> { - Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; - return physicalColumn.getName() + " " + physicalColumn.getDataType(); - }) - .collect(Collectors.joining(", ")); return MessageFormat.format( MOCK_SQL_TEMPLATE, StringUtils.join(generateMockedTableIdentifier(tableName), "."),