Skip to content

Commit

Permalink
[core] Fix push down limit data correctness issue with deletion vector (
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Oct 9, 2024
1 parent 79dae8e commit be5f218
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;

Expand Down Expand Up @@ -114,10 +115,14 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result)
}

/**
* 0 represents that we can't compute the row count of this split, 'cause this split needs to be
* merged.
* 0 represents that we can't compute the row count of this split: 1. the split needs to be
* merged; 2. the table enabled deletion vector and there are some deletion files.
*/
private long getRowCountForSplit(DataSplit split) {
if (split.deletionFiles().isPresent()
&& split.deletionFiles().get().stream().anyMatch(Objects::nonNull)) {
return 0L;
}
return split.convertToRawFiles()
.map(files -> files.stream().map(RawFile::rowCount).reduce(Long::sum).orElse(0L))
.orElse(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ class PaimonPushDownTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, "c", "p2") :: Nil)
}

test("Paimon pushDown: limit for append-only tables with deletion vector") {
withTable("dv_test") {
spark.sql(
"""
|CREATE TABLE dv_test (c1 INT, c2 STRING)
|TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'source.split.target-size' = '1')
|""".stripMargin)

spark.sql("insert into table dv_test values(1, 'a'),(2, 'b'),(3, 'c')")
assert(spark.sql("select * from dv_test limit 2").count() == 2)

spark.sql("delete from dv_test where c1 = 1")
assert(spark.sql("select * from dv_test limit 2").count() == 2)
}
}

test("Paimon pushDown: limit for append-only tables") {
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING, c STRING)
Expand Down

0 comments on commit be5f218

Please sign in to comment.