Skip to content

Commit

Permalink
[core] Optimize full compaction for manifest entries (#2963)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 7, 2024
1 parent 0b3509d commit a51f580
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 45 deletions.
45 changes: 27 additions & 18 deletions paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/** Entry representing a file. */
public interface FileEntry {
Expand Down Expand Up @@ -117,23 +119,10 @@ static void mergeEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
Map<Identifier, ManifestEntry> map) {
List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
manifestFiles.stream()
.map(
manifestFileMeta ->
CompletableFuture.supplyAsync(
() ->
manifestFile.read(
manifestFileMeta.fileName()),
FileUtils.COMMON_IO_FORK_JOIN_POOL))
.collect(Collectors.toList());

try {
for (CompletableFuture<List<ManifestEntry>> taskResult : manifestReadFutures) {
mergeEntries(taskResult.get(), map);
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to read manifest file.", e);
List<Supplier<List<ManifestEntry>>> manifestReadFutures =
readManifestEntries(manifestFile, manifestFiles);
for (Supplier<List<ManifestEntry>> taskResult : manifestReadFutures) {
mergeEntries(taskResult.get(), map);
}
}

Expand Down Expand Up @@ -167,6 +156,26 @@ static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifi
}
}

static List<Supplier<List<ManifestEntry>>> readManifestEntries(
ManifestFile manifestFile, List<ManifestFileMeta> manifestFiles) {
List<Supplier<List<ManifestEntry>>> result = new ArrayList<>();
for (ManifestFileMeta file : manifestFiles) {
Future<List<ManifestEntry>> future =
CompletableFuture.supplyAsync(
() -> manifestFile.read(file.fileName()),
FileUtils.COMMON_IO_FORK_JOIN_POOL);
result.add(
() -> {
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to read manifest file.", e);
}
});
}
return result;
}

static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
for (T entry : entries) {
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,7 @@ public long suggestedFileSize() {
* <p>NOTE: This method is atomic.
*/
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
new RollingFileWriter<>(
() ->
new ManifestEntryWriter(
writerFactory,
pathFactory.newPath(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
suggestedFileSize);
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer = createRollingWriter();
try {
writer.write(entries);
writer.close();
Expand All @@ -98,6 +91,16 @@ public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
return writer.result();
}

public RollingFileWriter<ManifestEntry, ManifestFileMeta> createRollingWriter() {
return new RollingFileWriter<>(
() ->
new ManifestEntryWriter(
writerFactory,
pathFactory.newPath(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
suggestedFileSize);
}

private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> {

private final TableStatsCollector partitionStatsCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.manifest;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand All @@ -28,6 +29,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
Expand All @@ -41,8 +43,11 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Metadata of a manifest file. */
public class ManifestFileMeta {

Expand Down Expand Up @@ -170,7 +175,7 @@ public static List<ManifestFileMeta> merge(
for (ManifestFileMeta manifest : newMetas) {
manifestFile.delete(manifest.fileName);
}
throw e;
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -229,7 +234,8 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
ManifestFile manifestFile,
long suggestedMetaSize,
long sizeTrigger,
RowType partitionType) {
RowType partitionType)
throws Exception {
// 1. should trigger full compaction

List<ManifestFileMeta> base = new ArrayList<>();
Expand Down Expand Up @@ -311,41 +317,70 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}
});

Map<Identifier, ManifestEntry> fullMerged = new LinkedHashMap<>();
List<ManifestEntry> mergedEntries = new ArrayList<>();
for (; j < base.size(); j++) {
ManifestFileMeta file = base.get(j);
FileEntry.mergeEntries(manifestFile.read(file.fileName), fullMerged);
boolean contains = false;
for (Identifier identifier : deleteEntries) {
if (fullMerged.containsKey(identifier)) {
for (ManifestEntry entry : manifestFile.read(file.fileName)) {
checkArgument(entry.kind() == FileKind.ADD);
if (deleteEntries.contains(entry.identifier())) {
contains = true;
break;
} else {
mergedEntries.add(entry);
}
}
if (contains) {
// already read this file into fullMerged
j++;
break;
} else {
fullMerged.clear();
mergedEntries.clear();
result.add(file);
}
}

// 2.3. merge base files
// 2.3. merge

FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged);
FileEntry.mergeEntries(deltaMerged.values(), fullMerged);
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
manifestFile.createRollingWriter();
Exception exception = null;
try {

// 2.4. write new manifest files
// 2.3.1 merge mergedEntries
for (ManifestEntry entry : mergedEntries) {
writer.write(entry);
}

if (!fullMerged.isEmpty()) {
List<ManifestFileMeta> merged =
manifestFile.write(new ArrayList<>(fullMerged.values()));
result.addAll(merged);
newMetas.addAll(merged);
// 2.3.2 merge base files
for (Supplier<List<ManifestEntry>> reader :
FileEntry.readManifestEntries(manifestFile, base.subList(j, base.size()))) {
for (ManifestEntry entry : reader.get()) {
checkArgument(entry.kind() == FileKind.ADD);
if (!deleteEntries.contains(entry.identifier())) {
writer.write(entry);
}
}
}

// 2.3.3 merge deltaMerged
for (ManifestEntry entry : deltaMerged.values()) {
if (entry.kind() == FileKind.ADD) {
writer.write(entry);
}
}
} catch (Exception e) {
exception = e;
} finally {
if (exception != null) {
IOUtils.closeQuietly(writer);
throw exception;
}
writer.close();
}

List<ManifestFileMeta> merged = writer.result();
result.addAll(merged);
newMetas.addAll(merged);
return Optional.of(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void testMergeWithoutDeleteFile() {
}

@Test
public void testTriggerFullCompaction() {
public void testTriggerFullCompaction() throws Exception {
List<ManifestEntry> entries = new ArrayList<>();
for (int i = 0; i < 16; i++) {
entries.add(makeEntry(true, String.valueOf(i)));
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testTriggerFullCompaction() {
}

@Test
public void testMultiPartitionsFullCompaction() {
public void testMultiPartitionsFullCompaction() throws Exception {

List<ManifestFileMeta> input = createBaseManifestFileMetas(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ protected void assertEquivalentEntries(
.collect(Collectors.toList());
List<String> entryBeforeMerge =
FileEntry.mergeEntries(inputEntry).stream()
.filter(entry -> entry.kind() == FileKind.ADD)
.map(entry -> entry.kind() + "-" + entry.file().fileName())
.collect(Collectors.toList());

Expand Down

0 comments on commit a51f580

Please sign in to comment.