Skip to content

Commit

Permalink
[flink] Support debezium avro format in cdc action. (#3323)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Aug 11, 2024
1 parent 2ec768f commit 0c0a083
Show file tree
Hide file tree
Showing 31 changed files with 2,012 additions and 247 deletions.
25 changes: 24 additions & 1 deletion docs/content/flink/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ flink-sql-connector-kafka-*.jar
```

## Supported Formats
Flink provides several Kafka CDC formats: Canal, Debezium, Ogg, Maxwell and Normal JSON.
Flink provides several Kafka CDC formats: Canal Json, Debezium Json, Debezium Avro, Ogg Json, Maxwell Json and Normal Json.
If a message in a Kafka topic is a change event captured from another database using the Change Data Capture (CDC) tool, then you can use the Paimon Kafka CDC. Write the INSERT, UPDATE, DELETE messages parsed into the paimon table.
<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -252,3 +252,26 @@ Synchronization from multiple Kafka topics to Paimon database.
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
```
## Additional kafka_config
There are some useful options to build Flink Kafka Source, but they are not provided by flink-kafka-connector document. They are:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left">Key</th>
<th class="text-left">Default</th>
<th class="text-left">Type</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>schema.registry.url</td>
<td>(none)</td>
<td>String</td>
<td>When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.</td>
</tr>
</tbody>
</table>
9 changes: 7 additions & 2 deletions docs/content/flink/cdc-ingestion/pulsar-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ flink-connector-pulsar-*.jar
```

## Supported Formats
Flink provides several Pulsar CDC formats: Canal, Debezium, Ogg, Maxwell and Normal JSON.
Flink provides several Pulsar CDC formats: Canal Json, Debezium Json, Debezium Avro, Ogg Json, Maxwell Json and Normal Json.
If a message in a pulsar topic is a change event captured from another database using the Change Data Capture (CDC) tool, then you can use the Paimon Pulsar CDC. Write the INSERT, UPDATE, DELETE messages parsed into the paimon table.
<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -352,6 +352,11 @@ There are some useful options to build Flink Pulsar Source, but they are not pro
<td>Boolean</td>
<td>To specify the boundedness of a stream.</td>
</tr>
<tr>
<td>schema.registry.url</td>
<td>(none)</td>
<td>String</td>
<td>When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.</td>
</tr>
</tbody>
</table>
21 changes: 21 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ under the License.
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<confluent.platform.version>7.5.0</confluent.platform.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
Expand Down Expand Up @@ -124,6 +132,19 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.platform.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.SyncTableActionBase.SchemaRetrievalException;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.schema.Schema;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -31,6 +34,13 @@
/** Utility class to build schema by trying to read and parse records from message queue. */
public class MessageQueueSchemaUtils {

public static final ConfigOption<String> SCHEMA_REGISTRY_URL =
ConfigOptions.key("schema.registry.url")
.stringType()
.noDefaultValue()
.withDescription(
"To use the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.");

private static final int MAX_RETRY = 5;
private static final int POLL_TIMEOUT_MILLIS = 1000;

Expand All @@ -49,7 +59,8 @@ public static Schema getSchema(
int retry = 0;
int retryInterval = 1000;

RecordParser recordParser = dataFormat.createParser(typeMapping, Collections.emptyList());
AbstractRecordParser recordParser =
dataFormat.createParser(typeMapping, Collections.emptyList());

while (true) {
Optional<Schema> schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,13 @@ public void checkRequiredOption() {
public Source<CdcSourceRecord, ?, ?> provideSource() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
return KafkaActionUtils.buildKafkaSource(
cdcSourceConfig,
provideDataFormat().createKafkaDeserializer(cdcSourceConfig));
case PULSAR:
return PulsarActionUtils.buildPulsarSource(cdcSourceConfig);
return PulsarActionUtils.buildPulsarSource(
cdcSourceConfig,
provideDataFormat().createPulsarDeserializer(cdcSourceConfig));
default:
throw new UnsupportedOperationException(
"Cannot get source from source type" + sourceType);
Expand Down Expand Up @@ -229,9 +233,13 @@ public DataFormat provideDataFormat() {
public MessageQueueSchemaUtils.ConsumerWrapper provideConsumer() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.getKafkaEarliestConsumer(cdcSourceConfig);
return KafkaActionUtils.getKafkaEarliestConsumer(
cdcSourceConfig,
provideDataFormat().createKafkaDeserializer(cdcSourceConfig));
case PULSAR:
return PulsarActionUtils.createPulsarConsumer(cdcSourceConfig);
return PulsarActionUtils.createPulsarConsumer(
cdcSourceConfig,
provideDataFormat().createPulsarDeserializer(cdcSourceConfig));
default:
throw new UnsupportedOperationException(
"Cannot get consumer from source type" + sourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand All @@ -36,8 +33,6 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -64,79 +58,29 @@
* Subclasses are expected to provide specific implementations for extracting records, validating
* message formats, and other format-specific operations.
*/
public abstract class RecordParser
implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {

private static final Logger LOG = LoggerFactory.getLogger(RecordParser.class);

protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
public abstract class AbstractJsonRecordParser extends AbstractRecordParser {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJsonRecordParser.class);

protected JsonNode root;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
}

@Nullable
public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
if (isDDL()) {
return null;
}

Optional<RichCdcMultiplexRecord> recordOpt = extractRecords().stream().findFirst();
if (!recordOpt.isPresent()) {
return null;
}

Schema.Builder builder = Schema.newBuilder();
recordOpt
.get()
.fields()
.forEach(
field ->
builder.column(
field.name(), field.type(), field.description()));
builder.primaryKey(extractPrimaryKeys());
return builder.build();
} catch (Exception e) {
logInvalidSourceRecord(record);
throw e;
}
protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}

protected abstract List<RichCdcMultiplexRecord> extractRecords();

protected abstract String primaryField();

protected abstract String dataField();

protected boolean isDDL() {
return false;
}

// use STRING type in default when we cannot get origin data types (most cases)
protected void fillDefaultTypes(JsonNode record, RowType.Builder rowTypeBuilder) {
record.fieldNames()
.forEachRemaining(name -> rowTypeBuilder.field(name, DataTypes.STRING()));
}

@Override
public void flatMap(CdcSourceRecord value, Collector<RichCdcMultiplexRecord> out) {
try {
setRoot(value);
extractRecords().forEach(out::collect);
} catch (Exception e) {
logInvalidSourceRecord(value);
throw e;
}
}

protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
fillDefaultTypes(record, rowTypeBuilder);
Map<String, Object> recordMap =
Expand All @@ -163,19 +107,8 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return rowData;
}

// generate values for computed columns
protected void evalComputedColumns(
Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
computedColumns.forEach(
computedColumn -> {
rowData.put(
computedColumn.columnName(),
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
rowTypeBuilder.field(computedColumn.columnName(), computedColumn.columnType());
});
}

private List<String> extractPrimaryKeys() {
@Override
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand All @@ -193,21 +126,6 @@ protected void processRecord(
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
}

/** Handle case sensitivity here. */
private RichCdcMultiplexRecord createRecord(
RowKind rowKind, Map<String, String> data, List<DataField> paimonFields) {
return new RichCdcMultiplexRecord(
getDatabaseName(),
getTableName(),
paimonFields,
extractPrimaryKeys(),
new CdcRecord(rowKind, data));
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}

protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
JsonNode oldFullRecordNode = data.deepCopy();
oldNode.fieldNames()
Expand All @@ -219,21 +137,19 @@ protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
}

@Nullable
@Override
protected String getTableName() {
JsonNode node = root.get(FIELD_TABLE);
return isNull(node) ? null : node.asText();
}

@Nullable
@Override
protected String getDatabaseName() {
JsonNode node = root.get(FIELD_DATABASE);
return isNull(node) ? null : node.asText();
}

private void logInvalidSourceRecord(CdcSourceRecord record) {
LOG.error("Invalid source record:\n{}", record.toString());
}

protected void checkNotNull(JsonNode node, String key) {
if (isNull(node)) {
throw new RuntimeException(
Expand Down Expand Up @@ -262,6 +178,4 @@ protected JsonNode getAndCheck(String key, String conditionKey, String condition
checkNotNull(node, key, conditionKey, conditionValue);
return node;
}

protected abstract String format();
}
Loading

0 comments on commit 0c0a083

Please sign in to comment.