From 657bc99385f9ed4bd42c411c67201adfe4f9acd8 Mon Sep 17 00:00:00 2001 From: wangkang Date: Thu, 15 Aug 2024 17:19:18 +0800 Subject: [PATCH] [cdc] upgrade to flink cdc 3.1.1 (#3764) --- paimon-e2e-tests/pom.xml | 23 ++++++++++++++++--- .../resources-filtered/docker-compose.yaml | 6 +++-- paimon-flink/paimon-flink-cdc/pom.xml | 17 +++++++------- .../flink/action/cdc/SyncJobHandler.java | 6 ++--- .../cdc/mongodb/MongoDBActionUtils.java | 10 ++++---- .../mongodb/MongoDBSyncDatabaseAction.java | 4 ++-- .../MongoDBSyncDatabaseActionFactory.java | 2 +- .../cdc/mongodb/MongoDBSyncTableAction.java | 4 ++-- .../MongoDBSyncTableActionFactory.java | 2 +- .../cdc/mongodb/MongodbSchemaUtils.java | 4 ++-- .../action/cdc/mysql/MySqlActionUtils.java | 20 ++++++++-------- .../action/cdc/mysql/MySqlRecordParser.java | 4 ++-- .../cdc/mysql/MySqlSyncDatabaseAction.java | 6 ++--- .../mysql/MySqlSyncDatabaseActionFactory.java | 2 +- .../cdc/mysql/MySqlSyncTableAction.java | 6 ++--- .../mysql/MySqlSyncTableActionFactory.java | 2 +- .../action/cdc/mysql/MySqlTypeUtils.java | 2 +- .../cdc/mysql/format/DebeziumEventUtils.java | 2 +- .../cdc/postgres/PostgresActionUtils.java | 14 +++++------ .../cdc/postgres/PostgresRecordParser.java | 2 +- .../cdc/postgres/PostgresSyncTableAction.java | 6 ++--- .../PostgresSyncTableActionFactory.java | 2 +- .../CdcDebeziumDeserializationSchema.java | 2 +- .../CdcTimestampExtractorFactory.java | 4 ++-- .../cdc/mongodb/MongodbSchemaITCase.java | 2 +- .../cdc/mysql/MySqlCdcTypeMappingITCase.java | 2 +- .../action/cdc/mysql/MySqlContainer.java | 2 +- .../flink/action/cdc/mysql/MySqlVersion.java | 2 +- .../postgres/PostgresActionITCaseBase.java | 4 ++-- .../PostgresSyncTableActionITCase.java | 2 +- .../paimon/flink/kafka/KafkaLogTestUtils.java | 6 +++++ pom.xml | 1 + 32 files changed, 100 insertions(+), 73 deletions(-) diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml index 1b439bea20e1..fa590a6de1b7 100644 --- a/paimon-e2e-tests/pom.xml +++ b/paimon-e2e-tests/pom.xml @@ -33,7 +33,7 @@ under the License. 2.8.3-10.0 - 2.4.2 + 3.1.1 flink-sql-connector-hive-2.3.10_${scala.binary.version} @@ -77,7 +77,7 @@ under the License. - com.ververica + org.apache.flink flink-connector-mysql-cdc ${flink.cdc.version} test @@ -120,6 +120,13 @@ under the License. ${testcontainers.version} test + + + mysql + mysql-connector-java + ${test.mysql.connector.java.version} + test + @@ -197,7 +204,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-mysql-cdc ${flink.cdc.version} mysql-cdc.jar @@ -206,6 +213,16 @@ under the License. /tmp/paimon-e2e-tests-jars + + mysql + mysql-connector-java + ${test.mysql.connector.java.version} + mysql-connector-java.jar + jar + true + /tmp/paimon-e2e-tests-jars + + org.apache.flink diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml index 91869fbf8118..400f88c385c9 100644 --- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml +++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml @@ -31,7 +31,8 @@ services: - /tmp/paimon-e2e-tests-jars:/jars entrypoint: > /bin/bash -c " - cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar + cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar + /jars/mysql-cdc.jar /jars/mysql-connector-java.jar /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ; echo 'See FLINK-31659 for why we need the following two steps' ; mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ; @@ -54,7 +55,8 @@ services: - /tmp/paimon-e2e-tests-jars:/jars entrypoint: > /bin/bash -c " - cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar /jars/mysql-cdc.jar + cp /jars/paimon-flink.jar /jars/paimon-flink-action.jar /jars/bundled-hadoop.jar + /jars/mysql-cdc.jar /jars/mysql-connector-java.jar /jars/flink-sql-connector-kafka.jar /jars/flink-sql-connector-hive.jar /opt/flink/lib ; echo 'See FLINK-31659 for why we need the following two steps' ; mv /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ ; diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 7ac9facc6348..4a1496e5c007 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -34,15 +34,16 @@ under the License. Paimon : Flink : CDC - 1.17.2 - 2.4.2 - 2.4.1 + 1.18.1 + 3.1.1 + 3.1.1 1.11.1 2.2.0 2.9.0 1.19.1 4.0.0-1.17 7.5.0 + 3.0.1-1.18 @@ -84,14 +85,14 @@ under the License. - com.ververica + org.apache.flink flink-connector-postgres-cdc ${flink.cdc.version} provided - com.ververica + org.apache.flink flink-connector-mysql-cdc ${flink.cdc.version} provided @@ -100,7 +101,7 @@ under the License. org.apache.flink flink-connector-kafka - ${flink.version} + ${flink.connector.kafka.version} provided @@ -113,7 +114,7 @@ under the License. - com.ververica + org.apache.flink flink-connector-mongodb-cdc ${flink.mongodb.cdc.version} provided @@ -303,7 +304,7 @@ under the License. org.apache.kafka.connect - com.ververica.cdc.connectors.shaded.org.apache.kafka.connect + org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect org.apache.kafka diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java index 280f6c17d599..147d19377b6c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java @@ -28,11 +28,11 @@ import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java index 65d22ba33fb2..c86307fe7015 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java @@ -21,11 +21,11 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema; -import com.ververica.cdc.connectors.base.options.SourceOptions; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java index 7881aa58b9a7..4120aa1ba06a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java @@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase; import org.apache.paimon.flink.action.cdc.SyncJobHandler; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import java.util.Collections; import java.util.Map; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java index 0468e1c11150..d768b6740ccf 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionFactory.java @@ -91,7 +91,7 @@ public void printHelp() { + "It can't be a regular expression."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options"); + + "see https://github.com/apache/flink-cdc/blob/master/docs/content/docs/connectors/flink-sources/mongodb-cdc.md#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index 0c5b078867c8..c1bf21b5ba2a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.SyncTableActionBase; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import java.util.Map; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java index f74171f29e0a..79063c606f8c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionFactory.java @@ -105,7 +105,7 @@ public void printHelp() { + "This can be done by configuring 'field.name' to specify the synchronization fields and 'parser.path' to specify the JSON parsing path for those fields."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#connector-options"); + + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java index 28ad91ba0893..76c936a32d49 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java @@ -27,8 +27,8 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import org.apache.flink.configuration.Configuration; import org.bson.Document; @@ -40,7 +40,7 @@ import java.util.List; import java.util.Objects; -import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; +import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME; import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index cc9644b54d32..929529a637eb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -26,14 +26,14 @@ import org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; -import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; -import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; -import com.ververica.cdc.connectors.mysql.table.StartupOptions; -import com.ververica.cdc.debezium.table.DebeziumOptions; -import com.ververica.cdc.debezium.utils.JdbcUrlUtils; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; +import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -179,7 +179,7 @@ public static MySqlSource buildMySqlSource( String startupMode = mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE); // see - // https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196 + // https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L197 if ("initial".equalsIgnoreCase(startupMode)) { sourceBuilder.startupOptions(StartupOptions.initial()); } else if ("earliest-offset".equalsIgnoreCase(startupMode)) { @@ -234,7 +234,7 @@ public static MySqlSource buildMySqlSource( } // see - // https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options + // https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options // https://dev.mysql.com/doc/connectors/en/connector-j-reference-configuration-properties.html private static Map getJdbcProperties( TypeMapping typeMapping, Configuration mySqlConfig) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java index b87a92806262..502e6237a477 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java @@ -37,14 +37,14 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; -import com.ververica.cdc.debezium.table.DebeziumOptions; import io.debezium.connector.AbstractSourceInfo; import io.debezium.relational.Column; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Table; import io.debezium.relational.history.TableChanges; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.slf4j.Logger; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index d27fa32d0233..ecdb6c65f482 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -34,8 +34,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +58,7 @@ * An {@link Action} which synchronize the whole MySQL database into one Paimon database. * *

You should specify MySQL source database in {@code mySqlConfig}. See document + * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document * of flink-cdc-connectors for detailed keys and values. * *

For each MySQL table to be synchronized, if the corresponding Paimon table does not exist, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index f84bf979ab96..316e3f7822a5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -147,7 +147,7 @@ public void printHelp() { + "It can't be a regular expression."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options"); + + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index de6c22a21b7d..eeb273265772 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -27,8 +27,8 @@ import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import java.util.ArrayList; import java.util.List; @@ -43,7 +43,7 @@ * An {@link Action} which synchronize one or multiple MySQL tables into one Paimon table. * *

You should specify MySQL source table in {@code mySqlConfig}. See document + * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document * of flink-cdc-connectors for detailed keys and values. * *

If the specified Paimon table does not exist, this action will automatically create the table. diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java index eab8f37eec62..141b7b73e1f7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java @@ -98,7 +98,7 @@ public void printHelp() { + "are required configurations, others are optional."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options"); + + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java index f81046962c39..d1540ed39b32 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java @@ -47,7 +47,7 @@ import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; /* This file is based on source code from MySqlTypeUtils in the flink-cdc-connectors Project - * (https://ververica.github.io/flink-cdc-connectors/), licensed by the Apache Software Foundation (ASF) + * (https://github.com/apache/flink-cdc/), licensed by the Apache Software Foundation (ASF) * under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java index 562d138ad822..c03b050fa2a6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java @@ -18,11 +18,11 @@ package org.apache.paimon.flink.action.cdc.mysql.format; -import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; import io.debezium.document.Array; import io.debezium.document.DocumentReader; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; import java.io.IOException; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java index 45d8f69ccbd1..165a77eb3f1c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java @@ -27,12 +27,12 @@ import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; -import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder; -import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; -import com.ververica.cdc.debezium.table.DebeziumOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder; +import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.configuration.Configuration; import org.apache.kafka.connect.json.JsonConverterConfig; @@ -138,7 +138,7 @@ public static JdbcIncrementalSource buildPostgresSource( // Postgres CDC using increment snapshot, splitSize is used instead of fetchSize (as in JDBC // connector). splitSize is the number of records in each snapshot split. see - // https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#incremental-snapshot-options + // https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#incremental-snapshot-options postgresConfig .getOptional(PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE) .ifPresent(sourceBuilder::splitSize); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index 0d672e85d5f9..a050896d8d9f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -40,7 +40,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Bits; import io.debezium.time.Date; @@ -49,6 +48,7 @@ import io.debezium.time.Timestamp; import io.debezium.time.ZonedTimestamp; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.apache.kafka.connect.json.JsonConverterConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java index e7d692b91e42..f66c20dfa02a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java @@ -27,8 +27,8 @@ import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import java.util.ArrayList; import java.util.HashSet; @@ -45,7 +45,7 @@ * An {@link Action} which synchronize one or multiple PostgreSQL tables into one Paimon table. * *

You should specify PostgreSQL source table in {@code postgresConfig}. See document + * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#connector-options">document * of flink-cdc-connectors for detailed keys and values. * *

If the specified Paimon table does not exist, this action will automatically create the table. diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java index afc8b726896b..b40a480f839a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java @@ -99,7 +99,7 @@ public void printHelp() { + "are required configurations, others are optional."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options"); + + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java index 9ce39ad29e5a..e202e4eecabd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java @@ -20,8 +20,8 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java index e59d124a7b27..26733465a054 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java @@ -26,8 +26,8 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.pulsar.source.PulsarSource; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java index eb207f8c0bda..394cdd1f149b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java @@ -29,7 +29,7 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; import org.apache.flink.configuration.Configuration; import org.bson.Document; import org.junit.jupiter.api.BeforeAll; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java index 7105335f37d5..de48d7046861 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java @@ -24,7 +24,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import com.ververica.cdc.debezium.utils.JdbcUrlUtils; +import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java index a5e06d86def6..5d20dd8ca0c5 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java @@ -31,7 +31,7 @@ * overriding mysql conf file, i.e. my.cnf. * *

Copied from ververica + * href="https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlContainer.java">flink-cdc * / flink-cdc-connectors. */ @SuppressWarnings("rawtypes") diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java index 087f4867715c..201446246de6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java @@ -22,7 +22,7 @@ * MySql version enum. * *

Copied from ververica + * href="https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java">flink-cdc * / flink-cdc-connectors. */ public enum MySqlVersion { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java index 17010ed9a92e..52e2fee0ab47 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java @@ -20,8 +20,8 @@ import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; -import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import org.apache.flink.cdc.connectors.postgres.source.PostgresConnectionPoolFactory; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.junit.jupiter.api.AfterAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index b5b36888ebe0..10f14ca732d5 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -25,7 +25,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.JsonSerdeUtil; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.apache.flink.core.execution.JobClient; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java index a5077c44bbde..375cbb5c43a4 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java @@ -53,6 +53,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -120,6 +121,11 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter( return new SinkRuntimeProviderContext(isBounded()) .createDataStructureConverter(producedDataType); } + + @Override + public Optional getTargetColumns() { + return Optional.empty(); + } }; public static KafkaLogStoreFactory discoverKafkaLogFactory() { diff --git a/pom.xml b/pom.xml index d7d9c5430c0f..f8d79940a009 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ under the License. 1.20.0 3.0.1-1.18 + 8.0.27 1.5.5-11 3.0.11