diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 000000000000..723c71dc565d
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class RecordAttributes extends StreamElement {}
diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..efe5e12b12d7
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
+}
diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 000000000000..723c71dc565d
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class RecordAttributes extends StreamElement {}
diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..efe5e12b12d7
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
+}
diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 000000000000..723c71dc565d
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class RecordAttributes extends StreamElement {}
diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..efe5e12b12d7
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 000000000000..723c71dc565d
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class RecordAttributes extends StreamElement {}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..efe5e12b12d7
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
+}
diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml
index 666756f877d9..4f1b5bbfe418 100644
--- a/paimon-flink/paimon-flink-1.19/pom.xml
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -69,6 +69,13 @@ under the License.
provided
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml
index 82dd412bf1d2..f1c4edd23b8f 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -301,6 +301,11 @@ under the License.
+
+
+ org.apache.flink.streaming.runtime.streamrecord.RecordAttributes
+ org.apache.flink.shaded.org.apache.flink.streaming.runtime.streamrecord.RecordAttributes
+
org.apache.kafka.connect
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
new file mode 100644
index 000000000000..723c71dc565d
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/streaming/runtime/streamrecord/RecordAttributes.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class RecordAttributes extends StreamElement {}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..efe5e12b12d7
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Placeholder class for new feature introduced since flink 1.19. Should never be used. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {}
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
new file mode 100644
index 000000000000..fae52e3faff0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProcessRecordAttributesUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.StoreSinkWrite;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** Utils for {@link RecordAttributes}. */
+public class ProcessRecordAttributesUtil {
+ public static void processWithWrite(RecordAttributes recordAttributes, StoreSinkWrite write) {}
+
+ public static void processWithOutput(RecordAttributes recordAttributes, Output output) {
+ output.emitRecordAttributes(recordAttributes);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 546f82ec1f84..5bb2b6e835c5 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -101,9 +101,12 @@ public FlinkSinkBuilder forRow(DataStream input, DataType rowDataType) {
DataFormatConverters.RowConverter converter =
new DataFormatConverters.RowConverter(fieldDataTypes);
this.input =
- input.map((MapFunction) converter::toInternal)
- .setParallelism(input.getParallelism())
- .returns(InternalTypeInfo.of(rowType));
+ input.transform(
+ "Map",
+ InternalTypeInfo.of(rowType),
+ new StreamMapWithForwardingRecordAttributes<>(
+ (MapFunction) converter::toInternal))
+ .setParallelism(input.getParallelism());
return this;
}
@@ -237,9 +240,12 @@ public DataStreamSink> build() {
protected DataStream mapToInternalRow(
DataStream input, org.apache.paimon.types.RowType rowType) {
- return input.map((MapFunction) FlinkRowWrapper::new)
- .setParallelism(input.getParallelism())
- .returns(org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType));
+ return input.transform(
+ "Map",
+ org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType),
+ new StreamMapWithForwardingRecordAttributes<>(
+ (MapFunction) FlinkRowWrapper::new))
+ .setParallelism(input.getParallelism());
}
protected DataStreamSink> buildDynamicBucketSink(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 9713421fdd6a..70fac7a83e93 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.index.SimpleHashBucketAssigner;
@@ -32,6 +33,7 @@
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/** Assign bucket for the input record, output record with bucket. */
@@ -100,6 +102,11 @@ public void processElement(StreamRecord streamRecord) throws Exception {
output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
}
+ @Override
+ public void processRecordAttributes(RecordAttributes recordAttributes) {
+ ProcessRecordAttributesUtil.processWithOutput(recordAttributes, output);
+ }
+
@Override
public void prepareSnapshotPreBarrier(long checkpointId) {
assigner.prepareCommit(checkpointId);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StreamMapWithForwardingRecordAttributes.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StreamMapWithForwardingRecordAttributes.java
new file mode 100644
index 000000000000..03038a042d38
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StreamMapWithForwardingRecordAttributes.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.ProcessRecordAttributesUtil;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+
+/** A {@link StreamMap} that forwards received {@link RecordAttributes} to downstream operators. */
+public class StreamMapWithForwardingRecordAttributes extends StreamMap {
+ public StreamMapWithForwardingRecordAttributes(MapFunction mapper) {
+ super(mapper);
+ }
+
+ @Override
+ public void processRecordAttributes(RecordAttributes recordAttributes) {
+ ProcessRecordAttributesUtil.processWithOutput(recordAttributes, output);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index f38e0ad6bfb5..f0d23f31dd25 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.flink.ProcessRecordAttributesUtil;
import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -27,6 +28,7 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import java.io.IOException;
import java.util.List;
@@ -97,6 +99,12 @@ void initStateAndWriter(
table, commitUser, state, ioManager, memoryPool, getMetricGroup());
}
+ @Override
+ public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
+ ProcessRecordAttributesUtil.processWithWrite(recordAttributes, write);
+ super.processRecordAttributes(recordAttributes);
+ }
+
protected abstract boolean containLogSystem();
@Override