Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Support debezium avro format in cdc action. #3323

Merged
merged 17 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion docs/content/flink/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ flink-sql-connector-kafka-*.jar
```

## Supported Formats
Flink provides several Kafka CDC formats: Canal, Debezium, Ogg, Maxwell and Normal JSON.
Flink provides several Kafka CDC formats: Canal Json, Debezium Json, Debezium Avro, Ogg Json, Maxwell Json and Normal Json.
If a message in a Kafka topic is a change event captured from another database using the Change Data Capture (CDC) tool, then you can use the Paimon Kafka CDC. Write the INSERT, UPDATE, DELETE messages parsed into the paimon table.
<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -252,3 +252,26 @@ Synchronization from multiple Kafka topics to Paimon database.
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
```

## Additional kafka_config

There are some useful options to build Flink Kafka Source, but they are not provided by flink-kafka-connector document. They are:

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left">Key</th>
<th class="text-left">Default</th>
<th class="text-left">Type</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>schema.registry.url</td>
<td>(none)</td>
<td>String</td>
<td>When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.</td>
</tr>
</tbody>
</table>
9 changes: 7 additions & 2 deletions docs/content/flink/cdc-ingestion/pulsar-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ flink-connector-pulsar-*.jar
```

## Supported Formats
Flink provides several Pulsar CDC formats: Canal, Debezium, Ogg, Maxwell and Normal JSON.
Flink provides several Pulsar CDC formats: Canal Json, Debezium Json, Debezium Avro, Ogg Json, Maxwell Json and Normal Json.
If a message in a pulsar topic is a change event captured from another database using the Change Data Capture (CDC) tool, then you can use the Paimon Pulsar CDC. Write the INSERT, UPDATE, DELETE messages parsed into the paimon table.
<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -352,6 +352,11 @@ There are some useful options to build Flink Pulsar Source, but they are not pro
<td>Boolean</td>
<td>To specify the boundedness of a stream.</td>
</tr>
<tr>
<td>schema.registry.url</td>
<td>(none)</td>
<td>String</td>
<td>When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.</td>
</tr>
</tbody>
</table>

21 changes: 21 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ under the License.
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<confluent.platform.version>7.5.0</confluent.platform.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
Expand Down Expand Up @@ -124,6 +132,19 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.platform.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.SyncTableActionBase.SchemaRetrievalException;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.schema.Schema;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -31,6 +34,13 @@
/** Utility class to build schema by trying to read and parse records from message queue. */
public class MessageQueueSchemaUtils {

public static final ConfigOption<String> SCHEMA_REGISTRY_URL =
ConfigOptions.key("schema.registry.url")
.stringType()
.noDefaultValue()
.withDescription(
"To use the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.");

private static final int MAX_RETRY = 5;
private static final int POLL_TIMEOUT_MILLIS = 1000;

Expand All @@ -49,7 +59,8 @@ public static Schema getSchema(
int retry = 0;
int retryInterval = 1000;

RecordParser recordParser = dataFormat.createParser(typeMapping, Collections.emptyList());
AbstractRecordParser recordParser =
dataFormat.createParser(typeMapping, Collections.emptyList());

while (true) {
Optional<Schema> schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,13 @@ public void checkRequiredOption() {
public Source<CdcSourceRecord, ?, ?> provideSource() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
return KafkaActionUtils.buildKafkaSource(
cdcSourceConfig,
provideDataFormat().createKafkaDeserializer(cdcSourceConfig));
case PULSAR:
return PulsarActionUtils.buildPulsarSource(cdcSourceConfig);
return PulsarActionUtils.buildPulsarSource(
cdcSourceConfig,
provideDataFormat().createPulsarDeserializer(cdcSourceConfig));
default:
throw new UnsupportedOperationException(
"Cannot get source from source type" + sourceType);
Expand Down Expand Up @@ -229,9 +233,13 @@ public DataFormat provideDataFormat() {
public MessageQueueSchemaUtils.ConsumerWrapper provideConsumer() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.getKafkaEarliestConsumer(cdcSourceConfig);
return KafkaActionUtils.getKafkaEarliestConsumer(
cdcSourceConfig,
provideDataFormat().createKafkaDeserializer(cdcSourceConfig));
case PULSAR:
return PulsarActionUtils.createPulsarConsumer(cdcSourceConfig);
return PulsarActionUtils.createPulsarConsumer(
cdcSourceConfig,
provideDataFormat().createPulsarDeserializer(cdcSourceConfig));
default:
throw new UnsupportedOperationException(
"Cannot get consumer from source type" + sourceType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand All @@ -36,8 +33,6 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -64,79 +58,29 @@
* Subclasses are expected to provide specific implementations for extracting records, validating
* message formats, and other format-specific operations.
*/
public abstract class RecordParser
implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {

private static final Logger LOG = LoggerFactory.getLogger(RecordParser.class);

protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
public abstract class AbstractJsonRecordParser extends AbstractRecordParser {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJsonRecordParser.class);

protected JsonNode root;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
}

@Nullable
public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
if (isDDL()) {
return null;
}

Optional<RichCdcMultiplexRecord> recordOpt = extractRecords().stream().findFirst();
if (!recordOpt.isPresent()) {
return null;
}

Schema.Builder builder = Schema.newBuilder();
recordOpt
.get()
.fields()
.forEach(
field ->
builder.column(
field.name(), field.type(), field.description()));
builder.primaryKey(extractPrimaryKeys());
return builder.build();
} catch (Exception e) {
logInvalidSourceRecord(record);
throw e;
}
protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}

protected abstract List<RichCdcMultiplexRecord> extractRecords();

protected abstract String primaryField();

protected abstract String dataField();

protected boolean isDDL() {
return false;
}

// use STRING type in default when we cannot get origin data types (most cases)
protected void fillDefaultTypes(JsonNode record, RowType.Builder rowTypeBuilder) {
record.fieldNames()
.forEachRemaining(name -> rowTypeBuilder.field(name, DataTypes.STRING()));
}

@Override
public void flatMap(CdcSourceRecord value, Collector<RichCdcMultiplexRecord> out) {
try {
setRoot(value);
extractRecords().forEach(out::collect);
} catch (Exception e) {
logInvalidSourceRecord(value);
throw e;
}
}

protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
fillDefaultTypes(record, rowTypeBuilder);
Map<String, Object> recordMap =
Expand All @@ -163,19 +107,8 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return rowData;
}

// generate values for computed columns
protected void evalComputedColumns(
Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
computedColumns.forEach(
computedColumn -> {
rowData.put(
computedColumn.columnName(),
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
rowTypeBuilder.field(computedColumn.columnName(), computedColumn.columnType());
});
}

private List<String> extractPrimaryKeys() {
@Override
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand All @@ -193,21 +126,6 @@ protected void processRecord(
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
}

/** Handle case sensitivity here. */
private RichCdcMultiplexRecord createRecord(
RowKind rowKind, Map<String, String> data, List<DataField> paimonFields) {
return new RichCdcMultiplexRecord(
getDatabaseName(),
getTableName(),
paimonFields,
extractPrimaryKeys(),
new CdcRecord(rowKind, data));
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}

protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
JsonNode oldFullRecordNode = data.deepCopy();
oldNode.fieldNames()
Expand All @@ -219,21 +137,19 @@ protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
}

@Nullable
@Override
protected String getTableName() {
JsonNode node = root.get(FIELD_TABLE);
return isNull(node) ? null : node.asText();
}

@Nullable
@Override
protected String getDatabaseName() {
JsonNode node = root.get(FIELD_DATABASE);
return isNull(node) ? null : node.asText();
}

private void logInvalidSourceRecord(CdcSourceRecord record) {
LOG.error("Invalid source record:\n{}", record.toString());
}

protected void checkNotNull(JsonNode node, String key) {
if (isNull(node)) {
throw new RuntimeException(
Expand Down Expand Up @@ -262,6 +178,4 @@ protected JsonNode getAndCheck(String key, String conditionKey, String condition
checkNotNull(node, key, conditionKey, conditionValue);
return node;
}

protected abstract String format();
}
Loading
Loading