Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support delete stats in result of scan plan. #4506

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,27 @@ public DataFileMeta rename(String newFileName) {
valueStatsCols);
}

public DataFileMeta copyWithoutStats() {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
EMPTY_STATS,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
Collections.emptyList());
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public Identifier identifier() {
file.embeddedIndex());
}

public ManifestEntry copyWithoutStats() {
return new ManifestEntry(kind, partition, bucket, totalBuckets, file.copyWithoutStats());
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestEntry)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private ManifestCacheFilter manifestCacheFilter = null;
private ScanMetrics scanMetrics = null;
private boolean dropStats;

public AbstractFileStoreScan(
ManifestsReader manifestsReader,
Expand All @@ -105,6 +106,7 @@ public AbstractFileStoreScan(
this.manifestFileFactory = manifestFileFactory;
this.tableSchemas = new ConcurrentHashMap<>();
this.parallelism = parallelism;
this.dropStats = false;
}

@Override
Expand Down Expand Up @@ -215,6 +217,12 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
return this;
}

@Override
public FileStoreScan dropStats() {
this.dropStats = true;
return this;
}

@Nullable
@Override
public Integer parallelism() {
Expand Down Expand Up @@ -291,6 +299,11 @@ public Snapshot snapshot() {

@Override
public List<ManifestEntry> files() {
if (dropStats) {
return files.stream()
.map(ManifestEntry::copyWithoutStats)
.collect(Collectors.toList());
}
return files;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface FileStoreScan {

FileStoreScan withMetrics(ScanMetrics metrics);

FileStoreScan dropStats();

@Nullable
Integer parallelism();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry)
return this;
}

@Override
public AbstractDataTableScan dropStats() {
snapshotReader.dropStats();
return this;
}

public CoreOptions options() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
// do nothing, should implement this if need
return this;
}

default InnerTableScan dropStats() {
// do nothing, should implement this if need
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ default ReadBuilder withProjection(int[][] projection) {
*/
ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

/** Delete stats in scan plan result. */
ReadBuilder dropStats();

/** Create a {@link TableScan} to perform batch planning. */
TableScan newScan();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ReadBuilderImpl implements ReadBuilder {

private @Nullable RowType readType;

private boolean dropStats = false;

public ReadBuilderImpl(InnerTable table) {
this.table = table;
}
Expand Down Expand Up @@ -124,6 +126,12 @@ public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public ReadBuilder dropStats() {
this.dropStats = true;
return this;
}

@Override
public TableScan newScan() {
InnerTableScan tableScan = configureScan(table.newScan());
Expand Down Expand Up @@ -156,6 +164,9 @@ private InnerTableScan configureScan(InnerTableScan scan) {
if (bucketFilter != null) {
scan.withBucketFilter(bucketFilter);
}
if (dropStats) {
scan.dropStats();
}
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public interface SnapshotReader {

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);

SnapshotReader dropStats();

SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

SnapshotReader withMetricRegistry(MetricRegistry registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader dropStats() {
scan.dropStats();
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
if (splitGenerator.alwaysRawConvertible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader dropStats() {
wrapped.dropStats();
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KeyValueFileStoreScan}. */
Expand Down Expand Up @@ -274,6 +275,27 @@ public void testWithManifestList() throws Exception {
runTestExactMatch(scan, null, expected);
}

@Test
public void testDropStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(100, 0, (long) Math.abs(random.nextInt(1000)));
writeData(data, 0);
data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000);
writeData(data, 0);
data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000);
writeData(data, 0);
data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000);
Snapshot snapshot = writeData(data, 0);

KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id()).dropStats();
List<ManifestEntry> files = scan.plan().files();

for (ManifestEntry manifestEntry : files) {
assertThat(manifestEntry.file().valueStats()).isEqualTo(EMPTY_STATS);
}
}

private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRow, BinaryRow> expected)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
StreamTableScan scan = readBuilder.newStreamScan();
StreamTableScan scan = readBuilder.dropStats().newStreamScan();
if (metricGroup(context) != null) {
((StreamDataTableScan) scan)
.withMetricsRegistry(new FlinkMetricRegistry(context.metricGroup()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ protected void scanSplitsForInference() {
List<PartitionEntry> partitionEntries =
table.newReadBuilder()
.withFilter(predicate)
.dropStats()
.newScan()
.listPartitionEntries();
long totalSize = 0;
Expand All @@ -188,7 +189,12 @@ protected void scanSplitsForInference() {
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
table.newReadBuilder()
.withFilter(predicate)
.dropStats()
.newScan()
.plan()
.splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu

private List<FileStoreSourceSplit> getSplits(SplitEnumeratorContext context) {
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
TableScan scan = readBuilder.newScan();
TableScan scan = readBuilder.dropStats().newScan();
// register scan metrics
if (context.metricGroup() != null) {
((InnerTableScan) scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public MonitorFunction(

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.scan = readBuilder.newStreamScan();
this.scan = readBuilder.dropStats().newStreamScan();

this.checkpointState =
context.getOperatorStateStore()
Expand Down
Loading