Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce SplitGroup for SplitGenerator #3059

Merged
merged 5 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ public Optional<CoreOptions.FileFormatType> fileFormat() {
}
}

public boolean rawConvertible() {
return level != 0 && Objects.equals(deleteRowCount, 0L);
}

public DataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
return new DataFileMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.append.AppendOnlyCompactManager.fileComparator;

Expand All @@ -44,21 +45,23 @@ public AppendOnlySplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> input) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
files.sort(fileComparator(bucketMode == BucketMode.UNAWARE));
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// When the bucket mode is unaware, we spit the files as batch, because unaware-bucket table
// only contains one bucket (bucket 0).
if (bucketMode == BucketMode.UNAWARE) {
return splitForBatch(files);
} else {
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ public MergeTreeSplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
boolean rawConvertible = files.stream().allMatch(DataFileMeta::rawConvertible);
boolean oneLevel =
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1;

if (rawConvertible && (deletionVectorsEnabled || mergeEngine == FIRST_ROW || oneLevel)) {
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

/*
Expand Down Expand Up @@ -93,13 +99,19 @@ public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
new IntervalPartition(files, keyComparator)
.partition().stream().map(this::flatRun).collect(Collectors.toList());

return packSplits(sections);
return packSplits(sections).stream()
.map(
f ->
f.size() == 1 && f.get(0).rawConvertible()
? SplitGroup.rawConvertibleGroup(f)
: SplitGroup.nonRawConvertibleGroup(f))
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// We don't split streaming scan files
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}

private List<List<DataFileMeta>> packSplits(List<List<DataFileMeta>> sections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,27 @@
/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {

List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files);
List<SplitGroup> splitForBatch(List<DataFileMeta> files);

List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files);
List<SplitGroup> splitForStreaming(List<DataFileMeta> files);

/** Split group. */
class SplitGroup {

public final List<DataFileMeta> files;
public final boolean rawConvertible;

private SplitGroup(List<DataFileMeta> files, boolean rawConvertible) {
this.files = files;
this.rawConvertible = rawConvertible;
}

public static SplitGroup rawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, true);
}

public static SplitGroup nonRawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -65,15 +66,15 @@ public Result scan(SnapshotReader reader) {
for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
for (SplitGenerator.SplitGroup splitGroup :
reader.splitGenerator().splitForBatch(entry.getValue())) {
// TODO pass deletion files
result.add(
DataSplit.builder()
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.withDataFiles(splitGroup.files)
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
Expand Down Expand Up @@ -278,18 +277,24 @@ private List<DataSplit> generateSplits(
.withPartition(partition)
.withBucket(bucket)
.isStreaming(isStreaming);
List<List<DataFileMeta>> splitGroups =
List<SplitGenerator.SplitGroup> splitGroups =
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
for (List<DataFileMeta> dataFiles : splitGroups) {
builder.withDataFiles(dataFiles)
.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
if (deletionVectors) {
IndexFileMeta deletionIndexFile =
indexFileHandler

IndexFileMeta deletionIndexFile =
deletionVectors
? indexFileHandler
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
.orElse(null)
: null;
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
builder.withDataFiles(dataFiles);
if (splitGroup.rawConvertible) {
builder.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
}
if (deletionVectors) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles, deletionIndexFile));
}
Expand Down Expand Up @@ -370,8 +375,7 @@ private Plan toChangesPlan(
.withBucket(bucket)
.withBeforeFiles(before)
.withDataFiles(data)
.isStreaming(isStreaming)
.rawFiles(convertToRawFiles(part, bucket, data));
.isStreaming(isStreaming);
if (deletionVectors) {
IndexFileMeta beforeDeletionIndex =
indexFileHandler
Expand Down Expand Up @@ -437,21 +441,6 @@ private List<DeletionFile> getDeletionFiles(
private List<RawFile> convertToRawFiles(
BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();

// append only or deletionVectors files can be returned
if (tableSchema.primaryKeys().isEmpty() || deletionVectors || mergeEngine == FIRST_ROW) {
return makeRawTableFiles(bucketPath, dataFiles);
}

int maxLevel = options.numLevels() - 1;
if (dataFiles.stream().map(DataFileMeta::level).allMatch(l -> l == maxLevel)) {
return makeRawTableFiles(bucketPath, dataFiles);
}

return Collections.emptyList();
}

private List<RawFile> makeRawTableFiles(String bucketPath, List<DataFileMeta> dataFiles) {
return dataFiles.stream()
.map(f -> makeRawTableFile(bucketPath, f))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ public void testMergeTree() {
Collections.singletonList("6"));
}

private List<List<String>> toNames(List<List<DataFileMeta>> splits) {
return splits.stream()
private List<List<String>> toNames(List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
files ->
files.stream()
splitGroup ->
splitGroup.files.stream()
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.collect(Collectors.toList());
Expand Down
Loading