diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 052180c41..b6e90bea6 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -70,7 +70,7 @@ under the License.
1.6.2-SNAPSHOT
1.18.0
1.18
- 2.4.2
+ 3.1.1
flink-python
0.16.0
13.0.0
@@ -93,6 +93,8 @@ under the License.
4.12
1.3
4.9
+ 8.0.26
+ 19.3.0.0
@@ -237,7 +239,7 @@ under the License.
- com.ververica
+ org.apache.flink
flink-sql-connector-mysql-cdc
${flink.sql.cdc.version}
provided
@@ -249,7 +251,7 @@ under the License.
- com.ververica
+ org.apache.flink
flink-sql-connector-oracle-cdc
${flink.sql.cdc.version}
provided
@@ -261,7 +263,7 @@ under the License.
- com.ververica
+ org.apache.flink
flink-sql-connector-postgres-cdc
${flink.sql.cdc.version}
provided
@@ -273,7 +275,7 @@ under the License.
- com.ververica
+ org.apache.flink
flink-sql-connector-sqlserver-cdc
${flink.sql.cdc.version}
provided
@@ -285,7 +287,7 @@ under the License.
- com.ververica
+ org.apache.flink
flink-sql-connector-mongodb-cdc
${flink.sql.cdc.version}
@@ -297,6 +299,20 @@ under the License.
+
+
+ mysql
+ mysql-connector-java
+ ${mysql.driver.version}
+ test
+
+
+ com.oracle.ojdbc
+ ojdbc8
+ ${ojdbc.version}
+ provided
+
+
org.apache.flink
flink-runtime-web
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
index d1d915453..b7e4575cb 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
@@ -19,6 +19,12 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.util.Collector;
import com.fasterxml.jackson.databind.JsonNode;
@@ -26,12 +32,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.doris.flink.exception.DorisException;
import java.math.BigDecimal;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index fe7f33d00..17138c84c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -18,6 +18,13 @@
package org.apache.doris.flink.tools.cdc.mongodb;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
+import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
+import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -30,13 +37,6 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
-import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -57,7 +57,7 @@
import java.util.List;
import java.util.Map;
-import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class MongoDBDatabaseSync extends DatabaseSync {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
index 2080d0238..979ce903d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.mysql;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 635224721..c0a2f345c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -18,22 +18,22 @@
package org.apache.doris.flink.tools.cdc.mysql;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
-import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index beb6a6778..784116812 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -18,20 +18,20 @@
package org.apache.doris.flink.tools.cdc.oracle;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.oracle.OracleSource;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.oracle.OracleSource;
-import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
-import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -51,14 +51,14 @@
import java.util.Map;
import java.util.Properties;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
public class OracleDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
index b847476e7..dd2afc13c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.oracle;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import oracle.sql.TIMESTAMP;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 74390325b..65981138a 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -18,20 +18,20 @@
package org.apache.doris.flink.tools.cdc.postgres;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.postgres.PostgreSQLSource;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
-import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
-import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -50,17 +50,17 @@
import java.util.Map;
import java.util.Properties;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
-import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
-import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
-import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
public class PostgresDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
index 68562384c..717ae88dd 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.postgres;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 3f674fb2d..577c8530d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -18,21 +18,21 @@
package org.apache.doris.flink.tools.cdc.sqlserver;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.connectors.sqlserver.SqlServerSource;
+import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
-import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
-import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -51,14 +51,14 @@
import java.util.Map;
import java.util.Properties;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
public class SqlServerDatabaseSync extends DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
index abab9f606..0f6043477 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.sqlserver;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index 74e635ba8..01c33833f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -18,11 +18,11 @@
package org.apache.doris.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
index 342e8d165..a63228935 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.utils;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;