Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Reduce small files while batch write append only table with a lot of partitions. #3160

Merged
merged 4 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
Expand All @@ -88,6 +91,7 @@ public AppendOnlyWriter(
RowType writeSchema,
long maxSequenceNumber,
CompactManager compactManager,
AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
Expand All @@ -104,6 +108,7 @@ public AppendOnlyWriter(
this.writeSchema = writeSchema;
this.pathFactory = pathFactory;
this.compactManager = compactManager;
this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
Expand Down Expand Up @@ -209,13 +214,25 @@ public void close() throws Exception {
}

public void toBufferedWriter() throws Exception {
if (sinkWriter != null && !sinkWriter.bufferSpillableWriter()) {
flush(false, false);
trySyncLatestCompaction(true);
if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() && bucketFileRead != null) {
// fetch the written results
List<DataFileMeta> files = sinkWriter.flush();

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);

// rewrite small files
try (RecordReaderIterator<InternalRow> reader = bucketFileRead.read(files)) {
while (reader.hasNext()) {
sinkWriter.write(reader.next());
}
}

// remove small files
for (DataFileMeta file : files) {
leaves12138 marked this conversation as resolved.
Show resolved Hide resolved
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -145,6 +146,7 @@ protected RecordWriter<InternalRow> createWriter(
rowType,
maxSequenceNumber,
compactManager,
bucketReader(partition, bucket),
commitForceCompact,
factory,
restoreIncrement,
Expand Down Expand Up @@ -174,21 +176,30 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter(
fileCompression,
statsCollectors);
try {
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
DataSplit.builder()
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(toCompact)
.build())));
rewriter.write(bucketReader(partition, bucket).read(toCompact));
} finally {
rewriter.close();
}
return rewriter.result();
};
}

public BucketFileRead bucketReader(BinaryRow partition, int bucket) {
return files -> {
try {
return new RecordReaderIterator<>(
read.createReader(
DataSplit.builder()
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.build()));
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}

public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
// AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act difference in
// unaware-bucket mode (no compaction and force empty-writer).
Expand All @@ -215,4 +226,9 @@ protected void forceBufferSpill() throws Exception {
}
}
}

/** Read for one bucket. */
public interface BucketFileRead {
RecordReaderIterator<InternalRow> read(List<DataFileMeta> files);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
AppendOnlyWriterTest.SCHEMA,
getMaxSequenceNumber(toCompact),
compactManager,
null,
forceCompact,
pathFactory,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
0,
new AppendOnlyCompactManager(
null, toCompact, 4, 10, 10, null, null), // not used
null,
false,
dataFilePathFactory,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@

import java.util.List;
import java.util.Map;
import java.util.Random;

/** Tests for {@link AppendOnlyFileStoreWrite}. */
public class AppendOnlyFileStoreWriteTest {

private static final Random RANDOM = new Random();

@TempDir java.nio.file.Path tempDir;

@Test
Expand Down Expand Up @@ -101,6 +104,49 @@ public void testWritesInBatch() throws Exception {
Assertions.assertThat(records).isEqualTo(11);
}

@Test
public void testWritesInBatchWithNoExtraFiles() throws Exception {
FileStoreTable table = createFileStoreTable();

AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite) table.store().newWrite("ss");
write.withExecutionMode(false);

write.write(partition(0), 0, GenericRow.of(0, 0, 0));
write.write(partition(1), 1, GenericRow.of(1, 1, 0));
write.write(partition(2), 2, GenericRow.of(2, 2, 0));
write.write(partition(3), 3, GenericRow.of(3, 3, 0));
write.write(partition(4), 4, GenericRow.of(4, 4, 0));
write.write(partition(5), 5, GenericRow.of(5, 5, 0));
write.write(partition(6), 6, GenericRow.of(6, 6, 0));

for (int i = 0; i < 1000; i++) {
int number = RANDOM.nextInt(7);
write.write(partition(number), number, GenericRow.of(number, number, 0));
}

List<CommitMessage> commit = write.prepareCommit(true, Long.MAX_VALUE);

Assertions.assertThat(commit.size()).isEqualTo(7);

long files =
commit.stream()
.map(s -> (CommitMessageImpl) s)
.mapToLong(s -> s.newFilesIncrement().newFiles().size())
.sum();
Assertions.assertThat(files).isEqualTo(7);

long records =
commit.stream()
.map(s -> (CommitMessageImpl) s)
.mapToLong(
s ->
s.newFilesIncrement().newFiles().stream()
.mapToLong(DataFileMeta::rowCount)
.sum())
.sum();
Assertions.assertThat(records).isEqualTo(1007);
}

protected FileStoreTable createFileStoreTable() throws Exception {
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
Schema schema =
Expand Down
Loading