Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Drop stats for deleted data files to reduce memory #4629

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ public FilesIterator(
if (filter != null) {
snapshotReader.withFilter(filter);
}
// drop stats to reduce memory
snapshotReader.dropStats();
this.streamingMode = isStreaming;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ protected AbstractFileStoreWrite(
int writerNumberMax,
boolean legacyPartitionName) {
this.snapshotManager = snapshotManager;
this.scan = scan;
// Statistic is useless in writer
this.scan = scan == null ? null : scan.dropStats();
this.indexFactory = indexFactory;
this.dvMaintainerFactory = dvMaintainerFactory;
this.totalBuckets = totalBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
Expand All @@ -33,9 +35,9 @@

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Converter for array of {@link SimpleColStats}. */
public class SimpleStatsEvolution {
Expand All @@ -46,14 +48,19 @@ public class SimpleStatsEvolution {

private final Map<List<String>, int[]> indexMappings;

private final GenericRow emptyValues;
private final GenericArray emptyNullCounts;

public SimpleStatsEvolution(
RowType rowType,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castFieldGetters) {
this.fieldNames = rowType.getFieldNames();
this.indexMapping = indexMapping;
this.castFieldGetters = castFieldGetters;
this.indexMappings = new HashMap<>();
this.indexMappings = new ConcurrentHashMap<>();
this.emptyValues = new GenericRow(fieldNames.size());
this.emptyNullCounts = new GenericArray(new Object[fieldNames.size()]);
}

public Result evolution(
Expand All @@ -62,7 +69,12 @@ public Result evolution(
InternalRow maxValues = stats.maxValues();
InternalArray nullCounts = stats.nullCounts();

if (denseFields != null) {
if (denseFields != null && denseFields.isEmpty()) {
// optimize for empty dense fields
minValues = emptyValues;
maxValues = emptyValues;
nullCounts = emptyNullCounts;
} else if (denseFields != null) {
int[] denseIndexMapping =
indexMappings.computeIfAbsent(
denseFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public LookupDataTableScan(
defaultValueAssigner);
this.startupMode = options.startupMode();
this.lookupScanMode = lookupScanMode;
dropStats();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private LocalQueryExecutor(

this.scan =
table.newReadBuilder()
.dropStats()
.withFilter(filter)
.withBucketFilter(
requireCachedBucketIds == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void open(OpenContext openContext) throws Exception {
*/
public void open(Configuration parameters) throws Exception {
FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table);
ReadBuilder readBuilder = monitorTable.newReadBuilder();
ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats();
this.scan = readBuilder.newStreamScan();
this.read = readBuilder.newRead();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
condition: Expression,
output: Seq[Attribute]): Seq[DataSplit] = {
// low level snapshot reader, it can not be affected by 'scan.mode'
val snapshotReader = table.newSnapshotReader()
// dropStats after filter push down
val snapshotReader = table.newSnapshotReader().dropStats()
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
Expand Down
Loading