Skip to content

Commit

Permalink
[core] Push bucket filter into manifest entry reader
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Sep 25, 2024
1 parent 22e6bd3 commit a98288a
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public AppendOnlyFileStoreWrite newWrite(
newRead(),
schema.id(),
rowType,
partitionType,
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
Expand All @@ -119,6 +120,7 @@ public AppendOnlyFileStoreWrite newWrite(
schema.id(),
commitUser,
rowType,
partitionType,
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
Expand Down Expand Up @@ -154,14 +156,12 @@ public void pushdown(Predicate predicate) {

return new AppendOnlyFileStoreScan(
newManifestsReader(forWrite),
partitionType,
bucketFilter,
snapshotManager(),
schemaManager,
schema,
manifestFileFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism(),
options.fileIndexReadEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,13 @@ public void pushdown(Predicate keyFilter) {
};
return new KeyValueFileStoreScan(
newManifestsReader(forWrite),
partitionType,
bucketFilter,
snapshotManager(),
schemaManager,
schema,
keyValueFieldsExtractor,
manifestFileFactory(forWrite),
options.bucket(),
forWrite,
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
options.mergeEngine(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,8 @@ public static Function<InternalRow, Integer> totalBucketGetter() {
public static Function<InternalRow, String> fileNameGetter() {
return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getString(0).toString();
}

public static Function<InternalRow, Integer> levelGetter() {
return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getInt(10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
Expand All @@ -38,8 +37,6 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -68,11 +65,9 @@
public abstract class AbstractFileStoreScan implements FileStoreScan {

private final ManifestsReader manifestsReader;
private final RowType partitionType;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
private final int numOfBuckets;
private final boolean checkNumOfBuckets;
private final Integer parallelism;

private final ConcurrentMap<Long, TableSchema> tableSchemas;
Expand All @@ -93,24 +88,20 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

public AbstractFileStoreScan(
ManifestsReader manifestsReader,
RowType partitionType,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
@Nullable Integer parallelism) {
this.manifestsReader = manifestsReader;
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
this.schema = schema;
this.manifestFileFactory = manifestFileFactory;
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.parallelism = parallelism;
}
Expand Down Expand Up @@ -229,7 +220,6 @@ public ManifestsReader manifestsReader() {

@Override
public Plan plan() {

Pair<Snapshot, List<ManifestEntry>> planResult = doPlan();

final Snapshot readSnapshot = planResult.getLeft();
Expand Down Expand Up @@ -301,45 +291,14 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
Collection<ManifestEntry> mergedEntries =
readAndMergeFileEntries(manifests, this::readManifest);

List<ManifestEntry> files = new ArrayList<>();
long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();
for (ManifestEntry file : mergedEntries) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
+ FileStorePathFactory.getPartitionComputer(
partitionType,
CoreOptions.PARTITION_DEFAULT_NAME
.defaultValue())
.generatePartValues(file.partition())
: "table";
throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
partInfo, numOfBuckets, file.totalBuckets()));
}

// bucket filter should not be applied along with partition filter
// because the specifiedBucket is computed against the current
// numOfBuckets
// however entry.bucket() was computed against the old numOfBuckets
// and thus the filtered manifest entries might be empty
// which renders the bucket check invalid
if (filterMergedManifestEntry(file)) {
files.add(file);
}
}

long afterBucketFilter = files.size();
long skippedByBucketAndLevelFilter = mergedEntries.size() - files.size();
// We group files by bucket here, and filter them by the whole bucket filter.
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
files =
files.stream()
List<ManifestEntry> files =
mergedEntries.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
Expand All @@ -352,21 +311,17 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
.flatMap(Collection::stream)
.collect(Collectors.toList());

long skippedByWholeBucketFiles = afterBucketFilter - files.size();
long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
checkState(
startDataFiles
- skippedByPartitionAndStats
- skippedByBucketAndLevelFilter
- skippedByWholeBucketFiles
startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles
== files.size());
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
skippedByPartitionAndStats,
skippedByBucketAndLevelFilter,
skippedByWholeBucketFiles,
files.size()));
}
Expand Down Expand Up @@ -401,13 +356,6 @@ protected TableSchema scanTableSchema(long id) {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);

/** Note: Keep this thread-safe. */
private boolean filterMergedManifestEntry(ManifestEntry entry) {
return (bucketFilter == null || bucketFilter.test(entry.bucket()))
&& bucketKeyFilter.select(entry.bucket(), entry.totalBuckets())
&& (levelFilter == null || levelFilter.test(entry.file().level()));
}

/** Note: Keep this thread-safe. */
protected abstract List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> entries);

Expand All @@ -420,12 +368,8 @@ public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
.read(
manifest.fileName(),
manifest.fileSize(),
createCacheRowFilter(manifestCacheFilter, numOfBuckets),
createEntryRowFilter(
manifestsReader.partitionFilter(),
bucketFilter,
fileNameFilter,
numOfBuckets));
createCacheRowFilter(),
createEntryRowFilter());
List<ManifestEntry> filteredEntries = new ArrayList<>(entries.size());
for (ManifestEntry entry : entries) {
if ((manifestEntryFilter == null || manifestEntryFilter.test(entry))
Expand All @@ -446,12 +390,8 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
// use filter for ManifestEntry
// currently, projection is not pushed down to file format
// see SimpleFileEntrySerializer
createCacheRowFilter(manifestCacheFilter, numOfBuckets),
createEntryRowFilter(
manifestsReader.partitionFilter(),
bucketFilter,
fileNameFilter,
numOfBuckets));
createCacheRowFilter(),
createEntryRowFilter());
}

/**
Expand All @@ -460,8 +400,7 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
*/
private static Filter<InternalRow> createCacheRowFilter(
@Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) {
private Filter<InternalRow> createCacheRowFilter() {
if (manifestCacheFilter == null) {
return Filter.alwaysTrue();
}
Expand All @@ -485,25 +424,31 @@ private static Filter<InternalRow> createCacheRowFilter(
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
*/
private static Filter<InternalRow> createEntryRowFilter(
@Nullable PartitionPredicate partitionFilter,
@Nullable Filter<Integer> bucketFilter,
@Nullable Filter<String> fileNameFilter,
int numOfBuckets) {
private Filter<InternalRow> createEntryRowFilter() {
Function<InternalRow, BinaryRow> partitionGetter =
ManifestEntrySerializer.partitionGetter();
Function<InternalRow, Integer> bucketGetter = ManifestEntrySerializer.bucketGetter();
Function<InternalRow, Integer> totalBucketGetter =
ManifestEntrySerializer.totalBucketGetter();
Function<InternalRow, String> fileNameGetter = ManifestEntrySerializer.fileNameGetter();
PartitionPredicate partitionFilter = manifestsReader.partitionFilter();
Function<InternalRow, Integer> levelGetter = ManifestEntrySerializer.levelGetter();
return row -> {
if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) {
return false;
}

if (bucketFilter != null
&& numOfBuckets == totalBucketGetter.apply(row)
&& !bucketFilter.test(bucketGetter.apply(row))) {
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
if (bucketFilter != null && numOfBuckets == totalBucket && !bucketFilter.test(bucket)) {
return false;
}

if (!bucketKeyFilter.select(bucket, totalBucket)) {
return false;
}

if (levelFilter != null && !levelFilter.test(levelGetter.apply(row))) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
Expand All @@ -54,7 +55,9 @@
import java.util.concurrent.Executors;
import java.util.function.Function;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
import static org.apache.paimon.utils.FileStorePathFactory.getPartitionComputer;

/**
* Base {@link FileStoreWrite} implementation.
Expand All @@ -70,6 +73,8 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private final int writerNumberMax;
@Nullable private final IndexMaintainer.Factory<T> indexFactory;
@Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory;
private final int totalBuckets;
private final RowType partitionType;

@Nullable protected IOManager ioManager;

Expand All @@ -90,11 +95,15 @@ protected AbstractFileStoreWrite(
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName,
int totalBuckets,
RowType partitionType,
int writerNumberMax) {
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
this.totalBuckets = totalBuckets;
this.partitionType = partitionType;
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
Expand Down Expand Up @@ -451,10 +460,26 @@ public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
private List<DataFileMeta> scanExistingFileMetas(
long snapshotId, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
// Concat all the DataFileMeta of existing files into existingFileMetas.
scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream()
.map(ManifestEntry::file)
.forEach(existingFileMetas::add);
List<ManifestEntry> files =
scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files();
for (ManifestEntry entry : files) {
if (entry.totalBuckets() != totalBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue())
.generatePartValues(partition)
: "table";
throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
partInfo, totalBuckets, entry.totalBuckets()));
}
existingFileMetas.add(entry.file());
}
return existingFileMetas;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,22 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {

public AppendOnlyFileStoreScan(
ManifestsReader manifestsReader,
RowType partitionType,
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
ManifestFile.Factory manifestFileFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
boolean fileIndexReadEnabled) {
super(
manifestsReader,
partitionType,
bucketFilter,
snapshotManager,
schemaManager,
schema,
manifestFileFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism);
this.simpleStatsConverters =
new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
Expand Down
Loading

0 comments on commit a98288a

Please sign in to comment.