Skip to content

Commit

Permalink
[flink] Optimize the partitioning for multi-table CDC between write t…
Browse files Browse the repository at this point in the history
…o commit (#2133)
  • Loading branch information
yuzelin authored Oct 17, 2023
1 parent 2dff7e9 commit 5d4dbbe
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Map;
import java.util.Objects;

/** {@link ChannelComputer} for {@link CdcRecord}. */
/** {@link ChannelComputer} for {@link CdcMultiplexRecord}. */
public class CdcMultiplexRecordChannelComputer implements ChannelComputer<CdcMultiplexRecord> {

private static final Logger LOG =
Expand Down Expand Up @@ -90,6 +90,6 @@ private ChannelComputer<CdcRecord> computeChannelComputer(CdcMultiplexRecord rec

@Override
public String toString() {
return "shuffle by table";
return "shuffle by bucket";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
Expand Down Expand Up @@ -116,15 +117,24 @@ public DataStreamSink<?> sinkFrom(
createWriteOperator(sinkProvider, commitUser, dynamicOptions))
.setParallelism(input.getParallelism());

// shuffle committables by table
DataStream<MultiTableCommittable> partitioned =
FlinkStreamPartitioner.partition(
written,
new MultiTableCommittableChannelComputer(),
input.getParallelism());

SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
true,
commitUser,
createCommitterFactory(),
createCommittableStateManager()));
partitioned
.transform(
GLOBAL_COMMITTER_NAME,
typeInfo,
new CommitterOperator<>(
true,
commitUser,
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(input.getParallelism());
configureGlobalCommitter(
committed,
commitCpuCores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand All @@ -33,7 +32,6 @@

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -153,21 +151,15 @@ private void buildCombinedCdcSink() {
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
.process(new MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));

FlinkStreamPartitioner<CdcMultiplexRecord> partitioner =
new FlinkStreamPartitioner<>(
new CdcMultiplexRecordChannelComputer(catalogLoader, dynamicOptions));
PartitionTransformation<CdcMultiplexRecord> partitioned =
new PartitionTransformation<>(
newlyAddedTableStream.getTransformation(), partitioner);

if (parallelism != null) {
partitioned.setParallelism(parallelism);
}
DataStream<CdcMultiplexRecord> partitioned =
partition(
newlyAddedTableStream,
new CdcMultiplexRecordChannelComputer(catalogLoader, dynamicOptions),
parallelism);

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(catalogLoader, committerCpu, committerMemory);
sink.sinkFrom(
new DataStream<>(input.getExecutionEnvironment(), partitioned), dynamicOptions);
sink.sinkFrom(partitioned, dynamicOptions);
}

private void buildForFixedBucket(FileStoreTable table, DataStream<CdcRecord> parsed) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.sink.ChannelComputer;
import org.apache.paimon.flink.sink.MultiTableCommittable;

import java.util.Objects;

/** {@link ChannelComputer} for {@link MultiTableCommittable}. */
public class MultiTableCommittableChannelComputer
implements ChannelComputer<MultiTableCommittable> {

private static final long serialVersionUID = 1L;

private transient int numChannels;

@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
}

@Override
public int channel(MultiTableCommittable multiTableCommittable) {
return Math.floorMod(
Objects.hash(multiTableCommittable.getDatabase(), multiTableCommittable.getTable()),
numChannels);
}

@Override
public String toString() {
return "shuffle by table";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;

import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link FlinkCdcMultiTableSink}. */
public class FlinkCdcMultiTableSinkTest {

@Test
public void testTransformationParallelism() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
int inputParallelism = ThreadLocalRandom.current().nextInt(8) + 1;
DataStreamSource<CdcMultiplexRecord> input =
env.addSource(
new ParallelSourceFunction<CdcMultiplexRecord>() {
@Override
public void run(SourceContext<CdcMultiplexRecord> ctx) {}

@Override
public void cancel() {}
})
.setParallelism(inputParallelism);

FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
() -> FlinkCatalogFactory.createPaimonCatalog(new Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null);
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input, Collections.emptyMap());

// check the transformation graph
LegacySinkTransformation<?> end =
(LegacySinkTransformation<?>) dataStreamSink.getTransformation();
assertThat(end.getName()).isEqualTo("end");

OneInputTransformation<?, ?> committer =
(OneInputTransformation<?, ?>) end.getInputs().get(0);
assertThat(committer.getName()).isEqualTo("Multiplex Global Committer");
assertThat(committer.getParallelism()).isEqualTo(inputParallelism);

PartitionTransformation<?> partitioner =
(PartitionTransformation<?>) committer.getInputs().get(0);
assertThat(partitioner.getParallelism()).isEqualTo(inputParallelism);

OneInputTransformation<?, ?> writer =
(OneInputTransformation<?, ?>) partitioner.getInputs().get(0);
assertThat(writer.getName()).isEqualTo("CDC MultiplexWriter");
assertThat(writer.getParallelism()).isEqualTo(inputParallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,11 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
}
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
committerOperator);
GLOBAL_COMMITTER_NAME + " : " + table.name(),
new CommittableTypeInfo(),
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
Options options = Options.fromMap(table.options());
configureGlobalCommitter(
committed,
Expand All @@ -223,7 +225,6 @@ public static void configureGlobalCommitter(
double cpuCores,
@Nullable MemorySize heapMemory,
ReadableConfig conf) {
committed.setParallelism(1).setMaxParallelism(1);
if (heapMemory == null) {
return;
}
Expand Down

0 comments on commit 5d4dbbe

Please sign in to comment.