Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 30, 2023
1 parent d7e36a8 commit 081b968
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setNewSchemaChange(useNewSchemaChange)
.setSingleSink(singleSink)
.create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ protected String getSyncTableList(List<String> syncTables){
}else{
// includingTablePattern and ^excludingPattern
String includingPattern = String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables);
if (excludingTables.isEmpty()) {
if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
return includingPattern;
}else{
String excludingPattern = String.format("?!(^%s$)", getTableListPrefix() + "\\." + excludingTables);
String excludingPattern = String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables);
return String.format("(%s)(%s)", includingPattern, excludingPattern);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.doris.flink.tools.cdc;

import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.flink.configuration.Configuration;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;

/**
Expand All @@ -37,4 +40,17 @@ public void multiToOneRulesParserTest() throws Exception{
databaseSync.multiToOneRulesParser(arr[0], arr[1]);
});
}

@Test
public void getSyncTableListTest() throws Exception{
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync.setSingleSink(false);
databaseSync.setIncludingTables("tbl_1|tbl_2");
Configuration config = new Configuration();
config.setString("database-name", "db");
config.setString("table-name", "tbl.*");
databaseSync.setConfig(config);
String syncTableList = databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2"));
Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
}
}

0 comments on commit 081b968

Please sign in to comment.