Skip to content

Commit

Permalink
[core] fix the issue of incorrect filtering in filterWholeBucketAllFi…
Browse files Browse the repository at this point in the history
…les. (apache#4066)
  • Loading branch information
liming30 authored Aug 27, 2024
1 parent 8f0a5f3 commit 797fa7d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;

/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
Expand Down Expand Up @@ -173,6 +175,11 @@ private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> entries
}

private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry> entries) {
if (!deletionVectorsEnabled
&& (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE)) {
return entries;
}

// entries come from the same bucket, if any of it doesn't meet the request, we could
// filter the bucket.
for (ManifestEntry entry : entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public void testMergeRead() {

// projection
assertThat(batchSql("SELECT a FROM T")).containsExactlyInAnyOrder(Row.of(4));

// filter
assertThat(batchSql("SELECT * FROM T where b = 5 and c = '6'"))
.containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,9 +1161,9 @@ public void testAggregatorResetWhenIgnoringRetract() {
/** IT Test for aggregation merge engine. */
public static class BasicAggregateITCase extends CatalogITCaseBase {

@Test
public void testLocalMerge() {
sql(
@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE T ("
+ "k INT,"
+ "v INT,"
Expand All @@ -1173,11 +1173,27 @@ public void testLocalMerge() {
+ "'fields.v.aggregate-function'='sum',"
+ "'local-merge-buffer-size'='1m'"
+ ");");
}

@Test
public void testLocalMerge() {
sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1), (1, 2, 1)");
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 3, 1), Row.of(2, 1, 1));
}

@Test
public void testMergeRead() {
sql("INSERT INTO T VALUES(1, 1, 1), (2, 1, 1)");
sql("INSERT INTO T VALUES(1, 2, 1)");
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 3, 1), Row.of(2, 1, 1));
// filter
assertThat(batchSql("SELECT * FROM T where v = 3"))
.containsExactlyInAnyOrder(Row.of(1, 3, 1));
assertThat(batchSql("SELECT * FROM T where v = 1"))
.containsExactlyInAnyOrder(Row.of(2, 1, 1));
}
}

/** ITCase for {@link FieldNestedUpdateAgg}. */
Expand Down

0 comments on commit 797fa7d

Please sign in to comment.