Skip to content

Commit

Permalink
[flink] Supports Kafka with canal CDC format database ingestion (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alibaba-HZY authored Jun 20, 2023
1 parent 9bd2a2d commit a5570d8
Show file tree
Hide file tree
Showing 27 changed files with 1,761 additions and 164 deletions.
75 changes: 73 additions & 2 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ under the License.
# CDC Ingestion

Paimon supports a variety of ways to ingest data into Paimon tables with schema evolution. This means that the added
columns are synchronized to the Paimon table in real time and the synchronization job will not restarted for this purpose.
columns are synchronized to the Paimon table in real time and the synchronization job will not be restarted for this purpose.

We currently support the following sync ways:

1. MySQL Synchronizing Table: synchronize one or multiple tables from MySQL into one Paimon table.
2. MySQL Synchronizing Database: synchronize the whole MySQL database into one Paimon database.
3. [API Synchronizing Table]({{< ref "/api/flink-api#cdc-ingestion-table" >}}): synchronize your custom DataStream input into one Paimon table.
4. Kafka Synchronizing Table: synchronize one Kafka topic's table into one Paimon table.
4. Kafka Synchronizing Table: synchronize one Kafka topic's table into one Paimon table.
5. Kafka Synchronizing Database: synchronize one Kafka topic containing multiple tables or multiple topics containing one table each into one Paimon database.

## MySQL

Expand Down Expand Up @@ -276,6 +277,76 @@ Example
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
```
### Synchronizing Databases
By using [KafkaSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the multi topic or one topic into one Paimon database.
To use this feature through `flink run`, run the following shell command.
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
kafka-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
[--schema-init-max-read <int>] \
[--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
[--including-tables <table-name|name-regular-expr>] \
[--excluding-tables <table-name|name-regular-expr>] \
[--kafka-conf <kafka-source-conf> [--kafka-conf <kafka-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
```
{{< generated/kafka_sync_database >}}
Only tables with primary keys will be synchronized.
For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table.
Its schema will be derived from all specified Kafka topic's tables,it gets the earliest non-DDL data parsing schema from topic. If the Paimon table already exists, its schema will be compared against the schema of all specified Kafka topic's tables.
Example
Synchronization from one Kafka topic to Paimon database.
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
kafka-sync-database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--schema-init-max-read 500 \
--kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
--kafka-conf topic=order \
--kafka-conf properties.group.id=123456 \
--kafka-conf value.format=canal-json \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
```
Synchronization from multiple Kafka topics to Paimon database.
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
kafka-sync-database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
--kafka-conf topic=order,logistic_order,user \
--kafka-conf properties.group.id=123456 \
--kafka-conf value.format=canal-json \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
```
## Schema Change Evolution
Expand Down
73 changes: 73 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}
{{ $ref := ref . "maintenance/configurations.md" }}
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 15%">Configuration</th>
<th class="text-left" style="width: 85%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>--warehouse</h5></td>
<td>The path to Paimon warehouse.</td>
</tr>
<tr>
<td><h5>--database</h5></td>
<td>The database name in Paimon catalog.</td>
</tr>
<tr>
<td><h5>--schema-init-max-read</h5></td>
<td>If your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized. The default value is 1000.</td>
</tr>
<tr>
<td><h5>--ignore-incompatible</h5></td>
<td>It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception.</td>
</tr>
<tr>
<td><h5>--table-prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table-prefix ods_".</td>
</tr>
<tr>
<td><h5>--table-suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized. The usage is same as "--table-prefix".</td>
</tr>
<tr>
<td><h5>--including-tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both.</td>
</tr>
<tr>
<td><h5>--kafka-conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
<td>The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of catalog configurations.</td>
</tr>
<tr>
<td><h5>--table-conf</h5></td>
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -41,55 +42,30 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;

/** Utility class to load canal kafka schema. */
public class KafkaSchema {

private static final int MAX_RETRY = 100;
private static final int MAX_READ = 1000;

private final ObjectMapper objectMapper = new ObjectMapper();

private String databaseName;
private String tableName;
private final String databaseName;
private final String tableName;
private final Map<String, DataType> fields;
private final List<String> primaryKeys;

public KafkaSchema(Configuration kafkaConfig, String topic) throws Exception {

fields = new LinkedHashMap<>();
primaryKeys = new ArrayList<>();
KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);

consumer.subscribe(Collections.singletonList(topic));

int retry = 0;
boolean success = false;
while (!success) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
success = parseCanalJson(record.value());
if (success) {
break;
}
} else {
throw new UnsupportedOperationException(
"This format: " + format + " is not support.");
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
if (!success && retry == MAX_RETRY) {
throw new Exception("Could not get metadata from server,topic :" + topic);
}
Thread.sleep(100);
retry++;
}
public KafkaSchema(
String databaseName,
String tableName,
Map<String, DataType> fields,
List<String> primaryKeys) {
this.databaseName = databaseName;
this.tableName = tableName;
this.fields = fields;
this.primaryKeys = primaryKeys;
}

public String tableName() {
Expand Down Expand Up @@ -132,7 +108,12 @@ private static String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}

private boolean parseCanalJson(String record) throws JsonProcessingException {
private static KafkaSchema parseCanalJson(String record, ObjectMapper objectMapper)
throws JsonProcessingException {
String databaseName;
String tableName;
final Map<String, DataType> fields = new LinkedHashMap<>();
final List<String> primaryKeys = new ArrayList<>();
JsonNode root = objectMapper.readValue(record, JsonNode.class);
if (!extractIsDDL(root)) {
JsonNode mysqlType = root.get("mysqlType");
Expand All @@ -151,8 +132,102 @@ private boolean parseCanalJson(String record) throws JsonProcessingException {
}
databaseName = extractJsonNode(root, "database");
tableName = extractJsonNode(root, "table");
return new KafkaSchema(databaseName, tableName, fields, primaryKeys);
}
return null;
}

public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String topic)
throws Exception {
KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);

consumer.subscribe(Collections.singletonList(topic));
KafkaSchema kafkaSchema;
int retry = 0;
ObjectMapper objectMapper = new ObjectMapper();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
kafkaSchema = parseCanalJson(record.value(), objectMapper);
if (kafkaSchema != null) {
return kafkaSchema;
}
} else {
throw new UnsupportedOperationException(
"This format: " + format + " is not support.");
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
if (retry == MAX_RETRY) {
throw new Exception("Could not get metadata from server,topic :" + topic);
}
Thread.sleep(100);
retry++;
}
}

public static List<KafkaSchema> getListKafkaSchema(
Configuration kafkaConfig, String topic, int maxRead) throws Exception {
KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);

consumer.subscribe(Collections.singletonList(topic));
List<KafkaSchema> kafkaSchemaList = Lists.newArrayList();
int retry = 0;
int read = 0;
maxRead = maxRead == 0 ? MAX_READ : maxRead;
ObjectMapper objectMapper = new ObjectMapper();
while (maxRead > read) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
read++;
try {
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
KafkaSchema kafkaSchema = parseCanalJson(record.value(), objectMapper);
if (kafkaSchema != null && !kafkaSchemaList.contains(kafkaSchema)) {
kafkaSchemaList.add(kafkaSchema);
}
} else {
throw new UnsupportedOperationException(
"This format: " + format + " is not support.");
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
if (kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
throw new Exception("Could not get metadata from server,topic :" + topic);
} else if (!kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
break;
}
Thread.sleep(100);
retry++;
}
return kafkaSchemaList;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return false;
if (!(o instanceof KafkaSchema)) {
return false;
}
KafkaSchema that = (KafkaSchema) o;
return databaseName.equals(that.databaseName)
&& tableName.equals(that.tableName)
&& fields.equals(that.fields)
&& primaryKeys.equals(that.primaryKeys);
}

@Override
public int hashCode() {
return Objects.hash(databaseName, tableName, fields, primaryKeys);
}
}
Loading

0 comments on commit a5570d8

Please sign in to comment.