diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 3664128a26ca..c75855aa3437 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -65,6 +65,14 @@
--type_mapping |
It is used to specify how to map MySQL data type to Paimon type.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 6482a625f4c7..c107500eba86 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
+ public static final String INCLUDING_DBS = "including_dbs";
+ public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 56334c1e7bff..99dc1fd98dc5 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -60,6 +60,10 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected List partitionKeys = new ArrayList<>();
protected List primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
+
+ protected String includingDbs = ".*";
+ @Nullable protected String excludingDbs;
+
protected List tables = new ArrayList<>();
protected Map> partitionKeyMultiple = new HashMap<>();
@@ -144,6 +148,18 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}
+ public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
+ if (includingDbs != null) {
+ this.includingDbs = includingDbs;
+ }
+ return this;
+ }
+
+ public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
+ this.excludingDbs = excludingDbs;
+ return this;
+ }
+
public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
@@ -186,9 +202,11 @@ protected EventParser.Factory buildEventParserFactory()
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
- Pattern includingPattern = Pattern.compile(includingTables);
- Pattern excludingPattern =
+ Pattern tblIncludingPattern = Pattern.compile(includingTables);
+ Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
+ Pattern dbIncludingPattern = Pattern.compile(includingDbs);
+ Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
caseSensitive,
@@ -207,8 +225,10 @@ protected EventParser.Factory buildEventParserFactory()
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
- includingPattern,
- excludingPattern,
+ tblIncludingPattern,
+ tblExcludingPattern,
+ dbIncludingPattern,
+ dbExcludingPattern,
tableNameConverter,
createdTables);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index d497b588c2af..84cc6011fa3a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -24,7 +24,9 @@
import java.util.Optional;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
@@ -59,6 +61,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
+ .includingTables(params.get(INCLUDING_DBS))
+ .excludingTables(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 939410bf46ce..31f6c521e9de 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -46,33 +46,44 @@ public class RichCdcMultiplexRecordEventParser implements EventParser createdTables;
private final Map parsers = new HashMap<>();
+
+ private final Set includedDbs = new HashSet<>();
+ private final Set excludedDbs = new HashSet<>();
+
private final Set includedTables = new HashSet<>();
private final Set excludedTables = new HashSet<>();
private RichCdcMultiplexRecord record;
private String currentTable;
+ private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;
public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
- this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
+ this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}
public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
- @Nullable Pattern includingPattern,
- @Nullable Pattern excludingPattern,
+ @Nullable Pattern tblIncludingPattern,
+ @Nullable Pattern tblExcludingPattern,
+ @Nullable Pattern dbIncludingPattern,
+ @Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set createdTables) {
this.schemaBuilder = schemaBuilder;
- this.includingPattern = includingPattern;
- this.excludingPattern = excludingPattern;
+ this.tblIncludingPattern = tblIncludingPattern;
+ this.tblExcludingPattern = tblExcludingPattern;
+ this.dbIncludingPattern = dbIncludingPattern;
+ this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
@@ -81,6 +92,7 @@ public RichCdcMultiplexRecordEventParser(
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
+ this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
@@ -124,7 +136,46 @@ public Optional parseNewTable() {
return Optional.empty();
}
+ private boolean shouldSynchronizeCurrentDb() {
+ // In case the record is incomplete, we let the null value pass validation
+ // and handle the null value when we really need it
+ if (currentDb == null) {
+ return true;
+ }
+
+ if (includedDbs.contains(currentDb)) {
+ return true;
+ }
+ if (excludedDbs.contains(currentDb)) {
+ return false;
+ }
+
+ boolean shouldSynchronize = true;
+
+ if (dbIncludingPattern != null) {
+ shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches();
+ }
+ if (dbExcludingPattern != null) {
+ shouldSynchronize =
+ shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches();
+ }
+ if (!shouldSynchronize) {
+ LOG.debug(
+ "Source database {} won't be synchronized because it was excluded. ",
+ currentDb);
+ excludedDbs.add(currentDb);
+ return false;
+ }
+
+ includedDbs.add(currentDb);
+ return true;
+ }
+
private boolean shouldSynchronizeCurrentTable() {
+ if (!shouldSynchronizeCurrentDb()) {
+ return false;
+ }
+
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentTable == null) {
@@ -139,12 +190,13 @@ private boolean shouldSynchronizeCurrentTable() {
}
boolean shouldSynchronize = true;
- if (includingPattern != null) {
- shouldSynchronize = includingPattern.matcher(currentTable).matches();
+
+ if (tblIncludingPattern != null) {
+ shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches();
}
- if (excludingPattern != null) {
+ if (tblExcludingPattern != null) {
shouldSynchronize =
- shouldSynchronize && !excludingPattern.matcher(currentTable).matches();
+ shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
new file mode 100644
index 000000000000..a5307fa1f96d
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for {@link SyncDatabaseActionBase}. */
+public class SyncDatabaseActionBaseTest {
+ private static final String ANY_DB = "any_db";
+ private static final String WHITE_DB = "white_db";
+ private static final String BLACK_DB = "black_db";
+ private static final String WHITE_TBL = "white_tbl";
+ private static final String BLACK_TBL = "black_tbl";
+
+ private SyncDatabaseActionBase kafkaSyncDbAction;
+ private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
+ private RichCdcMultiplexRecord blackAnyDbCdcRecord;
+ private RichCdcMultiplexRecord whiteCdcRecord;
+ private RichCdcMultiplexRecord blackCdcRecord;
+ private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
+ private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;
+
+ @TempDir private java.nio.file.Path tmp;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ LocalFileIO localFileIO = new LocalFileIO();
+ Path defaultDb = new Path(tmp.toString(), "default.db");
+ localFileIO.mkdirs(defaultDb);
+
+ kafkaSyncDbAction =
+ new KafkaSyncDatabaseAction(
+ tmp.toString(), "default", new HashMap<>(), new HashMap<>());
+
+ Map rawData = new HashMap<>();
+ rawData.put("field", "value");
+
+ CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
+ whiteAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ whiteCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+
+ whiteDbBlackTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackDbWhiteTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ }
+
+ @Test
+ public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOException {
+
+ kafkaSyncDbAction.includingTables(WHITE_TBL);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+ @Test
+ public void testSyncTablesWithDbList() {
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ // white db and white table
+ parser.setRawEvent(whiteCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+ @Test
+ public void testSycTablesCrossDB() {
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteDbBlackTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ parser.setRawEvent(blackDbWhiteTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+}
|