From 9d2a63a0cf31e66083c0374d7a38c7b3f4777cd7 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 4 Dec 2024 08:53:52 +0700 Subject: [PATCH 1/5] [core] Drop stats for deleted data files to reduce memory --- .../paimon/append/UnawareAppendTableCompactionCoordinator.java | 2 ++ .../org/apache/paimon/operation/AbstractFileStoreWrite.java | 3 ++- .../scala/org/apache/paimon/spark/commands/PaimonCommand.scala | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java index 577f28d0f5cf..5e43568aac3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java @@ -380,6 +380,8 @@ public FilesIterator( if (filter != null) { snapshotReader.withFilter(filter); } + // drop stats to reduce memory + snapshotReader.dropStats(); this.streamingMode = isStreaming; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index d63887030090..79ee3e1f4706 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -101,7 +101,8 @@ protected AbstractFileStoreWrite( int writerNumberMax, boolean legacyPartitionName) { this.snapshotManager = snapshotManager; - this.scan = scan; + // Statistic is useless in writer + this.scan = scan.dropStats(); this.indexFactory = indexFactory; this.dvMaintainerFactory = dvMaintainerFactory; this.totalBuckets = totalBuckets; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 191d7a766b71..466643b15709 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -94,7 +94,8 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon condition: Expression, output: Seq[Attribute]): Seq[DataSplit] = { // low level snapshot reader, it can not be affected by 'scan.mode' - val snapshotReader = table.newSnapshotReader() + // dropStats after filter push down + val snapshotReader = table.newSnapshotReader().dropStats() if (condition != TrueLiteral) { val filter = convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true) From 49a9f70aead8dfa18bc010c7865896e0da1dabf2 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 4 Dec 2024 08:58:15 +0700 Subject: [PATCH 2/5] fix --- .../org/apache/paimon/flink/lookup/LookupDataTableScan.java | 1 + .../paimon/flink/lookup/PrimaryKeyPartialLookupTable.java | 1 + .../java/org/apache/paimon/flink/service/QueryFileMonitor.java | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java index 48cb64e70be1..f43d80321ecc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -59,6 +59,7 @@ public LookupDataTableScan( defaultValueAssigner); this.startupMode = options.startupMode(); this.lookupScanMode = lookupScanMode; + dropStats(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index ef5543ac9b7c..7bd7a652b56e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -207,6 +207,7 @@ private LocalQueryExecutor( this.scan = table.newReadBuilder() + .dropStats() .withFilter(filter) .withBucketFilter( requireCachedBucketIds == null diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java index 02f8a654112e..b9776786fa57 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java @@ -83,7 +83,7 @@ public void open(OpenContext openContext) throws Exception { */ public void open(Configuration parameters) throws Exception { FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable) table); - ReadBuilder readBuilder = monitorTable.newReadBuilder(); + ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats(); this.scan = readBuilder.newStreamScan(); this.read = readBuilder.newRead(); } From 2882ff4416bd18023e241b200e0ab42a96b7a12d Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 4 Dec 2024 10:06:32 +0700 Subject: [PATCH 3/5] fix --- .../org/apache/paimon/operation/AbstractFileStoreWrite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 79ee3e1f4706..43957de8d6c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -102,7 +102,7 @@ protected AbstractFileStoreWrite( boolean legacyPartitionName) { this.snapshotManager = snapshotManager; // Statistic is useless in writer - this.scan = scan.dropStats(); + this.scan = scan == null ? null : scan.dropStats(); this.indexFactory = indexFactory; this.dvMaintainerFactory = dvMaintainerFactory; this.totalBuckets = totalBuckets; From b09950e9f2ab676f26062ad5ae3b65d259e6b531 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 4 Dec 2024 10:35:17 +0700 Subject: [PATCH 4/5] fix thread safe in SimpleStatsEvolution --- .../java/org/apache/paimon/stats/SimpleStatsEvolution.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index d3f6d4cd62af..f3dadd121666 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -33,9 +33,9 @@ import javax.annotation.Nullable; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** Converter for array of {@link SimpleColStats}. */ public class SimpleStatsEvolution { @@ -53,7 +53,7 @@ public SimpleStatsEvolution( this.fieldNames = rowType.getFieldNames(); this.indexMapping = indexMapping; this.castFieldGetters = castFieldGetters; - this.indexMappings = new HashMap<>(); + this.indexMappings = new ConcurrentHashMap<>(); } public Result evolution( From 03cea8691b83b0a5910ee35ac6dc4032fde5e0d1 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 4 Dec 2024 11:16:02 +0700 Subject: [PATCH 5/5] fix --- .../apache/paimon/stats/SimpleStatsEvolution.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java index f3dadd121666..079300a89dd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java @@ -22,6 +22,8 @@ import org.apache.paimon.casting.CastedRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; @@ -46,6 +48,9 @@ public class SimpleStatsEvolution { private final Map, int[]> indexMappings; + private final GenericRow emptyValues; + private final GenericArray emptyNullCounts; + public SimpleStatsEvolution( RowType rowType, @Nullable int[] indexMapping, @@ -54,6 +59,8 @@ public SimpleStatsEvolution( this.indexMapping = indexMapping; this.castFieldGetters = castFieldGetters; this.indexMappings = new ConcurrentHashMap<>(); + this.emptyValues = new GenericRow(fieldNames.size()); + this.emptyNullCounts = new GenericArray(new Object[fieldNames.size()]); } public Result evolution( @@ -62,7 +69,12 @@ public Result evolution( InternalRow maxValues = stats.maxValues(); InternalArray nullCounts = stats.nullCounts(); - if (denseFields != null) { + if (denseFields != null && denseFields.isEmpty()) { + // optimize for empty dense fields + minValues = emptyValues; + maxValues = emptyValues; + nullCounts = emptyNullCounts; + } else if (denseFields != null) { int[] denseIndexMapping = indexMappings.computeIfAbsent( denseFields,