diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java index 577f28d0f5cf..5e43568aac3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java @@ -380,6 +380,8 @@ public FilesIterator( if (filter != null) { snapshotReader.withFilter(filter); } + // drop stats to reduce memory + snapshotReader.dropStats(); this.streamingMode = isStreaming; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index d63887030090..43957de8d6c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -101,7 +101,8 @@ protected AbstractFileStoreWrite( int writerNumberMax, boolean legacyPartitionName) { this.snapshotManager = snapshotManager; - this.scan = scan; + // Statistic is useless in writer + this.scan = scan == null ? null : scan.dropStats(); this.indexFactory = indexFactory; this.dvMaintainerFactory = dvMaintainerFactory; this.totalBuckets = totalBuckets; diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index d3f6d4cd62af..079300a89dd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -22,6 +22,8 @@ import org.apache.paimon.casting.CastedRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; @@ -33,9 +35,9 @@ import javax.annotation.Nullable; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** Converter for array of {@link SimpleColStats}. */ public class SimpleStatsEvolution { @@ -46,6 +48,9 @@ public class SimpleStatsEvolution { private final Map, int[]> indexMappings; + private final GenericRow emptyValues; + private final GenericArray emptyNullCounts; + public SimpleStatsEvolution( RowType rowType, @Nullable int[] indexMapping, @@ -53,7 +58,9 @@ public SimpleStatsEvolution( this.fieldNames = rowType.getFieldNames(); this.indexMapping = indexMapping; this.castFieldGetters = castFieldGetters; - this.indexMappings = new HashMap<>(); + this.indexMappings = new ConcurrentHashMap<>(); + this.emptyValues = new GenericRow(fieldNames.size()); + this.emptyNullCounts = new GenericArray(new Object[fieldNames.size()]); } public Result evolution( @@ -62,7 +69,12 @@ public Result evolution( InternalRow maxValues = stats.maxValues(); InternalArray nullCounts = stats.nullCounts(); - if (denseFields != null) { + if (denseFields != null && denseFields.isEmpty()) { + // optimize for empty dense fields + minValues = emptyValues; + maxValues = emptyValues; + nullCounts = emptyNullCounts; + } else if (denseFields != null) { int[] denseIndexMapping = indexMappings.computeIfAbsent( denseFields, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java index 48cb64e70be1..f43d80321ecc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -59,6 +59,7 @@ public LookupDataTableScan( defaultValueAssigner); this.startupMode = options.startupMode(); this.lookupScanMode = lookupScanMode; + dropStats(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index ef5543ac9b7c..7bd7a652b56e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -207,6 +207,7 @@ private LocalQueryExecutor( this.scan = table.newReadBuilder() + .dropStats() .withFilter(filter) .withBucketFilter( requireCachedBucketIds == null diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java index 02f8a654112e..b9776786fa57 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java @@ -83,7 +83,7 @@ public void open(OpenContext openContext) throws Exception { */ public void open(Configuration parameters) throws Exception { FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table); - ReadBuilder readBuilder = monitorTable.newReadBuilder(); + ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats(); this.scan = readBuilder.newStreamScan(); this.read = readBuilder.newRead(); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 191d7a766b71..466643b15709 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -94,7 +94,8 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon condition: Expression, output: Seq[Attribute]): Seq[DataSplit] = { // low level snapshot reader, it can not be affected by 'scan.mode' - val snapshotReader = table.newSnapshotReader() + // dropStats after filter push down + val snapshotReader = table.newSnapshotReader().dropStats() if (condition != TrueLiteral) { val filter = convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)