Skip to content

Commit

Permalink
MockStatementExplainer update: big fix for multi table error when usi…
Browse files Browse the repository at this point in the history
…ng catalog
  • Loading branch information
MactavishCui committed Nov 9, 2024
1 parent d0d2831 commit 9ff2bb3
Showing 1 changed file with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import org.dinky.parser.SqlType;
import org.dinky.utils.JsonUtils;

import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.operations.Operation;
Expand All @@ -49,14 +54,17 @@ public class MockStatementExplainer {
// Because calcite cannot parse flink sql ddl, a table environment is designed here for flink sql ddl pars
private final CustomTableEnvironment tableEnv;
private boolean isMockSink = false;
public static final String MOCK_SQL_TEMPLATE = "CREATE TABLE {0} ({1}) WITH ({2})";
private final SqlParser.Config calciteConfig;
private final String MOCK_TABLE_PREFIX = "mock_sink_";
public static final String MOCK_SQL_TEMPLATE = "CREATE TABLE IF NOT EXISTS {0} ({1}) WITH ({2})";

public static MockStatementExplainer build(CustomTableEnvironment tableEnv) {
return new MockStatementExplainer(tableEnv);
}

public MockStatementExplainer(CustomTableEnvironment tableEnv) {
this.tableEnv = tableEnv;
this.calciteConfig = SqlParser.config().withLex(Lex.JAVA);
}

public MockStatementExplainer isMockSink(boolean isMockSink) {
Expand All @@ -76,8 +84,8 @@ public void jobParamMock(JobParam jobParam) {
* @param jobParam job param
*/
private void mockSink(JobParam jobParam) {
// Based on insert statements, get table names need to be mocked
Set<String> tablesNeedMock = getSinkTableNamesNeedMock(jobParam.getTrans());
// Based on insert statements, get table names need to be mocked, and modify insert statements' target table
Set<String> tablesNeedMock = getTableNamesNeedMockAndModifyTrans(jobParam);
// mock insert table ddl
List<StatementParam> mockedDdl = new ArrayList<>();

Expand All @@ -90,7 +98,7 @@ private void mockSink(JobParam jobParam) {
CatalogTable catalogTable = createOperation.getCatalogTable();
// get table name and check if it should be mocked
String tableName = createOperation.getTableIdentifier().getObjectName();
if (tablesNeedMock.contains(tableName.toUpperCase())) {
if (tablesNeedMock.contains(tableName)) {
// generate mock statement
mockedDdl.add(
new StatementParam(getSinkMockDdlStatement(tableName, catalogTable), SqlType.CREATE));
Expand All @@ -107,22 +115,39 @@ private void mockSink(JobParam jobParam) {
/**
* get tables names of insert statements, these tables will be mocked
*
* @param transStatements trans statement that contains all insert statements
* @param jobParam jobParam
* @return a hash set, which contains all insert table names
*/
private static Set<String> getSinkTableNamesNeedMock(List<StatementParam> transStatements) {
private Set<String> getTableNamesNeedMockAndModifyTrans(JobParam jobParam) {
List<StatementParam> transStatements = jobParam.getTrans();
List<StatementParam> mockedTransStatements = new ArrayList<>();
Set<String> insertTables = new HashSet<>();
for (StatementParam statement : transStatements) {
if (statement.getType().equals(SqlType.INSERT)) {
try {
SqlInsert sqlInsert =
(SqlInsert) SqlParser.create(statement.getValue()).parseQuery();
SqlInsert sqlInsert = (SqlInsert) SqlParser.create(statement.getValue(), calciteConfig)
.parseQuery();
insertTables.add(sqlInsert.getTargetTable().toString());
SqlInsert mockedInsertTrans = new SqlInsert(
sqlInsert.getParserPosition(),
SqlNodeList.EMPTY,
new SqlIdentifier(
MOCK_TABLE_PREFIX
+ sqlInsert.getTargetTable().toString(),
SqlParserPos.ZERO),
sqlInsert.getSource(),
sqlInsert.getTargetColumnList());
mockedTransStatements.add(new StatementParam(
mockedInsertTrans
.toSqlString(AnsiSqlDialect.DEFAULT)
.toString(),
SqlType.INSERT));
} catch (Exception e) {
log.error("Statement parse error, statement: {}", statement.getValue());
}
}
}
jobParam.setTrans(mockedTransStatements);
return insertTables;
}

Expand All @@ -133,7 +158,7 @@ private static Set<String> getSinkTableNamesNeedMock(List<StatementParam> transS
* @param catalogTable catalog table
* @return ddl that connector is changed as well as other options not changed
*/
private static String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
private String getSinkMockDdlStatement(String tableName, CatalogTable catalogTable) {
// options
Map<String, String> optionsMap = catalogTable.getOptions();
optionsMap.put("connector", MockDynamicTableSinkFactory.IDENTIFIER);
Expand All @@ -150,6 +175,6 @@ private static String getSinkMockDdlStatement(String tableName, CatalogTable cat
return physicalColumn.getName() + " " + physicalColumn.getDataType();
})
.collect(Collectors.joining(", "));
return MessageFormat.format(MOCK_SQL_TEMPLATE, tableName, columns, mockedWithOption);
return MessageFormat.format(MOCK_SQL_TEMPLATE, MOCK_TABLE_PREFIX + tableName, columns, mockedWithOption);
}
}

0 comments on commit 9ff2bb3

Please sign in to comment.