Skip to content

Commit

Permalink
[core] Fix Limit push down sort splits violates its general contract.
Browse files Browse the repository at this point in the history
Signed-off-by: LinMingQiang <[email protected]>
  • Loading branch information
LinMingQiang committed Nov 11, 2024
1 parent b8a4def commit cba7d5c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,40 +100,21 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result)
return result;
}

// We first add the rawConvertible split to avoid merging, and if the row count
// is still less than limit number, then add split which is not rawConvertible.
splits.sort(
(x, y) -> {
if (x.rawConvertible() && y.rawConvertible()) {
return 0;
} else if (x.rawConvertible()) {
return -1;
} else {
return 1;
}
});

// fast return if there is no rawConvertible split
if (!splits.get(0).rawConvertible()) {
return result;
}

List<Split> limitedSplits = new ArrayList<>();
for (DataSplit dataSplit : splits) {
long splitRowCount = getRowCountForSplit(dataSplit);
limitedSplits.add(dataSplit);
scannedRowCount += splitRowCount;
if (scannedRowCount >= pushDownLimit) {
break;
if (dataSplit.rawConvertible()) {
long splitRowCount = getRowCountForSplit(dataSplit);
limitedSplits.add(dataSplit);
scannedRowCount += splitRowCount;
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
return new ScannedResult(newPlan);
}
}
}

SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
return new ScannedResult(newPlan);
} else {
return result;
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,72 @@ class PaimonPushDownTest extends PaimonSparkTestBase {
val scanBuilder = getScanBuilder()
Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])

// Case 1: All dataSplits is rawConvertible.
val dataSplitsWithoutLimit = scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit.length)
// All dataSplits is rawConvertible.
dataSplitsWithoutLimit.foreach(
splits => {
Assertions.assertTrue(splits.asInstanceOf[DataSplit].rawConvertible())
})

// It still return false even it can push down limit.
// It still returns false even it can push down limit.
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))

val dataSplitsWithLimit = scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(1, dataSplitsWithLimit.length)

Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())

spark.sql("UPDATE T SET b = 'x' WHERE a = 1")
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
val dataSplitsWithLimit1 = scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(2, dataSplitsWithLimit1.length)
Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())

// Case 2: Update 2 rawConvertible dataSplits to convert to nonRawConvertible.
spark.sql("INSERT INTO T VALUES (1, 'a2', '11'), (2, 'b2', '22')")
val scanBuilder2 = getScanBuilder()
val dataSplitsWithoutLimit2 = scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit2.length)

// Now, we have 4 dataSplits, and 2 dataSplit is nonRawConvertible, 2 dataSplit is rawConvertible.
Assertions.assertEquals(
2,
dataSplitsWithoutLimit2
.filter(
split => {
split.asInstanceOf[DataSplit].rawConvertible()
})
.length)

// Return 2 dataSplits.
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
val dataSplitsWithLimit2 = scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithLimit2.length)
Assertions.assertEquals(2, dataSplitsWithLimit2.length)
Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())

// 2 dataSplits cannot meet the limit requirement, so need to scan all dataSplits.
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(3))
val dataSplitsWithLimit22 = scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
// Need to scan all dataSplits.
Assertions.assertEquals(4, dataSplitsWithLimit22.length)
Assertions.assertEquals(3, spark.sql("SELECT * FROM T LIMIT 3").count())

// Case 3: Update the remaining 2 rawConvertible dataSplits to make all dataSplits is nonRawConvertible.
spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
val scanBuilder3 = getScanBuilder()
val dataSplitsWithoutLimit3 = scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit3.length)

// All dataSplits is nonRawConvertible.
dataSplitsWithoutLimit3.foreach(
splits => {
Assertions.assertFalse(splits.asInstanceOf[DataSplit].rawConvertible())
})

Assertions.assertFalse(scanBuilder3.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
val dataSplitsWithLimit3 = scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
// Need to scan all dataSplits.
Assertions.assertEquals(4, dataSplitsWithLimit3.length)
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())

}

test("Paimon pushDown: runtime filter") {
Expand Down

0 comments on commit cba7d5c

Please sign in to comment.