Skip to content

Commit

Permalink
[feature][flink] introduce RecordAttributesProcessor for write operators
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jul 22, 2024
1 parent f457d35 commit 0317ff7
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class DynamicBucketCompactSink extends RowDynamicBucketSink {

public DynamicBucketCompactSink(
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
super(table, overwritePartition);
super(table, overwritePartition, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import javax.annotation.Nullable;

/**
* A {@link PrepareCommitOperator} to write {@link InternalRow} with bucket. Record schema is fixed.
*/
public class DynamicBucketRowWriteOperator
extends TableWriteOperator<Tuple2<InternalRow, Integer>> {

private static final long serialVersionUID = 1L;
@Nullable private final RecordAttributesProcessor recordAttributesProcessor;

public DynamicBucketRowWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
String initialCommitUser,
@Nullable RecordAttributesProcessor processor) {
super(table, storeSinkWriteProvider, initialCommitUser);
recordAttributesProcessor = processor;
}

@Override
Expand All @@ -49,4 +55,13 @@ public void processElement(StreamRecord<Tuple2<InternalRow, Integer>> element)
throws Exception {
write.write(element.getValue().f0, element.getValue().f1);
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
super.processRecordAttributes(recordAttributes);
if (recordAttributesProcessor != null) {
recordAttributesProcessor.processRecordAttributes(
recordAttributes, new RecordAttributesProcessor.RecordAttributesContext(write));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,22 @@ public class FixedBucketSink extends FlinkWriteSink<InternalRow> {

@Nullable private final LogSinkFunction logSinkFunction;

@Nullable private final RecordAttributesProcessor recordAttributesProcessor;

public FixedBucketSink(
FileStoreTable table,
@Nullable Map<String, String> overwritePartition,
@Nullable LogSinkFunction logSinkFunction) {
@Nullable LogSinkFunction logSinkFunction,
@Nullable RecordAttributesProcessor recordAttributesProcessor) {
super(table, overwritePartition);
this.logSinkFunction = logSinkFunction;
this.recordAttributesProcessor = recordAttributesProcessor;
}

@Override
protected OneInputStreamOperator<InternalRow, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider, commitUser);
return new RowDataStoreWriteOperator(
table, logSinkFunction, recordAttributesProcessor, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class FlinkSinkBuilder {

protected boolean compactSink = false;
@Nullable protected LogSinkFunction logSinkFunction;
@Nullable protected RecordAttributesProcessor recordAttributesProcessor;

public FlinkSinkBuilder(Table table) {
if (!(table instanceof FileStoreTable)) {
Expand Down Expand Up @@ -219,6 +220,11 @@ public FlinkSinkBuilder clusteringIfPossible(
return this;
}

public FlinkSinkBuilder withRecordAttributesProcessor(RecordAttributesProcessor processor) {
this.recordAttributesProcessor = processor;
return this;
}

/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
input = trySortInput(input);
Expand Down Expand Up @@ -262,9 +268,11 @@ protected DataStreamSink<?> buildDynamicBucketSink(
// todo support global index sort compact
? new DynamicBucketCompactSink(table, overwritePartition).build(input, parallelism)
: globalIndex
? new GlobalDynamicBucketSink(table, overwritePartition)
? new GlobalDynamicBucketSink(
table, overwritePartition, recordAttributesProcessor)
.build(input, parallelism)
: new RowDynamicBucketSink(table, overwritePartition)
: new RowDynamicBucketSink(
table, overwritePartition, recordAttributesProcessor)
.build(input, parallelism);
}

Expand All @@ -274,7 +282,9 @@ protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
input,
new RowDataChannelComputer(table.schema(), logSinkFunction != null),
parallelism);
FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, logSinkFunction);
FixedBucketSink sink =
new FixedBucketSink(
table, overwritePartition, logSinkFunction, recordAttributesProcessor);
return sink.sinkFrom(partitioned);
}

Expand All @@ -286,7 +296,12 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
boundedInput = !FlinkSink.isStreaming(input);
}
return new RowUnawareBucketSink(
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
table,
overwritePartition,
logSinkFunction,
recordAttributesProcessor,
parallelism,
boundedInput)
.sinkFrom(input);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

import java.io.Serializable;

/** Processor interface for {@link RecordAttributes}. */
public interface RecordAttributesProcessor extends Serializable {
void processRecordAttributes(
RecordAttributes recordAttributes, RecordAttributesContext context);

/**
* Context class for {@link RecordAttributesProcessor}, providing information to be used when
* processing record attributes.
*/
class RecordAttributesContext {
private final StoreSinkWrite write;

protected RecordAttributesContext(StoreSinkWrite write) {
this.write = write;
}

public StoreSinkWrite getWrite() {
return write;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand All @@ -51,6 +52,7 @@ public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
private static final long serialVersionUID = 3L;

@Nullable private final LogSinkFunction logSinkFunction;
@Nullable private final RecordAttributesProcessor recordAttributesProcessor;
private transient SimpleContext sinkContext;
@Nullable private transient LogWriteCallback logCallback;

Expand All @@ -60,10 +62,12 @@ public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
public RowDataStoreWriteOperator(
FileStoreTable table,
@Nullable LogSinkFunction logSinkFunction,
@Nullable RecordAttributesProcessor recordAttributesProcessor,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.logSinkFunction = logSinkFunction;
this.recordAttributesProcessor = recordAttributesProcessor;
}

@Override
Expand Down Expand Up @@ -137,6 +141,15 @@ record = write.write(element.getValue());
}
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
super.processRecordAttributes(recordAttributes);
if (recordAttributesProcessor != null) {
recordAttributesProcessor.processRecordAttributes(
recordAttributes, new RecordAttributesProcessor.RecordAttributesContext(write));
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
public class RowDynamicBucketSink extends DynamicBucketSink<InternalRow> {

private static final long serialVersionUID = 1L;
@Nullable private final RecordAttributesProcessor recordAttributesProcessor;

public RowDynamicBucketSink(
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
FileStoreTable table,
@Nullable Map<String, String> overwritePartition,
@Nullable RecordAttributesProcessor recordAttributesProcessor) {
super(table, overwritePartition);
this.recordAttributesProcessor = recordAttributesProcessor;
}

@Override
Expand All @@ -62,6 +66,7 @@ protected ChannelComputer<Tuple2<InternalRow, Integer>> channelComputer2() {
@Override
protected OneInputStreamOperator<Tuple2<InternalRow, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator(table, writeProvider, commitUser);
return new DynamicBucketRowWriteOperator(
table, writeProvider, commitUser, recordAttributesProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,30 @@

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

import javax.annotation.Nullable;

import java.util.Map;

/** An {@link UnawareBucketSink} which handles {@link InternalRow}. */
public class RowUnawareBucketSink extends UnawareBucketSink<InternalRow> {

@Nullable private final RecordAttributesProcessor recordAttributesProcessor;

public RowUnawareBucketSink(
FileStoreTable table,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
@Nullable RecordAttributesProcessor recordAttributesProcessor,
Integer parallelism,
boolean boundedInput) {
super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput);
this.recordAttributesProcessor = recordAttributesProcessor;
}

@Override
protected OneInputStreamOperator<InternalRow, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new RowDataStoreWriteOperator(table, logSinkFunction, writeProvider, commitUser);
return new RowDataStoreWriteOperator(
table, logSinkFunction, recordAttributesProcessor, writeProvider, commitUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.RecordAttributesProcessor;
import org.apache.paimon.flink.sink.RowWithBucketChannelComputer;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
Expand Down Expand Up @@ -55,16 +56,21 @@
public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<InternalRow, Integer>> {

private static final long serialVersionUID = 1L;
@Nullable private final RecordAttributesProcessor recordAttributesProcessor;

public GlobalDynamicBucketSink(
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
FileStoreTable table,
@Nullable Map<String, String> overwritePartition,
@Nullable RecordAttributesProcessor processor) {
super(table, overwritePartition);
recordAttributesProcessor = processor;
}

@Override
protected OneInputStreamOperator<Tuple2<InternalRow, Integer>, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator(table, writeProvider, commitUser);
return new DynamicBucketRowWriteOperator(
table, writeProvider, commitUser, recordAttributesProcessor);
}

public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer parallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private boolean testSpillable(
DataStreamSource<InternalRow> source =
streamExecutionEnvironment.fromCollection(
Collections.singletonList(GenericRow.of(1, 1)));
FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable, null, null);
FlinkSink<InternalRow> flinkSink = new FixedBucketSink(fileStoreTable, null, null, null);
DataStream<Committable> written = flinkSink.doWrite(source, "123", 1);
RowDataStoreWriteOperator operator =
((RowDataStoreWriteOperator)
Expand Down
Loading

0 comments on commit 0317ff7

Please sign in to comment.