newWrite(
- String commitUser, ManifestCacheFilter manifestFilter) {
+ String commitUser,
+ ManifestCacheFilter manifestFilter,
+ @Nullable RecordAttributeManager recordAttributeManager) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
- store().newWrite(commitUser, manifestFilter),
+ store().newWrite(commitUser, manifestFilter, recordAttributeManager),
createRowKeyExtractor(),
(record, rowKind) ->
kv.replace(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java
new file mode 100644
index 0000000000000..bcdeca7396701
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+/**
+ * Manager of the attributes of the internal records during runtime.
+ *
+ * Different from {@link RecordAttributeManager}, this class manages the attributes need to be
+ * set or acquired at bucket's granularity.
+ */
+public class BucketRecordAttributeManager {
+ private boolean isInsertOnly;
+ private boolean areAllRecordsInsertOnlySinceLastFlush;
+
+ public BucketRecordAttributeManager() {
+ this.isInsertOnly = false;
+ this.areAllRecordsInsertOnlySinceLastFlush = false;
+ }
+
+ /**
+ * This method is called when the insert only status of the records changes.
+ *
+ * @param isInsertOnly If true, all the following records would be of {@link
+ * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
+ * key.
+ */
+ void onInsertOnlyChanged(boolean isInsertOnly) {
+ this.isInsertOnly = isInsertOnly;
+ if (!isInsertOnly) {
+ areAllRecordsInsertOnlySinceLastFlush = false;
+ }
+ }
+
+ /**
+ * This method is called when the internal records are flushed to disk. It denotes that the
+ * attributes of previously added records are no longer needed.
+ */
+ public void onFlush() {
+ areAllRecordsInsertOnlySinceLastFlush = isInsertOnly;
+ }
+
+ /** @return whether all records added since last flush are insert-only. */
+ public boolean areAllRecordsInsertOnlySinceLastFlush() {
+ return areAllRecordsInsertOnlySinceLastFlush;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java
new file mode 100644
index 0000000000000..266d68fe46489
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Manager of the attributes of the internal records during runtime. */
+public class RecordAttributeManager {
+ private final Map, BucketRecordAttributeManager> bucketManagerMap =
+ new HashMap<>();
+
+ private boolean isInsertOnly = false;
+
+ /**
+ * This method is called when the insert only status of the records changes.
+ *
+ * @param isInsertOnly If true, all the following records would be of {@link
+ * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
+ * key.
+ */
+ public void onInsertOnlyChanged(boolean isInsertOnly) {
+ this.isInsertOnly = isInsertOnly;
+ bucketManagerMap.values().forEach(x -> x.onInsertOnlyChanged(isInsertOnly));
+ }
+
+ /**
+ * @return A {@link BucketRecordAttributeManager} that manages the internal record attributes of
+ * a specific bucket.
+ */
+ public BucketRecordAttributeManager getBucketRecordAttributeManager(
+ BinaryRow partition, int bucket) {
+ return bucketManagerMap.computeIfAbsent(
+ Pair.of(partition, bucket),
+ pair -> {
+ BucketRecordAttributeManager manager = new BucketRecordAttributeManager();
+ manager.onInsertOnlyChanged(isInsertOnly);
+ return manager;
+ });
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 92f14b3ed41cf..54360eb99a698 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -58,6 +58,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BucketRecordAttributeManager;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -367,6 +368,35 @@ public void testWriteMany() throws Exception {
doTestWriteRead(3, 20_000);
}
+ @Test
+ public void testChangelog() throws Exception {
+ writer =
+ createMergeTreeWriter(
+ Collections.emptyList(),
+ createCompactManager(service, Collections.emptyList()),
+ ChangelogProducer.INPUT,
+ null);
+
+ doTestWriteReadWithChangelog(8, 200, false);
+ }
+
+ @Test
+ public void testChangelogFromCopyingData() throws Exception {
+ writer =
+ createMergeTreeWriter(
+ Collections.emptyList(),
+ createCompactManager(service, Collections.emptyList()),
+ ChangelogProducer.INPUT,
+ new BucketRecordAttributeManager() {
+ @Override
+ public boolean areAllRecordsInsertOnlySinceLastFlush() {
+ return true;
+ }
+ });
+
+ doTestWriteReadWithChangelog(8, 200, true);
+ }
+
private void doTestWriteRead(int batchNumber) throws Exception {
doTestWriteRead(batchNumber, 200);
}
@@ -413,12 +443,78 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception {
assertThat(files).isEqualTo(Collections.emptySet());
}
+ private void doTestWriteReadWithChangelog(
+ int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception {
+ List expected = new ArrayList<>();
+ List newFiles = new ArrayList<>();
+ List changelogFiles = new ArrayList<>();
+ Set newFileNames = new HashSet<>();
+ List compactedFiles = new ArrayList<>();
+
+ // write batch and commit
+ for (int i = 0; i <= batchNumber; i++) {
+ if (i < batchNumber) {
+ expected.addAll(writeBatch(perBatch));
+ } else {
+ writer.sync();
+ }
+
+ CommitIncrement increment = writer.prepareCommit(true);
+ newFiles.addAll(increment.newFilesIncrement().newFiles());
+ changelogFiles.addAll(increment.newFilesIncrement().changelogFiles());
+ mergeCompacted(newFileNames, compactedFiles, increment);
+ }
+
+ // assert records from writer
+ assertRecords(expected);
+
+ // assert records from increment new files
+ assertRecords(expected, newFiles, false);
+ assertRecords(expected, newFiles, true);
+
+ // assert records from changelog files
+ if (isChangelogEqualToData) {
+ assertRecords(expected, changelogFiles, false);
+ assertRecords(expected, changelogFiles, true);
+ } else {
+ List actual = new ArrayList<>();
+ for (DataFileMeta changelogFile : changelogFiles) {
+ actual.addAll(readAll(Collections.singletonList(changelogFile), false));
+ }
+ assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0]));
+ }
+
+ // assert records from increment compacted files
+ assertRecords(expected, compactedFiles, true);
+
+ writer.close();
+
+ Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent();
+ Set files =
+ Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
+ .map(FileStatus::getPath)
+ .map(Path::getName)
+ .collect(Collectors.toSet());
+ newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
+ changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
+ compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
+ assertThat(files).isEqualTo(Collections.emptySet());
+ }
+
private MergeTreeWriter createMergeTreeWriter(List files) {
return createMergeTreeWriter(files, createCompactManager(service, files));
}
private MergeTreeWriter createMergeTreeWriter(
List files, MergeTreeCompactManager compactManager) {
+ return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE, null);
+ }
+
+ private MergeTreeWriter createMergeTreeWriter(
+ List files,
+ MergeTreeCompactManager compactManager,
+ ChangelogProducer changelogProducer,
+ BucketRecordAttributeManager bucketRecordAttributeManager) {
long maxSequenceNumber =
files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
MergeTreeWriter writer =
@@ -434,9 +530,10 @@ private MergeTreeWriter createMergeTreeWriter(
DeduplicateMergeFunction.factory().create(),
writerFactory,
options.commitForceCompact(),
- ChangelogProducer.NONE,
+ changelogProducer,
null,
- null);
+ null,
+ bucketRecordAttributeManager);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java
new file mode 100644
index 0000000000000..dd58e695de3e3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RecordAttributeManager}. */
+public class RecordAttributeManagerTest {
+ @Test
+ public void test() {
+ BinaryRow partition = new BinaryRow(1);
+ {
+ BinaryRowWriter writer = new BinaryRowWriter(partition);
+ writer.writeInt(0, 0);
+ writer.complete();
+ }
+
+ RecordAttributeManager manager = new RecordAttributeManager();
+ BucketRecordAttributeManager bucketManager0 =
+ manager.getBucketRecordAttributeManager(partition, 0);
+ BucketRecordAttributeManager bucketManager0Reclaimed =
+ manager.getBucketRecordAttributeManager(partition, 0);
+ BucketRecordAttributeManager bucketManager1 =
+ manager.getBucketRecordAttributeManager(partition, 1);
+
+ assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+
+ manager.onInsertOnlyChanged(true);
+ assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+
+ bucketManager0.onFlush();
+ assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue();
+ assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue();
+ assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+
+ bucketManager1.onFlush();
+ assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue();
+ assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue();
+ assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isTrue();
+
+ manager.onInsertOnlyChanged(false);
+ bucketManager0.onFlush();
+ assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse();
+ }
+}
diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
index 4d86c12a6e524..2f09a9249f0a7 100644
--- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
+++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
@@ -102,7 +102,7 @@ public boolean rename(Path src, Path dst) throws IOException {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}
- private org.apache.hadoop.fs.Path path(Path path) {
+ protected org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 6ecc76da3f76b..cdd2f24a74e2b 100644
--- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -20,9 +20,11 @@
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.slf4j.Logger;
@@ -30,6 +32,8 @@
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -70,6 +74,28 @@ public class OSSFileIO extends HadoopCompliantFileIO {
private Options hadoopOptions;
+ @Override
+ public boolean copyFile(Path sourcePath, Path targetPath) throws IOException {
+ org.apache.hadoop.fs.Path srcPath = path(sourcePath);
+ org.apache.hadoop.fs.Path dstPath = path(targetPath);
+
+ try (FileSystem fs = createFileSystem(srcPath)) {
+ AliyunOSSFileSystem ossFs = (AliyunOSSFileSystem) fs;
+ FileStatus sourceStatus = ossFs.getFileStatus(srcPath);
+
+ Method method =
+ AliyunOSSFileSystem.class.getDeclaredMethod(
+ "copyFile",
+ org.apache.hadoop.fs.Path.class,
+ long.class,
+ org.apache.hadoop.fs.Path.class);
+ method.setAccessible(true);
+ return (boolean) method.invoke(ossFs, srcPath, sourceStatus.getLen(), dstPath);
+ } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public boolean isObjectStore() {
return true;
diff --git a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java
new file mode 100644
index 0000000000000..72746a97f6629
--- /dev/null
+++ b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OSSFileIO}. */
+public class OSSFileIOTest {
+ @Test
+ public void testCopy() throws Exception {
+ TestAliyunOssFileSystemStore store = new TestAliyunOssFileSystemStore();
+ TestAliyunOssFileSystem fileSystem = new TestAliyunOssFileSystem(store);
+ TestOSSFileIO fileIO = new TestOSSFileIO(fileSystem);
+ fileIO.copyFile(
+ new org.apache.paimon.fs.Path("sourceFoo"),
+ new org.apache.paimon.fs.Path("targetBar"));
+ assertThat(store.isCopyInvokedAndVerified).isTrue();
+ }
+
+ private static class TestOSSFileIO extends OSSFileIO {
+ private final TestAliyunOssFileSystem fileSystem;
+
+ private TestOSSFileIO(TestAliyunOssFileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ protected FileSystem createFileSystem(Path path) {
+ return fileSystem;
+ }
+ }
+
+ private static class TestAliyunOssFileSystem extends AliyunOSSFileSystem {
+ public TestAliyunOssFileSystem(AliyunOSSFileSystemStore store) throws Exception {
+ Field storeField = AliyunOSSFileSystem.class.getDeclaredField("store");
+ storeField.setAccessible(true);
+ storeField.set(this, store);
+
+ Field workingDirField = AliyunOSSFileSystem.class.getDeclaredField("workingDir");
+ workingDirField.setAccessible(true);
+ workingDirField.set(this, new Path("/"));
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) {
+ return new FileStatus();
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private static class TestAliyunOssFileSystemStore extends AliyunOSSFileSystemStore {
+ private boolean isCopyInvokedAndVerified = false;
+
+ @Override
+ public boolean copyFile(String srcKey, long srcLen, String dstKey) {
+ assertThat(srcKey).isEqualTo("sourceFoo");
+ assertThat(srcLen).isZero();
+ assertThat(dstKey).isEqualTo("targetBar");
+ isCopyInvokedAndVerified = true;
+ return true;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 9af7eabdaaadb..581156930b9c2 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -256,7 +256,13 @@ private OneInputStreamOperatorTestHarness createTestHarn
CdcRecordStoreWriteOperator operator =
new CdcRecordStoreWriteOperator(
table,
- (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ (t,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new StoreSinkWriteImpl(
t,
commitUser,
@@ -266,7 +272,8 @@ private OneInputStreamOperatorTestHarness createTestHarn
false,
true,
memoryPool,
- metricGroup),
+ metricGroup,
+ recordAttributeManager),
commitUser);
TypeSerializer inputSerializer = new JavaSerializer<>();
TypeSerializer outputSerializer =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
index b4cf7aa78de22..023ab49831454 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -61,7 +61,8 @@ public AsyncLookupSinkWrite(
waitCompaction,
isStreaming,
memoryPool,
- metricGroup);
+ metricGroup,
+ null);
this.tableName = table.name();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
index 147e7527ff897..f9dbd19741cec 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -50,7 +50,7 @@ public DataStreamSink> build(DataStream input, @Nullable Integer
// bucket-assigner
HashBucketAssignerOperator assignerOperator =
- new HashBucketAssignerOperator<>(
+ createHashBucketAssignerOperator(
initialCommitUser, table, null, extractorFunction(), true);
TupleTypeInfo> rowWithBucketType =
new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index f04043ce41bc5..cf697108fd326 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -20,6 +20,7 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;
@@ -54,6 +55,16 @@ public DynamicBucketSink(
protected abstract SerializableFunction>
extractorFunction();
+ protected HashBucketAssignerOperator createHashBucketAssignerOperator(
+ String commitUser,
+ Table table,
+ Integer numAssigners,
+ SerializableFunction> extractorFunction,
+ boolean overwrite) {
+ return new HashBucketAssignerOperator<>(
+ commitUser, table, numAssigners, extractorFunction, overwrite);
+ }
+
public DataStreamSink> build(DataStream input, @Nullable Integer parallelism) {
String initialCommitUser = createCommitUser(table.coreOptions().toConfiguration());
@@ -73,7 +84,7 @@ public DataStreamSink> build(DataStream input, @Nullable Integer parallelis
// 2. bucket-assigner
HashBucketAssignerOperator assignerOperator =
- new HashBucketAssignerOperator<>(
+ createHashBucketAssignerOperator(
initialCommitUser, table, numAssigners, extractorFunction(), false);
TupleTypeInfo> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index f369ec31c3d58..f5d207893ba8c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -117,7 +117,13 @@ private StoreSinkWrite.Provider createWriteProvider(
if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) -> {
assertNoSinkMaterializer.run();
return new GlobalFullCompactionSinkWrite(
table,
@@ -136,7 +142,13 @@ private StoreSinkWrite.Provider createWriteProvider(
if (changelogProducer == ChangelogProducer.LOOKUP
&& !coreOptions.prepareCommitWaitCompaction()) {
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) -> {
assertNoSinkMaterializer.run();
return new AsyncLookupSinkWrite(
table,
@@ -151,7 +163,13 @@ private StoreSinkWrite.Provider createWriteProvider(
};
}
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
table,
@@ -162,7 +180,8 @@ private StoreSinkWrite.Provider createWriteProvider(
waitCompaction,
isStreaming,
memoryPool,
- metricGroup);
+ metricGroup,
+ recordAttributeManager);
};
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 8baee5ac1b91d..dd73bd5590b60 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
@@ -70,11 +71,11 @@ public class FlinkSinkBuilder {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class);
- private final FileStoreTable table;
+ protected final FileStoreTable table;
private DataStream input;
- @Nullable private Map overwritePartition;
- @Nullable private Integer parallelism;
+ @Nullable protected Map overwritePartition;
+ @Nullable protected Integer parallelism;
private Boolean boundedInput = null;
@Nullable private TableSortInfo tableSortInfo;
@@ -221,7 +222,7 @@ public FlinkSinkBuilder clusteringIfPossible(
/** Build {@link DataStreamSink}. */
public DataStreamSink> build() {
input = trySortInput(input);
- DataStream input = MapToInternalRow.map(this.input, table.rowType());
+ DataStream input = mapToInternalRow(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
input =
input.forward()
@@ -247,7 +248,14 @@ public DataStreamSink> build() {
}
}
- private DataStreamSink> buildDynamicBucketSink(
+ protected DataStream mapToInternalRow(
+ DataStream input, org.apache.paimon.types.RowType rowType) {
+ return input.map((MapFunction) FlinkRowWrapper::new)
+ .setParallelism(input.getParallelism())
+ .returns(org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType));
+ }
+
+ protected DataStreamSink> buildDynamicBucketSink(
DataStream input, boolean globalIndex) {
checkArgument(logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
return compactSink && !globalIndex
@@ -260,7 +268,7 @@ private DataStreamSink> buildDynamicBucketSink(
.build(input, parallelism);
}
- private DataStreamSink> buildForFixedBucket(DataStream input) {
+ protected DataStreamSink> buildForFixedBucket(DataStream input) {
DataStream partitioned =
partition(
input,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 4202717c8646f..ebe1c4c51cd99 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -129,7 +129,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
return new PaimonDataStreamSinkProvider(
(dataStream) -> {
- LogFlinkSinkBuilder builder = new LogFlinkSinkBuilder(table);
+ LogFlinkSinkBuilder builder = createSinkBuilder();
builder.logSinkFunction(logSinkFunction)
.forRowData(
new DataStream<>(
@@ -149,6 +149,10 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
});
}
+ protected LogFlinkSinkBuilder createSinkBuilder() {
+ return new LogFlinkSinkBuilder(table);
+ }
+
@Override
public DynamicTableSink copy() {
FlinkTableSink copied =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 62341a180dab9..e6958cd5d5414 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -85,7 +85,8 @@ public GlobalFullCompactionSinkWrite(
waitCompaction,
isStreaming,
memoryPool,
- metricGroup);
+ metricGroup,
+ null);
this.deltaCommits = deltaCommits;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
deleted file mode 100644
index a9bf744010992..0000000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.data.RowData;
-
-/** An util to convert {@link RowData} stream to {@link InternalRow} stream. */
-public class MapToInternalRow {
-
- public static DataStream map(DataStream input, RowType rowType) {
- return input.map((MapFunction) FlinkRowWrapper::new)
- .setParallelism(input.getParallelism())
- .returns(InternalTypeInfo.fromRowType(rowType));
- }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 52e494b5a9dbf..eda787f17a582 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -156,7 +156,8 @@ public void processElement(StreamRecord element) throws Exception {
state,
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
- getMetricGroup()));
+ getMetricGroup(),
+ null));
if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
@@ -256,7 +257,13 @@ private StoreSinkWrite.Provider createWriteProvider(
if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION
|| deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new GlobalFullCompactionSinkWrite(
table,
commitUser,
@@ -273,7 +280,13 @@ private StoreSinkWrite.Provider createWriteProvider(
if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
&& !coreOptions.prepareCommitWaitCompaction()) {
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new AsyncLookupSinkWrite(
table,
commitUser,
@@ -286,7 +299,13 @@ private StoreSinkWrite.Provider createWriteProvider(
metricGroup);
}
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ return (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new StoreSinkWriteImpl(
table,
commitUser,
@@ -296,6 +315,7 @@ private StoreSinkWrite.Provider createWriteProvider(
waitCompaction,
isStreaming,
memoryPool,
- metricGroup);
+ metricGroup,
+ recordAttributeManager);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 1842884907728..dac0a29ca1491 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -109,7 +109,7 @@ void initStateAndWriter(
write =
storeSinkWriteProvider.provide(
- table, commitUser, state, ioManager, memoryPool, getMetricGroup());
+ table, commitUser, state, ioManager, memoryPool, getMetricGroup(), null);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 6001721b71f39..703f4cf0bfb40 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -26,6 +26,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -81,7 +82,8 @@ StoreSinkWrite provide(
StoreSinkWriteState state,
IOManager ioManager,
@Nullable MemorySegmentPool memoryPool,
- @Nullable MetricGroup metricGroup);
+ @Nullable MetricGroup metricGroup,
+ @Nullable RecordAttributeManager recordAttributeManager);
}
/** Provider of {@link StoreSinkWrite} that uses given write buffer. */
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 3ecc80bb6f13c..2cd70cb32eca7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -32,6 +32,7 @@
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -64,6 +65,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected TableWriteImpl> write;
@Nullable private final MetricGroup metricGroup;
+ @Nullable private final RecordAttributeManager recordAttributeManager;
public StoreSinkWriteImpl(
FileStoreTable table,
@@ -74,7 +76,8 @@ public StoreSinkWriteImpl(
boolean waitCompaction,
boolean isStreamingMode,
@Nullable MemorySegmentPool memoryPool,
- @Nullable MetricGroup metricGroup) {
+ @Nullable MetricGroup metricGroup,
+ @Nullable RecordAttributeManager recordAttributeManager) {
this(
table,
commitUser,
@@ -85,7 +88,8 @@ public StoreSinkWriteImpl(
isStreamingMode,
memoryPool,
null,
- metricGroup);
+ metricGroup,
+ recordAttributeManager);
}
public StoreSinkWriteImpl(
@@ -108,7 +112,8 @@ public StoreSinkWriteImpl(
isStreamingMode,
null,
memoryPoolFactory,
- metricGroup);
+ metricGroup,
+ null);
}
private StoreSinkWriteImpl(
@@ -121,7 +126,8 @@ private StoreSinkWriteImpl(
boolean isStreamingMode,
@Nullable MemorySegmentPool memoryPool,
@Nullable MemoryPoolFactory memoryPoolFactory,
- @Nullable MetricGroup metricGroup) {
+ @Nullable MetricGroup metricGroup,
+ @Nullable RecordAttributeManager recordAttributeManager) {
this.commitUser = commitUser;
this.state = state;
this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
@@ -131,6 +137,7 @@ private StoreSinkWriteImpl(
this.memoryPool = memoryPool;
this.memoryPoolFactory = memoryPoolFactory;
this.metricGroup = metricGroup;
+ this.recordAttributeManager = recordAttributeManager;
this.write = newTableWrite(table);
}
@@ -143,7 +150,8 @@ private TableWriteImpl> newTableWrite(FileStoreTable table) {
table.newWrite(
commitUser,
(part, bucket) ->
- state.stateValueFilter().filter(table.name(), part, bucket))
+ state.stateValueFilter().filter(table.name(), part, bucket),
+ recordAttributeManager)
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.withExecutionMode(isStreamingMode)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index f38e0ad6bfb53..cf2f9e15c564f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -23,6 +23,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.utils.RecordAttributeManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -40,6 +41,7 @@ public abstract class TableWriteOperator extends PrepareCommitOperator createTestHarnes
protected StoreCompactOperator createCompactOperator(FileStoreTable table) {
return new StoreCompactOperator(
table,
- (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ (t,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new StoreSinkWriteImpl(
t,
commitUser,
@@ -254,7 +260,8 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) {
false,
false,
memoryPool,
- metricGroup),
+ metricGroup,
+ recordAttributeManager),
"test");
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 9cd2a73920fb6..744982c0857fc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -53,8 +53,13 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
StoreCompactOperator storeCompactOperator =
new StoreCompactOperator(
(FileStoreTable) getTableDefault(),
- (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
- compactRememberStoreWrite,
+ (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) -> compactRememberStoreWrite,
"10086");
storeCompactOperator.open();
StateInitializationContextImpl context =
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index a6a4d3e5088b2..33fc310403c5d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -113,7 +113,13 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception {
new RowDataStoreWriteOperator(
fileStoreTable,
null,
- (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new StoreSinkWriteImpl(
table,
commitUser,
@@ -123,7 +129,8 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception {
false,
true,
memoryPool,
- metricGroup),
+ metricGroup,
+ recordAttributeManager),
"test");
OneInputStreamOperatorTestHarness harness =
createHarness(operator);
@@ -254,7 +261,13 @@ private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
return new RowDataStoreWriteOperator(
fileStoreTable,
null,
- (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ (table,
+ commitUser,
+ state,
+ ioManager,
+ memoryPool,
+ metricGroup,
+ recordAttributeManager) ->
new AsyncLookupSinkWrite(
table,
commitUser,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index edd6688da66dd..f9ceff8ca3297 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -185,7 +185,8 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc
null,
options,
EXTRACTOR,
- tablePath.getName())
+ tablePath.getName(),
+ null)
.createWriterContainer(partition, bucket, true)
.writer;
((MemoryOwner) writer)