Skip to content

Commit

Permalink
[cdc] upgrade to flink cdc 3.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
hadoopkandy committed Jul 24, 2024
1 parent 6742db8 commit ba229ad
Show file tree
Hide file tree
Showing 31 changed files with 79 additions and 79 deletions.
6 changes: 3 additions & 3 deletions paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.

<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>

Expand Down Expand Up @@ -77,7 +77,7 @@ under the License.
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>test</scope>
Expand Down Expand Up @@ -197,7 +197,7 @@ under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<destFileName>mysql-cdc.jar</destFileName>
Expand Down
17 changes: 9 additions & 8 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.17.2</flink.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<flink.mongodb.cdc.version>2.4.1</flink.mongodb.cdc.version>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.mongodb.cdc.version>3.1.1</flink.mongodb.cdc.version>
<avro.version>1.11.1</avro.version>
<geometry.version>2.2.0</geometry.version>
<json-path.version>2.9.0</json-path.version>
<mongodb.testcontainers.version>1.19.1</mongodb.testcontainers.version>
<flink.connector.pulsar.version>4.0.0-1.17</flink.connector.pulsar.version>
<flink.connector.kafka.version>3.0.1-1.18</flink.connector.kafka.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -76,14 +77,14 @@ under the License.
<!-- CDC dependencies -->

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
Expand All @@ -92,7 +93,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<version>${flink.connector.kafka.version}</version>
<scope>provided</scope>
</dependency>

Expand All @@ -105,7 +106,7 @@ under the License.


<dependency>
<groupId>com.ververica</groupId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink.mongodb.cdc.version}</version>
<scope>provided</scope>
Expand Down Expand Up @@ -282,7 +283,7 @@ under the License.
<!-- Same as flink-sql-connector-kafka. -->
<relocation>
<pattern>org.apache.kafka.connect</pattern>
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
<shadedPattern>org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;

import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;

import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;

import java.util.Collections;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void printHelp() {
+ "It can't be a regular expression.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options");
+ "see https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mongodb-cdc.md#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void printHelp() {
+ "This can be done by configuring 'field.name' to specify the synchronization fields and 'parser.path' to specify the JSON parsing path for those fields.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;

Expand All @@ -40,7 +40,7 @@
import java.util.List;
import java.util.Objects;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -179,7 +179,7 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(

String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
// see
// https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
// https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L197
if ("initial".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.initial());
} else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
Expand Down Expand Up @@ -234,7 +234,7 @@ public static MySqlSource<CdcSourceRecord> buildMySqlSource(
}

// see
// https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options
// https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options
// https://dev.mysql.com/doc/connectors/en/connector-j-reference-configuration-properties.html
private static Map<String, String> getJdbcProperties(
TypeMapping typeMapping, Configuration mySqlConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +58,7 @@
* An {@link Action} which synchronize the whole MySQL database into one Paimon database.
*
* <p>You should specify MySQL source database in {@code mySqlConfig}. See <a
* href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document
* href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document
* of flink-cdc-connectors</a> for detailed keys and values.
*
* <p>For each MySQL table to be synchronized, if the corresponding Paimon table does not exist,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void printHelp() {
+ "It can't be a regular expression.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -43,7 +43,7 @@
* An {@link Action} which synchronize one or multiple MySQL tables into one Paimon table.
*
* <p>You should specify MySQL source table in {@code mySqlConfig}. See <a
* href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document
* href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document
* of flink-cdc-connectors</a> for detailed keys and values.
*
* <p>If the specified Paimon table does not exist, this action will automatically create the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void printHelp() {
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
+ "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();

System.out.println("Paimon catalog and table sink conf syntax:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;

/* This file is based on source code from MySqlTypeUtils in the flink-cdc-connectors Project
* (https://ververica.github.io/flink-cdc-connectors/), licensed by the Apache Software Foundation (ASF)
* (https://github.com/apache/flink-cdc/), licensed by the Apache Software Foundation (ASF)
* under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.paimon.flink.action.cdc.mysql.format;

import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Array;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;

Expand Down Expand Up @@ -138,7 +138,7 @@ public static JdbcIncrementalSource<CdcSourceRecord> buildPostgresSource(

// Postgres CDC using increment snapshot, splitSize is used instead of fetchSize (as in JDBC
// connector). splitSize is the number of records in each snapshot split. see
// https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options
// https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#incremental-snapshot-options
postgresConfig
.getOptional(PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.ifPresent(sourceBuilder::splitSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;

import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Bits;
import io.debezium.time.Date;
Expand All @@ -49,6 +48,7 @@
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverterConfig;
Expand Down
Loading

0 comments on commit ba229ad

Please sign in to comment.