Skip to content

Commit

Permalink
[flink] MySql CDC can now deal with multiple changes in one ALTER TAB…
Browse files Browse the repository at this point in the history
…LE statement (apache#912)
  • Loading branch information
tsreaper authored Apr 17, 2023
1 parent fda5ddf commit de458dc
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -72,7 +72,8 @@ static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema mySqlSchema
return false;
}
DataType type = tableSchema.fields().get(idx).type();
if (!SchemaChangeProcessFunction.canConvert(entry.getValue(), type)) {
if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue(), type)
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -38,12 +39,10 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Optional;

/**
* {@link EventParser} for MySQL Debezium JSON.
Expand All @@ -55,13 +54,8 @@
public class MySqlDebeziumJsonEventParser implements EventParser<String> {

private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
private static final String SCHEMA_CHANGE_REGEX =
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP|MODIFY)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s\\(]+))?\\s*(\\((.*?)\\))?.*";

private final ObjectMapper objectMapper = new ObjectMapper();
private final Pattern schemaChangePattern =
Pattern.compile(SCHEMA_CHANGE_REGEX, Pattern.CASE_INSENSITIVE);

private final ZoneId serverTimeZone;

private JsonNode payload;
Expand All @@ -88,7 +82,7 @@ public void setRawEvent(String rawEvent) {
+ "in the JsonDebeziumDeserializationSchema you created");
payload = root.get("payload");

if (!isSchemaChange()) {
if (!isUpdatedDataFields()) {
updateFieldTypes(schema);
}
} catch (Exception e) {
Expand Down Expand Up @@ -125,45 +119,54 @@ private void updateFieldTypes(JsonNode schema) {
}

@Override
public boolean isSchemaChange() {
public boolean isUpdatedDataFields() {
return payload.get("op") == null;
}

@Override
public List<SchemaChange> getSchemaChanges() {
public Optional<List<DataField>> getUpdatedDataFields() {
JsonNode historyRecord = payload.get("historyRecord");
if (historyRecord == null) {
return Collections.emptyList();
return Optional.empty();
}

JsonNode ddlNode;
JsonNode columns;
try {
ddlNode = objectMapper.readTree(historyRecord.asText()).get("ddl");
String historyRecordString = historyRecord.asText();
JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges");
if (tableChanges.size() != 1) {
throw new IllegalArgumentException(
"Invalid historyRecord, because tableChanges should contain exactly 1 item.\n"
+ historyRecordString);
}
columns = tableChanges.get(0).get("table").get("columns");
} catch (Exception e) {
LOG.debug("Failed to parse history record for schema changes", e);
return Collections.emptyList();
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
}
if (ddlNode == null) {
return Collections.emptyList();
if (columns == null) {
return Optional.empty();
}
String ddl = ddlNode.asText();

Matcher matcher = schemaChangePattern.matcher(ddl);
if (matcher.find()) {
String op = matcher.group(1);
String column = matcher.group(3);
String type = matcher.group(5);
String len = matcher.group(7);
if ("add".equalsIgnoreCase(op)) {
return Collections.singletonList(
SchemaChange.addColumn(column, MySqlTypeUtils.toDataType(type, len)));
} else if ("modify".equalsIgnoreCase(op)) {
return Collections.singletonList(
SchemaChange.updateColumnType(
column, MySqlTypeUtils.toDataType(type, len)));
List<DataField> result = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
JsonNode column = columns.get(i);
JsonNode length = column.get("length");
JsonNode scale = column.get("scale");
DataType type =
MySqlTypeUtils.toDataType(
column.get("typeName").asText(),
length == null ? null : length.asInt(),
scale == null ? null : scale.asInt());
if (column.get("optional").asBoolean()) {
type = type.nullable();
} else {
type = type.notNull();
}

result.add(new DataField(i, column.get("name").asText(), type));
}
return Collections.emptyList();
return Optional.of(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;

import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -86,19 +86,19 @@ public MySqlSchema merge(MySqlSchema other) {
DataType newType = entry.getValue();
if (fields.containsKey(fieldName)) {
DataType oldType = fields.get(fieldName);
if (SchemaChangeProcessFunction.canConvert(oldType, newType)) {
fields.put(fieldName, newType);
} else if (SchemaChangeProcessFunction.canConvert(newType, oldType)) {
// nothing to do
} else {
throw new IllegalArgumentException(
String.format(
"Column %s have different types in table %s.%s and table %s.%s",
fieldName,
databaseName,
tableName,
other.databaseName,
other.tableName));
switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, newType)) {
case CONVERT:
fields.put(fieldName, newType);
break;
case EXCEPTION:
throw new IllegalArgumentException(
String.format(
"Column %s have different types in table %s.%s and table %s.%s",
fieldName,
databaseName,
tableName,
other.databaseName,
other.tableName));
}
} else {
fields.put(fieldName, newType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

/**
* Converts from MySQL type to {@link DataType}.
*
Expand Down Expand Up @@ -106,19 +103,6 @@ public class MySqlTypeUtils {
// The base length of a timestamp is 19, for example "2023-03-23 17:20:00".
private static final int JDBC_TIMESTAMP_BASE_LENGTH = 19;

public static DataType toDataType(String type, String params) {
List<Integer> paramList = new ArrayList<>();
if (params != null) {
for (String s : params.split(",")) {
paramList.add(Integer.parseInt(s.trim()));
}
}
return toDataType(
type,
paramList.size() > 0 ? paramList.get(0) : null,
paramList.size() > 1 ? paramList.get(1) : null);
}

public static DataType toDataType(
String type, @Nullable Integer length, @Nullable Integer scale) {
switch (type.toUpperCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A {@link ProcessFunction} to parse CDC change event to either {@link SchemaChange} or {@link
* CdcRecord} and send them to different side outputs according to table name.
* A {@link ProcessFunction} to parse CDC change event to either a list of {@link DataField}s or
* {@link CdcRecord} and send them to different side outputs according to table name.
*
* <p>This {@link ProcessFunction} can handle records for different tables at the same time.
*
Expand All @@ -42,7 +44,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T, V
private final EventParser.Factory<T> parserFactory;

private transient EventParser<T> parser;
private transient Map<String, OutputTag<SchemaChange>> schemaChangeOutputTags;
private transient Map<String, OutputTag<List<DataField>>> updatedDataFieldsOutputTags;
private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;

public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> parserFactory) {
Expand All @@ -52,7 +54,7 @@ public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> parserFactory)
@Override
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
schemaChangeOutputTags = new HashMap<>();
updatedDataFieldsOutputTags = new HashMap<>();
recordOutputTags = new HashMap<>();
}

Expand All @@ -61,25 +63,24 @@ public void processElement(T raw, Context context, Collector<Void> collector) th
parser.setRawEvent(raw);
String tableName = parser.tableName();

if (parser.isSchemaChange()) {
for (SchemaChange schemaChange : parser.getSchemaChanges()) {
context.output(getSchemaChangeOutputTag(tableName), schemaChange);
}
if (parser.isUpdatedDataFields()) {
parser.getUpdatedDataFields()
.ifPresent(t -> context.output(getUpdatedDataFieldsOutputTag(tableName), t));
} else {
for (CdcRecord record : parser.getRecords()) {
context.output(getRecordOutputTag(tableName), record);
}
}
}

private OutputTag<SchemaChange> getSchemaChangeOutputTag(String tableName) {
return schemaChangeOutputTags.computeIfAbsent(
tableName, CdcMultiTableParsingProcessFunction::createSchemaChangeOutputTag);
private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String tableName) {
return updatedDataFieldsOutputTags.computeIfAbsent(
tableName, CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
}

public static OutputTag<SchemaChange> createSchemaChangeOutputTag(String tableName) {
public static OutputTag<List<DataField>> createUpdatedDataFieldsOutputTag(String tableName) {
return new OutputTag<>(
"schema-change-" + tableName, TypeInformation.of(SchemaChange.class));
"new-data-field-list-" + tableName, new ListTypeInfo<>(DataField.class));
}

private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;

/**
* A {@link ProcessFunction} to parse CDC change event to either {@link SchemaChange} or {@link
* CdcRecord} and send them to different downstreams.
* A {@link ProcessFunction} to parse CDC change event to either a list of {@link DataField}s or
* {@link CdcRecord} and send them to different downstreams.
*
* <p>This {@link ProcessFunction} can only handle records for a single constant table. To handle
* records for different tables, see {@link CdcMultiTableParsingProcessFunction}.
Expand All @@ -37,8 +39,8 @@
*/
public class CdcParsingProcessFunction<T> extends ProcessFunction<T, CdcRecord> {

public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
new OutputTag<>("schema-change", TypeInformation.of(SchemaChange.class));
public static final OutputTag<List<DataField>> NEW_DATA_FIELD_LIST_OUTPUT_TAG =
new OutputTag<>("new-data-field-list", new ListTypeInfo<>(DataField.class));

private final EventParser.Factory<T> parserFactory;

Expand All @@ -57,10 +59,9 @@ public void open(Configuration parameters) throws Exception {
public void processElement(T raw, Context context, Collector<CdcRecord> collector)
throws Exception {
parser.setRawEvent(raw);
if (parser.isSchemaChange()) {
for (SchemaChange schemaChange : parser.getSchemaChanges()) {
context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
}
if (parser.isUpdatedDataFields()) {
parser.getUpdatedDataFields()
.ifPresent(t -> context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, t));
} else {
for (CdcRecord record : parser.getRecords()) {
collector.collect(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;

/**
* Parse a CDC change event to a list of {@link SchemaChange} or {@link CdcRecord}.
* Parse a CDC change event to a list of {@link DataField}s or {@link CdcRecord}.
*
* @param <T> CDC change event type
*/
Expand All @@ -34,9 +35,9 @@ public interface EventParser<T> {

String tableName();

boolean isSchemaChange();
boolean isUpdatedDataFields();

List<SchemaChange> getSchemaChanges();
Optional<List<DataField>> getUpdatedDataFields();

List<CdcRecord> getRecords();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public void build() {
DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
parsed,
CdcMultiTableParsingProcessFunction.createSchemaChangeOutputTag(
table.name()))
CdcMultiTableParsingProcessFunction
.createUpdatedDataFieldsOutputTag(table.name()))
.process(
new SchemaChangeProcessFunction(
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location())));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
Expand Down
Loading

0 comments on commit de458dc

Please sign in to comment.