Skip to content

Commit

Permalink
[flink][cdc] Fix that setting both table affix and including pattern …
Browse files Browse the repository at this point in the history
…causes table cannot be synchronized in kafka cdc (apache#2047)
  • Loading branch information
yuzelin authored Sep 25, 2023
1 parent b9bbc15 commit aca3fd5
Show file tree
Hide file tree
Showing 62 changed files with 115 additions and 97 deletions.
8 changes: 8 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
Expand Down Expand Up @@ -132,9 +131,7 @@ public static KafkaSchema getKafkaSchema(
int retryInterval = 1000;

DataFormat format = getDataFormat(kafkaConfig);
RecordParser recordParser =
format.createParser(
true, new TableNameConverter(true), typeMapping, Collections.emptyList());
RecordParser recordParser = format.createParser(true, typeMapping, Collections.emptyList());

while (true) {
ConsumerRecords<String, String> consumerRecords =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,26 @@ public void build(StreamExecutionEnvironment env) throws Exception {
}

catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);

KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);

DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
format.createParser(
caseSensitive, tableNameConverter, typeMapping, Collections.emptyList());
format.createParser(caseSensitive, typeMapping, Collections.emptyList());
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
new RichCdcMultiplexRecordSchemaBuilder(tableConfig, caseSensitive);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern);
schemaBuilder,
includingPattern,
excludingPattern,
tableNameConverter);

new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
Expand Down Expand Up @@ -171,13 +170,9 @@ public void build(StreamExecutionEnvironment env) throws Exception {
}
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
format.createParser(
caseSensitive,
new TableNameConverter(caseSensitive),
typeMapping,
computedColumns);
format.createParser(caseSensitive, typeMapping, computedColumns);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
RichCdcMultiplexRecordEventParser::new;
() -> new RichCdcMultiplexRecordEventParser(caseSensitive);

CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
Expand Down Expand Up @@ -54,16 +53,12 @@ public enum DataFormat {
* configurations.
*
* @param caseSensitive Indicates whether the parser should be case-sensitive.
* @param tableNameConverter Converter to transform table names.
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link RecordParser}.
*/
public RecordParser createParser(
boolean caseSensitive,
TableNameConverter tableNameConverter,
TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
return parser.createParser(caseSensitive, typeMapping, tableNameConverter, computedColumns);
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
return parser.createParser(caseSensitive, typeMapping, computedColumns);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
Expand Down Expand Up @@ -64,7 +63,6 @@ public abstract class RecordParser implements FlatMapFunction<String, RichCdcMul
protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected final TableNameConverter tableNameConverter;
protected final boolean caseSensitive;
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
Expand All @@ -77,13 +75,9 @@ public abstract class RecordParser implements FlatMapFunction<String, RichCdcMul
protected String tableName;

public RecordParser(
boolean caseSensitive,
TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.caseSensitive = caseSensitive;
this.typeMapping = typeMapping;
this.tableNameConverter = tableNameConverter;
this.computedColumns = computedColumns;
}

Expand All @@ -93,7 +87,7 @@ public KafkaSchema getKafkaSchema(String record) {
return null;
}
databaseName = extractStringFromRootJson(FIELD_DATABASE);
tableName = tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
tableName = extractStringFromRootJson(FIELD_TABLE);
this.setPrimaryField();
this.setDataField();
this.validateFormat();
Expand Down Expand Up @@ -148,7 +142,7 @@ public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) throws
validateFormat();

databaseName = extractStringFromRootJson(FIELD_DATABASE);
tableName = tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
tableName = extractStringFromRootJson(FIELD_TABLE);

extractRecords().forEach(out::collect);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;

import java.util.List;
Expand All @@ -40,13 +39,9 @@ public interface RecordParserFactory {
*
* @param caseSensitive Indicates whether the parser should be case-sensitive.
* @param typeMapping Data type mapping options.
* @param tableNameConverter Converter to transform table names.
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link RecordParser}.
*/
RecordParser createParser(
boolean caseSensitive,
TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns);
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.canal;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
Expand Down Expand Up @@ -73,11 +72,8 @@ protected boolean isDDL() {
}

public CanalRecordParser(
boolean caseSensitive,
TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
Expand Down Expand Up @@ -54,11 +53,8 @@ public class MaxwellRecordParser extends RecordParser {
private static final String OP_DELETE = "delete";

public MaxwellRecordParser(
boolean caseSensitive,
TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
Expand Down Expand Up @@ -57,11 +56,8 @@ public class OggRecordParser extends RecordParser {
private static final String OP_DELETE = "D";

public OggRecordParser(
boolean caseSensitive,
TypeMapping typeMapping,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mongodb;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
import org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
Expand Down Expand Up @@ -61,24 +60,18 @@ public class MongoDBRecordParser implements FlatMapFunction<String, RichCdcMulti
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final List<ComputedColumn> computedColumns;
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final Configuration mongodbConfig;
private JsonNode root;

public MongoDBRecordParser(
boolean caseSensitive,
TableNameConverter tableNameConverter,
Configuration mongodbConfig) {
this(caseSensitive, tableNameConverter, Collections.emptyList(), mongodbConfig);
public MongoDBRecordParser(boolean caseSensitive, Configuration mongodbConfig) {
this(caseSensitive, Collections.emptyList(), mongodbConfig);
}

public MongoDBRecordParser(
boolean caseSensitive,
TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.caseSensitive = caseSensitive;
this.tableNameConverter = tableNameConverter;
this.computedColumns = computedColumns;
this.mongodbConfig = mongodbConfig;
}
Expand All @@ -87,7 +80,7 @@ public MongoDBRecordParser(
public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) throws Exception {
root = OBJECT_MAPPER.readValue(value, JsonNode.class);
String databaseName = extractString(FIELD_DATABASE);
String collection = tableNameConverter.convert(extractString(FIELD_TABLE));
String collection = extractString(FIELD_TABLE);
MongoVersionStrategy versionStrategy =
VersionStrategyFactory.create(
databaseName, collection, caseSensitive, computedColumns, mongodbConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@ public void build(StreamExecutionEnvironment env) throws Exception {
}

catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
List<Identifier> excludedTables = new ArrayList<>();

List<Identifier> excludedTables = new ArrayList<>();
MongoDBSource<String> source =
MongoDBActionUtils.buildMongodbSource(
mongodbConfig,
Expand All @@ -143,16 +141,19 @@ public void build(StreamExecutionEnvironment env) throws Exception {
Pattern includingPattern = Pattern.compile(this.includingTables);
Pattern excludingPattern =
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder, includingPattern, excludingPattern);
schemaBuilder,
includingPattern,
excludingPattern,
tableNameConverter);
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB Source")
.flatMap(
new MongoDBRecordParser(
caseSensitive, tableNameConverter, mongodbConfig)))
.flatMap(new MongoDBRecordParser(caseSensitive, mongodbConfig)))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
Expand Down Expand Up @@ -150,7 +149,7 @@ public void build(StreamExecutionEnvironment env) throws Exception {
}

EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
RichCdcMultiplexRecordEventParser::new;
() -> new RichCdcMultiplexRecordEventParser(caseSensitive);

CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
Expand All @@ -162,7 +161,6 @@ public void build(StreamExecutionEnvironment env) throws Exception {
.flatMap(
new MongoDBRecordParser(
caseSensitive,
new TableNameConverter(caseSensitive),
computedColumns,
mongodbConfig)))
.withParserFactory(parserFactory)
Expand Down
Loading

0 comments on commit aca3fd5

Please sign in to comment.