Skip to content

Commit

Permalink
[core] support drop stats in result of scan plan (#4506)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Nov 18, 2024
1 parent 203db41 commit 220789d
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 4 deletions.
21 changes: 21 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,27 @@ public DataFileMeta rename(String newFileName) {
valueStatsCols);
}

public DataFileMeta copyWithoutStats() {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
EMPTY_STATS,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
Collections.emptyList());
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public Identifier identifier() {
file.embeddedIndex());
}

public ManifestEntry copyWithoutStats() {
return new ManifestEntry(kind, partition, bucket, totalBuckets, file.copyWithoutStats());
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestEntry)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private ManifestCacheFilter manifestCacheFilter = null;
private ScanMetrics scanMetrics = null;
private boolean dropStats;

public AbstractFileStoreScan(
ManifestsReader manifestsReader,
Expand All @@ -105,6 +106,7 @@ public AbstractFileStoreScan(
this.manifestFileFactory = manifestFileFactory;
this.tableSchemas = new ConcurrentHashMap<>();
this.parallelism = parallelism;
this.dropStats = false;
}

@Override
Expand Down Expand Up @@ -215,6 +217,12 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
return this;
}

@Override
public FileStoreScan dropStats() {
this.dropStats = true;
return this;
}

@Nullable
@Override
public Integer parallelism() {
Expand Down Expand Up @@ -291,6 +299,11 @@ public Snapshot snapshot() {

@Override
public List<ManifestEntry> files() {
if (dropStats) {
return files.stream()
.map(ManifestEntry::copyWithoutStats)
.collect(Collectors.toList());
}
return files;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface FileStoreScan {

FileStoreScan withMetrics(ScanMetrics metrics);

FileStoreScan dropStats();

@Nullable
Integer parallelism();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry)
return this;
}

@Override
public AbstractDataTableScan dropStats() {
snapshotReader.dropStats();
return this;
}

public CoreOptions options() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
// do nothing, should implement this if need
return this;
}

default InnerTableScan dropStats() {
// do nothing, should implement this if need
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ default ReadBuilder withProjection(int[][] projection) {
*/
ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

/** Delete stats in scan plan result. */
ReadBuilder dropStats();

/** Create a {@link TableScan} to perform batch planning. */
TableScan newScan();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ReadBuilderImpl implements ReadBuilder {

private @Nullable RowType readType;

private boolean dropStats = false;

public ReadBuilderImpl(InnerTable table) {
this.table = table;
}
Expand Down Expand Up @@ -124,6 +126,12 @@ public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public ReadBuilder dropStats() {
this.dropStats = true;
return this;
}

@Override
public TableScan newScan() {
InnerTableScan tableScan = configureScan(table.newScan());
Expand Down Expand Up @@ -156,6 +164,9 @@ private InnerTableScan configureScan(InnerTableScan scan) {
if (bucketFilter != null) {
scan.withBucketFilter(bucketFilter);
}
if (dropStats) {
scan.dropStats();
}
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public interface SnapshotReader {

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);

SnapshotReader dropStats();

SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

SnapshotReader withMetricRegistry(MetricRegistry registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader dropStats() {
scan.dropStats();
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
if (splitGenerator.alwaysRawConvertible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader dropStats() {
wrapped.dropStats();
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KeyValueFileStoreScan}. */
Expand Down Expand Up @@ -274,6 +275,27 @@ public void testWithManifestList() throws Exception {
runTestExactMatch(scan, null, expected);
}

@Test
public void testDropStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(100, 0, (long) Math.abs(random.nextInt(1000)));
writeData(data, 0);
data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000);
writeData(data, 0);
data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000);
writeData(data, 0);
data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000);
Snapshot snapshot = writeData(data, 0);

KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id()).dropStats();
List<ManifestEntry> files = scan.plan().files();

for (ManifestEntry manifestEntry : files) {
assertThat(manifestEntry.file().valueStats()).isEqualTo(EMPTY_STATS);
}
}

private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRow, BinaryRow> expected)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
StreamTableScan scan = readBuilder.newStreamScan();
StreamTableScan scan = readBuilder.dropStats().newStreamScan();
if (metricGroup(context) != null) {
((StreamDataTableScan) scan)
.withMetricsRegistry(new FlinkMetricRegistry(context.metricGroup()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ protected void scanSplitsForInference() {
List<PartitionEntry> partitionEntries =
table.newReadBuilder()
.withFilter(predicate)
.dropStats()
.newScan()
.listPartitionEntries();
long totalSize = 0;
Expand All @@ -188,7 +189,12 @@ protected void scanSplitsForInference() {
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
table.newReadBuilder()
.withFilter(predicate)
.dropStats()
.newScan()
.plan()
.splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu

private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
TableScan scan = readBuilder.newScan();
TableScan scan = readBuilder.dropStats().newScan();
// register scan metrics
if (context.metricGroup() != null) {
((InnerTableScan) scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public MonitorFunction(

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.scan = readBuilder.newStreamScan();
this.scan = readBuilder.dropStats().newStreamScan();

this.checkpointState =
context.getOperatorStateStore()
Expand Down

0 comments on commit 220789d

Please sign in to comment.