Skip to content

Commit

Permalink
chore(planner): refine physical join (#16988)
Browse files Browse the repository at this point in the history
* chore(code): rename merge join to range join

* chore(planner): refine physical join

* chore(test): add sqllogictest
  • Loading branch information
Dousir9 authored Dec 3, 2024
1 parent 900ecf1 commit 9bb9441
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::pipelines::processors::transforms::range_join::filter_block;
use crate::pipelines::processors::transforms::range_join::RangeJoinState;

impl RangeJoinState {
pub fn merge_join(&self, task_id: usize) -> Result<Vec<DataBlock>> {
pub fn range_join(&self, task_id: usize) -> Result<Vec<DataBlock>> {
let tasks = self.tasks.read();
let (left_idx, right_idx) = tasks[task_id];
let left_sorted_blocks = self.left_sorted_blocks.read();
Expand All @@ -42,7 +42,7 @@ impl RangeJoinState {
None,
)?;

// Start to execute merge join algo
// Start to execute range join algo
let left_len = left_sorted_block.num_rows();
let right_len = right_sort_block.num_rows();

Expand Down Expand Up @@ -140,7 +140,7 @@ impl RangeJoinState {
Ok(result_blocks)
}

// Used by merge join
// Used by range join
fn sort_descriptions(&self, _: bool) -> Vec<SortColumnDescription> {
let op = &self.conditions[0].operator;
let asc = match op.as_str() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Processor for TransformRangeJoinLeft {
if let Some(task_id) = task_id {
let res = match self.state.ie_join_state {
Some(ref _ie_join_state) => self.state.ie_join(task_id)?,
None => self.state.merge_join(task_id)?,
None => self.state.range_join(task_id)?,
};
for block in res {
if !block.is_empty() {
Expand Down
13 changes: 5 additions & 8 deletions src/query/sql/src/executor/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {

let left_rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
let right_rel_expr = RelExpr::with_s_expr(s_expr.child(1)?);
if matches!(
right_rel_expr
.derive_cardinality()?
.statistics
.precise_cardinality,
Some(1)
) {
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of MERGE JOIN.
let right_stat_info = right_rel_expr.derive_cardinality()?;
if matches!(right_stat_info.statistics.precise_cardinality, Some(1))
|| right_stat_info.cardinality == 1.0
{
// If the output rows of build side is equal to 1, we use CROSS JOIN + FILTER instead of RANGE JOIN.
return Ok(PhysicalJoinType::Hash);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ impl<'a> JoinConditionResolver<'a> {
// - Each side of `=` only contains columns from one table and the both sides are disjoint.
// For example, `t1.a + t1.b = t2.a` is a valid one while `t1.a + t2.a = t2.b` isn't.
//
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join).
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-range join, hash join).

let mut added = if let Some((left, right)) = split_equivalent_predicate_expr(predicate) {
let (left, _) = scalar_binder.bind(&left)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,9 @@ ORDER BY 1, 2;
4 8
5 9

# Multi-predicate merge joins with many matches
# Multi-predicate range joins with many matches
# (similar to test/sql/join/inner/test_unequal_join_duplicates.test
# but with a merge join with multiple predicates)
# but with a range join with multiple predicates)
statement ok
drop table if exists many_bounds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,9 @@ ORDER BY 1, 2;
4 8
5 9

# Multi-predicate merge joins with many matches
# Multi-predicate range joins with many matches
# (similar to test/sql/join/inner/test_unequal_join_duplicates.test
# but with a merge join with multiple predicates)
# but with a range join with multiple predicates)
statement ok
drop table if exists many_bounds;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ ORDER BY 1, 2;
4 8
5 9

# Multi-predicate merge joins with many matches
# Multi-predicate range joins with many matches
# (similar to test/sql/join/inner/test_unequal_join_duplicates.test
# but with a merge join with multiple predicates)
# but with a range join with multiple predicates)
statement ok
drop table if exists many_bounds;

Expand Down
35 changes: 35 additions & 0 deletions tests/sqllogictests/suites/mode/standalone/explain/join.test
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,41 @@ HashJoin
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00

query T
EXPLAIN SELECT * FROM t1 WHERE a >= (SELECT a FROM t2 where a > 0);
----
HashJoin
├── output columns: [t1.a (#0)]
├── join type: INNER
├── build keys: []
├── probe keys: []
├── filters: [t1.a (#0) >= scalar_subquery_1 (#1)]
├── estimated rows: 4.00
├── Filter(Build)
│ ├── output columns: [t2.a (#1)]
│ ├── filters: [is_true(t2.a (#1) > 0)]
│ ├── estimated rows: 1.00
│ └── TableScan
│ ├── table: default.default.t2
│ ├── output columns: [a (#1)]
│ ├── read rows: 1
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [is_true(t2.a (#1) > 0)], limit: NONE]
│ └── estimated rows: 1.00
└── TableScan(Probe)
├── table: default.default.t1
├── output columns: [a (#0)]
├── read rows: 4
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00

statement ok
DROP TABLE t1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,38 @@ HashJoin
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00

query T
EXPLAIN SELECT * FROM t1 WHERE a >= (SELECT a FROM t2 where a > 0);
----
HashJoin
├── output columns: [t1.a (#0)]
├── join type: INNER
├── build keys: []
├── probe keys: []
├── filters: [t1.a (#0) >= scalar_subquery_1 (#1)]
├── estimated rows: 4.00
├── TableScan(Build)
│ ├── table: default.default.t2
│ ├── output columns: [a (#1)]
│ ├── read rows: 1
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [is_true(t2.a (#1) > 0)], limit: NONE]
│ └── estimated rows: 1.00
└── TableScan(Probe)
├── table: default.default.t1
├── output columns: [a (#0)]
├── read rows: 4
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00


statement ok
DROP TABLE t1;

Expand Down

0 comments on commit 9bb9441

Please sign in to comment.