-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[spark] Support report scan ordering #4026
Conversation
Thanks @ulysses-you for the contribution. This optimization looks good to me! But there is some potential issues. |
/** | ||
* To indicate if this `DataSplit` keep the raw table ordering. For example, for the primary key | ||
* table, we will do sorted run during write and do merge read during read, so the data is | ||
* sorted by the primary keys. Return `false` means the ordering is broken. If a `DataSplit` is | ||
* `rawConvertible` then there is no sort merge read, so only if the data file number less than | ||
* 2, we can get the correct data ordering. | ||
*/ | ||
public boolean keepOrdering() { | ||
return !rawConvertible || dataFiles.size() < 2; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @JingsongLi , If I get it correctly, this should address your concern ?
return Array.empty | ||
} | ||
|
||
val allSplitsKeepOrdering = lazyInputPartitions.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should make sure there is only one split in a bucket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @JingsongLi, please correct me if wrong. Per my understanding, if a partition contains multi-splits using merge file read, then all splits should be ordered and never be overlapped ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging only occurs within a single split.
|
||
import scala.collection.JavaConverters._ | ||
|
||
case class PaimonScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there possibility to extract a base class PaimonBucketedScan
to reuse codes?
* 2, we can get the correct data ordering. | ||
*/ | ||
public boolean keepOrdering() { | ||
return !rawConvertible || dataFiles.size() < 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to introduce method here, just use !rawConvertible || dataFiles.size() < 2
in spark.
thank you @JingsongLi , addressed comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Purpose
This pr makes
PaimonScan
supportSupportsReportOrdering
. For primary key table, we will do a sorted run during write so the primary keys in one file are always ordered. And skip report ordering if it contains a rawConvertible split since we will not do merge sort read.This can help eliminate local sort for smj/sorted agg etc..
Tests
add tests
API and Format
no
Documentation