Skip to content

Commit

Permalink
[flink] kafka_sync_database supports different prefix and suffix for …
Browse files Browse the repository at this point in the history
…different db (#4704)
  • Loading branch information
JackeyLee007 authored Dec 15, 2024
1 parent 47a8871 commit d61f3d2
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 14 deletions.
2 changes: 2 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
Expand Down
12 changes: 10 additions & 2 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,21 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix".</td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class CdcActionCommonUtils {
public static final String PULSAR_CONF = "pulsar_conf";
public static final String TABLE_PREFIX = "table_prefix";
public static final String TABLE_SUFFIX = "table_suffix";
public static final String TABLE_PREFIX_DB = "table_prefix_db";
public static final String TABLE_SUFFIX_DB = "table_suffix_db";
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected String tablePrefix = "";
protected String tableSuffix = "";
protected Map<String, String> tableMapping = new HashMap<>();
protected Map<String, String> dbPrefix = new HashMap<>();
protected Map<String, String> dbSuffix = new HashMap<>();
protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
Expand Down Expand Up @@ -98,6 +100,30 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
return this;
}

public SyncDatabaseActionBase withDbPrefix(Map<String, String> dbPrefix) {
if (dbPrefix != null) {
this.dbPrefix =
dbPrefix.entrySet().stream()
.collect(
HashMap::new,
(m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()),
HashMap::putAll);
}
return this;
}

public SyncDatabaseActionBase withDbSuffix(Map<String, String> dbSuffix) {
if (dbSuffix != null) {
this.dbSuffix =
dbSuffix.entrySet().stream()
.collect(
HashMap::new,
(m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()),
HashMap::putAll);
}
return this;
}

public SyncDatabaseActionBase withTableMapping(Map<String, String> tableMapping) {
if (tableMapping != null) {
this.tableMapping = tableMapping;
Expand Down Expand Up @@ -164,7 +190,13 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(
allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping);
allowUpperCase,
mergeShards,
dbPrefix,
dbSuffix,
tablePrefix,
tableSuffix,
tableMapping);
Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX_DB;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Base {@link ActionFactory} for synchronizing into database. */
Expand All @@ -52,6 +54,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
protected void withParams(MultipleParameterToolAdapter params, T action) {
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.withDbPrefix(optionalConfigMap(params, TABLE_PREFIX_DB))
.withDbSuffix(optionalConfigMap(params, TABLE_SUFFIX_DB))
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class TableNameConverter implements Serializable {

private final boolean caseSensitive;
private final boolean mergeShards;
private final Map<String, String> dbPrefix;
private final Map<String, String> dbSuffix;
private final String prefix;
private final String suffix;
private final Map<String, String> tableMapping;
Expand All @@ -45,21 +47,54 @@ public TableNameConverter(
String prefix,
String suffix,
Map<String, String> tableMapping) {
this(
caseSensitive,
mergeShards,
new HashMap<>(),
new HashMap<>(),
prefix,
suffix,
tableMapping);
}

public TableNameConverter(
boolean caseSensitive,
boolean mergeShards,
Map<String, String> dbPrefix,
Map<String, String> dbSuffix,
String prefix,
String suffix,
Map<String, String> tableMapping) {
this.caseSensitive = caseSensitive;
this.mergeShards = mergeShards;
this.dbPrefix = dbPrefix;
this.dbSuffix = dbSuffix;
this.prefix = prefix;
this.suffix = suffix;
this.tableMapping = lowerMapKey(tableMapping);
}

public String convert(String originName) {
if (tableMapping.containsKey(originName.toLowerCase())) {
String mappedName = tableMapping.get(originName.toLowerCase());
public String convert(String originDbName, String originTblName) {
// top priority: table mapping
if (tableMapping.containsKey(originTblName.toLowerCase())) {
String mappedName = tableMapping.get(originTblName.toLowerCase());
return caseSensitive ? mappedName : mappedName.toLowerCase();
}

String tableName = caseSensitive ? originName : originName.toLowerCase();
return prefix + tableName + suffix;
String tblPrefix = prefix;
String tblSuffix = suffix;

// second priority: prefix and postfix specified by db
if (dbPrefix.containsKey(originDbName.toLowerCase())) {
tblPrefix = dbPrefix.get(originDbName.toLowerCase());
}
if (dbSuffix.containsKey(originDbName.toLowerCase())) {
tblSuffix = dbSuffix.get(originDbName.toLowerCase());
}

// third priority: normal prefix and suffix
String tableName = caseSensitive ? originTblName : originTblName.toLowerCase();
return tblPrefix + tableName + tblSuffix;
}

public String convert(Identifier originIdentifier) {
Expand All @@ -69,7 +104,7 @@ public String convert(Identifier originIdentifier) {
: originIdentifier.getDatabaseName()
+ "_"
+ originIdentifier.getObjectName();
return convert(rawName);
return convert(originIdentifier.getDatabaseName(), rawName);
}

private Map<String, String> lowerMapKey(Map<String, String> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ protected void beforeBuildingSourceSink() throws Exception {
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
database, tableNameConverter.convert(tableInfo.toPaimonTableName()));
database,
tableNameConverter.convert("", tableInfo.toPaimonTableName()));
FileStoreTable table;
Schema fromMySql =
CdcActionCommonUtils.buildPaimonSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,47 @@ public void testConvertTableName() {
tableMapping.put("mapped_src", "mapped_TGT");
TableNameConverter caseConverter =
new TableNameConverter(true, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");
Assert.assertEquals(caseConverter.convert("", "mapped_SRC"), "mapped_TGT");

Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");
Assert.assertEquals(caseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos");

TableNameConverter noCaseConverter =
new TableNameConverter(false, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt");
Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");
Assert.assertEquals(noCaseConverter.convert("", "mapped_src"), "mapped_tgt");
Assert.assertEquals(noCaseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos");
}

@Test
public void testConvertTableNameByDBPrefix_Suffix() {
Map<String, String> dbPrefix = new HashMap<>(2);
dbPrefix.put("db_with_prefix", "db_pref_");
dbPrefix.put("db_with_prefix_suffix", "db_pref_");

Map<String, String> dbSuffix = new HashMap<>(2);
dbSuffix.put("db_with_suffix", "_db_suff");
dbSuffix.put("db_with_prefix_suffix", "_db_suff");

TableNameConverter tblNameConverter =
new TableNameConverter(false, true, dbPrefix, dbSuffix, "pre_", "_suf", null);

// Tables in the specified db should have the specified prefix and suffix.

// db prefix + normal suffix
Assert.assertEquals(
"db_pref_table_name_suf", tblNameConverter.convert("db_with_prefix", "table_name"));

// normal prefix + db suffix
Assert.assertEquals(
"pre_table_name_db_suff", tblNameConverter.convert("db_with_suffix", "table_name"));

// db prefix + db suffix
Assert.assertEquals(
"db_pref_table_name_db_suff",
tblNameConverter.convert("db_with_prefix_suffix", "table_name"));

// only normal prefix and suffix
Assert.assertEquals(
"pre_table_name_suf",
tblNameConverter.convert("db_without_prefix_suffix", "table_name"));
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ under the License.
<exclude>release/**</exclude>
<!-- antlr grammar files -->
<exclude>paimon-common/src/main/antlr4/**</exclude>
<exclude>paimon-core/src/test/resources/compatibility/**</exclude>
</excludes>
</configuration>
</plugin>
Expand Down

0 comments on commit d61f3d2

Please sign in to comment.