Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev repartitioning #734

Merged
merged 37 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dcfc189
add round robin shuffle
Dec 10, 2024
e333366
update roundrobin
Dec 10, 2024
6b48a2c
update roundrobin
Dec 10, 2024
689b9c6
update roundrobin
Dec 10, 2024
f0d8737
update roundrobin
Dec 10, 2024
bef0c92
update roundrobin log info
Dec 11, 2024
d44d24a
update roundrobin
Dec 11, 2024
67f994a
update roundrobin
Dec 11, 2024
3bb4dc8
update roundrobin
Dec 12, 2024
cd22bf6
update roundrobin
Dec 12, 2024
de626b4
update roundrobin
Dec 12, 2024
b062173
update roundrobin
Dec 12, 2024
b9af073
update partition proto
Dec 13, 2024
b83338f
update partition proto
Dec 13, 2024
59b0eda
update sort before round robin
Dec 16, 2024
f452a5b
update sort before round robin
Dec 16, 2024
180802f
update round robin
Dec 16, 2024
4763172
update round robin
Dec 16, 2024
bfae9a8
update sort_batch_by_partition_id test
Dec 16, 2024
7ff18e1
update sort_batch_by_partition_id test
Dec 16, 2024
01108e3
update range partition
Dec 25, 2024
dc9bc6e
merge master
Dec 25, 2024
d34f901
merge master
Dec 25, 2024
e7c6a23
update range partition
Dec 26, 2024
8bd4e06
Merge branch 'master' into blaze-repartitioning
Dec 26, 2024
d802c6c
update range partition
Dec 26, 2024
f155fcd
Merge branch 'master' into blaze-repartitioning
Dec 26, 2024
5508034
fix range partition import
Dec 26, 2024
d8a598e
fix merge
Dec 26, 2024
d53d1b3
update range partition
Dec 30, 2024
e37547f
update range partition
Dec 30, 2024
5a942c1
update range partition converter
Dec 30, 2024
7e59a18
update range partition
Dec 31, 2024
b7d6baf
update range partition
Jan 2, 2025
2a56d42
Merge branch 'master' into blaze-repartitioning
Jan 2, 2025
5a00013
fix name
Jan 8, 2025
40c6ff6
cargo fix
Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native-engine/blaze-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ datafusion-ext-plans = { workspace = true }
log = "0.4.22"
object_store = "0.11.1"
prost = "0.13.4"
parking_lot = "0.12.3"

[build-dependencies]
tonic-build = "0.12.3"
9 changes: 9 additions & 0 deletions native-engine/blaze-serde/proto/blaze.proto
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ message PhysicalRepartition {
PhysicalSingleRepartition single_repartition = 1;
PhysicalHashRepartition hash_repartition = 2;
PhysicalRoundRobinRepartition round_robin_repartition = 3;
PhysicalRangeRepartition range_repartition = 4;
}
}

Expand All @@ -617,6 +618,14 @@ message PhysicalRoundRobinRepartition {
uint64 partition_count = 1;
}

message PhysicalRangeRepartition {
SortExecNode sort_expr = 1;
uint64 partition_count = 2;
repeated ScalarValue list_value= 3;
}



message JoinFilter {
PhysicalExprNode expression = 1;
repeated ColumnIndex column_indices = 2;
Expand Down
164 changes: 113 additions & 51 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use std::{
};

use arrow::{
array::RecordBatch,
array::{ArrayRef, RecordBatch},
compute::SortOptions,
datatypes::{Field, FieldRef, SchemaRef},
row::{RowConverter, SortField},
};
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use datafusion::{
common::stats::Precision,
common::{stats::Precision, Result, ScalarValue},
datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
Expand All @@ -45,7 +46,7 @@ use datafusion::{
NegativeExpr, NotExpr, PhysicalSortExpr,
},
union::UnionExec,
ColumnStatistics, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics,
},
prelude::create_udf,
};
Expand Down Expand Up @@ -79,21 +80,23 @@ use datafusion_ext_plans::{
project_exec::ProjectExec,
rename_columns_exec::RenameColumnsExec,
rss_shuffle_writer_exec::RssShuffleWriterExec,
shuffle::Partitioning,
shuffle_writer_exec::ShuffleWriterExec,
sort_exec::SortExec,
sort_merge_join_exec::SortMergeJoinExec,
window::{WindowExpr, WindowFunction, WindowRankType},
window_exec::WindowExec,
};
use object_store::{path::Path, ObjectMeta};
use parking_lot::Mutex as SyncMutex;

use crate::{
convert_box_required, convert_required,
error::PlanSerDeError,
from_proto_binary_op, proto_error, protobuf,
protobuf::{
physical_expr_node::ExprType, physical_plan_node::PhysicalPlanType,
physical_repartition::RepartitionType, GenerateFunction,
physical_repartition::RepartitionType, GenerateFunction, PhysicalRepartition, SortExecNode,
},
Schema,
};
Expand Down Expand Up @@ -331,45 +334,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::Sort(sort) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(sort.input)?;
let exprs = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {:?}",
self
))
})?;
if let ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {:?}",
self
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(PlanSerDeError::General(format!(
"physical_plan::from_proto() {:?}",
self
)))
}
})
.collect::<Result<Vec<_>, _>>()?;
let exprs = try_parse_physical_sort_expr(&input, sort).unwrap();
// always preserve partitioning
Ok(Arc::new(SortExec::new(
input,
Expand Down Expand Up @@ -1120,9 +1085,56 @@ fn try_parse_physical_expr_box_required(
}
}

fn try_parse_physical_sort_expr(
input: &Arc<dyn ExecutionPlan>,
sort: &Box<SortExecNode>,
) -> Option<Vec<PhysicalSortExpr>> {
let pyhsical_sort_expr = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {:?}",
input
))
})?;
if let ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {:?}",
input
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: bind(
try_parse_physical_expr(expr, &input.schema())?,
&input.schema(),
)?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(PlanSerDeError::General(format!(
"physical_plan::from_proto() {:?}",
input
)))
}
})
.collect::<Result<Vec<_>, _>>()
.ok()?;
Some(pyhsical_sort_expr)
}

pub fn parse_protobuf_partitioning(
input: Arc<dyn ExecutionPlan>,
partitioning: Option<&protobuf::PhysicalRepartition>,
partitioning: Option<&Box<PhysicalRepartition>>,
) -> Result<Option<Partitioning>, PlanSerDeError> {
partitioning.map_or(Ok(None), |p| {
let plan = p.repartition_type.as_ref().ok_or_else(|| {
Expand All @@ -1132,9 +1144,7 @@ pub fn parse_protobuf_partitioning(
))
})?;
match plan {
RepartitionType::SingleRepartition(..) => {
Ok(Some(Partitioning::UnknownPartitioning(1)))
}
RepartitionType::SingleRepartition(..) => Ok(Some(Partitioning::SinglePartitioning())),
RepartitionType::HashRepartition(hash_part) => {
// let hash_part = p.hash_repartition;
let expr = hash_part
Expand All @@ -1145,15 +1155,67 @@ pub fn parse_protobuf_partitioning(
.and_then(|e| Ok(bind(e, &input.schema())?))
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Some(Partitioning::Hash(
Ok(Some(Partitioning::HashPartitioning(
expr,
hash_part.partition_count.try_into().unwrap(),
)))
}

RepartitionType::RoundRobinRepartition(round_robin_part) => Ok(Some(
Partitioning::RoundRobinBatch(round_robin_part.partition_count.try_into().unwrap()),
)),
RepartitionType::RoundRobinRepartition(round_robin_part) => {
Ok(Some(Partitioning::RoundRobinPartitioning(
round_robin_part.partition_count.try_into().unwrap(),
)))
}

RepartitionType::RangeRepartition(range_part) => {
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();

let value_list = &range_part.list_value;

let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
.iter()
.map(|expr: &PhysicalSortExpr| {
Ok(SortField::new_with_options(
expr.expr.data_type(&input.schema())?,
expr.options,
))
})
.collect::<Result<Vec<SortField>>>()?,
)?));

let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| proto_error("partition::from_proto() error"))?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error("partition::from_proto() bound_list type error")),
};
values_ref
})
.collect::<Result<Vec<ArrayRef>, _>>()?;

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
exprs,
range_part.partition_count.try_into().unwrap(),
Arc::new(bound_rows),
)))
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ pub mod common;
pub mod generate;
pub mod joins;
mod scan;
mod shuffle;
pub mod shuffle;
pub mod window;
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};
Expand All @@ -36,7 +37,7 @@ use crate::{
memmgr::MemManager,
shuffle::{
rss_single_repartitioner::RssSingleShuffleRepartitioner,
rss_sort_repartitioner::RssSortShuffleRepartitioner, ShuffleRepartitioner,
rss_sort_repartitioner::RssSortShuffleRepartitioner, Partitioning, ShuffleRepartitioner,
},
};

Expand Down Expand Up @@ -80,7 +81,7 @@ impl ExecutionPlan for RssShuffleWriterExec {
self.props.get_or_init(|| {
PlanProperties::new(
EquivalenceProperties::new(self.schema()),
self.partitioning.clone(),
physical_plan::Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
})
Expand Down Expand Up @@ -122,7 +123,7 @@ impl ExecutionPlan for RssShuffleWriterExec {
p if p.partition_count() == 1 => {
Arc::new(RssSingleShuffleRepartitioner::new(rss_partition_writer))
}
Partitioning::Hash(..) | Partitioning::RoundRobinBatch(..) => {
Partitioning::HashPartitioning(..) | Partitioning::RoundRobinPartitioning(..) => {
let sort_time = exec_ctx.register_timer_metric("sort_time");
let partitioner = Arc::new(RssSortShuffleRepartitioner::new(
partition,
Expand Down
Loading