From a4b4bdfc92bb8fecefffb6f4f81b0a8f577d142e Mon Sep 17 00:00:00 2001 From: Antg <57290855+codeAntg@users.noreply.github.com> Date: Mon, 6 Nov 2023 16:43:46 +0800 Subject: [PATCH] [feature] multiple tables to one for DatabaseSync (#208) --- .../doris/flink/tools/cdc/CdcTools.java | 4 +- .../doris/flink/tools/cdc/DatabaseSync.java | 70 +++++++++++++++++-- .../tools/cdc/CdcMysqlSyncDatabaseCase.java | 10 ++- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 6 +- .../cdc/CdcPostgresSyncDatabaseCase.java | 6 +- .../cdc/CdcSqlServerSyncDatabaseCase.java | 10 +-- .../flink/tools/cdc/DatabaseSyncTest.java | 40 +++++++++++ 7 files changed, 128 insertions(+), 18 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 6a390ea82..8a8b3db57 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -103,6 +103,8 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data String tableSuffix = params.get("table-suffix"); String includingTables = params.get("including-tables"); String excludingTables = params.get("excluding-tables"); + String multiToOneOrigin = params.get("multi-to-one-origin"); + String multiToOneTarget = params.get("multi-to-one-target"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); boolean useNewSchemaChange = params.has("use-new-schema-change"); @@ -112,7 +114,7 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data Configuration sinkConfig = Configuration.fromMap(sinkMap); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db")); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index fcd0f4c0e..99c45ebe5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ public abstract class DatabaseSync { protected TableNameConverter converter; protected Pattern includingPattern; protected Pattern excludingPattern; + protected Map multiToOneRulesPattern; protected Map tableConfig; protected Configuration sinkConfig; protected boolean ignoreDefaultValue; @@ -67,6 +69,8 @@ public abstract class DatabaseSync { private boolean newSchemaChange; protected String includingTables; protected String excludingTables; + protected String multiToOneOrigin; + protected String multiToOneTarget; public abstract void registerDriver() throws SQLException; @@ -82,16 +86,19 @@ public DatabaseSync() throws SQLException { public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, - String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, + String excludingTables,String multiToOneOrigin,String multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig, Map tableConfig, boolean createTableOnly, boolean useNewSchemaChange) { this.env = env; this.config = config; this.database = database; - this.converter = new TableNameConverter(tablePrefix, tableSuffix); this.includingTables = includingTables; this.excludingTables = excludingTables; + this.multiToOneOrigin = multiToOneOrigin; + this.multiToOneTarget = multiToOneTarget; this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin,multiToOneTarget); + this.converter = new TableNameConverter(tablePrefix, tableSuffix,multiToOneRulesPattern); this.ignoreDefaultValue = ignoreDefaultValue; this.sinkConfig = sinkConfig; this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; @@ -118,7 +125,7 @@ public void build() throws Exception { List dorisTables = new ArrayList<>(); for (SourceSchema schema : schemaList) { syncTables.add(schema.getTableName()); - String dorisTable = converter.convert(schema.getTableName()); + String dorisTable=converter.convert(schema.getTableName()); if (!dorisSystem.tableExists(database, dorisTable)) { TableSchema dorisSchema = schema.convertTableSchema(tableConfig); //set doris target database @@ -126,7 +133,9 @@ public void build() throws Exception { dorisSchema.setTable(dorisTable); dorisSystem.createTable(dorisSchema); } - dorisTables.add(dorisTable); + if(!dorisTables.contains(dorisTable)){ + dorisTables.add(dorisTable); + } } if(createTableOnly){ System.out.println("Create table finished."); @@ -139,7 +148,6 @@ public void build() throws Exception { for (String table : dorisTables) { OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table); DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag); - int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table); } @@ -245,11 +253,36 @@ protected boolean isSyncNeeded(String tableName) { LOG.debug("table {} is synchronized? {}", tableName, sync); return sync; } + /** + * Filter table that many tables merge to one + */ + protected HashMap multiToOneRulesParser(String multiToOneOrigin,String multiToOneTarget){ + if(StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)){ + return null; + } + HashMap multiToOneRulesPattern= new HashMap<>(); + String[] origins = multiToOneOrigin.split("\\|"); + String[] targets = multiToOneTarget.split("\\|"); + if(origins.length!=targets.length){ + System.out.println("param error : multi to one params length are not equal,please check your params."); + System.exit(1); + } + try { + for (int i = 0; i < origins.length; i++) { + multiToOneRulesPattern.put(Pattern.compile(origins[i]),targets[i]); + } + } catch (Exception e) { + System.out.println("param error : Your regular expression is incorrect,please check."); + System.exit(1); + } + return multiToOneRulesPattern; + } public static class TableNameConverter implements Serializable { private static final long serialVersionUID = 1L; private final String prefix; private final String suffix; + private Map multiToOneRulesPattern; TableNameConverter(){ this("",""); @@ -260,8 +293,33 @@ public static class TableNameConverter implements Serializable { this.suffix = suffix == null ? "" : suffix; } + TableNameConverter(String prefix, String suffix,Map multiToOneRulesPattern) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + this.multiToOneRulesPattern = multiToOneRulesPattern; + } + public String convert(String tableName) { - return prefix + tableName + suffix; + if(multiToOneRulesPattern==null){ + return prefix + tableName + suffix; + } + + String target=null; + + for (Map.Entry patternStringEntry : multiToOneRulesPattern.entrySet()) { + if(patternStringEntry.getKey().matcher(tableName).matches()){ + target=patternStringEntry.getValue(); + } + } + /** + * If multiToOneRulesPattern is not null and target is not assigned, + * then the synchronization task contains both multi to one and one to one , + * prefixes and suffixes are added to common one-to-one mapping tables + * */ + if(target==null){ + return prefix + tableName + suffix; + } + return target; } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 1a205b163..875fb4c92 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -48,7 +48,8 @@ public static void main(String[] args) throws Exception{ mysqlConfig.put("hostname","127.0.0.1"); mysqlConfig.put("port","3306"); mysqlConfig.put("username","root"); - mysqlConfig.put("password",""); +// mysqlConfig.put("password",""); + mysqlConfig.put("password","12345678"); Configuration config = Configuration.fromMap(mysqlConfig); Map sinkConfig = new HashMap<>(); @@ -63,12 +64,15 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); - String includingTables = "tbl1|tbl2|tbl3"; +// String includingTables = "tbl1|tbl2|tbl3"; + String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; + String multiToOneOrigin="a_.*|b_.*"; + String multiToOneTarget="a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index 3a2a39efb..9b6277fcd 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -69,12 +69,14 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); - String includingTables = "test.*"; + String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; + String multiToOneOrigin="a_.*|b_.*"; + String multiToOneTarget="a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; DatabaseSync databaseSync = new OracleDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); env.execute(String.format("Oracle-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index cf5e1d89a..87fa871c4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -72,12 +72,14 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); - String includingTables = "testcdc"; + String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; + String multiToOneOrigin="a_.*|b_.*"; + String multiToOneTarget="a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; DatabaseSync databaseSync = new PostgresDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); env.execute(String.format("Postgres-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 7251a7fda..d247500ea 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -51,7 +51,7 @@ public static void main(String[] args) throws Exception{ sourceConfig.put("hostname","127.0.0.1"); sourceConfig.put("port","1433"); sourceConfig.put("username","sa"); - sourceConfig.put("password","123456"); + sourceConfig.put("password","Passw@rd"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); @@ -70,14 +70,16 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); - String includingTables = "products_test"; + String includingTables = "a_.*|b_.*|c"; String excludingTables = ""; + String multiToOneOrigin="a_.*|b_.*"; + String multiToOneTarget="a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; DatabaseSync databaseSync = new SqlServerDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); - env.execute(String.format("Postgres-Doris Database Sync: %s", database)); + env.execute(String.format("SqlServer-Doris Database Sync: %s", database)); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java new file mode 100644 index 000000000..daab90b9e --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; +import org.junit.Test; +import java.util.Arrays; + +/** + * Unit tests for the {@link DatabaseSync}. + **/ +public class DatabaseSyncTest { + @Test + public void multiToOneRulesParserTest() throws Exception{ + String[][] testCase = { + {"a_.*|b_.*","a|b"} // Normal condition +// ,{"a_.*|b_.*","a|b|c"} // Unequal length +// ,{"",""} // Null value +// ,{"***....","a"} // Abnormal regular expression + }; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + Arrays.stream(testCase).forEach(arr->{ + databaseSync.multiToOneRulesParser(arr[0], arr[1]); + }); + } +}