--type_mapping |
It is used to specify how to map MySQL data type to Paimon type.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index c8af6f91c420..8026aedbd380 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
+ public static final String INCLUDING_DBS = "including_dbs";
+ public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fb1339c5193..72f995991731 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -59,6 +59,10 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected List partitionKeys = new ArrayList<>();
protected List primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
+
+ protected String includingDbs = ".*";
+ @Nullable protected String excludingDbs;
+
protected List tables = new ArrayList<>();
protected Map> partitionKeyMultiple = new HashMap<>();
@@ -143,6 +147,19 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}
+ public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
+ if (includingDbs != null) {
+ this.includingDbs = includingDbs;
+ }
+ return this;
+ }
+
+ public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
+ this.excludingDbs = excludingDbs;
+ return this;
+ }
+
+
public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
@@ -185,9 +202,12 @@ protected EventParser.Factory buildEventParserFactory()
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
- Pattern includingPattern = Pattern.compile(includingTables);
- Pattern excludingPattern =
+ Pattern tblIncludingPattern = Pattern.compile(includingTables);
+ Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
+ Pattern dbIncludingPattern = Pattern.compile(includingDbs);
+ Pattern dbExcludingPattern =
+ excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
allowUpperCase,
@@ -206,8 +226,10 @@ protected EventParser.Factory buildEventParserFactory()
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
- includingPattern,
- excludingPattern,
+ tblIncludingPattern,
+ tblExcludingPattern,
+ dbIncludingPattern,
+ dbExcludingPattern,
tableNameConverter,
createdTables);
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index d497b588c2af..ec369771331a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -23,7 +23,8 @@
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import java.util.Optional;
-
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
@@ -59,6 +60,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
+ .includingTables(params.get(INCLUDING_DBS))
+ .excludingTables(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 939410bf46ce..8277f01b4cd6 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -46,33 +46,44 @@ public class RichCdcMultiplexRecordEventParser implements EventParser createdTables;
private final Map parsers = new HashMap<>();
+
+ private final Set includedDbs = new HashSet<>();
+ private final Set excludedDbs = new HashSet<>();
+
private final Set includedTables = new HashSet<>();
private final Set excludedTables = new HashSet<>();
private RichCdcMultiplexRecord record;
private String currentTable;
+ private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;
public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
- this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
+ this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}
public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
- @Nullable Pattern includingPattern,
- @Nullable Pattern excludingPattern,
+ @Nullable Pattern tblIncludingPattern,
+ @Nullable Pattern tblExcludingPattern,
+ @Nullable Pattern dbIncludingPattern,
+ @Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set createdTables) {
this.schemaBuilder = schemaBuilder;
- this.includingPattern = includingPattern;
- this.excludingPattern = excludingPattern;
+ this.tblIncludingPattern = tblIncludingPattern;
+ this.tblExcludingPattern = tblExcludingPattern;
+ this.dbIncludingPattern = dbIncludingPattern;
+ this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
@@ -81,6 +92,7 @@ public RichCdcMultiplexRecordEventParser(
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
+ this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
@@ -124,7 +136,47 @@ public Optional parseNewTable() {
return Optional.empty();
}
+ private boolean shouldSynchronizeCurrentDb() {
+ // In case the record is incomplete, we let the null value pass validation
+ // and handle the null value when we really need it
+ if (currentDb == null || currentTable == null) {
+ return true;
+ }
+
+ if (includedDbs.contains(currentDb)) {
+ return true;
+ }
+ if (excludedDbs.contains(currentDb)) {
+ return false;
+ }
+
+ boolean shouldSynchronize = true;
+
+ if (dbIncludingPattern != null) {
+ shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches();
+ }
+ if (dbExcludingPattern != null) {
+ shouldSynchronize =
+ shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches();
+ }
+ if (!shouldSynchronize) {
+ LOG.debug(
+ "Source database {} won't be synchronized because it was excluded. ",
+ currentDb);
+ excludedDbs.add(currentDb);
+ return false;
+ }
+
+ includedDbs.add(currentDb);
+ return true;
+ }
+
+
private boolean shouldSynchronizeCurrentTable() {
+ if (!shouldSynchronizeCurrentDb()) {
+ return false;
+ }
+
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentTable == null) {
@@ -139,12 +191,13 @@ private boolean shouldSynchronizeCurrentTable() {
}
boolean shouldSynchronize = true;
- if (includingPattern != null) {
- shouldSynchronize = includingPattern.matcher(currentTable).matches();
+
+ if (tblIncludingPattern != null) {
+ shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches();
}
- if (excludingPattern != null) {
+ if (tblExcludingPattern != null) {
shouldSynchronize =
- shouldSynchronize && !excludingPattern.matcher(currentTable).matches();
+ shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
new file mode 100644
index 000000000000..5f6707efe161
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -0,0 +1,125 @@
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.RowKind;
+import org.junit.Assert;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SyncDatabaseActionBaseTest {
+ private static final String ANY_DB = "any_db";
+ private static final String WHITE_DB = "white_db";
+ private static final String BLACK_DB = "black_db";
+ private static final String WHITE_TBL = "white_tbl";
+ private static final String BLACK_TBL = "black_tbl";
+
+
+ private SyncDatabaseActionBase kafkaSyncDbAction;
+ private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
+ private RichCdcMultiplexRecord blackAnyDbCdcRecord;
+ private RichCdcMultiplexRecord whiteCdcRecord;
+ private RichCdcMultiplexRecord blackCdcRecord;
+ private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
+ private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;
+
+ @TempDir
+ private java.nio.file.Path tmp;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ LocalFileIO localFileIO = new LocalFileIO();
+ Path defaultDb = new Path(tmp.toString(), "default.db");
+ localFileIO.mkdirs(defaultDb);
+
+ kafkaSyncDbAction = new KafkaSyncDatabaseAction(tmp.toString(), "default", new HashMap<>(), new HashMap<>() );
+
+ Map rawData = new HashMap<>();
+ rawData.put("field", "value");
+
+ CdcRecord cdcData =new CdcRecord(RowKind.INSERT, rawData);
+ whiteAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ blackAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ whiteCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ blackCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+
+ whiteDbBlackTblCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ blackDbWhiteTblCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ }
+
+ @Test
+ public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOException {
+
+ kafkaSyncDbAction.includingTables(WHITE_TBL);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+
+ RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+ @Test
+ public void testSyncTablesWithDbList(){
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ // white db and white table
+ parser.setRawEvent(whiteCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ }
+
+ @Test
+ public void testSycTablesCrossDB(){
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+ RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ List parsedRecords;
+
+ parser.setRawEvent(whiteDbBlackTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ parser.setRawEvent(blackDbWhiteTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+
+}
From 4a047e68df97d10f83225498d3e40e32b32b3d35 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?=
Date: Tue, 17 Dec 2024 11:59:24 +0800
Subject: [PATCH 2/4] add license for new file
---
.../action/cdc/SyncDatabaseActionBaseTest.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
index 5f6707efe161..da1f1ee37ad0 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
From a65b8057882b81bccbc304d007124b042e61ef59 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?=
Date: Tue, 17 Dec 2024 12:14:07 +0800
Subject: [PATCH 3/4] reformat files
---
.../action/cdc/SyncDatabaseActionBase.java | 4 +-
.../cdc/SyncDatabaseActionFactoryBase.java | 3 +-
.../RichCdcMultiplexRecordEventParser.java | 1 -
.../cdc/SyncDatabaseActionBaseTest.java | 59 ++++++++++++-------
4 files changed, 40 insertions(+), 27 deletions(-)
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 72f995991731..b10650c7a3bc 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -159,7 +159,6 @@ public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
return this;
}
-
public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
@@ -206,8 +205,7 @@ protected EventParser.Factory buildEventParserFactory()
Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
Pattern dbIncludingPattern = Pattern.compile(includingDbs);
- Pattern dbExcludingPattern =
- excludingDbs == null ? null : Pattern.compile(excludingDbs);
+ Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
allowUpperCase,
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index ec369771331a..84cc6011fa3a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -23,9 +23,10 @@
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import java.util.Optional;
+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
-import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 8277f01b4cd6..1b8325df21e8 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -171,7 +171,6 @@ private boolean shouldSynchronizeCurrentDb() {
return true;
}
-
private boolean shouldSynchronizeCurrentTable() {
if (!shouldSynchronizeCurrentDb()) {
return false;
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
index da1f1ee37ad0..a5307fa1f96d 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -25,19 +25,19 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.RowKind;
-import org.junit.Assert;
+import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
-
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/** Tests for {@link SyncDatabaseActionBase}. */
public class SyncDatabaseActionBaseTest {
private static final String ANY_DB = "any_db";
private static final String WHITE_DB = "white_db";
@@ -45,7 +45,6 @@ public class SyncDatabaseActionBaseTest {
private static final String WHITE_TBL = "white_tbl";
private static final String BLACK_TBL = "black_tbl";
-
private SyncDatabaseActionBase kafkaSyncDbAction;
private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
private RichCdcMultiplexRecord blackAnyDbCdcRecord;
@@ -54,8 +53,7 @@ public class SyncDatabaseActionBaseTest {
private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;
- @TempDir
- private java.nio.file.Path tmp;
+ @TempDir private java.nio.file.Path tmp;
@BeforeEach
public void setUp() throws Exception {
@@ -63,19 +61,33 @@ public void setUp() throws Exception {
Path defaultDb = new Path(tmp.toString(), "default.db");
localFileIO.mkdirs(defaultDb);
- kafkaSyncDbAction = new KafkaSyncDatabaseAction(tmp.toString(), "default", new HashMap<>(), new HashMap<>() );
+ kafkaSyncDbAction =
+ new KafkaSyncDatabaseAction(
+ tmp.toString(), "default", new HashMap<>(), new HashMap<>());
Map rawData = new HashMap<>();
rawData.put("field", "value");
- CdcRecord cdcData =new CdcRecord(RowKind.INSERT, rawData);
- whiteAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
- blackAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),cdcData);
- whiteCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
- blackCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
-
- whiteDbBlackTblCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),cdcData);
- blackDbWhiteTblCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),cdcData);
+ CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
+ whiteAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ whiteCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+
+ whiteDbBlackTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
+ blackDbWhiteTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
}
@Test
@@ -84,7 +96,9 @@ public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOExcep
kafkaSyncDbAction.includingTables(WHITE_TBL);
kafkaSyncDbAction.excludingTables(BLACK_TBL);
- RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
List parsedRecords;
parser.setRawEvent(whiteAnyDbCdcRecord);
@@ -97,10 +111,12 @@ public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOExcep
}
@Test
- public void testSyncTablesWithDbList(){
+ public void testSyncTablesWithDbList() {
kafkaSyncDbAction.includingDbs(WHITE_DB);
kafkaSyncDbAction.excludingDbs(BLACK_DB);
- RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
List parsedRecords;
parser.setRawEvent(whiteAnyDbCdcRecord);
@@ -119,15 +135,16 @@ public void testSyncTablesWithDbList(){
parser.setRawEvent(blackAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());
-
}
@Test
- public void testSycTablesCrossDB(){
+ public void testSycTablesCrossDB() {
kafkaSyncDbAction.includingDbs(WHITE_DB);
kafkaSyncDbAction.excludingDbs(BLACK_DB);
kafkaSyncDbAction.excludingTables(BLACK_TBL);
- RichCdcMultiplexRecordEventParser parser = (RichCdcMultiplexRecordEventParser) kafkaSyncDbAction.buildEventParserFactory().create();
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
List parsedRecords;
parser.setRawEvent(whiteDbBlackTblCdcRecord);
@@ -138,6 +155,4 @@ public void testSycTablesCrossDB(){
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());
}
-
-
}
From a12d698c012efd089041beb9ef73cca865823883 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=8E=E9=B9=8F=E7=A8=8B?=
Date: Tue, 17 Dec 2024 13:39:36 +0800
Subject: [PATCH 4/4] remote redundant judement
---
.../flink/sink/cdc/RichCdcMultiplexRecordEventParser.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 1b8325df21e8..31f6c521e9de 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -139,7 +139,7 @@ public Optional parseNewTable() {
private boolean shouldSynchronizeCurrentDb() {
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
- if (currentDb == null || currentTable == null) {
+ if (currentDb == null) {
return true;
}
|