Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] parse primary key and schema by historyRecord #1621

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.base.Strings;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import com.alibaba.druid.util.JdbcConstants;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +74,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
private final NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder;
private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final Set<String> includedTables = new HashSet<>();
Expand Down Expand Up @@ -111,7 +107,7 @@ public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
Expand All @@ -131,7 +127,7 @@ public MySqlDebeziumJsonEventParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
NewTableSchemaBuilder<MySqlCreateTableStatement> schemaBuilder,
NewTableSchemaBuilder<JsonNode> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
boolean convertTinyint1ToBool) {
Expand Down Expand Up @@ -231,20 +227,25 @@ public Optional<Schema> parseNewTable() {

try {
String historyRecordString = historyRecord.asText();
String ddl = objectMapper.readTree(historyRecordString).get("ddl").asText();
if (Strings.isNullOrEmpty(ddl)) {
return Optional.empty();
JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges");
if (tableChanges.size() != 1) {
throw new IllegalArgumentException(
"Invalid historyRecord, because tableChanges should contain exactly 1 item.\n"
+ historyRecord.asText());
}

SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
if (!(statement instanceof MySqlCreateTableStatement)) {
JsonNode tableChange = tableChanges.get(0);
if (!tableChange
.get("type")
.asText()
.equals(TableChanges.TableChangeType.CREATE.name())) {
return Optional.empty();
}

MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
String tableName = createTableStatement.getTableName();
if (primaryKeys.isEmpty()) {
JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames");
if (primaryKeyColumnNames.size() == 0) {
String id = tableChange.get("id").asText();
String tableName = id.replaceAll("\"", "").split("\\.")[1];
LOG.debug(
"Didn't find primary keys from MySQL DDL for table '{}'. "
+ "This table won't be synchronized.",
Expand All @@ -254,7 +255,7 @@ public Optional<Schema> parseNewTable() {
return Optional.empty();
}

return schemaBuilder.build(createTableStatement);
return schemaBuilder.build(tableChange);
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;

import com.alibaba.druid.sql.ast.SQLDataType;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLName;
import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.ast.statement.SQLTableElement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,7 +36,7 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Schema builder for MySQL cdc. */
public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<MySqlCreateTableStatement> {
public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<JsonNode> {

private final Map<String, String> tableConfig;
private final boolean caseSensitive;
Expand All @@ -53,50 +47,41 @@ public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean caseSens
}

@Override
public Optional<Schema> build(MySqlCreateTableStatement statement) {
List<SQLTableElement> columns = statement.getTableElementList();
LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();

for (SQLTableElement element : columns) {
if (element instanceof SQLColumnDefinition) {
SQLColumnDefinition column = (SQLColumnDefinition) element;
SQLName name = column.getName();
SQLDataType dataType = column.getDataType();
List<SQLExpr> arguments = dataType.getArguments();
Integer precision = null;
Integer scale = null;
if (arguments.size() >= 1) {
precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
}

if (arguments.size() >= 2) {
scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
}

SQLCharExpr comment = (SQLCharExpr) column.getComment();
fields.put(
name.getSimpleName(),
Tuple2.of(
MySqlTypeUtils.toDataType(
column.getDataType().getName(),
precision,
scale,
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue()),
comment == null ? null : String.valueOf(comment.getValue())));
}
public Optional<Schema> build(JsonNode tableChange) {
JsonNode jsonTable = tableChange.get("table");
String tableName = tableChange.get("id").asText();
ArrayNode columns = (ArrayNode) jsonTable.get("columns");
LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();

for (JsonNode element : columns) {
Integer precision = element.has("length") ? element.get("length").asInt() : null;
Integer scale = element.has("scale") ? element.get("scale").asInt() : null;
fields.put(
element.get("name").asText(),
// TODO : add table comment and column comment when we upgrade flink cdc to 2.4
MySqlTypeUtils.toDataType(
element.get("typeExpression").asText(),
precision,
scale,
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
.copy(element.get("optional").asBoolean()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with debezium json, I want to know here can we find field comments? If we can, we should also get the comments.

Copy link
Contributor Author

@zhangjun0x01 zhangjun0x01 Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new version debezium history record contains the table comment and column comment,I add TODO, when we upgrade to flink cdc 2.4 , we can add the comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new version debezium history record also contains the column default value , we can sync it also.

}

List<String> primaryKeys = statement.getPrimaryKeyNames();
ArrayNode arrayNode = (ArrayNode) jsonTable.get("primaryKeyColumnNames");
List<String> primaryKeys = new ArrayList<>();
for (JsonNode primary : arrayNode) {
primaryKeys.add(primary.asText());
}

if (!caseSensitive) {
LinkedHashMap<String, Tuple2<DataType, String>> tmp = new LinkedHashMap<>();
for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
for (Map.Entry<String, DataType> entry : fields.entrySet()) {
String fieldName = entry.getKey();
checkArgument(
!tmp.containsKey(fieldName.toLowerCase()),
"Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.",
fieldName,
statement.getTableName());
tableName);
tmp.put(fieldName.toLowerCase(), entry.getValue());
}
fields = tmp;
Expand All @@ -107,8 +92,8 @@ public Optional<Schema> build(MySqlCreateTableStatement statement) {

Schema.Builder builder = Schema.newBuilder();
builder.options(tableConfig);
for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
for (Map.Entry<String, DataType> entry : fields.entrySet()) {
builder.column(entry.getKey(), entry.getValue());
}
Schema schema = builder.primaryKey(primaryKeys).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,24 +823,33 @@ public void testAddIgnoredTable() throws Exception {
waitForResult(Collections.singletonList("+I[1, one]"), table1, rowType, primaryKeys);

// create new tables at runtime
// synchronized table: t2
// synchronized table: t2, t22
statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
// not synchronized tables: ta, t3

statement.executeUpdate("CREATE TABLE t22 LIKE t2");
statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");

// not synchronized tables: ta, t3, t4
statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");

statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, rowType, primaryKeys);

// check tables
assertTableExists(Arrays.asList("t1", "t2"));
assertTableNotExists(Arrays.asList("a", "ta", "t3"));
assertTableExists(Arrays.asList("t1", "t2", "t22"));
assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4"));

FileStoreTable newTable = getFileStoreTable("t2");
waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, rowType, primaryKeys);

newTable = getFileStoreTable("t22");
waitForResult(
Collections.singletonList("+I[1, Hello]"), newTable, rowType, primaryKeys);
}
}

Expand Down