Skip to content

Commit

Permalink
[core] support delete stats in result of scan plan
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Nov 12, 2024
1 parent 9b281bc commit a0d5f66
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 0 deletions.
21 changes: 21 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,27 @@ public DataFileMeta rename(String newFileName) {
valueStatsCols);
}

public DataFileMeta withoutStats() {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
EMPTY_STATS,
EMPTY_STATS,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public Identifier identifier() {
file.embeddedIndex());
}

public ManifestEntry withoutStats() {
return new ManifestEntry(
kind, partition, bucket, totalBuckets, file.withoutStats()
);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestEntry)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private ManifestCacheFilter manifestCacheFilter = null;
private ScanMetrics scanMetrics = null;
private boolean withoutStatsInPlan;

public AbstractFileStoreScan(
ManifestsReader manifestsReader,
Expand All @@ -105,6 +106,7 @@ public AbstractFileStoreScan(
this.manifestFileFactory = manifestFileFactory;
this.tableSchemas = new ConcurrentHashMap<>();
this.parallelism = parallelism;
this.withoutStatsInPlan = true;
}

@Override
Expand Down Expand Up @@ -215,6 +217,12 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
return this;
}

@Override
public FileStoreScan withoutStatsInPlan(boolean withoutStatsInPlan) {
this.withoutStatsInPlan = withoutStatsInPlan;
return this;
}

@Nullable
@Override
public Integer parallelism() {
Expand Down Expand Up @@ -291,6 +299,9 @@ public Snapshot snapshot() {

@Override
public List<ManifestEntry> files() {
if (withoutStatsInPlan) {
return files.stream().map(ManifestEntry::withoutStats).collect(Collectors.toList());
}
return files;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface FileStoreScan {

FileStoreScan withMetrics(ScanMetrics metrics);

FileStoreScan withoutStatsInPlan(boolean withoutStatsInPlan);

@Nullable
Integer parallelism();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public AbstractDataTableScan withMetricsRegistry(MetricRegistry metricsRegistry)
return this;
}

@Override
public AbstractDataTableScan withoutStatsInPlan(boolean withoutStatsInPlan) {
snapshotReader.withoutStatsInPlan(withoutStatsInPlan);
return this;
}

public CoreOptions options() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
// do nothing, should implement this if need
return this;
}

default InnerTableScan withoutStatsInPlan(boolean withoutStatsInPlan) {
// do nothing, should implement this if need
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public interface SnapshotReader {

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);

SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan);

SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

SnapshotReader withMetricRegistry(MetricRegistry registry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan) {
scan.withoutStatsInPlan(withoutStatsInPlan);
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
if (splitGenerator.alwaysRawConvertible()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
return this;
}

@Override
public SnapshotReader withoutStatsInPlan(boolean withoutStatsInPlan) {
wrapped.withoutStatsInPlan(withoutStatsInPlan);
return this;
}

@Override
public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) {
wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ private TableScan.Plan tablePlan(FileStoreTable storeTable) {
GenericRow.of(null, null, null, null, null, level));
});
}

scan.withoutStatsInPlan(false);
return scan.plan();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KeyValueFileStoreScan}. */
Expand Down Expand Up @@ -274,6 +275,28 @@ public void testWithManifestList() throws Exception {
runTestExactMatch(scan, null, expected);
}

@Test
public void testWithoutStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(100, 0, (long) Math.abs(random.nextInt(1000)));
writeData(data, 0);
data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000);
writeData(data, 0);
data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000);
writeData(data, 0);
data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000);
Snapshot snapshot = writeData(data, 0);

KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
List<ManifestEntry> files = scan.plan().files();

for (ManifestEntry manifestEntry : files) {
assertThat(manifestEntry.file().keyStats()).isEqualTo(EMPTY_STATS);
assertThat(manifestEntry.file().valueStats()).isEqualTo(EMPTY_STATS);
}
}

private void runTestExactMatch(
FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRow, BinaryRow> expected)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void before() throws Exception {
SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);
table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
scan = table.store().newScan();
scan.withoutStatsInPlan(false);

Identifier filesTableId =
identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER + FilesTable.FILES);
Expand Down

0 comments on commit a0d5f66

Please sign in to comment.