Skip to content

Commit

Permalink
kafka_sync_database supports db whitelist and blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
李鹏程 committed Dec 17, 2024
1 parent 0fb6076 commit 85f1deb
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 25 deletions.
6 changes: 4 additions & 2 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
[--including_databases <database-name|name-regular-expr>] \
[--excluding_databases <database-name|name-regular-expr>] \
[--type_mapping to-string] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
Expand Down
24 changes: 16 additions & 8 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including_tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
Expand All @@ -65,6 +65,14 @@
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both.</td>
</tr>
<tr>
<td><h5>--including_databases</h5></td>
<td>It is used to specify the databases within which the tables are to be synchronized. The usage is same as "--including_tables".</td>
</tr>
<tr>
<td><h5>--excluding_databases</h5></td>
<td>It is used to specify the databases within which the tables are not to be synchronized. The usage is same as "--excluding_tables". "--excluding_databases" has higher priority than "--including_databases" if you specified both.</td>
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
public static final String INCLUDING_DBS = "including_dbs";
public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
protected String includingDbs = ".*";
@Nullable protected String excludingDbs;
protected List<FileStoreTable> tables = new ArrayList<>();
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();

Expand Down Expand Up @@ -144,6 +146,18 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}

public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
if (includingDbs != null) {
this.includingDbs = includingDbs;
}
return this;
}

public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
this.excludingDbs = excludingDbs;
return this;
}

public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
Expand Down Expand Up @@ -186,9 +200,11 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
Pattern tblIncludingPattern = Pattern.compile(includingTables);
Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
Pattern dbIncludingPattern = Pattern.compile(includingDbs);
Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
caseSensitive,
Expand All @@ -207,8 +223,10 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
tblIncludingPattern,
tblExcludingPattern,
dbIncludingPattern,
dbExcludingPattern,
tableNameConverter,
createdTables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
Expand Down Expand Up @@ -59,6 +61,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.includingDbs(params.get(INCLUDING_DBS))
.excludingDbs(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,43 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);

@Nullable private final NewTableSchemaBuilder schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
@Nullable private final Pattern tblIncludingPattern;
@Nullable private final Pattern tblExcludingPattern;
@Nullable private final Pattern dbIncludingPattern;
@Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;

private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();

private final Set<String> includedDbs = new HashSet<>();
private final Set<String> excludedDbs = new HashSet<>();

private RichCdcMultiplexRecord record;
private String currentTable;
private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;

public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}

public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
@Nullable Pattern tblIncludingPattern,
@Nullable Pattern tblExcludingPattern,
@Nullable Pattern dbIncludingPattern,
@Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.tblIncludingPattern = tblIncludingPattern;
this.tblExcludingPattern = tblExcludingPattern;
this.dbIncludingPattern = dbIncludingPattern;
this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
Expand All @@ -81,6 +91,7 @@ public RichCdcMultiplexRecordEventParser(
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
Expand Down Expand Up @@ -124,7 +135,41 @@ public Optional<Schema> parseNewTable() {
return Optional.empty();
}

private boolean shouldSynchronizeCurrentDb() {
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentDb == null) {
return true;
}
if (includedDbs.contains(currentDb)) {
return true;
}
if (excludedDbs.contains(currentDb)) {
return false;
}
boolean shouldSynchronize = true;
if (dbIncludingPattern != null) {
shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches();
}
if (dbExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches();
}
if (!shouldSynchronize) {
LOG.debug(
"Source database {} won't be synchronized because it was excluded. ",
currentDb);
excludedDbs.add(currentDb);
return false;
}
includedDbs.add(currentDb);
return true;
}

private boolean shouldSynchronizeCurrentTable() {
if (!shouldSynchronizeCurrentDb()) {
return false;
}
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentTable == null) {
Expand All @@ -139,12 +184,12 @@ private boolean shouldSynchronizeCurrentTable() {
}

boolean shouldSynchronize = true;
if (includingPattern != null) {
shouldSynchronize = includingPattern.matcher(currentTable).matches();
if (tblIncludingPattern != null) {
shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches();
}
if (excludingPattern != null) {
if (tblExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !excludingPattern.matcher(currentTable).matches();
shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
Expand Down
Loading

0 comments on commit 85f1deb

Please sign in to comment.