Skip to content

Commit

Permalink
[cdc] Parse primary key and schema by historyRecord (#1621)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjun0x01 authored Jul 24, 2023
1 parent df55364 commit ba02730
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.base.Strings;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import com.alibaba.druid.util.JdbcConstants;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +74,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
private final NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder;
private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final Set<String> includedTables = new HashSet<>();
Expand Down Expand Up @@ -111,7 +107,7 @@ public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
Expand All @@ -131,7 +127,7 @@ public MySqlDebeziumJsonEventParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
Expand Down Expand Up @@ -231,20 +227,25 @@ public Optional<Schema> parseNewTable() {

try {
String historyRecordString = historyRecord.asText();
String ddl = objectMapper.readTree(historyRecordString).get("ddl").asText();
if (Strings.isNullOrEmpty(ddl)) {
return Optional.empty();
JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges");
if (tableChanges.size() != 1) {
throw new IllegalArgumentException(
"Invalid historyRecord, because tableChanges should contain exactly 1 item.\n"
+ historyRecord.asText());
}

SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
if (!(statement instanceof MySqlCreateTableStatement)) {
JsonNode tableChange = tableChanges.get(0);
if (!tableChange
.get("type")
.asText()
.equals(TableChanges.TableChangeType.CREATE.name())) {
return Optional.empty();
}

MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
String tableName = createTableStatement.getTableName();
if (primaryKeys.isEmpty()) {
JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames");
if (primaryKeyColumnNames.size() == 0) {
String id = tableChange.get("id").asText();
String tableName = id.replaceAll("\"", "").split("\\.")[1];
LOG.debug(
"Didn't find primary keys from MySQL DDL for table '{}'. "
+ "This table won't be synchronized.",
Expand All @@ -254,7 +255,7 @@ public Optional<Schema> parseNewTable() {
return Optional.empty();
}

return schemaBuilder.build(createTableStatement);
return schemaBuilder.build(tableChange);
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;

import com.alibaba.druid.sql.ast.SQLDataType;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLName;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.ast.statement.SQLTableElement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,7 +36,7 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Schema builder for MySQL cdc. */
public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<MySqlCreateTableStatement> {
public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<JsonNode> {

private final Map<String, String> tableConfig;
private final boolean caseSensitive;
Expand All @@ -53,50 +47,41 @@ public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean caseSens
}

@Override
public Optional<Schema> build(MySqlCreateTableStatement statement) {
List<SQLTableElement> columns = statement.getTableElementList();
LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();

for (SQLTableElement element : columns) {
if (element instanceof SQLColumnDefinition) {
SQLColumnDefinition column = (SQLColumnDefinition) element;
SQLName name = column.getName();
SQLDataType dataType = column.getDataType();
List<SQLExpr> arguments = dataType.getArguments();
Integer precision = null;
Integer scale = null;
if (arguments.size() >= 1) {
precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
}

if (arguments.size() >= 2) {
scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
}

SQLCharExpr comment = (SQLCharExpr) column.getComment();
fields.put(
name.getSimpleName(),
Tuple2.of(
MySqlTypeUtils.toDataType(
column.getDataType().getName(),
precision,
scale,
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
comment == null ? null : String.valueOf(comment.getValue())));
}
public Optional<Schema> build(JsonNode tableChange) {
JsonNode jsonTable = tableChange.get("table");
String tableName = tableChange.get("id").asText();
ArrayNode columns = (ArrayNode) jsonTable.get("columns");
LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();

for (JsonNode element : columns) {
Integer precision = element.has("length") ? element.get("length").asInt() : null;
Integer scale = element.has("scale") ? element.get("scale").asInt() : null;
fields.put(
element.get("name").asText(),
// TODO : add table comment and column comment when we upgrade flink cdc to 2.4
MySqlTypeUtils.toDataType(
element.get("typeExpression").asText(),
precision,
scale,
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
.copy(element.get("optional").asBoolean()));
}

List<String> primaryKeys = statement.getPrimaryKeyNames();
ArrayNode arrayNode = (ArrayNode) jsonTable.get("primaryKeyColumnNames");
List<String> primaryKeys = new ArrayList<>();
for (JsonNode primary : arrayNode) {
primaryKeys.add(primary.asText());
}

if (!caseSensitive) {
LinkedHashMap<String, Tuple2<DataType, String>> tmp = new LinkedHashMap<>();
for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
for (Map.Entry<String, DataType> entry : fields.entrySet()) {
String fieldName = entry.getKey();
checkArgument(
!tmp.containsKey(fieldName.toLowerCase()),
"Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.",
fieldName,
statement.getTableName());
tableName);
tmp.put(fieldName.toLowerCase(), entry.getValue());
}
fields = tmp;
Expand All @@ -107,8 +92,8 @@ public Optional<Schema> build(MySqlCreateTableStatement statement) {

Schema.Builder builder = Schema.newBuilder();
builder.options(tableConfig);
for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
for (Map.Entry<String, DataType> entry : fields.entrySet()) {
builder.column(entry.getKey(), entry.getValue());
}
Schema schema = builder.primaryKey(primaryKeys).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,24 +823,33 @@ public void testAddIgnoredTable() throws Exception {
waitForResult(Collections.singletonList("+I[1, one]"), table1, rowType, primaryKeys);

// create new tables at runtime
// synchronized table: t2
// synchronized table: t2, t22
statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
// not synchronized tables: ta, t3

statement.executeUpdate("CREATE TABLE t22 LIKE t2");
statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");

// not synchronized tables: ta, t3, t4
statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");

statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, rowType, primaryKeys);

// check tables
assertTableExists(Arrays.asList("t1", "t2"));
assertTableNotExists(Arrays.asList("a", "ta", "t3"));
assertTableExists(Arrays.asList("t1", "t2", "t22"));
assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4"));

FileStoreTable newTable = getFileStoreTable("t2");
waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, rowType, primaryKeys);

newTable = getFileStoreTable("t22");
waitForResult(
Collections.singletonList("+I[1, Hello]"), newTable, rowType, primaryKeys);
}
}

Expand Down

0 comments on commit ba02730

Please sign in to comment.