You should specify MySQL source database in {@code mySqlConfig}. See document + * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document * of flink-cdc-connectors for detailed keys and values. * *
For each MySQL table to be synchronized, if the corresponding Paimon table does not exist, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index f84bf979ab966..316e3f7822a57 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -147,7 +147,7 @@ public void printHelp() { + "It can't be a regular expression."); System.out.println( "For a complete list of supported configurations, " - + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options"); + + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options"); System.out.println(); System.out.println("Paimon catalog and table sink conf syntax:"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index de6c22a21b7d0..eeb273265772b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -27,8 +27,8 @@ import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo; import org.apache.paimon.schema.Schema; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import java.util.ArrayList; import java.util.List; @@ -43,7 +43,7 @@ * An {@link Action} which synchronize one or multiple MySQL tables into one Paimon table. * *
You should specify MySQL source table in {@code mySqlConfig}. See document + * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options">document * of flink-cdc-connectors for detailed keys and values. * *
If the specified Paimon table does not exist, this action will automatically create the table.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
index eab8f37eec622..141b7b73e1f7e 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java
@@ -98,7 +98,7 @@ public void printHelp() {
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
- + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options");
+ + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#connector-options");
System.out.println();
System.out.println("Paimon catalog and table sink conf syntax:");
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index ed5a3afd4b6e1..2d6e1b0a8ada7 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -47,7 +47,7 @@
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
/* This file is based on source code from MySqlTypeUtils in the flink-cdc-connectors Project
- * (https://ververica.github.io/flink-cdc-connectors/), licensed by the Apache Software Foundation (ASF)
+ * (https://github.com/apache/flink-cdc/), licensed by the Apache Software Foundation (ASF)
* under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
index 562d138ad822c..c03b050fa2a64 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
@@ -18,11 +18,11 @@
package org.apache.paimon.flink.action.cdc.mysql.format;
-import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import io.debezium.document.Array;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import java.io.IOException;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
index 45d8f69ccbd12..165a77eb3f1c7 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
@@ -27,12 +27,12 @@
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
-import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
-import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -138,7 +138,7 @@ public static JdbcIncrementalSource You should specify PostgreSQL source table in {@code postgresConfig}. See document
+ * href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#connector-options">document
* of flink-cdc-connectors for detailed keys and values.
*
* If the specified Paimon table does not exist, this action will automatically create the table.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java
index afc8b726896bd..b40a480f839ae 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionFactory.java
@@ -99,7 +99,7 @@ public void printHelp() {
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
- + "see https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options");
+ + "see https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#connector-options");
System.out.println();
System.out.println("Paimon catalog and table sink conf syntax:");
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
index 9ce39ad29e5ae..e202e4eecabd3 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
@@ -20,8 +20,8 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
index e59d124a7b276..26733465a0546 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -26,8 +26,8 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.pulsar.source.PulsarSource;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
index eb207f8c0bdac..394cdd1f149bc 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -29,7 +29,7 @@
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
-import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;
import org.junit.jupiter.api.BeforeAll;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index 7105335f37d57..de48d7046861a 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -24,7 +24,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
+import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
index a5e06d86def61..5d20dd8ca0c53 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
@@ -31,7 +31,7 @@
* overriding mysql conf file, i.e. my.cnf.
*
* Copied from ververica
+ * href="https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlContainer.java">flink-cdc
* / flink-cdc-connectors.
*/
@SuppressWarnings("rawtypes")
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
index 087f4867715cd..201446246de60 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
@@ -22,7 +22,7 @@
* MySql version enum.
*
* Copied from ververica
+ * href="https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java">flink-cdc
* / flink-cdc-connectors.
*/
public enum MySqlVersion {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
index 17010ed9a92e2..52e2fee0ab47d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
@@ -20,8 +20,8 @@
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
-import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
-import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index b5b36888ebe06..10f14ca732d5d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -25,7 +25,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
-import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.flink.core.execution.JobClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index a5077c44bbdea..f1a531d324165 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -47,13 +47,7 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -120,6 +114,11 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
return new SinkRuntimeProviderContext(isBounded())
.createDataStructureConverter(producedDataType);
}
+
+ @Override
+ public Optional