diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index 26a5be340942..fc16c5b0fc1f 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -199,12 +199,14 @@ To use this feature through `flink run`, run the following shell command. --warehouse \ --database \ [--table_mapping =] \ - [--table_prefix_db ] \ [--table_prefix ] \ - [--table_suffix_db ] \ [--table_suffix ] \ + [--table_prefix_db ] \ + [--table_suffix_db ] \ [--including_tables ] \ [--excluding_tables ] \ + [--including_dbs ] \ + [--excluding_dbs ] \ [--type_mapping to-string] \ [--partition_keys ] \ [--primary_keys ] \ diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 3664128a26ca..9f0b817e6647 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -41,22 +41,22 @@
--table_mapping
The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix". - -
--table_prefix_db
- The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix". -
--table_prefix
The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_". - -
--table_suffix_db
- The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db". -
--table_suffix
The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix". + +
--table_prefix_db
+ The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix". + + +
--table_suffix_db
+ The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db". +
--including_tables
It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including_tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. @@ -65,6 +65,14 @@
--excluding_tables
It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both. + +
--including_dbs
+ It is used to specify the databases within which the tables are to be synchronized. The usage is same as "--including_tables". + + +
--excluding_dbs
+ It is used to specify the databases within which the tables are not to be synchronized. The usage is same as "--excluding_tables". "--excluding_dbs" has higher priority than "--including_dbs" if you specified both. +
--type_mapping
It is used to specify how to map MySQL data type to Paimon type.
@@ -114,4 +122,4 @@ The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. - \ No newline at end of file + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 6482a625f4c7..c107500eba86 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -61,6 +61,8 @@ public class CdcActionCommonUtils { public static final String TABLE_MAPPING = "table_mapping"; public static final String INCLUDING_TABLES = "including_tables"; public static final String EXCLUDING_TABLES = "excluding_tables"; + public static final String INCLUDING_DBS = "including_dbs"; + public static final String EXCLUDING_DBS = "excluding_dbs"; public static final String TYPE_MAPPING = "type_mapping"; public static final String PARTITION_KEYS = "partition_keys"; public static final String PRIMARY_KEYS = "primary_keys"; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 56334c1e7bff..63e29d6a0ed8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -60,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { protected List partitionKeys = new ArrayList<>(); protected List primaryKeys = new ArrayList<>(); @Nullable protected String excludingTables; + protected String includingDbs = ".*"; + @Nullable protected String excludingDbs; protected List tables = new ArrayList<>(); protected Map> partitionKeyMultiple = new HashMap<>(); @@ -144,6 +146,18 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables) return this; } + public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) { + if (includingDbs != null) { + this.includingDbs = includingDbs; + } + return this; + } + + public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) { + this.excludingDbs = excludingDbs; + return this; + } + public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) { this.partitionKeys.addAll(Arrays.asList(partitionKeys)); return this; @@ -186,9 +200,11 @@ protected EventParser.Factory buildEventParserFactory() requirePrimaryKeys(), partitionKeyMultiple, metadataConverters); - Pattern includingPattern = Pattern.compile(includingTables); - Pattern excludingPattern = + Pattern tblIncludingPattern = Pattern.compile(includingTables); + Pattern tblExcludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + Pattern dbIncludingPattern = Pattern.compile(includingDbs); + Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs); TableNameConverter tableNameConverter = new TableNameConverter( caseSensitive, @@ -207,8 +223,10 @@ protected EventParser.Factory buildEventParserFactory() return () -> new RichCdcMultiplexRecordEventParser( schemaBuilder, - includingPattern, - excludingPattern, + tblIncludingPattern, + tblExcludingPattern, + dbIncludingPattern, + dbExcludingPattern, tableNameConverter, createdTables); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index d497b588c2af..c82039a9a021 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -24,7 +24,9 @@ import java.util.Optional; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; @@ -59,6 +61,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) { .withTableMapping(optionalConfigMap(params, TABLE_MAPPING)) .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) + .includingDbs(params.get(INCLUDING_DBS)) + .excludingDbs(params.get(EXCLUDING_DBS)) .withPartitionKeyMultiple( optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS)) .withPartitionKeys(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java index 939410bf46ce..47367c423406 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java @@ -46,8 +46,10 @@ public class RichCdcMultiplexRecordEventParser implements EventParser createdTables; @@ -55,24 +57,32 @@ public class RichCdcMultiplexRecordEventParser implements EventParser includedTables = new HashSet<>(); private final Set excludedTables = new HashSet<>(); + private final Set includedDbs = new HashSet<>(); + private final Set excludedDbs = new HashSet<>(); + private RichCdcMultiplexRecord record; private String currentTable; + private String currentDb; private boolean shouldSynchronizeCurrentTable; private RichEventParser currentParser; public RichCdcMultiplexRecordEventParser(boolean caseSensitive) { - this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>()); + this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>()); } public RichCdcMultiplexRecordEventParser( @Nullable NewTableSchemaBuilder schemaBuilder, - @Nullable Pattern includingPattern, - @Nullable Pattern excludingPattern, + @Nullable Pattern tblIncludingPattern, + @Nullable Pattern tblExcludingPattern, + @Nullable Pattern dbIncludingPattern, + @Nullable Pattern dbExcludingPattern, TableNameConverter tableNameConverter, Set createdTables) { this.schemaBuilder = schemaBuilder; - this.includingPattern = includingPattern; - this.excludingPattern = excludingPattern; + this.tblIncludingPattern = tblIncludingPattern; + this.tblExcludingPattern = tblExcludingPattern; + this.dbIncludingPattern = dbIncludingPattern; + this.dbExcludingPattern = dbExcludingPattern; this.tableNameConverter = tableNameConverter; this.createdTables = createdTables; } @@ -81,6 +91,7 @@ public RichCdcMultiplexRecordEventParser( public void setRawEvent(RichCdcMultiplexRecord record) { this.record = record; this.currentTable = record.tableName(); + this.currentDb = record.databaseName(); this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(); if (shouldSynchronizeCurrentTable) { this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser()); @@ -124,7 +135,41 @@ public Optional parseNewTable() { return Optional.empty(); } + private boolean shouldSynchronizeCurrentDb() { + // In case the record is incomplete, we let the null value pass validation + // and handle the null value when we really need it + if (currentDb == null) { + return true; + } + if (includedDbs.contains(currentDb)) { + return true; + } + if (excludedDbs.contains(currentDb)) { + return false; + } + boolean shouldSynchronize = true; + if (dbIncludingPattern != null) { + shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches(); + } + if (dbExcludingPattern != null) { + shouldSynchronize = + shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches(); + } + if (!shouldSynchronize) { + LOG.debug( + "Source database {} won't be synchronized because it was excluded. ", + currentDb); + excludedDbs.add(currentDb); + return false; + } + includedDbs.add(currentDb); + return true; + } + private boolean shouldSynchronizeCurrentTable() { + if (!shouldSynchronizeCurrentDb()) { + return false; + } // In case the record is incomplete, we let the null value pass validation // and handle the null value when we really need it if (currentTable == null) { @@ -139,12 +184,12 @@ private boolean shouldSynchronizeCurrentTable() { } boolean shouldSynchronize = true; - if (includingPattern != null) { - shouldSynchronize = includingPattern.matcher(currentTable).matches(); + if (tblIncludingPattern != null) { + shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches(); } - if (excludingPattern != null) { + if (tblExcludingPattern != null) { shouldSynchronize = - shouldSynchronize && !excludingPattern.matcher(currentTable).matches(); + shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches(); } if (!shouldSynchronize) { LOG.debug( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java new file mode 100644 index 000000000000..5247225caff4 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction; +import org.apache.paimon.flink.sink.cdc.CdcRecord; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.types.RowKind; + +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Tests for {@link SyncDatabaseActionBase}. */ +public class SyncDatabaseActionBaseTest { + private static final String ANY_DB = "any_db"; + private static final String WHITE_DB = "white_db"; + private static final String BLACK_DB = "black_db"; + private static final String WHITE_TBL = "white_tbl"; + private static final String BLACK_TBL = "black_tbl"; + + private SyncDatabaseActionBase kafkaSyncDbAction; + private RichCdcMultiplexRecord whiteAnyDbCdcRecord; + private RichCdcMultiplexRecord blackAnyDbCdcRecord; + private RichCdcMultiplexRecord whiteCdcRecord; + private RichCdcMultiplexRecord blackCdcRecord; + private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord; + private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord; + + @TempDir private java.nio.file.Path tmp; + + @BeforeEach + public void setUp() throws Exception { + LocalFileIO localFileIO = new LocalFileIO(); + Path defaultDb = new Path(tmp.toString(), "default.db"); + localFileIO.mkdirs(defaultDb); + + kafkaSyncDbAction = + new KafkaSyncDatabaseAction( + tmp.toString(), "default", new HashMap<>(), new HashMap<>()); + + Map rawData = new HashMap<>(); + rawData.put("field", "value"); + + CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData); + whiteAnyDbCdcRecord = + new RichCdcMultiplexRecord( + ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData); + blackAnyDbCdcRecord = + new RichCdcMultiplexRecord( + ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData); + whiteCdcRecord = + new RichCdcMultiplexRecord( + WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData); + blackCdcRecord = + new RichCdcMultiplexRecord( + BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData); + + whiteDbBlackTblCdcRecord = + new RichCdcMultiplexRecord( + WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData); + blackDbWhiteTblCdcRecord = + new RichCdcMultiplexRecord( + BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData); + } + + @Test + public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOException { + + kafkaSyncDbAction.includingTables(WHITE_TBL); + kafkaSyncDbAction.excludingTables(BLACK_TBL); + + RichCdcMultiplexRecordEventParser parser = + (RichCdcMultiplexRecordEventParser) + kafkaSyncDbAction.buildEventParserFactory().create(); + List parsedRecords; + + parser.setRawEvent(whiteAnyDbCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(1, parsedRecords.size()); + + parser.setRawEvent(blackAnyDbCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + } + + @Test + public void testSyncTablesWithDbList() { + kafkaSyncDbAction.includingDbs(WHITE_DB); + kafkaSyncDbAction.excludingDbs(BLACK_DB); + RichCdcMultiplexRecordEventParser parser = + (RichCdcMultiplexRecordEventParser) + kafkaSyncDbAction.buildEventParserFactory().create(); + List parsedRecords; + + parser.setRawEvent(whiteAnyDbCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + + parser.setRawEvent(blackAnyDbCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + + // white db and white table + parser.setRawEvent(whiteCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(1, parsedRecords.size()); + + parser.setRawEvent(blackAnyDbCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + } + + @Test + public void testSycTablesCrossDB() { + kafkaSyncDbAction.includingDbs(WHITE_DB); + kafkaSyncDbAction.excludingDbs(BLACK_DB); + kafkaSyncDbAction.excludingTables(BLACK_TBL); + RichCdcMultiplexRecordEventParser parser = + (RichCdcMultiplexRecordEventParser) + kafkaSyncDbAction.buildEventParserFactory().create(); + List parsedRecords; + parser.setRawEvent(whiteDbBlackTblCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + parser.setRawEvent(blackDbWhiteTblCdcRecord); + parsedRecords = parser.parseRecords(); + Assert.assertEquals(0, parsedRecords.size()); + } +}