diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 13a7ec471..54dc7baf1 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -70,7 +70,7 @@ under the License.
1.5.0-SNAPSHOT
1.18.0
1.18
- 2.4.1
+ 2.4.2
0.16.0
5.0.0
3.10.1
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 50ae7fcc9..1b6431202 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -33,12 +33,14 @@
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +88,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer
+ private Map tableMapping;
public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
Pattern pattern,
@@ -93,9 +97,11 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
boolean newSchemaChange) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
- String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
- this.database = tableInfo[0];
- this.table = tableInfo[1];
+ if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+ String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+ this.database = tableInfo[0];
+ this.table = tableInfo[1];
+ }
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
@@ -120,10 +126,29 @@ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
}
}
+ public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
+ Pattern pattern,
+ String sourceTableName,
+ boolean newSchemaChange,
+ DorisExecutionOptions executionOptions,
+ Map tableMapping) {
+ this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions);
+ this.tableMapping = tableMapping;
+ }
+
@Override
public Tuple2 serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+
+ //Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier);
+ if(StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)){
+ LOG.warn("filter table {}, because it is not listened, record detail is {}", cdcTableIdentifier, record);
+ return null;
+ }
+
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
@@ -155,7 +180,8 @@ public Tuple2 serialize(String record) throws IOException {
LOG.error("parse record fail, unknown op {} in {}", op, record);
return null;
}
- return Tuple2.of(null, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+
+ return Tuple2.of(dorisTableIdentifier, objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
}
/**
@@ -187,6 +213,13 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
+
+ // db,table
+ Tuple2 tuple = getDorisTableTuple(recordRoot);
+ if(tuple == null){
+ return false;
+ }
+
List ddlSqlList = extractDDLList(recordRoot);
if (CollectionUtils.isEmpty(ddlSqlList)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
@@ -197,8 +230,8 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
for (int i = 0; i < ddlSqlList.size(); i++) {
DDLSchema ddlSchema = ddlSchemas.get(i);
String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql, database);
+ boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema);
+ status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0);
LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
}
} catch (Exception ex) {
@@ -209,6 +242,7 @@ public boolean schemaChangeV2(JsonNode recordRoot) {
@VisibleForTesting
public List extractDDLList(JsonNode record) throws JsonProcessingException {
+ String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
@@ -233,7 +267,7 @@ public List extractDDLList(JsonNode record) throws JsonProcessingExcepti
String oldColumnName = renameMatcher.group(2);
String newColumnName = renameMatcher.group(3);
return SchemaChangeHelper.generateRenameDDLSql(
- dorisOptions.getTableIdentifier(), oldColumnName, newColumnName, originFieldSchemaMap);
+ dorisTable, oldColumnName, newColumnName, originFieldSchemaMap);
}
// add/drop ddl
@@ -248,7 +282,7 @@ public List extractDDLList(JsonNode record) throws JsonProcessingExcepti
if (!matcher.find()) {
return null;
}
- return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
+ return SchemaChangeHelper.generateDDLSql(dorisTable);
}
@VisibleForTesting
@@ -262,13 +296,20 @@ public boolean schemaChange(JsonNode recordRoot) {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
+ // db,table
+ Tuple2 tuple = getDorisTableTuple(recordRoot);
+ if(tuple == null){
+ return false;
+ }
+
String ddl = extractDDL(recordRoot);
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
- boolean doSchemaChange = checkSchemaChange(ddl);
- status = doSchemaChange && schemaChangeManager.execute(ddl, database);
+
+ boolean doSchemaChange = checkSchemaChange(ddl, tuple.f0, tuple.f1);
+ status = doSchemaChange && schemaChangeManager.execute(ddl, tuple.f0);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
@@ -286,12 +327,48 @@ protected boolean checkTable(JsonNode recordRoot) {
return sourceTableName.equals(dbTbl);
}
- private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
+ public String getCdcTableIdentifier(JsonNode record){
+ String db = extractJsonNode(record.get("source"), "db");
+ String schema = extractJsonNode(record.get("source"), "schema");
+ String table = extractJsonNode(record.get("source"), "table");
+ return SourceSchema.getString(db, schema, table);
+ }
+
+ public String getDorisTableIdentifier(String cdcTableIdentifier){
+ if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+ return dorisOptions.getTableIdentifier();
+ }
+ if(!CollectionUtil.isNullOrEmpty(tableMapping)
+ && !StringUtils.isNullOrWhitespaceOnly(cdcTableIdentifier)
+ && tableMapping.get(cdcTableIdentifier) != null){
+ return tableMapping.get(cdcTableIdentifier);
+ }
+ return null;
+ }
+
+ protected String getDorisTableIdentifier(JsonNode record){
+ String identifier = getCdcTableIdentifier(record);
+ return getDorisTableIdentifier(identifier);
+ }
+
+ protected Tuple2 getDorisTableTuple(JsonNode record){
+ String identifier = getDorisTableIdentifier(record);
+ if(StringUtils.isNullOrWhitespaceOnly(identifier)){
+ return null;
+ }
+ String[] tableInfo = identifier.split("\\.");
+ if(tableInfo.length != 2){
+ return null;
+ }
+ return Tuple2.of(tableInfo[0], tableInfo[1]);
+ }
+
+ private boolean checkSchemaChange(String database, String table, String ddl) throws IOException, IllegalArgumentException {
Map param = buildRequestParam(ddl);
return schemaChangeManager.checkSchemaChange(database, table, param);
}
- private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
+ private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema) throws IOException, IllegalArgumentException {
Map param = SchemaChangeManager.buildRequestParam(ddlSchema.isDropColumn(), ddlSchema.getColumnName());
return schemaChangeManager.checkSchemaChange(database, table, param);
}
@@ -328,8 +405,6 @@ protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
-
-
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ? record.get(key).asText() : null;
@@ -369,7 +444,7 @@ public String extractDDL(JsonNode record) throws JsonProcessingException {
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
- ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
+ ddl = String.format(EXECUTE_DDL, getDorisTableIdentifier(record), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
@@ -473,6 +548,11 @@ public void setSourceConnector(String sourceConnector) {
this.sourceConnector = SourceConnector.valueOf(sourceConnector.toUpperCase());
}
+ @VisibleForTesting
+ public void setTableMapping(Map tableMapping) {
+ this.tableMapping = tableMapping;
+ }
+
public static JsonDebeziumSchemaSerializer.Builder builder() {
return new JsonDebeziumSchemaSerializer.Builder();
}
@@ -486,6 +566,7 @@ public static class Builder {
private String sourceTableName;
private boolean newSchemaChange;
private DorisExecutionOptions executionOptions;
+ private Map tableMapping;
public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
@@ -512,9 +593,14 @@ public Builder setExecutionOptions(DorisExecutionOptions executionOptions) {
return this;
}
+ public Builder setTableMapping(Map tableMapping) {
+ this.tableMapping = tableMapping;
+ return this;
+ }
+
public JsonDebeziumSchemaSerializer build() {
return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange,
- executionOptions);
+ executionOptions, tableMapping);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 8a8b3db57..d05fa15c5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -108,13 +108,28 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
+ boolean singleSink = params.has("single-sink");
Map sinkMap = getConfigMap(params, "sink-conf");
Map tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConfig)
+ .setTableConfig(tableMap)
+ .setCreateTableOnly(createTableOnly)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index f1579b093..4dfa6d529 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -25,7 +25,6 @@
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
-import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -60,7 +59,7 @@ public abstract class DatabaseSync {
protected Pattern includingPattern;
protected Pattern excludingPattern;
protected Map multiToOneRulesPattern;
- protected Map tableConfig;
+ protected Map tableConfig = new HashMap<>();
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
@@ -71,6 +70,10 @@ public abstract class DatabaseSync {
protected String excludingTables;
protected String multiToOneOrigin;
protected String multiToOneTarget;
+ protected String tablePrefix;
+ protected String tableSuffix;
+ protected boolean singleSink;
+ private Map tableMapping = new HashMap<>();
public abstract void registerDriver() throws SQLException;
@@ -84,30 +87,15 @@ public DatabaseSync() throws SQLException {
registerDriver();
}
- public void create(StreamExecutionEnvironment env, String database, Configuration config,
- String tablePrefix, String tableSuffix, String includingTables,
- String excludingTables,String multiToOneOrigin,String multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
- Map tableConfig, boolean createTableOnly, boolean useNewSchemaChange) {
- this.env = env;
- this.config = config;
- this.database = database;
- this.includingTables = includingTables;
- this.excludingTables = excludingTables;
- this.multiToOneOrigin = multiToOneOrigin;
- this.multiToOneTarget = multiToOneTarget;
+ public void create() {
this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
this.converter = new TableNameConverter(tablePrefix, tableSuffix,multiToOneRulesPattern);
- this.ignoreDefaultValue = ignoreDefaultValue;
- this.sinkConfig = sinkConfig;
- this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
//default enable light schema change
if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
}
- this.createTableOnly = createTableOnly;
- this.newSchemaChange = useNewSchemaChange;
}
public void build() throws Exception {
@@ -125,7 +113,10 @@ public void build() throws Exception {
List dorisTables = new ArrayList<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
- String dorisTable=converter.convert(schema.getTableName());
+ String dorisTable = converter.convert(schema.getTableName());
+
+ //Calculate the mapping relationship between upstream and downstream tables
+ tableMapping.put(schema.getTableIdentifier(), String.format("%s.%s", database, dorisTable));
if (!dorisSystem.tableExists(database, dorisTable)) {
TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
//set doris target database
@@ -144,12 +135,16 @@ public void build() throws Exception {
config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", syncTables) + ")");
DataStreamSource streamSource = buildCdcSource(env);
- SingleOutputStreamOperator parsedStream = streamSource.process(new ParsingProcessFunction(converter));
- for (String table : dorisTables) {
- OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
- DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag);
- int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
- sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+ if(singleSink){
+ streamSource.sinkTo(buildDorisSink());
+ }else{
+ SingleOutputStreamOperator parsedStream = streamSource.process(new ParsingProcessFunction(converter));
+ for (String table : dorisTables) {
+ OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
+ DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag);
+ int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+ sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
+ }
}
}
@@ -171,6 +166,13 @@ private DorisConnectionOptions getDorisConnectionOptions() {
return builder.build();
}
+ /**
+ * create doris sink for multi table
+ */
+ public DorisSink buildDorisSink(){
+ return buildDorisSink(null);
+ }
+
/**
* create doris sink
*/
@@ -179,17 +181,20 @@ public DorisSink buildDorisSink(String table) {
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
- String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
DorisSink.Builder builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
.setBenodes(benodes)
- .setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT).ifPresent(dorisBuilder::setAutoRedirect);
+ //single sink not need table identifier
+ if(!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)){
+ dorisBuilder.setTableIdentifier(database + "." + table);
+ }
+
Properties pro = new Properties();
//default json data format
pro.setProperty("format", "json");
@@ -198,9 +203,9 @@ public DorisSink buildDorisSink(String table) {
Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
pro.putAll(streamLoadProp);
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder()
- .setLabelPrefix(String.join("-", labelPrefix, database, table))
.setStreamLoadProp(pro);
+ sinkConfig.getOptional(DorisConfigOptions.SINK_LABEL_PREFIX).ifPresent(executionBuilder::setLabelPrefix);
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
@@ -234,6 +239,7 @@ public DorisSink buildDorisSink(String table) {
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
+ .setTableMapping(tableMapping)
.build())
.setDorisOptions(dorisBuilder.build());
return builder.build();
@@ -278,6 +284,82 @@ protected HashMap multiToOneRulesParser(String multiToOneOrigin,
return multiToOneRulesPattern;
}
+
+ public DatabaseSync setEnv(StreamExecutionEnvironment env) {
+ this.env = env;
+ return this;
+ }
+
+ public DatabaseSync setConfig(Configuration config) {
+ this.config = config;
+ return this;
+ }
+
+ public DatabaseSync setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public DatabaseSync setIncludingTables(String includingTables) {
+ this.includingTables = includingTables;
+ return this;
+ }
+
+ public DatabaseSync setExcludingTables(String excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneOrigin(String multiToOneOrigin) {
+ this.multiToOneOrigin = multiToOneOrigin;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
+ this.multiToOneTarget = multiToOneTarget;
+ return this;
+ }
+
+ public DatabaseSync setTableConfig(Map tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public DatabaseSync setSinkConfig(Configuration sinkConfig) {
+ this.sinkConfig = sinkConfig;
+ return this;
+ }
+
+ public DatabaseSync setIgnoreDefaultValue(boolean ignoreDefaultValue) {
+ this.ignoreDefaultValue = ignoreDefaultValue;
+ return this;
+ }
+
+ public DatabaseSync setCreateTableOnly(boolean createTableOnly) {
+ this.createTableOnly = createTableOnly;
+ return this;
+ }
+
+ public DatabaseSync setNewSchemaChange(boolean newSchemaChange) {
+ this.newSchemaChange = newSchemaChange;
+ return this;
+ }
+
+ public DatabaseSync setSingleSink(boolean singleSink) {
+ this.singleSink = singleSink;
+ return this;
+ }
+
+ public DatabaseSync setTablePrefix(String tablePrefix) {
+ this.tablePrefix = tablePrefix;
+ return this;
+ }
+
+ public DatabaseSync setTableSuffix(String tableSuffix) {
+ this.tableSuffix = tableSuffix;
+ return this;
+ }
+
public static class TableNameConverter implements Serializable {
private static final long serialVersionUID = 1L;
private final String prefix;
@@ -322,4 +404,6 @@ public String convert(String tableName) {
return target;
}
}
+
+
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
similarity index 96%
rename from flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
rename to flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 563c848d8..a1c17f646 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -14,11 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.tools.cdc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 9168cb515..51d801355 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -19,6 +19,7 @@
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.flink.util.StringUtils;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
@@ -27,9 +28,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.StringJoiner;
public abstract class SourceSchema {
private final String databaseName;
+ private final String schemaName;
private final String tableName;
private final String tableComment;
private final LinkedHashMap fields;
@@ -40,6 +43,7 @@ public SourceSchema(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment)
throws Exception {
this.databaseName = databaseName;
+ this.schemaName = schemaName;
this.tableName = tableName;
this.tableComment = tableComment;
@@ -74,6 +78,26 @@ public SourceSchema(
public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
+ public String getTableIdentifier(){
+ return getString(databaseName, schemaName, tableName);
+ }
+
+ public static String getString(String databaseName, String schemaName, String tableName) {
+ StringJoiner identifier = new StringJoiner(".");
+ if(!StringUtils.isNullOrWhitespaceOnly(databaseName)){
+ identifier.add(databaseName);
+ }
+ if(!StringUtils.isNullOrWhitespaceOnly(schemaName)){
+ identifier.add(schemaName);
+ }
+
+ if(!StringUtils.isNullOrWhitespaceOnly(tableName)){
+ identifier.add(tableName);
+ }
+
+ return identifier.toString();
+ }
+
public TableSchema convertTableSchema(Map tableProps) {
TableSchema tableSchema = new TableSchema();
tableSchema.setModel(this.model);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index f1e072e0a..416842f3d 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -43,6 +43,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -359,4 +360,61 @@ private static Field getField(String column) throws DorisException {
}
return targetField;
}
+
+ @Test
+ public void testGetCdcTableIdentifier() throws Exception {
+ String insert = "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ JsonNode recordRoot = objectMapper.readTree(insert);
+ String identifier = serializer.getCdcTableIdentifier(recordRoot);
+ Assert.assertEquals( "test.t1", identifier);
+
+ String insertSchema = "{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"schema\":\"dbo\",\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"ts_ms\":1663924328869,\"transaction\":null}";
+ String identifierSchema = serializer.getCdcTableIdentifier(objectMapper.readTree(insertSchema));
+ Assert.assertEquals( "test.dbo.t1", identifierSchema);
+
+ String ddl = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String ddlRes = serializer.getCdcTableIdentifier(objectMapper.readTree(ddl));
+ Assert.assertEquals( "test.t1", ddlRes);
+ }
+
+ @Test
+ public void testGetDorisTableIdentifier() throws Exception {
+ Map map = new HashMap<>();
+ map.put("test.dbo.t1", "test.t1");
+ serializer.setTableMapping(map);
+ String identifier = serializer.getDorisTableIdentifier("test.dbo.t1");
+ Assert.assertEquals( "test.t1", identifier);
+
+ identifier = serializer.getDorisTableIdentifier("test.t1");
+ Assert.assertEquals("test.t1", identifier);
+
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ identifier = serializer.getDorisTableIdentifier("test.t1");
+ Assert.assertNull( identifier);
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
+ @Test
+ public void testSchemaChangeMultiTable() throws Exception {
+ Map map = new HashMap<>();
+ map.put("mysql.t1", "doris.t1");
+ map.put("mysql.t2", "doris.t2");
+ serializer.setTableMapping(map);
+ String tmp = dorisOptions.getTableIdentifier();
+ dorisOptions.setTableIdentifier(null);
+ String ddl1 = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String ddl2 = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"mysql\",\"sequence\":null,\"table\":\"t2\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_sec\\\":1663924503,\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13221,\\\"server_id\\\":1},\\\"databaseName\\\":\\\"test\\\",\\\"ddl\\\":\\\"alter table t1 add \\\\n c_1 varchar(200)\\\",\\\"tableChanges\\\":[{\\\"type\\\":\\\"ALTER\\\",\\\"id\\\":\\\"\\\\\\\"test\\\\\\\".\\\\\\\"t1\\\\\\\"\\\",\\\"table\\\":{\\\"defaultCharsetName\\\":\\\"utf8mb4\\\",\\\"primaryKeyColumnNames\\\":[\\\"id\\\"],\\\"columns\\\":[{\\\"name\\\":\\\"id\\\",\\\"jdbcType\\\":4,\\\"typeName\\\":\\\"INT\\\",\\\"typeExpression\\\":\\\"INT\\\",\\\"charsetName\\\":null,\\\"position\\\":1,\\\"optional\\\":false,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"name\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":128,\\\"position\\\":2,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dt\\\",\\\"jdbcType\\\":91,\\\"typeName\\\":\\\"DATE\\\",\\\"typeExpression\\\":\\\"DATE\\\",\\\"charsetName\\\":null,\\\"position\\\":3,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"dtime\\\",\\\"jdbcType\\\":93,\\\"typeName\\\":\\\"DATETIME\\\",\\\"typeExpression\\\":\\\"DATETIME\\\",\\\"charsetName\\\":null,\\\"position\\\":4,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"ts\\\",\\\"jdbcType\\\":2014,\\\"typeName\\\":\\\"TIMESTAMP\\\",\\\"typeExpression\\\":\\\"TIMESTAMP\\\",\\\"charsetName\\\":null,\\\"position\\\":5,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false},{\\\"name\\\":\\\"c_1\\\",\\\"jdbcType\\\":12,\\\"typeName\\\":\\\"VARCHAR\\\",\\\"typeExpression\\\":\\\"VARCHAR\\\",\\\"charsetName\\\":\\\"utf8mb4\\\",\\\"length\\\":200,\\\"position\\\":6,\\\"optional\\\":true,\\\"autoIncremented\\\":false,\\\"generated\\\":false}]}}]}\"}";
+ String exceptDDL1 = "ALTER TABLE doris.t1 add COLUMN c_1 varchar(600)";
+ String exceptDDL2 = "ALTER TABLE doris.t2 add COLUMN c_1 varchar(600)";
+
+ Assert.assertEquals(exceptDDL1, serializer.extractDDL(objectMapper.readTree(ddl1)));
+ Assert.assertEquals(exceptDDL2, serializer.extractDDL(objectMapper.readTree(ddl2)));
+
+ //Assert.assertEquals(exceptDDL1, serializer.extractDDLList(objectMapper.readTree(ddl1)));
+ //Assert.assertEquals(exceptDDL2, serializer.extractDDLList(objectMapper.readTree(ddl2)));
+
+ dorisOptions.setTableIdentifier(tmp);
+ }
+
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 875fb4c92..f0493a9dd 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -72,7 +72,21 @@ public static void main(String[] args) throws Exception{
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 9b6277fcd..61401386a 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -76,7 +76,21 @@ public static void main(String[] args) throws Exception{
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new OracleDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("Oracle-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 87fa871c4..4887f360b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -79,7 +79,21 @@ public static void main(String[] args) throws Exception{
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new PostgresDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("Postgres-Doris Database Sync: %s", database));
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index d247500ea..7129e773f 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -77,7 +77,21 @@ public static void main(String[] args) throws Exception{
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
DatabaseSync databaseSync = new SqlServerDatabaseSync();
- databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .create();
databaseSync.build();
env.execute(String.format("SqlServer-Doris Database Sync: %s", database));