From 67ca9401ea8e991cd09cd98c9237c06c93305790 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 24 Jul 2024 10:10:23 +0800 Subject: [PATCH] [Improve] Update flink cdc version to 3.1.1 (#438) --- flink-doris-connector/pom.xml | 28 ++++++++++--- ...orisJsonDebeziumDeserializationSchema.java | 12 +++--- .../cdc/mongodb/MongoDBDatabaseSync.java | 16 +++---- .../cdc/mysql/DateToStringConverter.java | 3 +- .../tools/cdc/mysql/MysqlDatabaseSync.java | 20 ++++----- .../tools/cdc/oracle/OracleDatabaseSync.java | 34 +++++++-------- .../tools/cdc/oracle/OracleDateConverter.java | 3 +- .../cdc/postgres/PostgresDatabaseSync.java | 42 +++++++++---------- .../cdc/postgres/PostgresDateConverter.java | 3 +- .../cdc/sqlserver/SqlServerDatabaseSync.java | 38 ++++++++--------- .../cdc/sqlserver/SqlServerDateConverter.java | 3 +- .../doris/flink/CDCSchemaChangeExample.java | 6 +-- .../flink/utils/DateToStringConverter.java | 3 +- 13 files changed, 116 insertions(+), 95 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 052180c41..b6e90bea6 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -70,7 +70,7 @@ under the License. 1.6.2-SNAPSHOT 1.18.0 1.18 - 2.4.2 + 3.1.1 flink-python 0.16.0 13.0.0 @@ -93,6 +93,8 @@ under the License. 4.12 1.3 4.9 + 8.0.26 + 19.3.0.0 @@ -237,7 +239,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-mysql-cdc ${flink.sql.cdc.version} provided @@ -249,7 +251,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-oracle-cdc ${flink.sql.cdc.version} provided @@ -261,7 +263,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-postgres-cdc ${flink.sql.cdc.version} provided @@ -273,7 +275,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-sqlserver-cdc ${flink.sql.cdc.version} provided @@ -285,7 +287,7 @@ under the License. - com.ververica + org.apache.flink flink-sql-connector-mongodb-cdc ${flink.sql.cdc.version} @@ -297,6 +299,20 @@ under the License. + + + mysql + mysql-connector-java + ${mysql.driver.version} + test + + + com.oracle.ojdbc + ojdbc8 + ${ojdbc.version} + provided + + org.apache.flink flink-runtime-web diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java index d1d915453..b7e4575cb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java @@ -19,6 +19,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.util.Collector; import com.fasterxml.jackson.databind.JsonNode; @@ -26,12 +32,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.doris.flink.exception.DorisException; import java.math.BigDecimal; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index fe7f33d00..17138c84c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -18,6 +18,13 @@ package org.apache.doris.flink.tools.cdc.mongodb; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource; +import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder; +import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -30,13 +37,6 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; -import com.ververica.cdc.connectors.base.options.SourceOptions; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder; -import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -57,7 +57,7 @@ import java.util.List; import java.util.Map; -import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; +import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; import static org.apache.flink.util.Preconditions.checkNotNull; public class MongoDBDatabaseSync extends DatabaseSync { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java index 2080d0238..979ce903d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java @@ -17,7 +17,8 @@ package org.apache.doris.flink.tools.cdc.mysql; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.slf4j.Logger; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 635224721..c0a2f345c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -18,22 +18,22 @@ package org.apache.doris.flink.tools.cdc.mysql; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; -import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; -import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; -import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; -import com.ververica.cdc.connectors.mysql.table.StartupOptions; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index beb6a6778..784116812 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -18,20 +18,20 @@ package org.apache.doris.flink.tools.cdc.oracle; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.oracle.OracleSource; +import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder; +import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceOptions; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; -import com.ververica.cdc.connectors.oracle.OracleSource; -import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder; -import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.DebeziumSourceFunction; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -51,14 +51,14 @@ import java.util.Map; import java.util.Properties; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; -import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; public class OracleDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(OracleDatabaseSync.class); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java index b847476e7..dd2afc13c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java @@ -17,7 +17,8 @@ package org.apache.doris.flink.tools.cdc.oracle; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import oracle.sql.TIMESTAMP; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java index 74390325b..65981138a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java @@ -18,20 +18,20 @@ package org.apache.doris.flink.tools.cdc.postgres; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.postgres.PostgreSQLSource; +import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; -import com.ververica.cdc.connectors.base.options.SourceOptions; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; -import com.ververica.cdc.connectors.postgres.PostgreSQLSource; -import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder; -import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.DebeziumSourceFunction; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -50,17 +50,17 @@ import java.util.Map; import java.util.Properties; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; -import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; -import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME; -import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; -import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME; public class PostgresDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(PostgresDatabaseSync.class); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java index 68562384c..717ae88dd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java @@ -17,7 +17,8 @@ package org.apache.doris.flink.tools.cdc.postgres; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.slf4j.Logger; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index 3f674fb2d..577c8530d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -18,21 +18,21 @@ package org.apache.doris.flink.tools.cdc.sqlserver; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.flink.cdc.connectors.sqlserver.SqlServerSource; +import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; -import com.ververica.cdc.connectors.base.options.JdbcSourceOptions; -import com.ververica.cdc.connectors.base.options.SourceOptions; -import com.ververica.cdc.connectors.base.options.StartupOptions; -import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; -import com.ververica.cdc.connectors.sqlserver.SqlServerSource; -import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.DebeziumSourceFunction; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -51,14 +51,14 @@ import java.util.Map; import java.util.Properties; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; -import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; -import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; public class SqlServerDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java index abab9f606..0f6043477 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java @@ -17,7 +17,8 @@ package org.apache.doris.flink.tools.cdc.sqlserver; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.slf4j.Logger; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java index 74e635ba8..01c33833f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java @@ -18,11 +18,11 @@ package org.apache.doris.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; -import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java index 342e8d165..a63228935 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java @@ -17,7 +17,8 @@ package org.apache.doris.flink.utils; -import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.slf4j.Logger;