Skip to content

Commit

Permalink
[cdc] Database sync initializes the set of existed tables (apache#3621)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jul 4, 2024
1 parent 2c66208 commit 2acb511
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
Expand All @@ -36,8 +37,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
Expand Down Expand Up @@ -138,10 +141,19 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);

Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(e);
}
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern, tableNameConverter);
schemaBuilder,
includingPattern,
excludingPattern,
tableNameConverter,
createdTables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,32 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;

private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
private final Set<String> createdTables = new HashSet<>();

private RichCdcMultiplexRecord record;
private String currentTable;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;

public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
this(null, null, null, new TableNameConverter(caseSensitive));
this(null, null, null, new TableNameConverter(caseSensitive), new HashSet<>());
}

public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
TableNameConverter tableNameConverter) {
TableNameConverter tableNameConverter,
Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}

@Override
Expand Down

0 comments on commit 2acb511

Please sign in to comment.