diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 4b00eae1f1b5..9d9f418979a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -33,7 +33,9 @@ import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; @@ -64,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final RowType writeSchema; private final DataFilePathFactory pathFactory; private final CompactManager compactManager; + private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead; private final boolean forceCompact; private final List newFiles; private final List deletedFiles; @@ -88,6 +91,7 @@ public AppendOnlyWriter( RowType writeSchema, long maxSequenceNumber, CompactManager compactManager, + AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead, boolean forceCompact, DataFilePathFactory pathFactory, @Nullable CommitIncrement increment, @@ -104,6 +108,7 @@ public AppendOnlyWriter( this.writeSchema = writeSchema; this.pathFactory = pathFactory; this.compactManager = compactManager; + this.bucketFileRead = bucketFileRead; this.forceCompact = forceCompact; this.newFiles = new ArrayList<>(); this.deletedFiles = new ArrayList<>(); @@ -209,13 +214,25 @@ public void close() throws Exception { } public void toBufferedWriter() throws Exception { - if (sinkWriter != null && !sinkWriter.bufferSpillableWriter()) { - flush(false, false); - trySyncLatestCompaction(true); + if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && bucketFileRead != null) { + // fetch the written results + List files = sinkWriter.flush(); sinkWriter.close(); sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression); sinkWriter.setMemoryPool(memorySegmentPool); + + // rewrite small files + try (RecordReaderIterator reader = bucketFileRead.read(files)) { + while (reader.hasNext()) { + sinkWriter.write(reader.next()); + } + } finally { + // remove small files + for (DataFileMeta file : files) { + fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + } + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index f794b160cddd..a0d86337139e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -47,6 +47,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -145,6 +146,7 @@ protected RecordWriter createWriter( rowType, maxSequenceNumber, compactManager, + bucketReader(partition, bucket), commitForceCompact, factory, restoreIncrement, @@ -174,14 +176,7 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( fileCompression, statsCollectors); try { - rewriter.write( - new RecordReaderIterator<>( - read.createReader( - DataSplit.builder() - .withPartition(partition) - .withBucket(bucket) - .withDataFiles(toCompact) - .build()))); + rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { rewriter.close(); } @@ -189,6 +184,17 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( }; } + public BucketFileRead bucketReader(BinaryRow partition, int bucket) { + return files -> + new RecordReaderIterator<>( + read.createReader( + DataSplit.builder() + .withPartition(partition) + .withBucket(bucket) + .withDataFiles(files) + .build())); + } + public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) { // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in // unaware-bucket mode (no compaction and force empty-writer). @@ -215,4 +221,9 @@ protected void forceBufferSpill() throws Exception { } } } + + /** Read for one bucket. */ + public interface BucketFileRead { + RecordReaderIterator read(List files) throws IOException; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 87bb14745c61..565be9f90f2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -594,6 +594,7 @@ private Pair> createWriter( AppendOnlyWriterTest.SCHEMA, getMaxSequenceNumber(toCompact), compactManager, + null, forceCompact, pathFactory, null, diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 22826d5bcd2b..ffa290e0a53b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -79,6 +79,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception 0, new AppendOnlyCompactManager( null, toCompact, 4, 10, 10, null, null), // not used + null, false, dataFilePathFactory, null, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java index 9281a7b35a9c..c38fb4fba6bf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/AppendOnlyFileStoreWriteTest.java @@ -42,10 +42,13 @@ import java.util.List; import java.util.Map; +import java.util.Random; /** Tests for {@link AppendOnlyFileStoreWrite}. */ public class AppendOnlyFileStoreWriteTest { + private static final Random RANDOM = new Random(); + @TempDir java.nio.file.Path tempDir; @Test @@ -101,6 +104,49 @@ public void testWritesInBatch() throws Exception { Assertions.assertThat(records).isEqualTo(11); } + @Test + public void testWritesInBatchWithNoExtraFiles() throws Exception { + FileStoreTable table = createFileStoreTable(); + + AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss"); + write.withExecutionMode(false); + + write.write(partition(0), 0, GenericRow.of(0, 0, 0)); + write.write(partition(1), 1, GenericRow.of(1, 1, 0)); + write.write(partition(2), 2, GenericRow.of(2, 2, 0)); + write.write(partition(3), 3, GenericRow.of(3, 3, 0)); + write.write(partition(4), 4, GenericRow.of(4, 4, 0)); + write.write(partition(5), 5, GenericRow.of(5, 5, 0)); + write.write(partition(6), 6, GenericRow.of(6, 6, 0)); + + for (int i = 0; i < 1000; i++) { + int number = RANDOM.nextInt(7); + write.write(partition(number), number, GenericRow.of(number, number, 0)); + } + + List commit = write.prepareCommit(true, Long.MAX_VALUE); + + Assertions.assertThat(commit.size()).isEqualTo(7); + + long files = + commit.stream() + .map(s -> (CommitMessageImpl) s) + .mapToLong(s -> s.newFilesIncrement().newFiles().size()) + .sum(); + Assertions.assertThat(files).isEqualTo(7); + + long records = + commit.stream() + .map(s -> (CommitMessageImpl) s) + .mapToLong( + s -> + s.newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::rowCount) + .sum()) + .sum(); + Assertions.assertThat(records).isEqualTo(1007); + } + protected FileStoreTable createFileStoreTable() throws Exception { Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString())); Schema schema =