From 019d7ee68d2f42a4df6bf2e7d345a4ae777d9afe Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 18 Jun 2024 17:38:55 +0800 Subject: [PATCH] [core] Generate changelog by copying data when records are insert-only --- .../java/org/apache/paimon/fs/FileIO.java | 18 ++ .../apache/paimon/fs/local/LocalFileIO.java | 13 ++ .../java/org/apache/paimon/fs/FileIOTest.java | 188 +++++++++++++++++- .../paimon/fs/local/LocalFleIOTest.java | 66 ++++++ .../src/test/resources/test-data.orc | Bin 0 -> 369 bytes .../apache/paimon/AppendOnlyFileStore.java | 9 +- .../java/org/apache/paimon/FileStore.java | 6 +- .../org/apache/paimon/KeyValueFileStore.java | 13 +- .../org/apache/paimon/io/DataFileMeta.java | 20 ++ .../paimon/io/KeyValueFileWriterFactory.java | 16 ++ .../paimon/mergetree/MergeTreeWriter.java | 60 +++++- .../operation/KeyValueFileStoreWrite.java | 12 +- .../paimon/privilege/PrivilegedFileStore.java | 8 +- .../privilege/PrivilegedFileStoreTable.java | 10 +- .../table/AppendOnlyFileStoreTable.java | 12 +- .../apache/paimon/table/FileStoreTable.java | 8 +- .../table/PrimaryKeyFileStoreTable.java | 11 +- .../utils/BucketRecordAttributeManager.java | 62 ++++++ .../paimon/utils/RecordAttributeManager.java | 59 ++++++ .../paimon/mergetree/MergeTreeTestBase.java | 101 +++++++++- .../utils/RecordAttributeManagerTest.java | 72 +++++++ .../paimon/oss/HadoopCompliantFileIO.java | 2 +- .../java/org/apache/paimon/oss/OSSFileIO.java | 26 +++ .../org/apache/paimon/oss/OSSFileIOTest.java | 90 +++++++++ .../cdc/CdcRecordStoreWriteOperatorTest.java | 11 +- .../flink/sink/AsyncLookupSinkWrite.java | 3 +- .../flink/sink/DynamicBucketCompactSink.java | 2 +- .../paimon/flink/sink/DynamicBucketSink.java | 13 +- .../apache/paimon/flink/sink/FlinkSink.java | 27 ++- .../paimon/flink/sink/FlinkSinkBuilder.java | 20 +- .../sink/GlobalFullCompactionSinkWrite.java | 3 +- .../paimon/flink/sink/MapToInternalRow.java | 38 ---- .../sink/MultiTablesStoreCompactOperator.java | 30 ++- .../flink/sink/StoreCompactOperator.java | 2 +- .../paimon/flink/sink/StoreSinkWrite.java | 4 +- .../paimon/flink/sink/StoreSinkWriteImpl.java | 18 +- .../paimon/flink/sink/TableWriteOperator.java | 11 +- .../flink/sink/CompactorSinkITCase.java | 11 +- .../flink/sink/StoreCompactOperatorTest.java | 9 +- .../paimon/flink/sink/WriterOperatorTest.java | 19 +- .../source/TestChangelogDataReadWrite.java | 3 +- 41 files changed, 1005 insertions(+), 101 deletions(-) create mode 100644 paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java create mode 100644 paimon-common/src/test/resources/test-data.orc create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java create mode 100644 paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index f31b00af17261..20c93bc937696 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.hadoop.HadoopFileIOLoader; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,6 +268,22 @@ default void overwriteFileUtf8(Path path, String content) throws IOException { } } + /** + * Copy content of one file into another. + * + * @return false if targetPath file exists + */ + default boolean copyFile(Path src, Path dst) throws IOException { + if (exists(dst)) { + return false; + } + try (SeekableInputStream is = newInputStream(src); + PositionOutputStream os = newOutputStream(dst, false)) { + IOUtils.copy(is, os); + } + return true; + } + /** * Read file to UTF_8 decoding, then write content to one file atomically, initially writes to * temp hidden file and only renames to the target file once temp file is closed. @@ -275,6 +292,7 @@ default void overwriteFileUtf8(Path path, String content) throws IOException { */ default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException { String content = readFileUtf8(sourcePath); + System.out.println("copyFileUtf8 " + content.length()); return writeFileUtf8(targetPath, content); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java index 82d8145a29278..faf363410399f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java @@ -217,6 +217,19 @@ public boolean rename(Path src, Path dst) throws IOException { } } + @Override + public boolean copyFile(Path src, Path dst) throws IOException { + if (exists(dst)) { + return false; + } + Files.copy(toPath(src), toPath(dst), StandardCopyOption.COPY_ATTRIBUTES); + return true; + } + + private java.nio.file.Path toPath(Path path) { + return toFile(path).toPath(); + } + /** * Converts the given Path to a File for this file system. If the path is empty, we will return * new File(".") instead of new File(""), since the latter returns diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index c1b86a0b20ea4..1617fe90e9f0c 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -19,19 +19,31 @@ package org.apache.paimon.fs; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.StandardCopyOption; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import static org.apache.paimon.utils.Preconditions.checkState; import static org.assertj.core.api.Assertions.assertThat; -/** Test {@link FileIO}. */ +/** Test static methods and methods with default implementations of {@link FileIO}. */ public class FileIOTest { @TempDir java.nio.file.Path tempDir; @@ -59,6 +71,36 @@ public void testRequireOptions() throws IOException { assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class); } + @Test + public void testCopy() throws Exception { + Path srcFile = new Path(tempDir.resolve("src.txt").toUri()); + Path dstFile = new Path(tempDir.resolve("dst.txt").toUri()); + + FileIO fileIO = new DummyFileIO(); + fileIO.writeFileUtf8(srcFile, "foobar"); + + assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + fileIO.deleteQuietly(srcFile); + srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI()); + + fileIO.copyFileUtf8(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isFalse(); + fileIO.deleteQuietly(dstFile); + + fileIO.copyFile(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isTrue(); + fileIO.deleteQuietly(dstFile); + } + public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException { AtomicReference exception = new AtomicReference<>(); final int max = 10; @@ -106,4 +148,148 @@ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws Interr assertThat(exception.get()).isNull(); } + + /** A {@link FileIO} on local filesystem to test the default copy implementation. */ + private static class DummyFileIO implements FileIO { + private static final ReentrantLock RENAME_LOCK = new ReentrantLock(); + + @Override + public boolean isObjectStore() { + throw new UnsupportedOperationException(); + } + + @Override + public void configure(CatalogContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws FileNotFoundException { + return new LocalFileIO.LocalSeekableInputStream(toFile(path)); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) + throws IOException { + if (exists(path) && !overwrite) { + throw new FileAlreadyExistsException("File already exists: " + path); + } + + Path parent = path.getParent(); + if (parent != null && !mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent); + } + + return new LocalFileIO.LocalPositionOutputStream(toFile(path)); + } + + @Override + public FileStatus getFileStatus(Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus[] listStatus(Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists(Path path) { + return toFile(path).exists(); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + File file = toFile(path); + if (file.isFile()) { + return file.delete(); + } else if ((!recursive) && file.isDirectory()) { + File[] containedFiles = file.listFiles(); + if (containedFiles == null) { + throw new IOException( + "Directory " + file + " does not exist or an I/O error occurred"); + } else if (containedFiles.length != 0) { + throw new IOException("Directory " + file + " is not empty"); + } + } + + return delete(file); + } + + private boolean delete(final File f) { + if (f.isDirectory()) { + final File[] files = f.listFiles(); + if (files != null) { + for (File file : files) { + final boolean del = delete(file); + if (!del) { + return false; + } + } + } + } else { + return f.delete(); + } + + // Now directory is empty + return f.delete(); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return mkdirsInternal(toFile(path)); + } + + private boolean mkdirsInternal(File file) throws IOException { + if (file.isDirectory()) { + return true; + } else if (file.exists() && !file.isDirectory()) { + // Important: The 'exists()' check above must come before the 'isDirectory()' check + // to + // be safe when multiple parallel instances try to create the directory + + // exists and is not a directory -> is a regular file + throw new FileAlreadyExistsException(file.getAbsolutePath()); + } else { + File parent = file.getParentFile(); + return (parent == null || mkdirsInternal(parent)) + && (file.mkdir() || file.isDirectory()); + } + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + File srcFile = toFile(src); + File dstFile = toFile(dst); + File dstParent = dstFile.getParentFile(); + dstParent.mkdirs(); + try { + RENAME_LOCK.lock(); + if (dstFile.exists()) { + return false; + } + Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + return true; + } catch (NoSuchFileException + | AccessDeniedException + | DirectoryNotEmptyException + | SecurityException e) { + return false; + } finally { + RENAME_LOCK.unlock(); + } + } + + private File toFile(Path path) { + // remove scheme + String localPath = path.toUri().getPath(); + checkState(localPath != null, "Cannot convert a null path to File"); + + if (localPath.length() == 0) { + return new File("."); + } + + return new File(localPath); + } + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java new file mode 100644 index 0000000000000..5fb4445468ef4 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/local/LocalFleIOTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs.local; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LocalFileIO}. */ +public class LocalFleIOTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testCopy() throws Exception { + Path srcFile = new Path(tempDir.resolve("src.txt").toUri()); + Path dstFile = new Path(tempDir.resolve("dst.txt").toUri()); + + FileIO fileIO = new LocalFileIO(); + fileIO.writeFileUtf8(srcFile, "foobar"); + + assertThat(fileIO.copyFileUtf8(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + assertThat(fileIO.copyFile(srcFile, dstFile)).isTrue(); + assertThat(fileIO.readFileUtf8(dstFile)).isEqualTo("foobar"); + fileIO.deleteQuietly(dstFile); + + fileIO.deleteQuietly(srcFile); + srcFile = new Path(this.getClass().getClassLoader().getResource("test-data.orc").toURI()); + + fileIO.copyFileUtf8(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isFalse(); + fileIO.deleteQuietly(dstFile); + + fileIO.copyFile(srcFile, dstFile); + assertThat(FileUtils.contentEquals(new File(srcFile.toUri()), new File(dstFile.toUri()))) + .isTrue(); + fileIO.deleteQuietly(dstFile); + } +} diff --git a/paimon-common/src/test/resources/test-data.orc b/paimon-common/src/test/resources/test-data.orc new file mode 100644 index 0000000000000000000000000000000000000000..8a8c39f4e9500e998dba3887656443407ae9b46f GIT binary patch literal 369 zcmeZI%3@>@ODrqO*DFrWNX<=L!xF8OSDKTfq*JX_Qdy9yWTjM;nw(#hqNJmgmzaye zFD^(-1_|aDrRyaE*%_&N1&Nut`FVO^L(^09Qi~ExQbF3&GE;L>ij}OQt6?T(14SX0 zz${hD%qvlf)lo`GO-n4zDN(Wlxuq897O36v!LEU!u71w0@qVGcPOd?41C(;|^U{@& zRfakGgu2Fid-}N`6oE`fQd0((!Eb+TZLA2`JIVPesmVfB#U+V(DTzfX496_MU|?cm01_A~0Q%m2i2wiq literal 0 HcmV?d00001 diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 3cd7bb3b6959b..6d358b15ce0d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -32,6 +32,9 @@ import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.Comparator; import java.util.List; @@ -87,12 +90,14 @@ public RawFileSplitRead newRead() { @Override public AppendOnlyFileStoreWrite newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public AppendOnlyFileStoreWrite newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { return new AppendOnlyFileStoreWrite( fileIO, newRead(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index e943d38cf5e19..7af99cb3ba9e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -37,6 +37,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -76,7 +77,10 @@ public interface FileStore extends Serializable { FileStoreWrite newWrite(String commitUser); - FileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter); + FileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager); FileStoreCommit newCommit(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index b1b7fc211c1a9..fd9b5320a8906 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -42,9 +42,12 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.KeyComparatorSupplier; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.paimon.utils.ValueEqualiserSupplier; +import javax.annotation.Nullable; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -152,11 +155,14 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { @Override public KeyValueFileStoreWrite newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override - public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public KeyValueFileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { IndexMaintainer.Factory indexFactory = null; if (bucketMode() == BucketMode.HASH_DYNAMIC) { indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler()); @@ -185,7 +191,8 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma deletionVectorsMaintainerFactory, options, keyValueFieldsExtractor, - tableName); + tableName, + recordAttributeManager); } private Map format2PathFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 7b27852618877..9200ad29165ed 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) { fileSource); } + public DataFileMeta rename(String newFileName) { + return new DataFileMeta( + newFileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource); + } + public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(fileName)); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 72f9b3f651537..78c37cf455cad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } + public void copyFile(String sourceFileName, String targetFileName, int level) + throws IOException { + Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); + Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); + fileIO.copyFile(sourcePath, targetPath); + } + + public FileIO getFileIO() { + return fileIO; + } + + public Path newChangelogPath(int level) { + return formatContext.pathFactory(level).newChangelogPath(); + } + public static Builder builder( FileIO fileIO, long schemaId, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index cf2e6c25159b7..7909168120742 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -26,6 +26,8 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -37,6 +39,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.options.MemorySize; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BucketRecordAttributeManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.RecordWriter; @@ -78,6 +81,9 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private final LinkedHashMap compactBefore; private final LinkedHashSet compactAfter; private final LinkedHashSet compactChangelog; + @Nullable private final BucketRecordAttributeManager recordAttributeManager; + + private final boolean canChangelogOptimizedToCopyPreconditions; @Nullable private CompactDeletionFile compactDeletionFile; @@ -98,7 +104,8 @@ public MergeTreeWriter( boolean commitForceCompact, ChangelogProducer changelogProducer, @Nullable CommitIncrement increment, - @Nullable FieldsComparator userDefinedSeqComparator) { + @Nullable FieldsComparator userDefinedSeqComparator, + @Nullable BucketRecordAttributeManager recordAttributeManager) { this.writeBufferSpillable = writeBufferSpillable; this.maxDiskSize = maxDiskSize; this.sortMaxFan = sortMaxFan; @@ -114,6 +121,7 @@ public MergeTreeWriter( this.commitForceCompact = commitForceCompact; this.changelogProducer = changelogProducer; this.userDefinedSeqComparator = userDefinedSeqComparator; + this.recordAttributeManager = recordAttributeManager; this.newFiles = new LinkedHashSet<>(); this.deletedFiles = new LinkedHashSet<>(); @@ -133,6 +141,25 @@ public MergeTreeWriter( compactChangelog.addAll(increment.compactIncrement().changelogFiles()); updateCompactDeletionFile(increment.compactDeletionFile()); } + + // TODO: Verify the performance of the default implementation of copy, + // and remove the following code block if proved to be of no regression. + boolean isCopyMethodOverridden; + try { + isCopyMethodOverridden = + !writerFactory + .getFileIO() + .getClass() + .getMethod("copyFile", Path.class, Path.class) + .getDeclaringClass() + .equals(FileIO.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + this.canChangelogOptimizedToCopyPreconditions = + isCopyMethodOverridden + && (changelogProducer == ChangelogProducer.INPUT) + && recordAttributeManager != null; } private long newSequenceNumber() { @@ -213,7 +240,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } final RollingFileWriter changelogWriter = - changelogProducer == ChangelogProducer.INPUT + changelogProducer == ChangelogProducer.INPUT & !canChangelogOptimizedToCopy() ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = @@ -232,22 +259,49 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul dataWriter.close(); } + List fileMetas = dataWriter.result(); if (changelogWriter != null) { newFilesChangelog.addAll(changelogWriter.result()); + } else if (canChangelogOptimizedToCopy()) { + List changelogMetas = getChangelogFileMetaFromDataFile(fileMetas); + for (int i = 0; i < changelogMetas.size(); i++) { + writerFactory.copyFile( + fileMetas.get(i).fileName(), changelogMetas.get(i).fileName(), 0); + } + newFilesChangelog.addAll(changelogMetas); } - for (DataFileMeta fileMeta : dataWriter.result()) { + for (DataFileMeta fileMeta : fileMetas) { newFiles.add(fileMeta); compactManager.addNewFile(fileMeta); } writeBuffer.clear(); + if (recordAttributeManager != null) { + recordAttributeManager.onFlush(); + } } trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); } + private List getChangelogFileMetaFromDataFile( + List dataFileMetaList) { + List changelogFileMetaList = new ArrayList<>(); + for (DataFileMeta dataFileMeta : dataFileMetaList) { + DataFileMeta changelogFileMeta = + dataFileMeta.rename(writerFactory.newChangelogPath(0).getName()); + changelogFileMetaList.add(changelogFileMeta); + } + return changelogFileMetaList; + } + + private boolean canChangelogOptimizedToCopy() { + return canChangelogOptimizedToCopyPreconditions + && recordAttributeManager.areAllRecordsInsertOnlySinceLastFlush(); + } + @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flushWriteBuffer(waitCompaction, false); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d693c431b1021..d64df137f8e66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -67,6 +67,7 @@ import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.UserDefinedSeqComparator; @@ -101,6 +102,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final RowType keyType; private final RowType valueType; @Nullable private final RecordLevelExpire recordLevelExpire; + @Nullable private final RecordAttributeManager recordAttributeManager; public KeyValueFileStoreWrite( FileIO fileIO, @@ -121,7 +123,8 @@ public KeyValueFileStoreWrite( @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, CoreOptions options, KeyValueFieldsExtractor extractor, - String tableName) { + String tableName, + @Nullable RecordAttributeManager recordAttributeManager) { super( commitUser, snapshotManager, @@ -134,6 +137,7 @@ public KeyValueFileStoreWrite( this.keyType = keyType; this.valueType = valueType; this.udsComparatorSupplier = udsComparatorSupplier; + this.recordAttributeManager = recordAttributeManager; this.readerFactoryBuilder = KeyValueFileReaderFactory.builder( fileIO, @@ -210,7 +214,11 @@ protected MergeTreeWriter createWriter( options.commitForceCompact(), options.changelogProducer(), restoreIncrement, - UserDefinedSeqComparator.create(valueType, options)); + UserDefinedSeqComparator.create(valueType, options), + recordAttributeManager == null + ? null + : recordAttributeManager.getBucketRecordAttributeManager( + partition, bucket)); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index ca2ad04a232d3..d23e8393bf779 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -40,6 +40,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -128,9 +129,12 @@ public FileStoreWrite newWrite(String commitUser) { } @Override - public FileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public FileStoreWrite newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { privilegeChecker.assertCanInsert(identifier); - return wrapped.newWrite(commitUser, manifestFilter); + return wrapped.newWrite(commitUser, manifestFilter, recordAttributeManager); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 548ae69ee5cce..0fad65b20f51b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -39,9 +39,12 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.Map; import java.util.Objects; @@ -265,9 +268,12 @@ public TableWriteImpl newWrite(String commitUser) { } @Override - public TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + public TableWriteImpl newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { privilegeChecker.assertCanInsert(identifier); - return wrapped.newWrite(commitUser, manifestFilter); + return wrapped.newWrite(commitUser, manifestFilter, recordAttributeManager); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index f45865d018c8f..55be5651f5930 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -42,6 +42,9 @@ import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.function.BiConsumer; @@ -135,15 +138,18 @@ public RecordReader reader(Split split) throws IOException { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public TableWriteImpl newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { // if this table is unaware-bucket table, we skip compaction and restored files searching AppendOnlyFileStoreWrite writer = - store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); + store().newWrite(commitUser, manifestFilter, recordAttributeManager) + .withBucketMode(bucketMode()); return new TableWriteImpl<>( writer, createRowKeyExtractor(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 212555d7b1d6f..ef25200d0969e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -29,6 +29,9 @@ import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -92,7 +95,10 @@ default Optional comment() { @Override TableWriteImpl newWrite(String commitUser); - TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter); + TableWriteImpl newWrite( + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager); @Override TableCommitImpl newCommit(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 8c6925d73a51c..ab65fffe83098 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -40,6 +40,9 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RecordAttributeManager; + +import javax.annotation.Nullable; import java.util.List; import java.util.function.BiConsumer; @@ -158,15 +161,17 @@ public InnerTableRead newRead() { @Override public TableWriteImpl newWrite(String commitUser) { - return newWrite(commitUser, null); + return newWrite(commitUser, null, null); } @Override public TableWriteImpl newWrite( - String commitUser, ManifestCacheFilter manifestFilter) { + String commitUser, + ManifestCacheFilter manifestFilter, + @Nullable RecordAttributeManager recordAttributeManager) { KeyValue kv = new KeyValue(); return new TableWriteImpl<>( - store().newWrite(commitUser, manifestFilter), + store().newWrite(commitUser, manifestFilter, recordAttributeManager), createRowKeyExtractor(), (record, rowKind) -> kv.replace( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java new file mode 100644 index 0000000000000..bcdeca7396701 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BucketRecordAttributeManager.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +/** + * Manager of the attributes of the internal records during runtime. + * + *

Different from {@link RecordAttributeManager}, this class manages the attributes need to be + * set or acquired at bucket's granularity. + */ +public class BucketRecordAttributeManager { + private boolean isInsertOnly; + private boolean areAllRecordsInsertOnlySinceLastFlush; + + public BucketRecordAttributeManager() { + this.isInsertOnly = false; + this.areAllRecordsInsertOnlySinceLastFlush = false; + } + + /** + * This method is called when the insert only status of the records changes. + * + * @param isInsertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + void onInsertOnlyChanged(boolean isInsertOnly) { + this.isInsertOnly = isInsertOnly; + if (!isInsertOnly) { + areAllRecordsInsertOnlySinceLastFlush = false; + } + } + + /** + * This method is called when the internal records are flushed to disk. It denotes that the + * attributes of previously added records are no longer needed. + */ + public void onFlush() { + areAllRecordsInsertOnlySinceLastFlush = isInsertOnly; + } + + /** @return whether all records added since last flush are insert-only. */ + public boolean areAllRecordsInsertOnlySinceLastFlush() { + return areAllRecordsInsertOnlySinceLastFlush; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java new file mode 100644 index 0000000000000..266d68fe46489 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordAttributeManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; + +import java.util.HashMap; +import java.util.Map; + +/** Manager of the attributes of the internal records during runtime. */ +public class RecordAttributeManager { + private final Map, BucketRecordAttributeManager> bucketManagerMap = + new HashMap<>(); + + private boolean isInsertOnly = false; + + /** + * This method is called when the insert only status of the records changes. + * + * @param isInsertOnly If true, all the following records would be of {@link + * org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary + * key. + */ + public void onInsertOnlyChanged(boolean isInsertOnly) { + this.isInsertOnly = isInsertOnly; + bucketManagerMap.values().forEach(x -> x.onInsertOnlyChanged(isInsertOnly)); + } + + /** + * @return A {@link BucketRecordAttributeManager} that manages the internal record attributes of + * a specific bucket. + */ + public BucketRecordAttributeManager getBucketRecordAttributeManager( + BinaryRow partition, int bucket) { + return bucketManagerMap.computeIfAbsent( + Pair.of(partition, bucket), + pair -> { + BucketRecordAttributeManager manager = new BucketRecordAttributeManager(); + manager.onInsertOnlyChanged(isInsertOnly); + return manager; + }); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 92f14b3ed41cf..54360eb99a698 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -58,6 +58,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BucketRecordAttributeManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.FileStorePathFactory; @@ -367,6 +368,35 @@ public void testWriteMany() throws Exception { doTestWriteRead(3, 20_000); } + @Test + public void testChangelog() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT, + null); + + doTestWriteReadWithChangelog(8, 200, false); + } + + @Test + public void testChangelogFromCopyingData() throws Exception { + writer = + createMergeTreeWriter( + Collections.emptyList(), + createCompactManager(service, Collections.emptyList()), + ChangelogProducer.INPUT, + new BucketRecordAttributeManager() { + @Override + public boolean areAllRecordsInsertOnlySinceLastFlush() { + return true; + } + }); + + doTestWriteReadWithChangelog(8, 200, true); + } + private void doTestWriteRead(int batchNumber) throws Exception { doTestWriteRead(batchNumber, 200); } @@ -413,12 +443,78 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { assertThat(files).isEqualTo(Collections.emptySet()); } + private void doTestWriteReadWithChangelog( + int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception { + List expected = new ArrayList<>(); + List newFiles = new ArrayList<>(); + List changelogFiles = new ArrayList<>(); + Set newFileNames = new HashSet<>(); + List compactedFiles = new ArrayList<>(); + + // write batch and commit + for (int i = 0; i <= batchNumber; i++) { + if (i < batchNumber) { + expected.addAll(writeBatch(perBatch)); + } else { + writer.sync(); + } + + CommitIncrement increment = writer.prepareCommit(true); + newFiles.addAll(increment.newFilesIncrement().newFiles()); + changelogFiles.addAll(increment.newFilesIncrement().changelogFiles()); + mergeCompacted(newFileNames, compactedFiles, increment); + } + + // assert records from writer + assertRecords(expected); + + // assert records from increment new files + assertRecords(expected, newFiles, false); + assertRecords(expected, newFiles, true); + + // assert records from changelog files + if (isChangelogEqualToData) { + assertRecords(expected, changelogFiles, false); + assertRecords(expected, changelogFiles, true); + } else { + List actual = new ArrayList<>(); + for (DataFileMeta changelogFile : changelogFiles) { + actual.addAll(readAll(Collections.singletonList(changelogFile), false)); + } + assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0])); + } + + // assert records from increment compacted files + assertRecords(expected, compactedFiles, true); + + writer.close(); + + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); + Set files = + Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) + .map(FileStatus::getPath) + .map(Path::getName) + .collect(Collectors.toSet()); + newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove); + assertThat(files).isEqualTo(Collections.emptySet()); + } + private MergeTreeWriter createMergeTreeWriter(List files) { return createMergeTreeWriter(files, createCompactManager(service, files)); } private MergeTreeWriter createMergeTreeWriter( List files, MergeTreeCompactManager compactManager) { + return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE, null); + } + + private MergeTreeWriter createMergeTreeWriter( + List files, + MergeTreeCompactManager compactManager, + ChangelogProducer changelogProducer, + BucketRecordAttributeManager bucketRecordAttributeManager) { long maxSequenceNumber = files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L); MergeTreeWriter writer = @@ -434,9 +530,10 @@ private MergeTreeWriter createMergeTreeWriter( DeduplicateMergeFunction.factory().create(), writerFactory, options.commitForceCompact(), - ChangelogProducer.NONE, + changelogProducer, null, - null); + null, + bucketRecordAttributeManager); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return writer; diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java new file mode 100644 index 0000000000000..dd58e695de3e3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/RecordAttributeManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link RecordAttributeManager}. */ +public class RecordAttributeManagerTest { + @Test + public void test() { + BinaryRow partition = new BinaryRow(1); + { + BinaryRowWriter writer = new BinaryRowWriter(partition); + writer.writeInt(0, 0); + writer.complete(); + } + + RecordAttributeManager manager = new RecordAttributeManager(); + BucketRecordAttributeManager bucketManager0 = + manager.getBucketRecordAttributeManager(partition, 0); + BucketRecordAttributeManager bucketManager0Reclaimed = + manager.getBucketRecordAttributeManager(partition, 0); + BucketRecordAttributeManager bucketManager1 = + manager.getBucketRecordAttributeManager(partition, 1); + + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + manager.onInsertOnlyChanged(true); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + bucketManager0.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + + bucketManager1.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isTrue(); + + manager.onInsertOnlyChanged(false); + bucketManager0.onFlush(); + assertThat(bucketManager0.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager0Reclaimed.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + assertThat(bucketManager1.areAllRecordsInsertOnlySinceLastFlush()).isFalse(); + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java index 4d86c12a6e524..2f09a9249f0a7 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java @@ -102,7 +102,7 @@ public boolean rename(Path src, Path dst) throws IOException { return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); } - private org.apache.hadoop.fs.Path path(Path path) { + protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 6ecc76da3f76b..cdd2f24a74e2b 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -20,9 +20,11 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; import org.slf4j.Logger; @@ -30,6 +32,8 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -70,6 +74,28 @@ public class OSSFileIO extends HadoopCompliantFileIO { private Options hadoopOptions; + @Override + public boolean copyFile(Path sourcePath, Path targetPath) throws IOException { + org.apache.hadoop.fs.Path srcPath = path(sourcePath); + org.apache.hadoop.fs.Path dstPath = path(targetPath); + + try (FileSystem fs = createFileSystem(srcPath)) { + AliyunOSSFileSystem ossFs = (AliyunOSSFileSystem) fs; + FileStatus sourceStatus = ossFs.getFileStatus(srcPath); + + Method method = + AliyunOSSFileSystem.class.getDeclaredMethod( + "copyFile", + org.apache.hadoop.fs.Path.class, + long.class, + org.apache.hadoop.fs.Path.class); + method.setAccessible(true); + return (boolean) method.invoke(ossFs, srcPath, sourceStatus.getLen(), dstPath); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + @Override public boolean isObjectStore() { return true; diff --git a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java new file mode 100644 index 0000000000000..72746a97f6629 --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSFileIOTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OSSFileIO}. */ +public class OSSFileIOTest { + @Test + public void testCopy() throws Exception { + TestAliyunOssFileSystemStore store = new TestAliyunOssFileSystemStore(); + TestAliyunOssFileSystem fileSystem = new TestAliyunOssFileSystem(store); + TestOSSFileIO fileIO = new TestOSSFileIO(fileSystem); + fileIO.copyFile( + new org.apache.paimon.fs.Path("sourceFoo"), + new org.apache.paimon.fs.Path("targetBar")); + assertThat(store.isCopyInvokedAndVerified).isTrue(); + } + + private static class TestOSSFileIO extends OSSFileIO { + private final TestAliyunOssFileSystem fileSystem; + + private TestOSSFileIO(TestAliyunOssFileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + @Override + protected FileSystem createFileSystem(Path path) { + return fileSystem; + } + } + + private static class TestAliyunOssFileSystem extends AliyunOSSFileSystem { + public TestAliyunOssFileSystem(AliyunOSSFileSystemStore store) throws Exception { + Field storeField = AliyunOSSFileSystem.class.getDeclaredField("store"); + storeField.setAccessible(true); + storeField.set(this, store); + + Field workingDirField = AliyunOSSFileSystem.class.getDeclaredField("workingDir"); + workingDirField.setAccessible(true); + workingDirField.set(this, new Path("/")); + } + + @Override + public FileStatus getFileStatus(Path path) { + return new FileStatus(); + } + + @Override + public void close() {} + } + + private static class TestAliyunOssFileSystemStore extends AliyunOSSFileSystemStore { + private boolean isCopyInvokedAndVerified = false; + + @Override + public boolean copyFile(String srcKey, long srcLen, String dstKey) { + assertThat(srcKey).isEqualTo("sourceFoo"); + assertThat(srcLen).isZero(); + assertThat(dstKey).isEqualTo("targetBar"); + isCopyInvokedAndVerified = true; + return true; + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java index 9af7eabdaaadb..581156930b9c2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java @@ -256,7 +256,13 @@ private OneInputStreamOperatorTestHarness createTestHarn CdcRecordStoreWriteOperator operator = new CdcRecordStoreWriteOperator( table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> + (t, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( t, commitUser, @@ -266,7 +272,8 @@ private OneInputStreamOperatorTestHarness createTestHarn false, true, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), commitUser); TypeSerializer inputSerializer = new JavaSerializer<>(); TypeSerializer outputSerializer = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java index b4cf7aa78de22..023ab49831454 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java @@ -61,7 +61,8 @@ public AsyncLookupSinkWrite( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + null); this.tableName = table.name(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java index 147e7527ff897..f9dbd19741cec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -50,7 +50,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer // bucket-assigner HashBucketAssignerOperator assignerOperator = - new HashBucketAssignerOperator<>( + createHashBucketAssignerOperator( initialCommitUser, table, null, extractorFunction(), true); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index f04043ce41bc5..cf697108fd326 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -20,6 +20,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.sink.PartitionKeyExtractor; import org.apache.paimon.utils.SerializableFunction; @@ -54,6 +55,16 @@ public DynamicBucketSink( protected abstract SerializableFunction> extractorFunction(); + protected HashBucketAssignerOperator createHashBucketAssignerOperator( + String commitUser, + Table table, + Integer numAssigners, + SerializableFunction> extractorFunction, + boolean overwrite) { + return new HashBucketAssignerOperator<>( + commitUser, table, numAssigners, extractorFunction, overwrite); + } + public DataStreamSink build(DataStream input, @Nullable Integer parallelism) { String initialCommitUser = createCommitUser(table.coreOptions().toConfiguration()); @@ -73,7 +84,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis // 2. bucket-assigner HashBucketAssignerOperator assignerOperator = - new HashBucketAssignerOperator<>( + createHashBucketAssignerOperator( initialCommitUser, table, numAssigners, extractorFunction(), false); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f369ec31c3d58..f5d207893ba8c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -117,7 +117,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new GlobalFullCompactionSinkWrite( table, @@ -136,7 +142,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == ChangelogProducer.LOOKUP && !coreOptions.prepareCommitWaitCompaction()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new AsyncLookupSinkWrite( table, @@ -151,7 +163,13 @@ private StoreSinkWrite.Provider createWriteProvider( }; } - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl( table, @@ -162,7 +180,8 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + recordAttributeManager); }; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 8baee5ac1b91d..dd73bd5590b60 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink; import org.apache.paimon.flink.sorter.TableSortInfo; import org.apache.paimon.flink.sorter.TableSorter; @@ -70,11 +71,11 @@ public class FlinkSinkBuilder { private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class); - private final FileStoreTable table; + protected final FileStoreTable table; private DataStream input; - @Nullable private Map overwritePartition; - @Nullable private Integer parallelism; + @Nullable protected Map overwritePartition; + @Nullable protected Integer parallelism; private Boolean boundedInput = null; @Nullable private TableSortInfo tableSortInfo; @@ -221,7 +222,7 @@ public FlinkSinkBuilder clusteringIfPossible( /** Build {@link DataStreamSink}. */ public DataStreamSink build() { input = trySortInput(input); - DataStream input = MapToInternalRow.map(this.input, table.rowType()); + DataStream input = mapToInternalRow(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { input = input.forward() @@ -247,7 +248,14 @@ public DataStreamSink build() { } } - private DataStreamSink buildDynamicBucketSink( + protected DataStream mapToInternalRow( + DataStream input, org.apache.paimon.types.RowType rowType) { + return input.map((MapFunction) FlinkRowWrapper::new) + .setParallelism(input.getParallelism()) + .returns(org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(rowType)); + } + + protected DataStreamSink buildDynamicBucketSink( DataStream input, boolean globalIndex) { checkArgument(logSinkFunction == null, "Dynamic bucket mode can not work with log system."); return compactSink && !globalIndex @@ -260,7 +268,7 @@ private DataStreamSink buildDynamicBucketSink( .build(input, parallelism); } - private DataStreamSink buildForFixedBucket(DataStream input) { + protected DataStreamSink buildForFixedBucket(DataStream input) { DataStream partitioned = partition( input, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java index 62341a180dab9..e6958cd5d5414 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java @@ -85,7 +85,8 @@ public GlobalFullCompactionSinkWrite( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + null); this.deltaCommits = deltaCommits; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java deleted file mode 100644 index a9bf744010992..0000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MapToInternalRow.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.FlinkRowWrapper; -import org.apache.paimon.flink.utils.InternalTypeInfo; -import org.apache.paimon.types.RowType; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.data.RowData; - -/** An util to convert {@link RowData} stream to {@link InternalRow} stream. */ -public class MapToInternalRow { - - public static DataStream map(DataStream input, RowType rowType) { - return input.map((MapFunction) FlinkRowWrapper::new) - .setParallelism(input.getParallelism()) - .returns(InternalTypeInfo.fromRowType(rowType)); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 52e494b5a9dbf..eda787f17a582 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -156,7 +156,8 @@ public void processElement(StreamRecord element) throws Exception { state, getContainingTask().getEnvironment().getIOManager(), memoryPool, - getMetricGroup())); + getMetricGroup(), + null)); if (write.streamingMode()) { write.notifyNewFiles(snapshotId, partition, bucket, files); @@ -256,7 +257,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new GlobalFullCompactionSinkWrite( table, commitUser, @@ -273,7 +280,13 @@ private StoreSinkWrite.Provider createWriteProvider( if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP && !coreOptions.prepareCommitWaitCompaction()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new AsyncLookupSinkWrite( table, commitUser, @@ -286,7 +299,13 @@ private StoreSinkWrite.Provider createWriteProvider( metricGroup); } - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + return (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( table, commitUser, @@ -296,6 +315,7 @@ private StoreSinkWrite.Provider createWriteProvider( waitCompaction, isStreaming, memoryPool, - metricGroup); + metricGroup, + recordAttributeManager); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java index 1842884907728..dac0a29ca1491 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java @@ -109,7 +109,7 @@ void initStateAndWriter( write = storeSinkWriteProvider.provide( - table, commitUser, state, ioManager, memoryPool, getMetricGroup()); + table, commitUser, state, ioManager, memoryPool, getMetricGroup(), null); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java index 6001721b71f39..703f4cf0bfb40 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java @@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -81,7 +82,8 @@ StoreSinkWrite provide( StoreSinkWriteState state, IOManager ioManager, @Nullable MemorySegmentPool memoryPool, - @Nullable MetricGroup metricGroup); + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager); } /** Provider of {@link StoreSinkWrite} that uses given write buffer. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index 3ecc80bb6f13c..2cd70cb32eca7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -64,6 +65,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { protected TableWriteImpl write; @Nullable private final MetricGroup metricGroup; + @Nullable private final RecordAttributeManager recordAttributeManager; public StoreSinkWriteImpl( FileStoreTable table, @@ -74,7 +76,8 @@ public StoreSinkWriteImpl( boolean waitCompaction, boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, - @Nullable MetricGroup metricGroup) { + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager) { this( table, commitUser, @@ -85,7 +88,8 @@ public StoreSinkWriteImpl( isStreamingMode, memoryPool, null, - metricGroup); + metricGroup, + recordAttributeManager); } public StoreSinkWriteImpl( @@ -108,7 +112,8 @@ public StoreSinkWriteImpl( isStreamingMode, null, memoryPoolFactory, - metricGroup); + metricGroup, + null); } private StoreSinkWriteImpl( @@ -121,7 +126,8 @@ private StoreSinkWriteImpl( boolean isStreamingMode, @Nullable MemorySegmentPool memoryPool, @Nullable MemoryPoolFactory memoryPoolFactory, - @Nullable MetricGroup metricGroup) { + @Nullable MetricGroup metricGroup, + @Nullable RecordAttributeManager recordAttributeManager) { this.commitUser = commitUser; this.state = state; this.paimonIOManager = new IOManagerImpl(ioManager.getSpillingDirectoriesPaths()); @@ -131,6 +137,7 @@ private StoreSinkWriteImpl( this.memoryPool = memoryPool; this.memoryPoolFactory = memoryPoolFactory; this.metricGroup = metricGroup; + this.recordAttributeManager = recordAttributeManager; this.write = newTableWrite(table); } @@ -143,7 +150,8 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { table.newWrite( commitUser, (part, bucket) -> - state.stateValueFilter().filter(table.name(), part, bucket)) + state.stateValueFilter().filter(table.name(), part, bucket), + recordAttributeManager) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) .withExecutionMode(isStreamingMode) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index f38e0ad6bfb53..cf2f9e15c564f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; +import org.apache.paimon.utils.RecordAttributeManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.StateInitializationContext; @@ -40,6 +41,7 @@ public abstract class TableWriteOperator extends PrepareCommitOperator createTestHarnes protected StoreCompactOperator createCompactOperator(FileStoreTable table) { return new StoreCompactOperator( table, - (t, commitUser, state, ioManager, memoryPool, metricGroup) -> + (t, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( t, commitUser, @@ -254,7 +260,8 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) { false, false, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), "test"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java index 9cd2a73920fb6..744982c0857fc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java @@ -53,8 +53,13 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception { StoreCompactOperator storeCompactOperator = new StoreCompactOperator( (FileStoreTable) getTableDefault(), - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> - compactRememberStoreWrite, + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> compactRememberStoreWrite, "10086"); storeCompactOperator.open(); StateInitializationContextImpl context = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index a6a4d3e5088b2..33fc310403c5d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -113,7 +113,13 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { new RowDataStoreWriteOperator( fileStoreTable, null, - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new StoreSinkWriteImpl( table, commitUser, @@ -123,7 +129,8 @@ private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception { false, true, memoryPool, - metricGroup), + metricGroup, + recordAttributeManager), "test"); OneInputStreamOperatorTestHarness harness = createHarness(operator); @@ -254,7 +261,13 @@ private RowDataStoreWriteOperator getAsyncLookupWriteOperator( return new RowDataStoreWriteOperator( fileStoreTable, null, - (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + (table, + commitUser, + state, + ioManager, + memoryPool, + metricGroup, + recordAttributeManager) -> new AsyncLookupSinkWrite( table, commitUser, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index edd6688da66dd..f9ceff8ca3297 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -185,7 +185,8 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc null, options, EXTRACTOR, - tablePath.getName()) + tablePath.getName(), + null) .createWriterContainer(partition, bucket, true) .writer; ((MemoryOwner) writer)