Skip to content

Commit

Permalink
[feature][flink] introduce RecordAttributes processing logic for writ…
Browse files Browse the repository at this point in the history
…e operators
  • Loading branch information
yunfengzhou-hub committed Sep 23, 2024
1 parent 3ac06dd commit fd345bf
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class RecordAttributes extends StreamElement {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class RecordAttributes extends StreamElement {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class RecordAttributes extends StreamElement {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class RecordAttributes extends StreamElement {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
}
7 changes: 7 additions & 0 deletions paimon-flink/paimon-flink-1.19/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
5 changes: 5 additions & 0 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ under the License.
</goals>
<configuration>
<relocations>
<!-- Avoid overriding the equivalent class introduced in flink and paimon-flink -->
<relocation>
<pattern>org.apache.flink.streaming.runtime.streamrecord.RecordAttributes</pattern>
<shadedPattern>org.apache.flink.shaded.org.apache.flink.streaming.runtime.streamrecord.RecordAttributes</shadedPattern>
</relocation>
<!-- Same as flink-sql-connector-kafka. -->
<relocation>
<pattern>org.apache.kafka.connect</pattern>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class RecordAttributes extends StreamElement {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.paimon.flink.sink.StoreSinkWrite;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;

/** Utils for {@link RecordAttributes}. */
public class ProcessRecordAttributesUtil {
public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}

public static void processWithOutput(RecordAttributes recordAttributes, Output output) {
output.emitRecordAttributes(recordAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,12 @@ public FlinkSinkBuilder forRow(DataStream<Row> input, DataType rowDataType) {
DataFormatConverters.RowConverter converter =
new DataFormatConverters.RowConverter(fieldDataTypes);
this.input =
input.map((MapFunction<Row, RowData>) converter::toInternal)
.setParallelism(input.getParallelism())
.returns(InternalTypeInfo.of(rowType));
input.transform(
"Map",
InternalTypeInfo.of(rowType),
new StreamMapWithForwardingRecordAttributes<>(
(MapFunction<Row, RowData>) converter::toInternal))
.setParallelism(input.getParallelism());
return this;
}

Expand Down Expand Up @@ -237,9 +240,12 @@ public DataStreamSink<?> build() {

protected DataStream<InternalRow> mapToInternalRow(
DataStream<RowData> input, org.apache.paimon.types.RowType rowType) {
return input.map((MapFunction<RowData, InternalRow>) FlinkRowWrapper::new)
.setParallelism(input.getParallelism())
.returns(org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType));
return input.transform(
"Map",
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType),
new StreamMapWithForwardingRecordAttributes<>(
(MapFunction<RowData, InternalRow>) FlinkRowWrapper::new))
.setParallelism(input.getParallelism());
}

protected DataStreamSink<?> buildDynamicBucketSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.index.SimpleHashBucketAssigner;
Expand All @@ -32,6 +33,7 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** Assign bucket for the input record, output record with bucket. */
Expand Down Expand Up @@ -100,6 +102,11 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes) {
ProcessRecordAttributesUtil.processWithOutput(recordAttributes, output);
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) {
assigner.prepareCommit(checkpointId);
Expand Down
Loading

0 comments on commit fd345bf

Please sign in to comment.