From a98288a8e311fa8042cb4ab9a0014ef6d4fba5dd Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 25 Sep 2024 21:18:00 +0800 Subject: [PATCH] [core] Push bucket filter into manifest entry reader --- .../apache/paimon/AppendOnlyFileStore.java | 4 +- .../org/apache/paimon/KeyValueFileStore.java | 2 - .../manifest/ManifestEntrySerializer.java | 4 + .../operation/AbstractFileStoreScan.java | 101 ++++-------------- .../operation/AbstractFileStoreWrite.java | 33 +++++- .../operation/AppendOnlyFileStoreScan.java | 4 - .../operation/AppendOnlyFileStoreWrite.java | 3 +- .../AppendOnlyFixedBucketFileStoreWrite.java | 2 + ...AppendOnlyUnawareBucketFileStoreWrite.java | 2 + .../operation/KeyValueFileStoreScan.java | 5 - .../operation/KeyValueFileStoreWrite.java | 1 + .../operation/MemoryFileStoreWrite.java | 4 + .../paimon/operation/metrics/ScanMetrics.java | 6 -- .../paimon/operation/metrics/ScanStats.java | 13 +-- .../operation/metrics/ScanMetricsTest.java | 16 +-- 15 files changed, 74 insertions(+), 126 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index b0e3ea5f428e..261691d7ce9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -106,6 +106,7 @@ public AppendOnlyFileStoreWrite newWrite( newRead(), schema.id(), rowType, + partitionType, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -119,6 +120,7 @@ public AppendOnlyFileStoreWrite newWrite( schema.id(), commitUser, rowType, + partitionType, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -154,14 +156,12 @@ public void pushdown(Predicate predicate) { return new AppendOnlyFileStoreScan( newManifestsReader(forWrite), - partitionType, bucketFilter, snapshotManager(), schemaManager, schema, manifestFileFactory(forWrite), options.bucket(), - forWrite, options.scanManifestParallelism(), options.fileIndexReadEnabled()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index d2d1d9972fa1..009f35780246 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -230,7 +230,6 @@ public void pushdown(Predicate keyFilter) { }; return new KeyValueFileStoreScan( newManifestsReader(forWrite), - partitionType, bucketFilter, snapshotManager(), schemaManager, @@ -238,7 +237,6 @@ public void pushdown(Predicate keyFilter) { keyValueFieldsExtractor, manifestFileFactory(forWrite), options.bucket(), - forWrite, options.scanManifestParallelism(), options.deletionVectorsEnabled(), options.mergeEngine(), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index 2c3ba2aeaab3..b1030448a7e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -96,4 +96,8 @@ public static Function totalBucketGetter() { public static Function fileNameGetter() { return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getString(0).toString(); } + + public static Function levelGetter() { + return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getInt(10); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index d043932810c3..ea9055eb5c2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -18,7 +18,6 @@ package org.apache.paimon.operation; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -38,8 +37,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -68,11 +65,9 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final ManifestsReader manifestsReader; - private final RowType partitionType; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; private final int numOfBuckets; - private final boolean checkNumOfBuckets; private final Integer parallelism; private final ConcurrentMap tableSchemas; @@ -93,24 +88,20 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { public AbstractFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, ManifestFile.Factory manifestFileFactory, int numOfBuckets, - boolean checkNumOfBuckets, @Nullable Integer parallelism) { this.manifestsReader = manifestsReader; - this.partitionType = partitionType; this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; this.schemaManager = schemaManager; this.schema = schema; this.manifestFileFactory = manifestFileFactory; this.numOfBuckets = numOfBuckets; - this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); this.parallelism = parallelism; } @@ -229,7 +220,6 @@ public ManifestsReader manifestsReader() { @Override public Plan plan() { - Pair> planResult = doPlan(); final Snapshot readSnapshot = planResult.getLeft(); @@ -301,45 +291,14 @@ private Pair> doPlan() { Collection mergedEntries = readAndMergeFileEntries(manifests, this::readManifest); - List files = new ArrayList<>(); long skippedByPartitionAndStats = startDataFiles - mergedEntries.size(); - for (ManifestEntry file : mergedEntries) { - if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { - String partInfo = - partitionType.getFieldCount() > 0 - ? "partition " - + FileStorePathFactory.getPartitionComputer( - partitionType, - CoreOptions.PARTITION_DEFAULT_NAME - .defaultValue()) - .generatePartValues(file.partition()) - : "table"; - throw new RuntimeException( - String.format( - "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " - + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", - partInfo, numOfBuckets, file.totalBuckets())); - } - // bucket filter should not be applied along with partition filter - // because the specifiedBucket is computed against the current - // numOfBuckets - // however entry.bucket() was computed against the old numOfBuckets - // and thus the filtered manifest entries might be empty - // which renders the bucket check invalid - if (filterMergedManifestEntry(file)) { - files.add(file); - } - } - - long afterBucketFilter = files.size(); - long skippedByBucketAndLevelFilter = mergedEntries.size() - files.size(); // We group files by bucket here, and filter them by the whole bucket filter. // Why do this: because in primary key table, we can't just filter the value // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`), // but we can do this by filter the whole bucket files - files = - files.stream() + List files = + mergedEntries.stream() .collect( Collectors.groupingBy( // we use LinkedHashMap to avoid disorder @@ -352,13 +311,10 @@ private Pair> doPlan() { .flatMap(Collection::stream) .collect(Collectors.toList()); - long skippedByWholeBucketFiles = afterBucketFilter - files.size(); + long skippedByWholeBucketFiles = mergedEntries.size() - files.size(); long scanDuration = (System.nanoTime() - started) / 1_000_000; checkState( - startDataFiles - - skippedByPartitionAndStats - - skippedByBucketAndLevelFilter - - skippedByWholeBucketFiles + startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles == files.size()); if (scanMetrics != null) { scanMetrics.reportScan( @@ -366,7 +322,6 @@ private Pair> doPlan() { scanDuration, manifests.size(), skippedByPartitionAndStats, - skippedByBucketAndLevelFilter, skippedByWholeBucketFiles, files.size())); } @@ -401,13 +356,6 @@ protected TableSchema scanTableSchema(long id) { /** Note: Keep this thread-safe. */ protected abstract boolean filterByStats(ManifestEntry entry); - /** Note: Keep this thread-safe. */ - private boolean filterMergedManifestEntry(ManifestEntry entry) { - return (bucketFilter == null || bucketFilter.test(entry.bucket())) - && bucketKeyFilter.select(entry.bucket(), entry.totalBuckets()) - && (levelFilter == null || levelFilter.test(entry.file().level())); - } - /** Note: Keep this thread-safe. */ protected abstract List filterWholeBucketByStats(List entries); @@ -420,12 +368,8 @@ public List readManifest(ManifestFileMeta manifest) { .read( manifest.fileName(), manifest.fileSize(), - createCacheRowFilter(manifestCacheFilter, numOfBuckets), - createEntryRowFilter( - manifestsReader.partitionFilter(), - bucketFilter, - fileNameFilter, - numOfBuckets)); + createCacheRowFilter(), + createEntryRowFilter()); List filteredEntries = new ArrayList<>(entries.size()); for (ManifestEntry entry : entries) { if ((manifestEntryFilter == null || manifestEntryFilter.test(entry)) @@ -446,12 +390,8 @@ private List readSimpleEntries(ManifestFileMeta manifest) { // use filter for ManifestEntry // currently, projection is not pushed down to file format // see SimpleFileEntrySerializer - createCacheRowFilter(manifestCacheFilter, numOfBuckets), - createEntryRowFilter( - manifestsReader.partitionFilter(), - bucketFilter, - fileNameFilter, - numOfBuckets)); + createCacheRowFilter(), + createEntryRowFilter()); } /** @@ -460,8 +400,7 @@ private List readSimpleEntries(ManifestFileMeta manifest) { * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static Filter createCacheRowFilter( - @Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) { + private Filter createCacheRowFilter() { if (manifestCacheFilter == null) { return Filter.alwaysTrue(); } @@ -485,25 +424,31 @@ private static Filter createCacheRowFilter( * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static Filter createEntryRowFilter( - @Nullable PartitionPredicate partitionFilter, - @Nullable Filter bucketFilter, - @Nullable Filter fileNameFilter, - int numOfBuckets) { + private Filter createEntryRowFilter() { Function partitionGetter = ManifestEntrySerializer.partitionGetter(); Function bucketGetter = ManifestEntrySerializer.bucketGetter(); Function totalBucketGetter = ManifestEntrySerializer.totalBucketGetter(); Function fileNameGetter = ManifestEntrySerializer.fileNameGetter(); + PartitionPredicate partitionFilter = manifestsReader.partitionFilter(); + Function levelGetter = ManifestEntrySerializer.levelGetter(); return row -> { if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) { return false; } - if (bucketFilter != null - && numOfBuckets == totalBucketGetter.apply(row) - && !bucketFilter.test(bucketGetter.apply(row))) { + int bucket = bucketGetter.apply(row); + int totalBucket = totalBucketGetter.apply(row); + if (bucketFilter != null && numOfBuckets == totalBucket && !bucketFilter.test(bucket)) { + return false; + } + + if (!bucketKeyFilter.select(bucket, totalBucket)) { + return false; + } + + if (levelFilter != null && !levelFilter.test(levelGetter.apply(row))) { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 2e502e96f328..dab20d642cb9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -34,6 +34,7 @@ import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.RecordWriter; @@ -54,7 +55,9 @@ import java.util.concurrent.Executors; import java.util.function.Function; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; +import static org.apache.paimon.utils.FileStorePathFactory.getPartitionComputer; /** * Base {@link FileStoreWrite} implementation. @@ -70,6 +73,8 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private final int writerNumberMax; @Nullable private final IndexMaintainer.Factory indexFactory; @Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory; + private final int totalBuckets; + private final RowType partitionType; @Nullable protected IOManager ioManager; @@ -90,11 +95,15 @@ protected AbstractFileStoreWrite( @Nullable IndexMaintainer.Factory indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName, + int totalBuckets, + RowType partitionType, int writerNumberMax) { this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; this.dvMaintainerFactory = dvMaintainerFactory; + this.totalBuckets = totalBuckets; + this.partitionType = partitionType; this.writers = new HashMap<>(); this.tableName = tableName; this.writerNumberMax = writerNumberMax; @@ -451,10 +460,26 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { private List scanExistingFileMetas( long snapshotId, BinaryRow partition, int bucket) { List existingFileMetas = new ArrayList<>(); - // Concat all the DataFileMeta of existing files into existingFileMetas. - scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream() - .map(ManifestEntry::file) - .forEach(existingFileMetas::add); + List files = + scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files(); + for (ManifestEntry entry : files) { + if (entry.totalBuckets() != totalBuckets) { + String partInfo = + partitionType.getFieldCount() > 0 + ? "partition " + + getPartitionComputer( + partitionType, + PARTITION_DEFAULT_NAME.defaultValue()) + .generatePartValues(partition) + : "table"; + throw new RuntimeException( + String.format( + "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " + + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", + partInfo, totalBuckets, entry.totalBuckets())); + } + existingFileMetas.add(entry.file()); + } return existingFileMetas; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index b911d38841a0..00e33bef8df1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -52,26 +52,22 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { public AppendOnlyFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, ScanBucketFilter bucketFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, ManifestFile.Factory manifestFileFactory, int numOfBuckets, - boolean checkNumOfBuckets, Integer scanManifestParallelism, boolean fileIndexReadEnabled) { super( manifestsReader, - partitionType, bucketFilter, snapshotManager, schemaManager, schema, manifestFileFactory, numOfBuckets, - checkNumOfBuckets, scanManifestParallelism); this.simpleStatsConverters = new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 40fe5dbfafc5..0a4d5d56a13e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -80,13 +80,14 @@ public AppendOnlyFileStoreWrite( RawFileSplitRead read, long schemaId, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) { - super(snapshotManager, scan, options, null, dvMaintainerFactory, tableName); + super(snapshotManager, scan, options, partitionType, null, dvMaintainerFactory, tableName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java index e169c0165b4a..c58bad9a9796 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java @@ -49,6 +49,7 @@ public AppendOnlyFixedBucketFileStoreWrite( long schemaId, String commitUser, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -60,6 +61,7 @@ public AppendOnlyFixedBucketFileStoreWrite( read, schemaId, rowType, + partitionType, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java index f33b207bb09f..e509b589944d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java @@ -46,6 +46,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( RawFileSplitRead read, long schemaId, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -57,6 +58,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( read, schemaId, rowType, + partitionType, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 5b067de087cd..ea478df4253e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -31,7 +31,6 @@ import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.stats.SimpleStatsConverters; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import java.util.ArrayList; @@ -56,7 +55,6 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { public KeyValueFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, ScanBucketFilter bucketFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, @@ -64,21 +62,18 @@ public KeyValueFileStoreScan( KeyValueFieldsExtractor keyValueFieldsExtractor, ManifestFile.Factory manifestFileFactory, int numOfBuckets, - boolean checkNumOfBuckets, Integer scanManifestParallelism, boolean deletionVectorsEnabled, MergeEngine mergeEngine, ChangelogProducer changelogProducer) { super( manifestsReader, - partitionType, bucketFilter, snapshotManager, schemaManager, schema, manifestFileFactory, numOfBuckets, - checkNumOfBuckets, scanManifestParallelism); this.fieldKeyStatsConverters = new SimpleStatsConverters( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 51dd395a96e6..d061e181618b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -136,6 +136,7 @@ public KeyValueFileStoreWrite( snapshotManager, scan, options, + partitionType, indexFactory, deletionVectorsMaintainerFactory, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 386a72cf5920..b7feeead4bbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -28,6 +28,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.WriterBufferMetric; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -62,6 +63,7 @@ public MemoryFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, + RowType partitionType, @Nullable IndexMaintainer.Factory indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) { @@ -71,6 +73,8 @@ public MemoryFileStoreWrite( indexFactory, dvMaintainerFactory, tableName, + options.bucket(), + partitionType, options.writeMaxWritersToSpill()); this.options = options; this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java index 4bf48a78eebc..9fcbb8960fc5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java @@ -52,9 +52,6 @@ public MetricGroup getMetricGroup() { public static final String LAST_SKIPPED_BY_PARTITION_AND_STATS = "lastSkippedByPartitionAndStats"; - public static final String LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER = - "lastSkippedByBucketAndLevelFilter"; - public static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER = "lastSkippedByWholeBucketFilesFilter"; @@ -72,9 +69,6 @@ private void registerGenericScanMetrics() { metricGroup.gauge( LAST_SKIPPED_BY_PARTITION_AND_STATS, () -> latestScan == null ? 0L : latestScan.getSkippedByPartitionAndStats()); - metricGroup.gauge( - LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER, - () -> latestScan == null ? 0L : latestScan.getSkippedByBucketAndLevelFilter()); metricGroup.gauge( LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER, () -> latestScan == null ? 0L : latestScan.getSkippedByWholeBucketFiles()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java index 21f905df392c..e760282e687a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java @@ -26,7 +26,6 @@ public class ScanStats { private final long duration; private final long scannedManifests; private final long skippedByPartitionAndStats; - private final long skippedByBucketAndLevelFilter; private final long skippedByWholeBucketFiles; private final long skippedTableFiles; @@ -36,18 +35,13 @@ public ScanStats( long duration, long scannedManifests, long skippedByPartitionAndStats, - long skippedByBucketAndLevelFilter, long skippedByWholeBucketFiles, long resultedTableFiles) { this.duration = duration; this.scannedManifests = scannedManifests; this.skippedByPartitionAndStats = skippedByPartitionAndStats; - this.skippedByBucketAndLevelFilter = skippedByBucketAndLevelFilter; this.skippedByWholeBucketFiles = skippedByWholeBucketFiles; - this.skippedTableFiles = - skippedByPartitionAndStats - + skippedByBucketAndLevelFilter - + skippedByWholeBucketFiles; + this.skippedTableFiles = skippedByPartitionAndStats + skippedByWholeBucketFiles; this.resultedTableFiles = resultedTableFiles; } @@ -71,11 +65,6 @@ protected long getSkippedByPartitionAndStats() { return skippedByPartitionAndStats; } - @VisibleForTesting - protected long getSkippedByBucketAndLevelFilter() { - return skippedByBucketAndLevelFilter; - } - @VisibleForTesting protected long getSkippedByWholeBucketFiles() { return skippedByWholeBucketFiles; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java index 5b41d0258316..2b9d0e0cb728 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java @@ -50,7 +50,6 @@ public void testGenericMetricsRegistration() { ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES, ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS, - ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER, ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER); } @@ -71,10 +70,6 @@ public void testMetricsAreUpdated() { (Gauge) registeredGenericMetrics.get( ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS); - Gauge lastSkippedByBucketAndLevelFilter = - (Gauge) - registeredGenericMetrics.get( - ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER); Gauge lastSkippedByWholeBucketFilesFilter = (Gauge) registeredGenericMetrics.get( @@ -91,7 +86,6 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().size()).isEqualTo(0); assertThat(lastScannedManifests.getValue()).isEqualTo(0); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(0); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0); assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0); @@ -111,9 +105,8 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0); assertThat(lastScannedManifests.getValue()).isEqualTo(20); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(40); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32); - assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(97); + assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(57); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10); // report again @@ -131,19 +124,18 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132, offset(0.001)); assertThat(lastScannedManifests.getValue()).isEqualTo(22); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(42); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33); - assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(105); + assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(63); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8); } private void reportOnce(ScanMetrics scanMetrics) { - ScanStats scanStats = new ScanStats(200, 20, 25, 40, 32, 10); + ScanStats scanStats = new ScanStats(200, 20, 25, 32, 10); scanMetrics.reportScan(scanStats); } private void reportAgain(ScanMetrics scanMetrics) { - ScanStats scanStats = new ScanStats(500, 22, 30, 42, 33, 8); + ScanStats scanStats = new ScanStats(500, 22, 30, 33, 8); scanMetrics.reportScan(scanStats); }