Skip to content

Commit

Permalink
bug fix 4 object not found
Browse files Browse the repository at this point in the history
  • Loading branch information
MactavishCui committed Nov 12, 2024
1 parent 237b45a commit 7df5f5e
Showing 1 changed file with 21 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.executor.ParserWrapper;
import org.dinky.job.JobParam;
import org.dinky.job.StatementParam;
import org.dinky.parser.SqlType;
Expand All @@ -29,22 +30,19 @@
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -89,22 +87,24 @@ private void mockSink(JobParam jobParam) {
// mock insert table ddl
List<StatementParam> mockedDdl = new ArrayList<>();
for (StatementParam ddl : jobParam.getDdl()) {
List<Operation> parseOperationList = tableEnv.getParser().parse(ddl.getValue());
for (Operation operation : parseOperationList) {
if (operation instanceof CreateTableOperation) {
CreateTableOperation createOperation = (CreateTableOperation) operation;
CatalogTable catalogTable = createOperation.getCatalogTable();
// get table name and check if it should be mocked
String tableName = createOperation.getTableIdentifier().getObjectName();
if (tablesNeedMock.contains(tableName)) {
// generate mock statement
mockedDdl.add(
new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE));
} else {
mockedDdl.add(ddl);
}
ParserWrapper parser = (ParserWrapper) tableEnv.getParser();
SqlNode sqlNode = parser.parseSql(ddl.getValue());
boolean isDdlMocked = false;
if (sqlNode instanceof SqlCreateTable) {
SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
String tableName = sqlCreateTable.getTableName().toString();
if (tablesNeedMock.contains(tableName)) {
// generate mock statement
mockedDdl.add(new StatementParam(
getSinkMockDdlStatement(
tableName, sqlCreateTable.getColumnList().toString()),
SqlType.CREATE));
isDdlMocked = true;
}
}
if (!isDdlMocked) {
mockedDdl.add(ddl);
}
}
jobParam.setDdl(mockedDdl);
log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobParam));
Expand Down Expand Up @@ -155,19 +155,11 @@ private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
* get mocked ddl statement
*
* @param tableName table name
* @param catalogTable catalog table
* @param columns columns
* @return ddl that connector is changed as well as other options not changed
*/
private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
private String getSinkMockDdlStatement(String tableName, String columns) {
String mockedOption = "'connector'='" + MockDynamicTableSinkFactory.IDENTIFIER + "'";
// columns
Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
String columns = unresolvedSchema.getColumns().stream()
.map(column -> {
Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column;
return physicalColumn.getName() + " " + physicalColumn.getDataType();
})
.collect(Collectors.joining(", "));
return MessageFormat.format(
MOCK_SQL_TEMPLATE,
StringUtils.join(generateMockedTableIdentifier(tableName), "."),
Expand Down

0 comments on commit 7df5f5e

Please sign in to comment.