Skip to content

Commit

Permalink
Supports range partitioning (#734)
Browse files Browse the repository at this point in the history
Co-authored-by: guoying06 <[email protected]>
  • Loading branch information
gy11233 and guoying06 authored Jan 8, 2025
1 parent 0c124a3 commit a110e53
Show file tree
Hide file tree
Showing 14 changed files with 554 additions and 106 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native-engine/blaze-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ datafusion-ext-plans = { workspace = true }
log = "0.4.22"
object_store = "0.11.1"
prost = "0.13.4"
parking_lot = "0.12.3"

[build-dependencies]
tonic-build = "0.12.3"
9 changes: 9 additions & 0 deletions native-engine/blaze-serde/proto/blaze.proto
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ message PhysicalRepartition {
PhysicalSingleRepartition single_repartition = 1;
PhysicalHashRepartition hash_repartition = 2;
PhysicalRoundRobinRepartition round_robin_repartition = 3;
PhysicalRangeRepartition range_repartition = 4;
}
}

Expand All @@ -617,6 +618,14 @@ message PhysicalRoundRobinRepartition {
uint64 partition_count = 1;
}

message PhysicalRangeRepartition {
SortExecNode sort_expr = 1;
uint64 partition_count = 2;
repeated ScalarValue list_value= 3;
}



message JoinFilter {
PhysicalExprNode expression = 1;
repeated ColumnIndex column_indices = 2;
Expand Down
164 changes: 113 additions & 51 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use std::{
};

use arrow::{
array::RecordBatch,
array::{ArrayRef, RecordBatch},
compute::SortOptions,
datatypes::{Field, FieldRef, SchemaRef},
row::{RowConverter, SortField},
};
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use datafusion::{
common::stats::Precision,
common::{stats::Precision, Result, ScalarValue},
datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
Expand All @@ -45,7 +46,7 @@ use datafusion::{
NegativeExpr, NotExpr, PhysicalSortExpr,
},
union::UnionExec,
ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics,
},
prelude::create_udf,
};
Expand Down Expand Up @@ -79,21 +80,23 @@ use datafusion_ext_plans::{
project_exec::ProjectExec,
rename_columns_exec::RenameColumnsExec,
rss_shuffle_writer_exec::RssShuffleWriterExec,
shuffle::Partitioning,
shuffle_writer_exec::ShuffleWriterExec,
sort_exec::SortExec,
sort_merge_join_exec::SortMergeJoinExec,
window::{WindowExpr, WindowFunction, WindowRankType},
window_exec::WindowExec,
};
use object_store::{path::Path, ObjectMeta};
use parking_lot::Mutex as SyncMutex;

use crate::{
convert_box_required, convert_required,
error::PlanSerDeError,
from_proto_binary_op, proto_error, protobuf,
protobuf::{
physical_expr_node::ExprType, physical_plan_node::PhysicalPlanType,
physical_repartition::RepartitionType, GenerateFunction,
physical_repartition::RepartitionType, GenerateFunction, PhysicalRepartition, SortExecNode,
},
Schema,
};
Expand Down Expand Up @@ -331,45 +334,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::Sort(sort) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(sort.input)?;
let exprs = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {:?}",
self
))
})?;
if let ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {:?}",
self
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(PlanSerDeError::General(format!(
"physical_plan::from_proto() {:?}",
self
)))
}
})
.collect::<Result<Vec<_>, _>>()?;
let exprs = try_parse_physical_sort_expr(&input, sort).unwrap();
// always preserve partitioning
Ok(Arc::new(SortExec::new(
input,
Expand Down Expand Up @@ -1120,9 +1085,56 @@ fn try_parse_physical_expr_box_required(
}
}

fn try_parse_physical_sort_expr(
input: &Arc<dyn ExecutionPlan>,
sort: &Box<SortExecNode>,
) -> Option<Vec<PhysicalSortExpr>> {
let pyhsical_sort_expr = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {:?}",
input
))
})?;
if let ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {:?}",
input
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(PlanSerDeError::General(format!(
"physical_plan::from_proto() {:?}",
input
)))
}
})
.collect::<Result<Vec<_>, _>>()
.ok()?;
Some(pyhsical_sort_expr)
}

pub fn parse_protobuf_partitioning(
input: Arc<dyn ExecutionPlan>,
partitioning: Option<&protobuf::PhysicalRepartition>,
partitioning: Option<&Box<PhysicalRepartition>>,
) -> Result<Option<Partitioning>, PlanSerDeError> {
partitioning.map_or(Ok(None), |p| {
let plan = p.repartition_type.as_ref().ok_or_else(|| {
Expand All @@ -1132,9 +1144,7 @@ pub fn parse_protobuf_partitioning(
))
})?;
match plan {
RepartitionType::SingleRepartition(..) => {
Ok(Some(Partitioning::UnknownPartitioning(1)))
}
RepartitionType::SingleRepartition(..) => Ok(Some(Partitioning::SinglePartitioning())),
RepartitionType::HashRepartition(hash_part) => {
// let hash_part = p.hash_repartition;
let expr = hash_part
Expand All @@ -1145,15 +1155,67 @@ pub fn parse_protobuf_partitioning(
.and_then(|e| Ok(bind(e, &input.schema())?))
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Some(Partitioning::Hash(
Ok(Some(Partitioning::HashPartitioning(
expr,
hash_part.partition_count.try_into().unwrap(),
)))
}

RepartitionType::RoundRobinRepartition(round_robin_part) => Ok(Some(
Partitioning::RoundRobinBatch(round_robin_part.partition_count.try_into().unwrap()),
)),
RepartitionType::RoundRobinRepartition(round_robin_part) => {
Ok(Some(Partitioning::RoundRobinPartitioning(
round_robin_part.partition_count.try_into().unwrap(),
)))
}

RepartitionType::RangeRepartition(range_part) => {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();

let value_list = &range_part.list_value;

let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| proto_error("partition::from_proto() error"))?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error("partition::from_proto() bound_list type error")),
};
values_ref
})
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ pub mod common;
pub mod generate;
pub mod joins;
mod scan;
mod shuffle;
pub mod shuffle;
pub mod window;
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
Expand All @@ -36,7 +37,7 @@ use crate::{
memmgr::MemManager,
shuffle::{
rss_single_repartitioner::RssSingleShuffleRepartitioner,
rss_sort_repartitioner::RssSortShuffleRepartitioner, ShuffleRepartitioner,
rss_sort_repartitioner::RssSortShuffleRepartitioner, Partitioning, ShuffleRepartitioner,
},
};

Expand Down Expand Up @@ -80,7 +81,7 @@ impl ExecutionPlan for RssShuffleWriterExec {
self.props.get_or_init(|| {
PlanProperties::new(
EquivalenceProperties::new(self.schema()),
self.partitioning.clone(),
physical_plan::Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
})
Expand Down Expand Up @@ -122,7 +123,7 @@ impl ExecutionPlan for RssShuffleWriterExec {
p if p.partition_count() == 1 => {
Arc::new(RssSingleShuffleRepartitioner::new(rss_partition_writer))
}
Partitioning::Hash(..) | Partitioning::RoundRobinBatch(..) => {
Partitioning::HashPartitioning(..) | Partitioning::RoundRobinPartitioning(..) => {
let sort_time = exec_ctx.register_timer_metric("sort_time");
let partitioner = Arc::new(RssSortShuffleRepartitioner::new(
partition,
Expand Down
Loading

0 comments on commit a110e53

Please sign in to comment.