Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] kafka_sync_database supports db whitelist and blacklist #4732

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,14 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
[--including_dbs <database-name|name-regular-expr>] \
[--excluding_dbs <database-name|name-regular-expr>] \
[--type_mapping to-string] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
Expand Down
26 changes: 17 additions & 9 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including_tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
Expand All @@ -65,6 +65,14 @@
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including_tables". "--excluding_tables" has higher priority than "--including_tables" if you specified both.</td>
</tr>
<tr>
<td><h5>--including_dbs</h5></td>
<td>It is used to specify the databases within which the tables are to be synchronized. The usage is same as "--including_tables".</td>
</tr>
<tr>
<td><h5>--excluding_dbs</h5></td>
<td>It is used to specify the databases within which the tables are not to be synchronized. The usage is same as "--excluding_tables". "--excluding_dbs" has higher priority than "--including_dbs" if you specified both.</td>
</tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Expand Down Expand Up @@ -114,4 +122,4 @@
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
public static final String INCLUDING_DBS = "including_dbs";
public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
protected String includingDbs = ".*";
@Nullable protected String excludingDbs;
protected List<FileStoreTable> tables = new ArrayList<>();
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();

Expand Down Expand Up @@ -144,6 +146,18 @@ public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables)
return this;
}

public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
if (includingDbs != null) {
this.includingDbs = includingDbs;
}
return this;
}

public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
this.excludingDbs = excludingDbs;
return this;
}

public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
Expand Down Expand Up @@ -186,9 +200,11 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
Pattern tblIncludingPattern = Pattern.compile(includingTables);
Pattern tblExcludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
Pattern dbIncludingPattern = Pattern.compile(includingDbs);
Pattern dbExcludingPattern = excludingDbs == null ? null : Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
caseSensitive,
Expand All @@ -207,8 +223,10 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
tblIncludingPattern,
tblExcludingPattern,
dbIncludingPattern,
dbExcludingPattern,
tableNameConverter,
createdTables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
Expand Down Expand Up @@ -59,6 +61,8 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
.includingDbs(params.get(INCLUDING_DBS))
.excludingDbs(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params, MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,43 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);

@Nullable private final NewTableSchemaBuilder schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
@Nullable private final Pattern tblIncludingPattern;
@Nullable private final Pattern tblExcludingPattern;
@Nullable private final Pattern dbIncludingPattern;
@Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;

private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();

private final Set<String> includedDbs = new HashSet<>();
private final Set<String> excludedDbs = new HashSet<>();

private RichCdcMultiplexRecord record;
private String currentTable;
private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;

public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
this(null, null, null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}

public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
@Nullable Pattern tblIncludingPattern,
@Nullable Pattern tblExcludingPattern,
@Nullable Pattern dbIncludingPattern,
@Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.tblIncludingPattern = tblIncludingPattern;
this.tblExcludingPattern = tblExcludingPattern;
this.dbIncludingPattern = dbIncludingPattern;
this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
Expand All @@ -81,6 +91,7 @@ public RichCdcMultiplexRecordEventParser(
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
Expand Down Expand Up @@ -124,7 +135,41 @@ public Optional<Schema> parseNewTable() {
return Optional.empty();
}

private boolean shouldSynchronizeCurrentDb() {
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentDb == null) {
return true;
}
if (includedDbs.contains(currentDb)) {
return true;
}
if (excludedDbs.contains(currentDb)) {
return false;
}
boolean shouldSynchronize = true;
if (dbIncludingPattern != null) {
shouldSynchronize = dbIncludingPattern.matcher(currentDb).matches();
}
if (dbExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !dbExcludingPattern.matcher(currentDb).matches();
}
if (!shouldSynchronize) {
LOG.debug(
"Source database {} won't be synchronized because it was excluded. ",
currentDb);
excludedDbs.add(currentDb);
return false;
}
includedDbs.add(currentDb);
return true;
}

private boolean shouldSynchronizeCurrentTable() {
if (!shouldSynchronizeCurrentDb()) {
return false;
}
// In case the record is incomplete, we let the null value pass validation
// and handle the null value when we really need it
if (currentTable == null) {
Expand All @@ -139,12 +184,12 @@ private boolean shouldSynchronizeCurrentTable() {
}

boolean shouldSynchronize = true;
if (includingPattern != null) {
shouldSynchronize = includingPattern.matcher(currentTable).matches();
if (tblIncludingPattern != null) {
shouldSynchronize = tblIncludingPattern.matcher(currentTable).matches();
}
if (excludingPattern != null) {
if (tblExcludingPattern != null) {
shouldSynchronize =
shouldSynchronize && !excludingPattern.matcher(currentTable).matches();
shouldSynchronize && !tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
Expand Down
Loading
Loading