Skip to content

Commit

Permalink
[flink][kafka-cdc] Support table debezium json format (apache#2251)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Nov 7, 2023
1 parent c74987b commit 4db2e9a
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -219,6 +222,39 @@ public static <T> JsonNode toTree(T value) {
return OBJECT_MAPPER_INSTANCE.valueToTree(value);
}

/**
* Adds an array of values to a JSON string under the specified key.
*
* @param origin The original JSON string.
* @param key The key under which the values will be added as an array.
* @param values A list of values to be added to the JSON string.
* @return The JSON string with the added array. If the JSON string is not a valid JSON object,
* or if the list of values is empty or null, the original JSON string will be returned.
* @throws RuntimeException If an error occurs while parsing the JSON string or adding the
* values.
*/
public static String putArrayToJsonString(String origin, String key, List<String> values) {
if (values == null || values.isEmpty()) {
return origin;
}

try {
JsonNode jsonNode = OBJECT_MAPPER_INSTANCE.readTree(origin);
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
ArrayNode arrayNode = objectNode.putArray(key);
for (String value : values) {
arrayNode.add(value);
}
return OBJECT_MAPPER_INSTANCE.writeValueAsString(objectNode);
} else {
return origin;
}
} catch (Exception e) {
throw new RuntimeException("Failed to add array to JSON", e);
}
}

public static boolean isNull(JsonNode jsonNode) {
return jsonNode == null || jsonNode.isNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;

Expand All @@ -36,7 +37,8 @@
public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
OGG_JSON(OggRecordParser::new),
MAXWELL_JSON(MaxwellRecordParser::new);
MAXWELL_JSON(MaxwellRecordParser::new),
DEBEZIUM_JSON(DebeziumRecordParser::new);
// Add more data formats here if needed

private final RecordParserFactory parser;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;

/**
* The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed
* to parse records from Debezium's JSON change data capture (CDC) format. Debezium is a CDC
* solution for MySQL databases that captures row-level changes to database tables and outputs them
* in JSON format. This parser extracts relevant information from the Debezium-JSON format and
* converts it into a list of {@link RichCdcMultiplexRecord} objects.
*
* <p>The class supports various database operations such as INSERT, UPDATE, DELETE, and READ
* (snapshot reads), and creates corresponding {@link RichCdcMultiplexRecord} objects to represent
* these changes.
*
* <p>Validation is performed to ensure that the JSON records contain all necessary fields,
* including the 'before' and 'after' states for UPDATE operations, and the class also supports
* schema extraction for the Kafka topic. Debezium's specific fields such as 'source', 'op' for
* operation type, and primary key field names are used to construct the details of each record
* event.
*/
public class DebeziumRecordParser extends RecordParser {

private static final String FIELD_BEFORE = "before";
private static final String FIELD_AFTER = "after";
private static final String FIELD_SOURCE = "source";
private static final String FIELD_PRIMARY = "pkNames";
private static final String FIELD_DB = "db";
private static final String FIELD_TYPE = "op";
private static final String OP_INSERT = "c";
private static final String OP_UPDATE = "u";
private static final String OP_DELETE = "d";
private static final String OP_READE = "r";

public DebeziumRecordParser(
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
public List<RichCdcMultiplexRecord> extractRecords() {
String operation = extractStringFromRootJson(FIELD_TYPE);
List<RichCdcMultiplexRecord> records = new ArrayList<>();
switch (operation) {
case OP_INSERT:
case OP_READE:
processRecord(root.get(dataField()), RowKind.INSERT, records);
break;
case OP_UPDATE:
processRecord(
mergeOldRecord(root.get(dataField()), root.get(FIELD_BEFORE)),
RowKind.DELETE,
records);
processRecord(root.get(dataField()), RowKind.INSERT, records);
break;
case OP_DELETE:
processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records);
break;
default:
throw new UnsupportedOperationException("Unknown record operation: " + operation);
}
return records;
}

@Override
protected void validateFormat() {
String errorMessageTemplate =
"Didn't find '%s' node in json. Please make sure your topic's format is correct.";
checkArgument(
!isNull(root.get(FIELD_SOURCE).get(FIELD_TABLE)),
errorMessageTemplate,
FIELD_TABLE);
checkArgument(
!isNull(root.get(FIELD_SOURCE).get(FIELD_DB)),
errorMessageTemplate,
FIELD_DATABASE);
checkArgument(!isNull(root.get(FIELD_TYPE)), errorMessageTemplate, FIELD_TYPE);
String operation = root.get(FIELD_TYPE).asText();
switch (operation) {
case OP_INSERT:
case OP_READE:
checkArgument(!isNull(root.get(dataField())), errorMessageTemplate, dataField());
break;
case OP_UPDATE:
case OP_DELETE:
checkArgument(!isNull(root.get(FIELD_BEFORE)), errorMessageTemplate, FIELD_BEFORE);
break;
default:
throw new IllegalArgumentException("Unsupported operation type: " + operation);
}
checkArgument(!isNull(root.get(primaryField())), errorMessageTemplate, primaryField());
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
}

@Override
protected String dataField() {
return FIELD_AFTER;
}

@Override
protected String extractStringFromRootJson(String key) {
if (key.equals(FIELD_TABLE)) {
tableName = root.get(FIELD_SOURCE).get(FIELD_TABLE).asText();
return tableName;
} else if (key.equals(FIELD_DATABASE)) {
databaseName = root.get(FIELD_SOURCE).get(FIELD_DB).asText();
return databaseName;
}
return root.get(key) != null ? root.get(key).asText() : null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.nio.charset.StandardCharsets;
import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkNotNull;

/**
* This class is used to deserialize byte[] messages into String format, and then add primary key
* fields to the JSON string.
*/
public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {

public static final String PRIMARY_KEY_NAMES = "pkNames";
private final List<String> primaryKeyNames;

public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
this.primaryKeyNames = checkNotNull(primaryKeyNames);
if (this.primaryKeyNames.isEmpty()) {
throw new IllegalArgumentException("primary key must not be empty");
}
}

@Override
public String deserialize(byte[] message) {
try {
String value = new String(message, StandardCharsets.UTF_8);
return JsonSerdeUtil.putArrayToJsonString(value, PRIMARY_KEY_NAMES, primaryKeyNames);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize message", e);
}
}

@Override
public boolean isEndOfStream(String nextElement) {
return false;
}

@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.debezium.JsonPrimaryKeyDeserializationSchema;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
Expand All @@ -34,7 +36,6 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand All @@ -54,6 +55,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -65,8 +67,14 @@ public class KafkaActionUtils {

private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
private static final String DEBEZIUM_JSON = "debezium-json";

public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
return buildKafkaSource(kafkaConfig, new ArrayList<>());
}

public static KafkaSource<String> buildKafkaSource(
Configuration kafkaConfig, List<String> primaryKeys) {
validateKafkaConfig(kafkaConfig);
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();

Expand All @@ -77,8 +85,11 @@ public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {

kafkaSourceBuilder
.setTopics(topics)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
.setGroupId(kafkaPropertiesGroupId(kafkaConfig))
.setValueOnlyDeserializer(
DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT))
? new JsonPrimaryKeyDeserializationSchema(primaryKeys)
: new SimpleStringSchema());
Properties properties = new Properties();
for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
String key = entry.getKey();
Expand Down Expand Up @@ -262,6 +273,11 @@ static DataFormat getDataFormat(Configuration kafkaConfig) {

static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
Configuration kafkaConfig, String topic) {
return getKafkaEarliestConsumer(kafkaConfig, topic, new ArrayList<>());
}

static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
Configuration kafkaConfig, String topic, List<String> primaryKeys) {
Properties props = new Properties();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand All @@ -286,26 +302,33 @@ static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
Collections.singletonList(new TopicPartition(topic, firstPartition));
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);

return new KafkaConsumerWrapper(consumer);
return new KafkaConsumerWrapper(
consumer,
DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT))
? primaryKeys
: new ArrayList<>());
}

private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper {

private static final String PK_NAMES_KEY = "pkNames";

private final KafkaConsumer<String, String> consumer;

KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer) {
private final List<String> pkNames;

KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer, List<String> pkNames) {
this.consumer = kafkaConsumer;
this.pkNames = pkNames;
}

@Override
public List<String> getRecords(String topic, int pollTimeOutMills) {
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeOutMills));
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records(topic);
List<String> result = new ArrayList<>();
records.forEach(r -> result.add(r.value()));
return result;
return StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
.map(r -> JsonSerdeUtil.putArrayToJsonString(r.value(), PK_NAMES_KEY, pkNames))
.collect(Collectors.toList());
}

@Override
Expand Down
Loading

0 comments on commit 4db2e9a

Please sign in to comment.