From 5a942c1c9eb4af7272d8ca6b74758088be564754 Mon Sep 17 00:00:00 2001 From: guoying06 Date: Mon, 30 Dec 2024 15:54:07 +0800 Subject: [PATCH] update range partition converter --- .../datafusion-ext-plans/src/shuffle/buffered_data.rs | 6 +++--- .../execution/blaze/plan/NativeShuffleExchangeBase.scala | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index a8692c11..9fc641dc 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -39,7 +39,7 @@ use crate::{ evaluate_robin_partition_ids, rss::RssWriter, RePartitioning, }, }; -use parking_lot::Mutex as SyncMutex; +use parking_lot::Mutex; pub struct BufferedData { partition_id: usize, @@ -463,7 +463,7 @@ mod test { let bound1 = Arc::new(Int32Array::from_iter_values([11, 14, 17])) as ArrayRef; let bounds = vec![bound1]; - let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new( + let sort_row_converter = Arc::new(Mutex::new(RowConverter::new( sort_exprs .iter() .map(|expr: &PhysicalSortExpr| { @@ -525,7 +525,7 @@ mod test { let bounds = vec![bound1, bound2]; - let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new( + let sort_row_converter = Arc::new(Mutex::new(RowConverter::new( sort_exprs .iter() .map(|expr: &PhysicalSortExpr| { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeBase.scala index 18026daf..6467c2f0 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeBase.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.blaze.shuffle.BlazeBlockStoreShuffleReader import org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleDependency import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.ArrayType @@ -238,7 +239,8 @@ abstract class NativeShuffleExchangeBase( val valueList = bounds.map { internal_row => internal_row.get(index, field.dataType) } - NativeConverters.convertValue(valueList, ArrayType(field.dataType)) + val arrayData = ArrayData.toArrayData(valueList) + NativeConverters.convertValue(arrayData, ArrayType(field.dataType)) }.toList case _ => null