From d4a0786220f086e5af561bf64fd47fbed094b95c Mon Sep 17 00:00:00 2001 From: zhangjun Date: Fri, 21 Jul 2023 11:34:55 +0800 Subject: [PATCH] parse primary key and schema by historyRecord --- .../mysql/MySqlDebeziumJsonEventParser.java | 37 ++++----- .../cdc/mysql/MySqlTableSchemaBuilder.java | 79 ++++++++----------- .../mysql/MySqlSyncDatabaseActionITCase.java | 17 +++- 3 files changed, 64 insertions(+), 69 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index f5a3d8b538c5b..67a28988029e5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -35,15 +35,11 @@ import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.shade.guava30.com.google.common.base.Strings; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement; -import com.alibaba.druid.util.JdbcConstants; +import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.json.JsonConverterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +74,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser { private final boolean caseSensitive; private final TableNameConverter tableNameConverter; private final List computedColumns; - private final NewTableSchemaBuilder schemaBuilder; + private final NewTableSchemaBuilder schemaBuilder; @Nullable private final Pattern includingPattern; @Nullable private final Pattern excludingPattern; private final Set includedTables = new HashSet<>(); @@ -111,7 +107,7 @@ public MySqlDebeziumJsonEventParser( ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter, - NewTableSchemaBuilder schemaBuilder, + NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, boolean convertTinyint1ToBool) { @@ -131,7 +127,7 @@ public MySqlDebeziumJsonEventParser( boolean caseSensitive, List computedColumns, TableNameConverter tableNameConverter, - NewTableSchemaBuilder schemaBuilder, + NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, boolean convertTinyint1ToBool) { @@ -231,20 +227,25 @@ public Optional parseNewTable() { try { String historyRecordString = historyRecord.asText(); - String ddl = objectMapper.readTree(historyRecordString).get("ddl").asText(); - if (Strings.isNullOrEmpty(ddl)) { - return Optional.empty(); + JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges"); + if (tableChanges.size() != 1) { + throw new IllegalArgumentException( + "Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + + historyRecord.asText()); } - SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL); - if (!(statement instanceof MySqlCreateTableStatement)) { + JsonNode tableChange = tableChanges.get(0); + if (!tableChange + .get("type") + .asText() + .equals(TableChanges.TableChangeType.CREATE.name())) { return Optional.empty(); } - MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement; - List primaryKeys = createTableStatement.getPrimaryKeyNames(); - String tableName = createTableStatement.getTableName(); - if (primaryKeys.isEmpty()) { + JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames"); + if (primaryKeyColumnNames.size() == 0) { + String id = tableChange.get("id").asText(); + String tableName = id.replaceAll("\"", "").split("\\.")[1]; LOG.debug( "Didn't find primary keys from MySQL DDL for table '{}'. " + "This table won't be synchronized.", @@ -254,7 +255,7 @@ public Optional parseNewTable() { return Optional.empty(); } - return schemaBuilder.build(createTableStatement); + return schemaBuilder.build(tableChange); } catch (Exception e) { LOG.info("Failed to parse history record for schema changes", e); return Optional.empty(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java index 0d9561fef9573..e7176c6fa0c15 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java @@ -22,16 +22,10 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataType; -import com.alibaba.druid.sql.ast.SQLDataType; -import com.alibaba.druid.sql.ast.SQLExpr; -import com.alibaba.druid.sql.ast.SQLName; -import com.alibaba.druid.sql.ast.expr.SQLCharExpr; -import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr; -import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition; -import com.alibaba.druid.sql.ast.statement.SQLTableElement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -42,7 +36,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** Schema builder for MySQL cdc. */ -public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder { +public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder { private final Map tableConfig; private final boolean caseSensitive; @@ -53,50 +47,41 @@ public MySqlTableSchemaBuilder(Map tableConfig, boolean caseSens } @Override - public Optional build(MySqlCreateTableStatement statement) { - List columns = statement.getTableElementList(); - LinkedHashMap> fields = new LinkedHashMap<>(); - - for (SQLTableElement element : columns) { - if (element instanceof SQLColumnDefinition) { - SQLColumnDefinition column = (SQLColumnDefinition) element; - SQLName name = column.getName(); - SQLDataType dataType = column.getDataType(); - List arguments = dataType.getArguments(); - Integer precision = null; - Integer scale = null; - if (arguments.size() >= 1) { - precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue()); - } - - if (arguments.size() >= 2) { - scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue()); - } - - SQLCharExpr comment = (SQLCharExpr) column.getComment(); - fields.put( - name.getSimpleName(), - Tuple2.of( - MySqlTypeUtils.toDataType( - column.getDataType().getName(), - precision, - scale, - MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()), - comment == null ? null : String.valueOf(comment.getValue()))); - } + public Optional build(JsonNode tableChange) { + JsonNode jsonTable = tableChange.get("table"); + String tableName = tableChange.get("id").asText(); + ArrayNode columns = (ArrayNode) jsonTable.get("columns"); + LinkedHashMap fields = new LinkedHashMap<>(); + + for (JsonNode element : columns) { + Integer precision = element.has("length") ? element.get("length").asInt() : null; + Integer scale = element.has("scale") ? element.get("scale").asInt() : null; + fields.put( + element.get("name").asText(), + // TODO : add table comment and column comment when we upgrade flink cdc to 2.4 + MySqlTypeUtils.toDataType( + element.get("typeExpression").asText(), + precision, + scale, + MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()) + .copy(element.get("optional").asBoolean())); } - List primaryKeys = statement.getPrimaryKeyNames(); + ArrayNode arrayNode = (ArrayNode) jsonTable.get("primaryKeyColumnNames"); + List primaryKeys = new ArrayList<>(); + for (JsonNode primary : arrayNode) { + primaryKeys.add(primary.asText()); + } if (!caseSensitive) { - LinkedHashMap> tmp = new LinkedHashMap<>(); - for (Map.Entry> entry : fields.entrySet()) { + LinkedHashMap tmp = new LinkedHashMap<>(); + for (Map.Entry entry : fields.entrySet()) { String fieldName = entry.getKey(); checkArgument( !tmp.containsKey(fieldName.toLowerCase()), "Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.", fieldName, - statement.getTableName()); + tableName); tmp.put(fieldName.toLowerCase(), entry.getValue()); } fields = tmp; @@ -107,8 +92,8 @@ public Optional build(MySqlCreateTableStatement statement) { Schema.Builder builder = Schema.newBuilder(); builder.options(tableConfig); - for (Map.Entry> entry : fields.entrySet()) { - builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1); + for (Map.Entry entry : fields.entrySet()) { + builder.column(entry.getKey(), entry.getValue()); } Schema schema = builder.primaryKey(primaryKeys).build(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 1606cab2a28c1..0595d6fa04da3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -823,24 +823,33 @@ public void testAddIgnoredTable() throws Exception { waitForResult(Collections.singletonList("+I[1, one]"), table1, rowType, primaryKeys); // create new tables at runtime - // synchronized table: t2 + // synchronized table: t2, t22 statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))"); statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')"); - // not synchronized tables: ta, t3 + + statement.executeUpdate("CREATE TABLE t22 LIKE t2"); + statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')"); + + // not synchronized tables: ta, t3, t4 statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))"); statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')"); statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))"); statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')"); + statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2"); statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')"); waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, rowType, primaryKeys); // check tables - assertTableExists(Arrays.asList("t1", "t2")); - assertTableNotExists(Arrays.asList("a", "ta", "t3")); + assertTableExists(Arrays.asList("t1", "t2", "t22")); + assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4")); FileStoreTable newTable = getFileStoreTable("t2"); waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, rowType, primaryKeys); + + newTable = getFileStoreTable("t22"); + waitForResult( + Collections.singletonList("+I[1, Hello]"), newTable, rowType, primaryKeys); } }