Skip to content

Commit

Permalink
[cdc] Refactor DebeziumRecordParser (apache#2790)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jan 25, 2024
1 parent 43ddab5 commit b88938d
Show file tree
Hide file tree
Showing 8 changed files with 611 additions and 109 deletions.
8 changes: 5 additions & 3 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ If a message in a Kafka topic is a change event captured from another database u
The JSON sources possibly missing some information. For example, Ogg and Maxwell format standards don't contain field
types; When you write JSON sources into Flink Kafka sink, it will only reserve data and row type and drop other information.
The synchronization job will try best to handle the problem as follows:
1. If missing field types, Paimon will use 'STRING' type as default.
2. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization.
3. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table
1. Usually, debezium-json contains 'schema' field, from which Paimon will retrieve data types. Make sure your debezium
json has this field, or Paimon will use 'STRING' type.
2. If missing field types, Paimon will use 'STRING' type as default.
3. If missing database name or table name, you cannot do database synchronization, but you can still do table synchronization.
4. If missing primary keys, the job might create non primary key table. You can set primary keys when submit job in table
synchronization.
{{< /hint >}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaIncludeRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;

Expand All @@ -39,8 +38,7 @@ public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
OGG_JSON(OggRecordParser::new),
MAXWELL_JSON(MaxwellRecordParser::new),
DEBEZIUM_JSON(DebeziumRecordParser::new),
DEBEZIUM_JSON_SCHEMA_INCLUDE(DebeziumSchemaIncludeRecordParser::new);
DEBEZIUM_JSON(DebeziumRecordParser::new);
// Add more data formats here if needed

private final RecordParserFactory parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;

/**
Expand Down Expand Up @@ -66,6 +76,11 @@ public class DebeziumRecordParser extends RecordParser {
private static final String OP_DELETE = "d";
private static final String OP_READE = "r";

private boolean hasSchema;
private final Map<String, String> debeziumTypes = new HashMap<>();
private final Map<String, String> classNames = new HashMap<>();
private final Map<String, Map<String, String>> parameters = new HashMap<>();

public DebeziumRecordParser(
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
Expand Down Expand Up @@ -105,13 +120,98 @@ private JsonNode getBefore(String op) {
@Override
protected void setRoot(String record) {
JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);

hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
root = node.get(FIELD_PAYLOAD);
JsonNode schema = node.get(FIELD_SCHEMA);
if (!isNull(schema)) {
parseSchema(schema);
hasSchema = true;
}
} else {
root = node;
}
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
parameters.clear();

ArrayNode schemaFields = getNodeAs(schema, "fields", ArrayNode.class);
Preconditions.checkNotNull(schemaFields);

ArrayNode fields = null;
for (int i = 0; i < schemaFields.size(); i++) {
JsonNode node = schemaFields.get(i);
if (getString(node, "field").equals("after")) {
fields = getNodeAs(node, "fields", ArrayNode.class);
break;
} else if (getString(node, "field").equals("before")) {
if (fields == null) {
fields = getNodeAs(node, "fields", ArrayNode.class);
}
}
}
Preconditions.checkNotNull(fields);

for (JsonNode node : fields) {
String field = getString(node, "field");

debeziumTypes.put(field, getString(node, "type"));
classNames.put(field, getString(node, "name"));

JsonNode parametersNode = node.get("parameters");
Map<String, String> parametersMap =
isNull(parametersNode)
? Collections.emptyMap()
: JsonSerdeUtil.convertValue(
parametersNode,
new TypeReference<HashMap<String, String>>() {});

parameters.put(field, parametersMap);
}
}

@Nullable
private String getString(JsonNode node, String fieldName) {
JsonNode fieldValue = node.get(fieldName);
return isNull(fieldValue) ? null : fieldValue.asText();
}

@Override
protected Map<String, String> extractRowData(
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
if (!hasSchema) {
return super.extractRowData(record, paimonFieldTypes);
}

Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
String fieldName = entry.getKey();
String rawValue = Objects.toString(entry.getValue(), null);
String debeziumType = debeziumTypes.get(fieldName);
String className = classNames.get(fieldName);

String transformed =
DebeziumSchemaUtils.transformRawValue(
rawValue, debeziumType, className, typeMapping, record.get(fieldName));
resultMap.put(fieldName, transformed);

paimonFieldTypes.put(
fieldName,
DebeziumSchemaUtils.toDataType(
debeziumType, className, parameters.get(fieldName)));
}

evalComputedColumns(resultMap, paimonFieldTypes);

return resultMap;
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down

This file was deleted.

Loading

0 comments on commit b88938d

Please sign in to comment.