Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Support report partitioning to eliminate shuffle exchange #3912

Merged
merged 1 commit into from
Aug 8, 2024

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Aug 6, 2024

Purpose

This pr makes PaimonScan implement SupportsReportPartitioning for Spark engine, so that we can eliminate shuffle exchange when doing join,aggregate,window,etc...

For example, the following query with join does not introduce shuffle exchange:

CREATE TABLE t1 (
    id BIGINT,
    c1 BIGINT,
    c2 STRING
) using paimon
TBLPROPERTIES (
    'primary-key' = 'id',
    'bucket' = '10'
) 

CREATE TABLE t2 (
    id BIGINT,
    c1 BIGINT,
    c2 STRING
) using paimon
TBLPROPERTIES (
    'primary-key' = 'id',
    'bucket' = '10'
) 

set spark.sql.autoBroadcastJoinThreshold=-1;
set spark.sql.sources.v2.bucketing.enabled=true;
SELECT * FROM t1 JOIN t2 ON t1.id = t2.id;

This feature depends on Spark storage partition join, in particular the interface KeyGroupedPartitioning which is from Spark3.3.

To achive the goal, this pr introduces BucketSpec for Paimon to hold the bucket related things:

public class BucketSpec {
    private BucketMode bucketMode;
    private List<String> bucketKeys;
    private int numBuckets;

and only if the bucket mode is HASH_FIXED we report the partitioning. It now supports primary table and bucket table using one column.

Also introduce FunctionCatalog for SparkBaseCatalog to resolve bucket tranform expression.

Tests

add new tests BucketedTableQueryTest

API and Format

no

Documentation

@ulysses-you
Copy link
Contributor Author

cc @JingsongLi @YannByron do you have time to take a look ? thank you

@@ -66,6 +67,10 @@ public class TableSchema implements Serializable {

private final List<String> primaryKeys;

private List<String> bucketKeys;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field can be final

@JingsongLi
Copy link
Contributor

Wow, this is bucketed join, this optimization is on my list.
Thanks for the contribution! @ulysses-you

assert(bucketSpec.getBucketKeys.size() == 1)
Expressions.bucket(bucketSpec.getNumBucket, bucketSpec.getBucketKeys.get(0))
val key = Expressions.bucket(bucketSpec.getNumBucket, bucketSpec.getBucketKeys.get(0))
new KeyGroupedPartitioning(Array(key), lazyInputPartitions.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is an equivalent join of two keys, cannot it be supported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore me, I see Spark does not support bucket with several input attributes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it seems an issue in Spark community. I did not find the strong reason why Spark forbid it..

* bucket(10, col)` would fail since we do not implement {@link
* org.apache.spark.sql.connector.catalog.functions.ScalarFunction}
*/
public static class BucketFunction implements UnboundFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Paimon's bucket calculation method and Spark's bucket function are completely different implementations.

So this function is UnboundFunction? I'm not sure if my understanding is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnboundFunction is kind of a unresolved expression in Spark and finally will be resolved to BoundFunction, see UnboundFunction#bind method.

For paimon, I think it more like a placeholder as it is not used to do evaluation. It only used to compare if two partitioning are semantics equivalent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An another thing maybe related. It seems Paimon did not follow Spark DSv2 write feature, e.g., RequiresDistributionAndOrdering, so the bucket function is only used to report partiioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For using RequiresDistributionAndOrdering:
The biggest problem before was: "Paimon's bucket calculation method and Spark's bucket function are completely different implementations".
Implementing org.apache.spark.sql.connector.catalog.functions.ScalarFunction looks like can solve this problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @JingsongLi for the context, I will work on it in followups if find time.

@YannByron YannByron self-assigned this Aug 7, 2024
@YannByron
Copy link
Contributor

This feature depends on Spark storage partition join, in particular the interface KeyGroupedPartitioning which is from Spark3.3. For easy to review, this pr only supports it in Spark3.5.

Thanks for this pr @ulysses-you. Can we modify this pr to make it available for spark3.3+ at the first shot?

@ulysses-you
Copy link
Contributor Author

@YannByron I made changes to support 3.3, 3.4 and 3.5, thank you

@YannByron
Copy link
Contributor

@YannByron I made changes to support 3.3, 3.4 and 3.5, thank you

Hi, @ulysses-you paimon is different from iceberg in terms of multi-Spark version support. iceberg does this by copying the code, while paimon does this by extracting the common code and solving the compatibility. maybe @JingsongLi can explain more.

@JingsongLi
Copy link
Contributor

@YannByron I made changes to support 3.3, 3.4 and 3.5, thank you

Hi, @ulysses-you paimon is different from iceberg in terms of multi-Spark version support. iceberg does this by copying the code, while paimon does this by extracting the common code and solving the compatibility. maybe @JingsongLi can explain more.

This is not easy to explain, I will try to clarify the trade-off here.

In the case of minimal changes, avoid copying a large amount of code, and even copy some classes to different versions to solve version incompatibility differences.

From my personal perspective, having a lot of code redundancy is not right, and it is not worth abstracting a lot for the sake of compatibility with a small part.

@ulysses-you
Copy link
Contributor Author

thank you @JingsongLi , got it.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ulysses-you for update! Looks good to me!

And thanks @YannByron for review.

@JingsongLi JingsongLi merged commit 3b9dd9b into apache:master Aug 8, 2024
10 checks passed
@ulysses-you ulysses-you deleted the partitioning branch August 9, 2024 00:57
@ulysses-you
Copy link
Contributor Author

Just to help user to trouble shotting, Spark community has a bug about bucketed scan if there are multi inner joins:

Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)

See https://issues.apache.org/jira/browse/SPARK-49179. It has been fixed at 3.4.4/3.5.3/4.0.0.

@YannByron
Copy link
Contributor

Link to #2404.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants