Skip to content

Commit

Permalink
[flink] Improve the parallelism of Flink database compaction commit o…
Browse files Browse the repository at this point in the history
…perator (#3789)
  • Loading branch information
zhourui999 authored Jul 24, 2024
1 parent 094e925 commit 6f4aa51
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,15 @@ protected DataStreamSink<?> doCommit(
if (streamingCheckpointEnabled) {
assertStreamingConfiguration(env);
}

DataStream<MultiTableCommittable> partitioned =
FlinkStreamPartitioner.partition(
written,
new MultiTableCommittableChannelComputer(),
written.getParallelism());
SingleOutputStreamOperator<?> committed =
written.transform(
partitioned
.transform(
GLOBAL_COMMITTER_NAME,
new MultiTableCommittableTypeInfo(),
new CommitterOperator<>(
Expand All @@ -154,8 +161,7 @@ protected DataStreamSink<?> doCommit(
createCommitterFactory(),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(1)
.setMaxParallelism(1);
.setParallelism(written.getParallelism());
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.cdc;
package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.table.sink.ChannelComputer;

import java.util.Objects;
Expand Down

0 comments on commit 6f4aa51

Please sign in to comment.