Skip to content

Commit

Permalink
[flink] Introduce record parser for debezium with schema (apache#2730)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Jan 23, 2024
1 parent 5749a73 commit 05c17b1
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaIncludeRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;

Expand All @@ -38,7 +39,8 @@ public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
OGG_JSON(OggRecordParser::new),
MAXWELL_JSON(MaxwellRecordParser::new),
DEBEZIUM_JSON(DebeziumRecordParser::new);
DEBEZIUM_JSON(DebeziumRecordParser::new),
DEBEZIUM_JSON_SCHEMA_INCLUDE(DebeziumSchemaIncludeRecordParser::new);
// Add more data formats here if needed

private final RecordParserFactory parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected boolean isDDL() {
}

// use STRING type in default when we cannot get origin data types (most cases)
protected LinkedHashMap<String, DataType> fillDefaultStringTypes(JsonNode record) {
protected LinkedHashMap<String, DataType> fillDefaultTypes(JsonNode record) {
LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
record.fieldNames().forEachRemaining(name -> fieldTypes.put(name, DataTypes.STRING()));
return fieldTypes;
Expand All @@ -138,7 +138,7 @@ public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) {

protected Map<String, String> extractRowData(
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) {
paimonFieldTypes.putAll(fillDefaultStringTypes(record));
paimonFieldTypes.putAll(fillDefaultTypes(record));
Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
Map<String, String> rowData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected Map<String, String> extractRowData(
rowData.put(fieldName, newValue);
}
} else {
paimonFieldTypes.putAll(fillDefaultStringTypes(record));
paimonFieldTypes.putAll(fillDefaultTypes(record));
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/** Cdc Debezium Schema Entity. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class CdcDebeziumSchema {

private static final String FIELD_FIELDS = "fields";
private static final String FIELD_NAME = "name";
private static final String FIELD_TYPE = "type";
private static final String FIELD_FIELD = "field";
private static final String FIELD_OPTIONAL = "optional";

@JsonProperty(FIELD_FIELD)
private final String field;

@JsonProperty(FIELD_TYPE)
private final String type;

@JsonProperty(FIELD_OPTIONAL)
private final Boolean optional;

@JsonProperty(FIELD_FIELDS)
private final List<CdcDebeziumSchema> fields;

@JsonProperty(FIELD_NAME)
private final String name;

@JsonCreator
public CdcDebeziumSchema(
@JsonProperty(FIELD_FIELD) String field,
@JsonProperty(FIELD_TYPE) String type,
@JsonProperty(FIELD_OPTIONAL) Boolean optional,
@JsonProperty(FIELD_FIELDS) List<CdcDebeziumSchema> fields,
@JsonProperty(FIELD_NAME) String name) {
this.field = field;
this.type = type;
this.optional = optional;
this.fields = fields;
this.name = name;
}

@JsonGetter(FIELD_FIELD)
public String field() {
return field;
}

@JsonGetter(FIELD_TYPE)
public String type() {
return type;
}

@JsonGetter(FIELD_OPTIONAL)
public Boolean optional() {
return optional;
}

@JsonGetter(FIELD_FIELDS)
public List<CdcDebeziumSchema> fields() {
return fields;
}

@JsonGetter(FIELD_NAME)
public String name() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
public class DebeziumRecordParser extends RecordParser {

private static final String FIELD_SCHEMA = "schema";
private static final String FIELD_PAYLOAD = "payload";
protected static final String FIELD_PAYLOAD = "payload";
private static final String FIELD_BEFORE = "before";
private static final String FIELD_AFTER = "after";
private static final String FIELD_SOURCE = "source";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.List;

import static io.debezium.pipeline.signal.SchemaChanges.FIELD_SCHEMA;

/**
* The {@code DebeziumSchemaIncludeRecordParser} class extends the {@link RecordParser} to parse
* Debezium's change data capture (CDC) records, specifically focusing on parsing records that
* include the complete Debezium schema. This inclusion of the schema allows for a more detailed and
* accurate interpretation of the CDC data.
*/
public class DebeziumSchemaIncludeRecordParser extends DebeziumRecordParser {

private JsonNode schema;

public DebeziumSchemaIncludeRecordParser(
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
protected LinkedHashMap<String, DataType> fillDefaultTypes(JsonNode record) {
CdcDebeziumSchema cdcDebeziumSchema;
try {
cdcDebeziumSchema = JsonSerdeUtil.fromJson(schema.toString(), CdcDebeziumSchema.class);
} catch (UncheckedIOException e) {
throw new RuntimeException(e);
}
LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
CdcDebeziumSchema field = cdcDebeziumSchema.fields().get(1);
for (CdcDebeziumSchema f : field.fields()) {
DataType dataType = toDataType(f.type());
fieldTypes.put(f.field(), dataType);
}
return fieldTypes;
}

public static DataType toDataType(String type) {
switch (type) {
case "int32":
return DataTypes.INT();
case "int64":
return DataTypes.BIGINT();
case "string":
return DataTypes.STRING();
case "float32":
case "float64":
return DataTypes.FLOAT();
case "double":
return DataTypes.DOUBLE();
case "bytes":
return DataTypes.BYTES();
default:
throw new UnsupportedOperationException(
String.format("Don't support type '%s' yet.", type));
}
}

@Override
protected void setRoot(String record) {
JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);
root = node.get(FIELD_PAYLOAD);
schema = node.get(FIELD_SCHEMA);
}

@Override
protected String format() {
return "debezium_json_schema_include";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class KafkaDebeziumSyncTableActionITCase extends KafkaSyncTableActionITCase {

private static final String DEBEZIUM = "debezium";
private static final String DEBEZIUM_JSON_SCHEMA_INCLUDE = "debezium_json_schema_include";

@Test
@Timeout(60)
Expand Down Expand Up @@ -91,4 +92,10 @@ public void testWaterMarkSyncTable() throws Exception {
public void testKafkaBuildSchemaWithDelete() throws Exception {
testKafkaBuildSchemaWithDelete(DEBEZIUM);
}

@Test
@Timeout(60)
public void testSchemaIncludeRecord1() throws Exception {
testSchemaIncludeRecord(DEBEZIUM_JSON_SCHEMA_INCLUDE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,43 @@ public void testWaterMarkSyncTable(String format) throws Exception {
Thread.sleep(1000);
}
}

public void testSchemaIncludeRecord(String format) throws Exception {
String topic = "schema_include";
createTestTopic(topic, 1, 1);

List<String> lines = readLines("kafka/debezium/table/schema/include/debezium-data-1.txt");
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}

Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format);
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.DOUBLE()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Collections.singletonList(
"+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
waitForResult(expected, table, rowType, primaryKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}

0 comments on commit 05c17b1

Please sign in to comment.