Skip to content

Commit

Permalink
[core] Reduce small files while batch write append only table with a …
Browse files Browse the repository at this point in the history
…lot of partitions. (#3160)
  • Loading branch information
leaves12138 authored Apr 7, 2024
1 parent a26b46f commit ca88ed9
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
Expand All @@ -88,6 +91,7 @@ public AppendOnlyWriter(
RowType writeSchema,
long maxSequenceNumber,
CompactManager compactManager,
AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
Expand All @@ -104,6 +108,7 @@ public AppendOnlyWriter(
this.writeSchema = writeSchema;
this.pathFactory = pathFactory;
this.compactManager = compactManager;
this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
Expand Down Expand Up @@ -209,13 +214,25 @@ public void close() throws Exception {
}

public void toBufferedWriter() throws Exception {
if (sinkWriter != null && !sinkWriter.bufferSpillableWriter()) {
flush(false, false);
trySyncLatestCompaction(true);
if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && bucketFileRead != null) {
// fetch the written results
List<DataFileMeta> files = sinkWriter.flush();

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);

// rewrite small files
try (RecordReaderIterator<InternalRow> reader = bucketFileRead.read(files)) {
while (reader.hasNext()) {
sinkWriter.write(reader.next());
}
} finally {
// remove small files
for (DataFileMeta file : files) {
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -145,6 +146,7 @@ protected RecordWriter<InternalRow> createWriter(
rowType,
maxSequenceNumber,
compactManager,
bucketReader(partition, bucket),
commitForceCompact,
factory,
restoreIncrement,
Expand Down Expand Up @@ -174,21 +176,25 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter(
fileCompression,
statsCollectors);
try {
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
DataSplit.builder()
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(toCompact)
.build())));
rewriter.write(bucketReader(partition, bucket).read(toCompact));
} finally {
rewriter.close();
}
return rewriter.result();
};
}

public BucketFileRead bucketReader(BinaryRow partition, int bucket) {
return files ->
new RecordReaderIterator<>(
read.createReader(
DataSplit.builder()
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.build()));
}

public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
// AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in
// unaware-bucket mode (no compaction and force empty-writer).
Expand All @@ -215,4 +221,9 @@ protected void forceBufferSpill() throws Exception {
}
}
}

/** Read for one bucket. */
public interface BucketFileRead {
RecordReaderIterator<InternalRow> read(List<DataFileMeta> files) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
AppendOnlyWriterTest.SCHEMA,
getMaxSequenceNumber(toCompact),
compactManager,
null,
forceCompact,
pathFactory,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
0,
new AppendOnlyCompactManager(
null, toCompact, 4, 10, 10, null, null), // not used
null,
false,
dataFilePathFactory,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@

import java.util.List;
import java.util.Map;
import java.util.Random;

/** Tests for {@link AppendOnlyFileStoreWrite}. */
public class AppendOnlyFileStoreWriteTest {

private static final Random RANDOM = new Random();

@TempDir java.nio.file.Path tempDir;

@Test
Expand Down Expand Up @@ -101,6 +104,49 @@ public void testWritesInBatch() throws Exception {
Assertions.assertThat(records).isEqualTo(11);
}

@Test
public void testWritesInBatchWithNoExtraFiles() throws Exception {
FileStoreTable table = createFileStoreTable();

AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss");
write.withExecutionMode(false);

write.write(partition(0), 0, GenericRow.of(0, 0, 0));
write.write(partition(1), 1, GenericRow.of(1, 1, 0));
write.write(partition(2), 2, GenericRow.of(2, 2, 0));
write.write(partition(3), 3, GenericRow.of(3, 3, 0));
write.write(partition(4), 4, GenericRow.of(4, 4, 0));
write.write(partition(5), 5, GenericRow.of(5, 5, 0));
write.write(partition(6), 6, GenericRow.of(6, 6, 0));

for (int i = 0; i < 1000; i++) {
int number = RANDOM.nextInt(7);
write.write(partition(number), number, GenericRow.of(number, number, 0));
}

List<CommitMessage> commit = write.prepareCommit(true, Long.MAX_VALUE);

Assertions.assertThat(commit.size()).isEqualTo(7);

long files =
commit.stream()
.map(s -> (CommitMessageImpl) s)
.mapToLong(s -> s.newFilesIncrement().newFiles().size())
.sum();
Assertions.assertThat(files).isEqualTo(7);

long records =
commit.stream()
.map(s -> (CommitMessageImpl) s)
.mapToLong(
s ->
s.newFilesIncrement().newFiles().stream()
.mapToLong(DataFileMeta::rowCount)
.sum())
.sum();
Assertions.assertThat(records).isEqualTo(1007);
}

protected FileStoreTable createFileStoreTable() throws Exception {
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
Schema schema =
Expand Down

0 comments on commit ca88ed9

Please sign in to comment.