From d61f3d2659572d44fc028e5e16f957504ee1a7f8 Mon Sep 17 00:00:00 2001 From: JackeyLee007 Date: Sun, 15 Dec 2024 13:44:47 +0800 Subject: [PATCH] [flink] kafka_sync_database supports different prefix and suffix for different db (#4704) --- docs/content/cdc-ingestion/kafka-cdc.md | 2 + .../generated/kafka_sync_database.html | 12 ++++- .../action/cdc/CdcActionCommonUtils.java | 2 + .../action/cdc/SyncDatabaseActionBase.java | 34 +++++++++++++- .../cdc/SyncDatabaseActionFactoryBase.java | 4 ++ .../flink/action/cdc/TableNameConverter.java | 47 ++++++++++++++++--- .../cdc/mysql/MySqlSyncDatabaseAction.java | 3 +- .../action/cdc/TableNameConverterTest.java | 42 +++++++++++++++-- pom.xml | 1 + 9 files changed, 133 insertions(+), 14 deletions(-) diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index b037937c554f..26a5be340942 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following shell command. --warehouse \ --database \ [--table_mapping =] \ + [--table_prefix_db ] \ [--table_prefix ] \ + [--table_suffix_db ] \ [--table_suffix ] \ [--including_tables ] \ [--excluding_tables ] \ diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 6c90f1d7f7d8..3664128a26ca 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -41,13 +41,21 @@
--table_mapping
The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix". + +
--table_prefix_db
+ The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix". +
--table_prefix
- The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_". + The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_". + + +
--table_suffix_db
+ The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".
--table_suffix
- The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix". + The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".
--including_tables
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 83891c90b8e1..c8af6f91c420 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -56,6 +56,8 @@ public class CdcActionCommonUtils { public static final String PULSAR_CONF = "pulsar_conf"; public static final String TABLE_PREFIX = "table_prefix"; public static final String TABLE_SUFFIX = "table_suffix"; + public static final String TABLE_PREFIX_DB = "table_prefix_db"; + public static final String TABLE_SUFFIX_DB = "table_suffix_db"; public static final String TABLE_MAPPING = "table_mapping"; public static final String INCLUDING_TABLES = "including_tables"; public static final String EXCLUDING_TABLES = "excluding_tables"; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index ac3483ac23bf..4fb1339c5193 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -53,6 +53,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { protected String tablePrefix = ""; protected String tableSuffix = ""; protected Map tableMapping = new HashMap<>(); + protected Map dbPrefix = new HashMap<>(); + protected Map dbSuffix = new HashMap<>(); protected String includingTables = ".*"; protected List partitionKeys = new ArrayList<>(); protected List primaryKeys = new ArrayList<>(); @@ -98,6 +100,30 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) { return this; } + public SyncDatabaseActionBase withDbPrefix(Map dbPrefix) { + if (dbPrefix != null) { + this.dbPrefix = + dbPrefix.entrySet().stream() + .collect( + HashMap::new, + (m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()), + HashMap::putAll); + } + return this; + } + + public SyncDatabaseActionBase withDbSuffix(Map dbSuffix) { + if (dbSuffix != null) { + this.dbSuffix = + dbSuffix.entrySet().stream() + .collect( + HashMap::new, + (m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()), + HashMap::putAll); + } + return this; + } + public SyncDatabaseActionBase withTableMapping(Map tableMapping) { if (tableMapping != null) { this.tableMapping = tableMapping; @@ -164,7 +190,13 @@ protected EventParser.Factory buildEventParserFactory() excludingTables == null ? null : Pattern.compile(excludingTables); TableNameConverter tableNameConverter = new TableNameConverter( - allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); + allowUpperCase, + mergeShards, + dbPrefix, + dbSuffix, + tablePrefix, + tableSuffix, + tableMapping); Set createdTables; try { createdTables = new HashSet<>(catalog.listTables(database)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index 2135f2a28112..d497b588c2af 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -31,7 +31,9 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX_DB; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Base {@link ActionFactory} for synchronizing into database. */ @@ -52,6 +54,8 @@ public Optional create(MultipleParameterToolAdapter params) { protected void withParams(MultipleParameterToolAdapter params, T action) { action.withTablePrefix(params.get(TABLE_PREFIX)) .withTableSuffix(params.get(TABLE_SUFFIX)) + .withDbPrefix(optionalConfigMap(params, TABLE_PREFIX_DB)) + .withDbSuffix(optionalConfigMap(params, TABLE_SUFFIX_DB)) .withTableMapping(optionalConfigMap(params, TABLE_MAPPING)) .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java index 4eca8b903ed1..15fc3507ce2d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java @@ -31,6 +31,8 @@ public class TableNameConverter implements Serializable { private final boolean caseSensitive; private final boolean mergeShards; + private final Map dbPrefix; + private final Map dbSuffix; private final String prefix; private final String suffix; private final Map tableMapping; @@ -45,21 +47,54 @@ public TableNameConverter( String prefix, String suffix, Map tableMapping) { + this( + caseSensitive, + mergeShards, + new HashMap<>(), + new HashMap<>(), + prefix, + suffix, + tableMapping); + } + + public TableNameConverter( + boolean caseSensitive, + boolean mergeShards, + Map dbPrefix, + Map dbSuffix, + String prefix, + String suffix, + Map tableMapping) { this.caseSensitive = caseSensitive; this.mergeShards = mergeShards; + this.dbPrefix = dbPrefix; + this.dbSuffix = dbSuffix; this.prefix = prefix; this.suffix = suffix; this.tableMapping = lowerMapKey(tableMapping); } - public String convert(String originName) { - if (tableMapping.containsKey(originName.toLowerCase())) { - String mappedName = tableMapping.get(originName.toLowerCase()); + public String convert(String originDbName, String originTblName) { + // top priority: table mapping + if (tableMapping.containsKey(originTblName.toLowerCase())) { + String mappedName = tableMapping.get(originTblName.toLowerCase()); return caseSensitive ? mappedName : mappedName.toLowerCase(); } - String tableName = caseSensitive ? originName : originName.toLowerCase(); - return prefix + tableName + suffix; + String tblPrefix = prefix; + String tblSuffix = suffix; + + // second priority: prefix and postfix specified by db + if (dbPrefix.containsKey(originDbName.toLowerCase())) { + tblPrefix = dbPrefix.get(originDbName.toLowerCase()); + } + if (dbSuffix.containsKey(originDbName.toLowerCase())) { + tblSuffix = dbSuffix.get(originDbName.toLowerCase()); + } + + // third priority: normal prefix and suffix + String tableName = caseSensitive ? originTblName : originTblName.toLowerCase(); + return tblPrefix + tableName + tblSuffix; } public String convert(Identifier originIdentifier) { @@ -69,7 +104,7 @@ public String convert(Identifier originIdentifier) { : originIdentifier.getDatabaseName() + "_" + originIdentifier.getObjectName(); - return convert(rawName); + return convert(originIdentifier.getDatabaseName(), rawName); } private Map lowerMapKey(Map map) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 235b3f9a3235..ce2e9124a664 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -143,7 +143,8 @@ protected void beforeBuildingSourceSink() throws Exception { for (JdbcTableInfo tableInfo : jdbcTableInfos) { Identifier identifier = Identifier.create( - database, tableNameConverter.convert(tableInfo.toPaimonTableName())); + database, + tableNameConverter.convert("", tableInfo.toPaimonTableName())); FileStoreTable table; Schema fromMySql = CdcActionCommonUtils.buildPaimonSchema( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java index dfbe32e3d398..89bbadfeb8c8 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java @@ -33,13 +33,47 @@ public void testConvertTableName() { tableMapping.put("mapped_src", "mapped_TGT"); TableNameConverter caseConverter = new TableNameConverter(true, true, "pre_", "_pos", tableMapping); - Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT"); + Assert.assertEquals(caseConverter.convert("", "mapped_SRC"), "mapped_TGT"); - Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + Assert.assertEquals(caseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos"); TableNameConverter noCaseConverter = new TableNameConverter(false, true, "pre_", "_pos", tableMapping); - Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt"); - Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + Assert.assertEquals(noCaseConverter.convert("", "mapped_src"), "mapped_tgt"); + Assert.assertEquals(noCaseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos"); + } + + @Test + public void testConvertTableNameByDBPrefix_Suffix() { + Map dbPrefix = new HashMap<>(2); + dbPrefix.put("db_with_prefix", "db_pref_"); + dbPrefix.put("db_with_prefix_suffix", "db_pref_"); + + Map dbSuffix = new HashMap<>(2); + dbSuffix.put("db_with_suffix", "_db_suff"); + dbSuffix.put("db_with_prefix_suffix", "_db_suff"); + + TableNameConverter tblNameConverter = + new TableNameConverter(false, true, dbPrefix, dbSuffix, "pre_", "_suf", null); + + // Tables in the specified db should have the specified prefix and suffix. + + // db prefix + normal suffix + Assert.assertEquals( + "db_pref_table_name_suf", tblNameConverter.convert("db_with_prefix", "table_name")); + + // normal prefix + db suffix + Assert.assertEquals( + "pre_table_name_db_suff", tblNameConverter.convert("db_with_suffix", "table_name")); + + // db prefix + db suffix + Assert.assertEquals( + "db_pref_table_name_db_suff", + tblNameConverter.convert("db_with_prefix_suffix", "table_name")); + + // only normal prefix and suffix + Assert.assertEquals( + "pre_table_name_suf", + tblNameConverter.convert("db_without_prefix_suffix", "table_name")); } } diff --git a/pom.xml b/pom.xml index 904b1c73c741..dbef98af06b2 100644 --- a/pom.xml +++ b/pom.xml @@ -529,6 +529,7 @@ under the License. release/** paimon-common/src/main/antlr4/** + paimon-core/src/test/resources/compatibility/**