Skip to content

Commit

Permalink
[Feature-3071][CDCSOURCE] CDCSOURCE supports ensuring data order in m…
Browse files Browse the repository at this point in the history
…ultiple degrees of parallelism
  • Loading branch information
aiwenmo committed Jan 25, 2024
1 parent b797a1a commit 12f6c53
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ protected SingleOutputStreamOperator<Map> shunt(
}

protected DataStream<Map> shunt(SingleOutputStreamOperator<Map> processOperator, Table table, OutputTag<Map> tag) {

return processOperator.getSideOutput(tag);
processOperator.forward();
return processOperator.getSideOutput(tag).forward();
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.dinky.utils.LogUtil;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand All @@ -55,6 +56,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable {
protected ZoneId sinkTimeZone = ZoneId.of("UTC");
Expand Down Expand Up @@ -147,7 +149,7 @@ protected void addTableSinkForTags(
buildColumn(columnNameList, columnTypeList, table.getColumns());
DataStream<Row> rowDataDataStream = buildRow(
filterOperator, columnNameList, columnTypeList, schemaTableName)
.rebalance();
.forward();
logger.info("Build {} flatMap successful...", schemaTableName);
logger.info("Start build {} sink...", schemaTableName);

Expand All @@ -166,6 +168,7 @@ protected SingleOutputStreamOperator<Map> createMapSingleOutputStreamOperator(
SingleOutputStreamOperator<Map> mapOperator =
dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class);
Map<String, String> split = config.getSplit();
partitionByTableAndPrimarykey(mapOperator, tableMap);
return mapOperator.process(new ProcessFunction<Map, Map>() {
@Override
public void processElement(Map map, ProcessFunction<Map, Map>.Context ctx, Collector<Map> out) {
Expand Down Expand Up @@ -238,6 +241,33 @@ public DataStreamSource<String> build(
return dataStreamSource;
}

protected void partitionByTableAndPrimarykey(
SingleOutputStreamOperator<Map> mapOperator, Map<String, Table> tableMap) {
mapOperator.partitionCustom(
new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return Math.abs(key.hashCode()) % numPartitions;
}
},
map -> {
LinkedHashMap source = (LinkedHashMap) map.get("source");
String tableName = createTableName(source, config.getSchemaFieldName(), config.getSplit());
Table table = tableMap.get(tableName);
List<String> primaryKeys = table.getColumns().stream()
.map(column -> {
if (column.isKeyFlag()) {
return column.getName();
}
return "";
})
.collect(Collectors.toList());

return tableName + String.join("_", primaryKeys);
});
mapOperator.name("PartitionByPrimarykey");
}

protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {}
;
}
22 changes: 22 additions & 0 deletions dinky-core/src/main/java/org/dinky/trans/ddl/CDCSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.trans.ddl;

import org.dinky.assertion.Asserts;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.parser.SingleSqlParserFactory;

import java.util.ArrayList;
Expand Down Expand Up @@ -222,6 +223,27 @@ private static Map<String, String> getKeyValue(List<String> list) {
return map;
}

public FlinkCDCConfig buildFlinkCDCConfig() {
return new FlinkCDCConfig(
connector,
hostname,
port,
username,
password,
checkpoint,
parallelism,
database,
schema,
table,
startupMode,
split,
debezium,
source,
sink,
sinks,
jdbc);
}

public String getConnector() {
return connector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,7 @@ public Operation create(String statement) {
public TableResult execute(Executor executor) {
logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement);
FlinkCDCConfig config = new FlinkCDCConfig(
cdcSource.getConnector(),
cdcSource.getHostname(),
cdcSource.getPort(),
cdcSource.getUsername(),
cdcSource.getPassword(),
cdcSource.getCheckpoint(),
cdcSource.getParallelism(),
cdcSource.getDatabase(),
cdcSource.getSchema(),
cdcSource.getTable(),
cdcSource.getStartupMode(),
cdcSource.getSplit(),
cdcSource.getDebezium(),
cdcSource.getSource(),
cdcSource.getSink(),
cdcSource.getSinks(),
cdcSource.getJdbc());
FlinkCDCConfig config = cdcSource.buildFlinkCDCConfig();
try {
CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config);
Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
Expand Down

0 comments on commit 12f6c53

Please sign in to comment.