From be5f218a7a9ea111d787d2b5e3bb5f154dae25e6 Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Wed, 9 Oct 2024 21:47:11 +0800 Subject: [PATCH] [core] Fix push down limit data correctness issue with deletion vector (#4296) --- .../paimon/table/source/DataTableBatchScan.java | 9 +++++++-- .../paimon/spark/sql/PaimonPushDownTest.scala | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 93ab5ba1644d..29baac9a5f44 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; @@ -114,10 +115,14 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) } /** - * 0 represents that we can't compute the row count of this split, 'cause this split needs to be - * merged. + * 0 represents that we can't compute the row count of this split: 1. the split needs to be + * merged; 2. the table enabled deletion vector and there are some deletion files. */ private long getRowCountForSplit(DataSplit split) { + if (split.deletionFiles().isPresent() + && split.deletionFiles().get().stream().anyMatch(Objects::nonNull)) { + return 0L; + } return split.convertToRawFiles() .map(files -> files.stream().map(RawFile::rowCount).reduce(Long::sum).orElse(0L)) .orElse(0L); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala index c55ed876d6b1..8f62a5b3332e 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala @@ -86,6 +86,22 @@ class PaimonPushDownTest extends PaimonSparkTestBase { checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, "c", "p2") :: Nil) } + test("Paimon pushDown: limit for append-only tables with deletion vector") { + withTable("dv_test") { + spark.sql( + """ + |CREATE TABLE dv_test (c1 INT, c2 STRING) + |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'source.split.target-size' = '1') + |""".stripMargin) + + spark.sql("insert into table dv_test values(1, 'a'),(2, 'b'),(3, 'c')") + assert(spark.sql("select * from dv_test limit 2").count() == 2) + + spark.sql("delete from dv_test where c1 = 1") + assert(spark.sql("select * from dv_test limit 2").count() == 2) + } + } + test("Paimon pushDown: limit for append-only tables") { spark.sql(s""" |CREATE TABLE T (a INT, b STRING, c STRING)