Skip to content

Commit

Permalink
[cdc] upgrade to flink cdc 3.1.1 (#3764)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadoopkandy authored Aug 15, 2024
1 parent 24fedf7 commit 657bc99
Show file tree
Hide file tree
Showing 32 changed files with 100 additions and 73 deletions.
23 changes: 20 additions & 3 deletions paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.

<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.10_${scala.binary.version}</flink.sql.connector.hive>
</properties>

Expand Down Expand Up @@ -77,7 +77,7 @@ under the License.
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>test</scope>
Expand Down Expand Up @@ -120,6 +120,13 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${test.mysql.connector.java.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -197,7 +204,7 @@ under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<destFileName>mysql-cdc.jar</destFileName>
Expand All @@ -206,6 +213,16 @@ under the License.
<outputDirectory>/tmp/paimon-e2e-tests-jars
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${test.mysql.connector.java.version}</version>
<destFileName>mysql-connector-java.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>/tmp/paimon-e2e-tests-jars
</outputDirectory>
</artifactItem>
<!-- test paimon with kafka sql jar -->
<artifactItem>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ services:
- /tmp/paimon-e2e-tests-jars:/jars
entrypoint: >
/bin/bash -c "
cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar
/jars/mysql-cdc.jar /jars/mysql-connector-java.jar
/jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ;
echo 'See FLINK-31659 for why we need the following two steps' ;
mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
Expand All @@ -54,7 +55,8 @@ services:
- /tmp/paimon-e2e-tests-jars:/jars
entrypoint: >
/bin/bash -c "
cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar
cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar
/jars/mysql-cdc.jar /jars/mysql-connector-java.jar
/jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ;
echo 'See FLINK-31659 for why we need the following two steps' ;
mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ;
Expand Down
17 changes: 9 additions & 8 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.17.2</flink.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.mongodb.cdc.version>2.4.1</flink.mongodb.cdc.version>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.mongodb.cdc.version>3.1.1</flink.mongodb.cdc.version>
<avro.version>1.11.1</avro.version>
<geometry.version>2.2.0</geometry.version>
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<confluent.platform.version>7.5.0</confluent.platform.version>
<flink.connector.kafka.version>3.0.1-1.18</flink.connector.kafka.version>
</properties>

<repositories>
Expand Down Expand Up @@ -84,14 +85,14 @@ under the License.
<!-- CDC dependencies -->

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
Expand All @@ -100,7 +101,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>${flink.connector.kafka.version}</version>
<scope>provided</scope>
</dependency>

Expand All @@ -113,7 +114,7 @@ under the License.


<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink.mongodb.cdc.version}</version>
<scope>provided</scope>
Expand Down Expand Up @@ -303,7 +304,7 @@ under the License.
<!-- Same as flink-sql-connector-kafka. -->
<relocation>
<pattern>org.apache.kafka.connect</pattern>
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
<shadedPattern>org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;

import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;

import java.util.Collections;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void printHelp() {
+ "It can't be a regular expression.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options");
+ "see https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mongodb-cdc.md#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void printHelp() {
+ "This can be done by configuring 'field.name' to specify the synchronization fields and 'parser.path' to specify the JSON parsing path for those fields.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;

Expand All @@ -40,7 +40,7 @@
import java.util.List;
import java.util.Objects;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -179,7 +179,7 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(

String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
// see
// https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
// https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L197
if ("initial".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.initial());
} else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
Expand Down Expand Up @@ -234,7 +234,7 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(
}

// see
// https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options
// https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options
// https://dev.mysql.com/doc/connectors/en/connector-j-reference-configuration-properties.html
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +58,7 @@
* An {@link Action} which synchronize the whole MySQL database into one Paimon database.
*
* <p>You should specify MySQL source database in {@code mySqlConfig}. See <a
* href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document
* href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document
* of flink-cdc-connectors</a> for detailed keys and values.
*
* <p>For each MySQL table to be synchronized, if the corresponding Paimon table does not exist,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void printHelp() {
+ "It can't be a regular expression.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -43,7 +43,7 @@
* An {@link Action} which synchronize one or multiple MySQL tables into one Paimon table.
*
* <p>You should specify MySQL source table in {@code mySqlConfig}. See <a
* href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document
* href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document
* of flink-cdc-connectors</a> for detailed keys and values.
*
* <p>If the specified Paimon table does not exist, this action will automatically create the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void printHelp() {
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;

/* This file is based on source code from MySqlTypeUtils in the flink-cdc-connectors Project
* (https://ververica.github.io/flink-cdc-connectors/), licensed by the Apache Software Foundation (ASF)
* (https://github.com/apache/flink-cdc/), licensed by the Apache Software Foundation (ASF)
* under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.paimon.flink.action.cdc.mysql.format;

import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Array;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;

import java.io.IOException;

Expand Down
Loading

0 comments on commit 657bc99

Please sign in to comment.