Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Support push down aggregate #4259

Merged
merged 3 commits into from
Sep 26, 2024
Merged

Conversation

ulysses-you
Copy link
Contributor

Purpose

Make PaimonScanBuilder support SupportsPushDownAggregates. This pr implements a local aggregator framework to eval aggregate function. For now, only CountStar is supported.

If the aggregation can be pushed down, build a PaimonLocalScan to avoid execute with RDD.

There are some limitations:

  • only support append table
  • only support without deletion vector
  • only support with no filter or partition filter
  • only support without group by

Tests

add test

API and Format

no

Documentation

@ulysses-you
Copy link
Contributor Author

cc @JingsongLi thank you

}
}

def supportAggregation(aggregation: Aggregation): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an initialize in this supportAggregation. It looks like not a support. Can you rename this to pushAggregation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, changed to pushAggregation

@@ -38,12 +38,12 @@ abstract class PaimonBaseScanBuilder(table: Table)

protected var pushed: Array[(Filter, Predicate)] = Array.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to pushedFilters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushedFilters conflicts with Spark method name, rename it to pushedPredicates

}

// Only support with push down partition filter
if (pushed.length != partitionFilter.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also save postScanFilters in pushFilters, and just check is empty here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, added postScanFilters

}

val readBuilder = table.newReadBuilder
if (pushed.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use partitionFilter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionFilters is the Spark filter, but we need Paimon's predicate..

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you Thanks for your contribution!

Left some comments about naming.

@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit 3120722 into apache:master Sep 26, 2024
10 checks passed
@ulysses-you ulysses-you deleted the agg branch September 26, 2024 06:49
@YannByron
Copy link
Contributor

Link to #2404.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants