Skip to content

Commit

Permalink
[SPARK-26297][SQL] improve the doc of Distribution/Partitioning
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Some documents of `Distribution/Partitioning` are stale and misleading, this PR fixes them:
1. `Distribution` never have intra-partition requirement
2. `OrderedDistribution` does not require tuples that share the same value being colocated in the same partition.
3. `RangePartitioning` can provide a weaker guarantee for a prefix of its `ordering` expressions.

## How was this patch tested?

comment-only PR.

Closes apache#23249 from cloud-fan/doc.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Dec 13, 2018
1 parent 6daa783 commit 05b68d5
Showing 1 changed file with 34 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import org.apache.spark.sql.types.{DataType, IntegerType}

/**
* Specifies how tuples that share common expressions will be distributed when a query is executed
* in parallel on many machines. Distribution can be used to refer to two distinct physical
* properties:
* - Inter-node partitioning of data: In this case the distribution describes how tuples are
* partitioned across physical machines in a cluster. Knowing this property allows some
* operators (e.g., Aggregate) to perform partition local operations instead of global ones.
* - Intra-partition ordering of data: In this case the distribution describes guarantees made
* about how tuples are distributed within a single partition.
* in parallel on many machines.
*
* Distribution here refers to inter-node partitioning of data. That is, it describes how tuples
* are partitioned across physical machines in a cluster. Knowing this property allows some
* operators (e.g., Aggregate) to perform partition local operations instead of global ones.
*/
sealed trait Distribution {
/**
Expand Down Expand Up @@ -70,9 +68,7 @@ case object AllTuples extends Distribution {

/**
* Represents data where tuples that share the same values for the `clustering`
* [[Expression Expressions]] will be co-located. Based on the context, this
* can mean such tuples are either co-located in the same partition or they will be contiguous
* within a single partition.
* [[Expression Expressions]] will be co-located in the same partition.
*/
case class ClusteredDistribution(
clustering: Seq[Expression],
Expand Down Expand Up @@ -118,10 +114,12 @@ case class HashClusteredDistribution(

/**
* Represents data where tuples have been ordered according to the `ordering`
* [[Expression Expressions]]. This is a strictly stronger guarantee than
* [[ClusteredDistribution]] as an ordering will ensure that tuples that share the
* same value for the ordering expressions are contiguous and will never be split across
* partitions.
* [[Expression Expressions]]. Its requirement is defined as the following:
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
* equal to any row in the first partition, according to the `ordering` expressions.
*
* In other words, this distribution requires the rows to be ordered across partitions, but not
* necessarily within a partition.
*/
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
require(
Expand Down Expand Up @@ -241,12 +239,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)

/**
* Represents a partitioning where rows are split across partitions based on some total ordering of
* the expressions specified in `ordering`. When data is partitioned in this manner the following
* two conditions are guaranteed to hold:
* - All row where the expressions in `ordering` evaluate to the same values will be in the same
* partition.
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
* that are in between `min` and `max` in this `ordering` will reside in this partition.
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
* Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row
* in the first partition, according to the `ordering` expressions.
*
* This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as
* there is no overlap between partitions.
*
* This class extends expression primarily so that transformations over expression will descend
* into its child.
Expand All @@ -262,6 +260,22 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
// If `ordering` is a prefix of `requiredOrdering`:
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
// RangePartitioning definition, any [a, b] in a previous partition must be smaller
// than any [a, b] in the following partition. This also means any [a, b, c] in a
// previous partition must be smaller than any [a, b, c] in the following partition.
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous
// partition which is larger than a [a2, b2] from the following partition, then there
// must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning
// definition. So it's guaranteed that, any [a, b] in a previous partition must not be
// greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering, _) =>
Expand Down

0 comments on commit 05b68d5

Please sign in to comment.