Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] kafka_sync_database supports db whitelist and blacklist #4727

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both.</td>
</tr>
<tr>
<td><h5>--including_databases</h5></td>
<td>It is used to specify the databases within which the tables are to be synchronized. The usage is same as "--including_tables".</td>
</tr>
<tr>
<td><h5>--excluding_databases</h5></td>
<td>It is used to specify the databases within which the tables are not to be synchronized. The usage is same as "--excluding_tables". "--excluding_databases" has higher priority than "--including_databases" if you specified both.</td>
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
public static final String INCLUDING_DBS = "including_dbs";
public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;

protected String includingDbs = ".*";
@Nullable protected String excludingDbs;

protected List<FileStoreTable> tables = new ArrayList<>();
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();

Expand Down Expand Up @@ -144,6 +148,18 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}

public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
if (includingDbs != null) {
this.includingDbs = includingDbs;
}
return this;
}

public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
this.excludingDbs = excludingDbs;
return this;
}

public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
Expand Down Expand Up @@ -186,9 +202,11 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
Pattern tblIncludingPattern = Pattern.compile(includingTables);
Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
Pattern dbIncludingPattern = Pattern.compile(includingDbs);
Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
caseSensitive,
Expand All @@ -207,8 +225,10 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
tblIncludingPattern,
tblExcludingPattern,
dbIncludingPattern,
dbExcludingPattern,
tableNameConverter,
createdTables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
Expand Down Expand Up @@ -59,6 +61,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.includingTables(params.get(INCLUDING_DBS))
.excludingTables(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,44 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);

@Nullable private final NewTableSchemaBuilder schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
@Nullable private final Pattern tblIncludingPattern;
@Nullable private final Pattern tblExcludingPattern;
@Nullable private final Pattern dbIncludingPattern;
@Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;

private final Map<String, RichEventParser> parsers = new HashMap<>();

private final Set<String> includedDbs = new HashSet<>();
private final Set<String> excludedDbs = new HashSet<>();

private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();

private RichCdcMultiplexRecord record;
private String currentTable;
private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;

public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}

public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
@Nullable Pattern tblIncludingPattern,
@Nullable Pattern tblExcludingPattern,
@Nullable Pattern dbIncludingPattern,
@Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.tblIncludingPattern = tblIncludingPattern;
this.tblExcludingPattern = tblExcludingPattern;
this.dbIncludingPattern = dbIncludingPattern;
this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
Expand All @@ -81,6 +92,7 @@ public RichCdcMultiplexRecordEventParser(
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
Expand Down Expand Up @@ -124,7 +136,46 @@ public Optional<Schema> parseNewTable() {
return Optional.empty();
}

private boolean shouldSynchronizeCurrentDb() {
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentDb == null) {
return true;
}

if (includedDbs.contains(currentDb)) {
return true;
}
if (excludedDbs.contains(currentDb)) {
return false;
}

boolean shouldSynchronize = true;

if (dbIncludingPattern != null) {
shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches();
}
if (dbExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches();
}
if (!shouldSynchronize) {
LOG.debug(
"Source database {} won't be synchronized because it was excluded. ",
currentDb);
excludedDbs.add(currentDb);
return false;
}

includedDbs.add(currentDb);
return true;
}

private boolean shouldSynchronizeCurrentTable() {
if (!shouldSynchronizeCurrentDb()) {
return false;
}

// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentTable == null) {
Expand All @@ -139,12 +190,13 @@ private boolean shouldSynchronizeCurrentTable() {
}

boolean shouldSynchronize = true;
if (includingPattern != null) {
shouldSynchronize = includingPattern.matcher(currentTable).matches();

if (tblIncludingPattern != null) {
shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches();
}
if (excludingPattern != null) {
if (tblExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !excludingPattern.matcher(currentTable).matches();
shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.RowKind;

import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Tests for {@link SyncDatabaseActionBase}. */
public class SyncDatabaseActionBaseTest {
private static final String ANY_DB = "any_db";
private static final String WHITE_DB = "white_db";
private static final String BLACK_DB = "black_db";
private static final String WHITE_TBL = "white_tbl";
private static final String BLACK_TBL = "black_tbl";

private SyncDatabaseActionBase kafkaSyncDbAction;
private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
private RichCdcMultiplexRecord blackAnyDbCdcRecord;
private RichCdcMultiplexRecord whiteCdcRecord;
private RichCdcMultiplexRecord blackCdcRecord;
private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;

@TempDir private java.nio.file.Path tmp;

@BeforeEach
public void setUp() throws Exception {
LocalFileIO localFileIO = new LocalFileIO();
Path defaultDb = new Path(tmp.toString(), "default.db");
localFileIO.mkdirs(defaultDb);

kafkaSyncDbAction =
new KafkaSyncDatabaseAction(
tmp.toString(), "default", new HashMap<>(), new HashMap<>());

Map<String, String> rawData = new HashMap<>();
rawData.put("field", "value");

CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
whiteAnyDbCdcRecord =
new RichCdcMultiplexRecord(
ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
blackAnyDbCdcRecord =
new RichCdcMultiplexRecord(
ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
whiteCdcRecord =
new RichCdcMultiplexRecord(
WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
blackCdcRecord =
new RichCdcMultiplexRecord(
BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);

whiteDbBlackTblCdcRecord =
new RichCdcMultiplexRecord(
WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), cdcData);
blackDbWhiteTblCdcRecord =
new RichCdcMultiplexRecord(
BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), cdcData);
}

@Test
public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, IOException {

kafkaSyncDbAction.includingTables(WHITE_TBL);
kafkaSyncDbAction.excludingTables(BLACK_TBL);

RichCdcMultiplexRecordEventParser parser =
(RichCdcMultiplexRecordEventParser)
kafkaSyncDbAction.buildEventParserFactory().create();
List<CdcRecord> parsedRecords;

parser.setRawEvent(whiteAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(1, parsedRecords.size());

parser.setRawEvent(blackAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());
}

@Test
public void testSyncTablesWithDbList() {
kafkaSyncDbAction.includingDbs(WHITE_DB);
kafkaSyncDbAction.excludingDbs(BLACK_DB);
RichCdcMultiplexRecordEventParser parser =
(RichCdcMultiplexRecordEventParser)
kafkaSyncDbAction.buildEventParserFactory().create();
List<CdcRecord> parsedRecords;

parser.setRawEvent(whiteAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());

parser.setRawEvent(blackAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());

// white db and white table
parser.setRawEvent(whiteCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(1, parsedRecords.size());

parser.setRawEvent(blackAnyDbCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());
}

@Test
public void testSycTablesCrossDB() {
kafkaSyncDbAction.includingDbs(WHITE_DB);
kafkaSyncDbAction.excludingDbs(BLACK_DB);
kafkaSyncDbAction.excludingTables(BLACK_TBL);
RichCdcMultiplexRecordEventParser parser =
(RichCdcMultiplexRecordEventParser)
kafkaSyncDbAction.buildEventParserFactory().create();
List<CdcRecord> parsedRecords;

parser.setRawEvent(whiteDbBlackTblCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());

parser.setRawEvent(blackDbWhiteTblCdcRecord);
parsedRecords = parser.parseRecords();
Assert.assertEquals(0, parsedRecords.size());
}
}
Loading