diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index 31c21d0fc9..a1e5de2ef7 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -146,8 +146,8 @@ protected SingleOutputStreamOperator shunt( } protected DataStream shunt(SingleOutputStreamOperator processOperator, Table table, OutputTag tag) { - - return processOperator.getSideOutput(tag); + processOperator.forward(); + return processOperator.getSideOutput(tag).forward(); } @SuppressWarnings("rawtypes") diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java index 97d940c8be..fd3b88c98c 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java @@ -30,6 +30,7 @@ import org.dinky.utils.LogUtil; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; @@ -55,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable { protected ZoneId sinkTimeZone = ZoneId.of("UTC"); @@ -147,7 +149,7 @@ protected void addTableSinkForTags( buildColumn(columnNameList, columnTypeList, table.getColumns()); DataStream rowDataDataStream = buildRow( filterOperator, columnNameList, columnTypeList, schemaTableName) - .rebalance(); + .forward(); logger.info("Build {} flatMap successful...", schemaTableName); logger.info("Start build {} sink...", schemaTableName); @@ -166,6 +168,7 @@ protected SingleOutputStreamOperator createMapSingleOutputStreamOperator( SingleOutputStreamOperator mapOperator = dataStreamSource.map(x -> objectMapper.readValue(x, Map.class)).returns(Map.class); Map split = config.getSplit(); + partitionByTableAndPrimarykey(mapOperator, tableMap); return mapOperator.process(new ProcessFunction() { @Override public void processElement(Map map, ProcessFunction.Context ctx, Collector out) { @@ -238,6 +241,33 @@ public DataStreamSource build( return dataStreamSource; } + protected void partitionByTableAndPrimarykey( + SingleOutputStreamOperator mapOperator, Map tableMap) { + mapOperator.partitionCustom( + new Partitioner() { + @Override + public int partition(String key, int numPartitions) { + return Math.abs(key.hashCode()) % numPartitions; + } + }, + map -> { + LinkedHashMap source = (LinkedHashMap) map.get("source"); + String tableName = createTableName(source, config.getSchemaFieldName(), config.getSplit()); + Table table = tableMap.get(tableName); + List primaryKeys = table.getColumns().stream() + .map(column -> { + if (column.isKeyFlag()) { + return column.getName(); + } + return ""; + }) + .collect(Collectors.toList()); + + return tableName + String.join("_", primaryKeys); + }); + mapOperator.name("PartitionByPrimarykey"); + } + protected void executeCatalogStatement(CustomTableEnvironment customTableEnvironment) {} ; } diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CDCSource.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CDCSource.java index d8518cbca0..21535381a3 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CDCSource.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CDCSource.java @@ -20,6 +20,7 @@ package org.dinky.trans.ddl; import org.dinky.assertion.Asserts; +import org.dinky.data.model.FlinkCDCConfig; import org.dinky.parser.SingleSqlParserFactory; import java.util.ArrayList; @@ -222,6 +223,27 @@ private static Map getKeyValue(List list) { return map; } + public FlinkCDCConfig buildFlinkCDCConfig() { + return new FlinkCDCConfig( + connector, + hostname, + port, + username, + password, + checkpoint, + parallelism, + database, + schema, + table, + startupMode, + split, + debezium, + source, + sink, + sinks, + jdbc); + } + public String getConnector() { return connector; } diff --git a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java index 8829d926b4..d2c7602400 100644 --- a/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java +++ b/dinky-core/src/main/java/org/dinky/trans/ddl/CreateCDCSourceOperation.java @@ -75,24 +75,7 @@ public Operation create(String statement) { public TableResult execute(Executor executor) { logger.info("Start build CDCSOURCE Task..."); CDCSource cdcSource = CDCSource.build(statement); - FlinkCDCConfig config = new FlinkCDCConfig( - cdcSource.getConnector(), - cdcSource.getHostname(), - cdcSource.getPort(), - cdcSource.getUsername(), - cdcSource.getPassword(), - cdcSource.getCheckpoint(), - cdcSource.getParallelism(), - cdcSource.getDatabase(), - cdcSource.getSchema(), - cdcSource.getTable(), - cdcSource.getStartupMode(), - cdcSource.getSplit(), - cdcSource.getDebezium(), - cdcSource.getSource(), - cdcSource.getSink(), - cdcSource.getSinks(), - cdcSource.getJdbc()); + FlinkCDCConfig config = cdcSource.buildFlinkCDCConfig(); try { CDCBuilder cdcBuilder = CDCBuilderFactory.buildCDCBuilder(config); Map> allConfigMap = cdcBuilder.parseMetaDataConfigs();