diff --git a/Cargo.lock b/Cargo.lock index 511d4a60..fd14a7f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -431,6 +431,7 @@ dependencies = [ "panic-message", "paste", "prost 0.13.3", + "raw-cpuid", "tokio", ] @@ -2411,6 +2412,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "redox_syscall" version = "0.5.7" diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index 6198a58c..b0017a04 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs @@ -30,7 +30,6 @@ macro_rules! define_conf { define_conf!(IntConf, BATCH_SIZE); define_conf!(DoubleConf, MEMORY_FRACTION); -define_conf!(IntConf, TOKIO_NUM_WORKER_THREADS); define_conf!(BooleanConf, SMJ_INEQUALITY_JOIN_ENABLE); define_conf!(BooleanConf, CASE_CONVERT_FUNCTIONS_ENABLE); define_conf!(BooleanConf, INPUT_BATCH_STATISTICS_ENABLE); diff --git a/native-engine/blaze/Cargo.toml b/native-engine/blaze/Cargo.toml index 1b666385..89edba89 100644 --- a/native-engine/blaze/Cargo.toml +++ b/native-engine/blaze/Cargo.toml @@ -25,6 +25,7 @@ once_cell = "1.20.2" panic-message = "0.3.0" paste = "1.0.15" prost = "0.13.3" +raw-cpuid = "11.2.0" tokio = "=1.42.0" [target.'cfg(not(windows))'.dependencies] diff --git a/native-engine/blaze/src/rt.rs b/native-engine/blaze/src/rt.rs index 3afa18a5..33d05a7a 100644 --- a/native-engine/blaze/src/rt.rs +++ b/native-engine/blaze/src/rt.rs @@ -24,11 +24,9 @@ use arrow::{ record_batch::RecordBatch, }; use blaze_jni_bridge::{ - conf::{IntConf, TOKIO_NUM_WORKER_THREADS}, - is_task_running, - jni_bridge::JavaClasses, - jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred, - jni_new_global_ref, jni_new_object, jni_new_string, + is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_convert_byte_array, + jni_exception_check, jni_exception_occurred, jni_new_global_ref, jni_new_object, + jni_new_string, }; use blaze_serde::protobuf::TaskDefinition; use datafusion::{ @@ -49,6 +47,7 @@ use datafusion_ext_plans::{ use futures::{FutureExt, StreamExt}; use jni::objects::{GlobalRef, JObject}; use prost::Message; +use raw_cpuid::CpuId; use tokio::{runtime::Runtime, task::JoinHandle}; use crate::{ @@ -95,13 +94,29 @@ impl NativeExecutionRuntime { &ExecutionPlanMetricsSet::new(), ); + // determine number of tokio worker threads + // use the real number of available physical cores + let default_parallelism = std::thread::available_parallelism() + .map(|v| v.get()) + .unwrap_or(1); + let has_htt = CpuId::new() + .get_feature_info() + .map(|info| info.has_htt()) + .unwrap_or(false); + let mut num_worker_threads = if has_htt { + default_parallelism / 2 + } else { + default_parallelism + }; + num_worker_threads = num_worker_threads.max(1); + // create tokio runtime // propagate classloader and task context to spawned children threads let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?; let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?; let tokio_runtime = tokio::runtime::Builder::new_multi_thread() .thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}")) - .worker_threads(TOKIO_NUM_WORKER_THREADS.value()? as usize) + .worker_threads(num_worker_threads) .on_thread_start(move || { let classloader = JavaClasses::get().classloader; let _ = jni_call_static!( diff --git a/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs b/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs index 7c3f1938..663f8920 100644 --- a/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs +++ b/native-engine/datafusion-ext-commons/src/algorithm/rdx_tournament_tree.rs @@ -33,6 +33,7 @@ pub struct RadixTournamentTree { #[allow(clippy::len_without_is_empty)] impl RadixTournamentTree { pub fn new(values: Vec, num_keys: usize) -> Self { + let num_keys = num_keys + 1; // avoid overflow let num_values = values.len(); let mut tree = unsafe { // safety: diff --git a/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs b/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs index f32c6104..d5ce81ca 100644 --- a/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs +++ b/native-engine/datafusion-ext-commons/src/algorithm/rdxsort.rs @@ -12,46 +12,103 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::vec::IntoIter; +use crate::unchecked; -use radsort::Key; +/// Perform radix sort on a single array +/// +/// - array: the array to be sorted +/// - counts: the counters to be used for counting, must be initialized to 0. +/// will be filled with the number of elements in each bucket after sorting. +/// - key: a function to extract the key from the array element +pub fn radix_sort_by_key(array: &mut [T], counts: &mut [usize], key: impl Fn(&T) -> usize) { + #[derive(Default, Clone, Copy)] + struct Part { + cur: usize, + end: usize, + } -const STD_SORT_LIMIT: usize = 4096; + let num_keys = counts.len(); + let mut counts = unchecked!(counts); + let mut parts = unchecked!(vec![Part::default(); num_keys]); -pub fn radix_sort_unstable(array: &mut [impl Key + Ord]) { - radix_sort_unstable_by_key(array, |v| *v); -} + // count + array.iter().for_each(|item| counts[key(item)] += 1); + + // construct parts + let mut beg = 0; + for (idx, count) in counts.iter().enumerate() { + if *count > 0 { + parts[idx] = Part { + cur: beg, + end: beg + count, + }; + beg += count; + } + } -pub fn radix_sort_unstable_by_key(array: &mut [T], key: impl Fn(&T) -> K) { - if array.len() < STD_SORT_LIMIT { - array.sort_unstable_by_key(key); - } else { - radsort::sort_by_key(array, key); + // reorganize each partition + let mut inexhausted_part_indices = unchecked!(vec![0; num_keys]); + for i in 0..num_keys { + inexhausted_part_indices[i] = i; + } + while { + inexhausted_part_indices.retain(|&i| parts[i].cur < parts[i].end); + inexhausted_part_indices.len() > 1 + } { + for &part_idx in inexhausted_part_indices.iter() { + let cur_part = &parts[part_idx]; + let cur = cur_part.cur; + let end = cur_part.end; + for item_idx in cur..end { + let target_part_idx = key(&array[item_idx]); + let target_part = &mut parts[target_part_idx]; + unsafe { + // safety: skip bound check + array.swap_unchecked(item_idx, target_part.cur); + } + target_part.cur += 1; + } + } } } -pub trait RadixSortIterExt: Iterator { - fn radix_sorted_unstable(self) -> IntoIter - where - Self: Sized, - Self::Item: Key + Ord, - { - let mut vec: Vec = self.collect(); - radix_sort_unstable(&mut vec); - vec.into_iter() +#[cfg(test)] +mod test { + use rand::Rng; + + use super::*; + + #[test] + fn fuzzytest_u16_small() { + for n in 0..1000 { + let mut array = vec![]; + for _ in 0..n { + array.push(rand::thread_rng().gen::()); + } + + let mut array1 = array.clone(); + radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize); + + let mut array2 = array.clone(); + array2.sort_unstable(); + + assert_eq!(array1, array2); + } } - fn radix_sorted_unstable_by_key( - self, - key: impl Fn(&Self::Item) -> K, - ) -> IntoIter - where - Self: Sized, - { - let mut vec: Vec = self.collect(); - radix_sort_unstable_by_key(&mut vec, key); - vec.into_iter() + #[test] + fn fuzzytest_u16_1m() { + let mut array = vec![]; + for _ in 0..1000000 { + array.push(rand::thread_rng().gen::()); + } + + let mut array1 = array.clone(); + radix_sort_by_key(&mut array1, &mut [0; 65536], |key| *key as usize); + + let mut array2 = array.clone(); + array2.sort_unstable(); + + assert_eq!(array1, array2); } } - -impl> RadixSortIterExt for I {} diff --git a/native-engine/datafusion-ext-commons/src/arrow/cast.rs b/native-engine/datafusion-ext-commons/src/arrow/cast.rs index 93d5f04f..a7e2087c 100644 --- a/native-engine/datafusion-ext-commons/src/arrow/cast.rs +++ b/native-engine/datafusion-ext-commons/src/arrow/cast.rs @@ -98,6 +98,7 @@ pub fn cast_impl( ) } (&DataType::List(_), DataType::List(to_field)) => { + log::info!("XXX cast list to_field={to_field:?}"); let list = as_list_array(array); let items = cast_impl(list.values(), to_field.data_type(), match_struct_fields)?; make_array( diff --git a/native-engine/datafusion-ext-commons/src/arrow/selection.rs b/native-engine/datafusion-ext-commons/src/arrow/selection.rs index 6774e640..095ef85a 100644 --- a/native-engine/datafusion-ext-commons/src/arrow/selection.rs +++ b/native-engine/datafusion-ext-commons/src/arrow/selection.rs @@ -143,8 +143,13 @@ pub fn create_array_interleaver( }); } } - let v = interleaver.arrays[*a].value(*b); - values.push(v) + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + let v = interleaver.arrays[*a].value(*b); + values.push(v) + } else { + values.push(Default::default()); + } } let array = PrimitiveArray::::new(values.into(), nulls); @@ -172,9 +177,12 @@ pub fn create_array_interleaver( }); } } - let o = interleaver.arrays[*a].value_offsets(); - let element_len = o[*b + 1].as_usize() - o[*b].as_usize(); - capacity += element_len; + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + let o = array.value_offsets(); + let element_len = o[*b + 1].as_usize() - o[*b].as_usize(); + capacity += element_len; + } offsets.append(T::Offset::from_usize(capacity).expect("overflow")); } @@ -192,7 +200,10 @@ pub fn create_array_interleaver( }); } } - values.extend_from_slice(interleaver.arrays[*a].value(*b).as_ref()); + let array = &interleaver.arrays[*a]; + if array.is_valid(*b) { + values.extend_from_slice(interleaver.arrays[*a].value(*b).as_ref()); + } } // Safety: safe by construction diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs index 6e526a0a..c612f6af 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs @@ -209,8 +209,10 @@ impl AggHashMap { } pub fn upsert_records(&mut self, keys: Vec) -> Vec { - self.map.reserve(keys.len()); - self.map.upsert_many(keys) + tokio::task::block_in_place(|| { + self.map.reserve(keys.len()); + self.map.upsert_many(keys) + }) } pub fn take_keys(&mut self) -> Vec { diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index e89ab395..8370b96b 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -28,7 +28,7 @@ use datafusion::{ use datafusion_ext_commons::{ algorithm::{ rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, - rdxsort::radix_sort_unstable_by_key, + rdxsort::radix_sort_by_key, }, batch_size, df_execution_err, downcast_any, io::{read_bytes_slice, read_len, write_len}, @@ -480,33 +480,31 @@ impl HashingData { .enumerate() .map(|(record_idx, key)| (bucket_id(key) as u32, record_idx as u32)) .collect::>(); - radix_sort_unstable_by_key(&mut entries, |v| v.0 as u16); + + let mut bucket_counts = vec![0; NUM_SPILL_BUCKETS]; + radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { + *bucket_id as usize + }); let mut writer = spill.get_compressed_writer(); - let mut begin = 0; - let mut end; - while begin < entries.len() { - let cur_bucket_id = entries[begin].0; - end = begin + 1; - while end < entries.len() && entries[end].0 == cur_bucket_id { - end += 1; + let mut offset = 0; + for (cur_bucket_id, bucket_count) in bucket_counts.into_iter().enumerate() { + if bucket_count == 0 { + continue; } - - write_len(cur_bucket_id as usize, &mut writer)?; - write_len(end - begin, &mut writer)?; + write_len(cur_bucket_id, &mut writer)?; + write_len(bucket_count, &mut writer)?; write_spill_bucket( &mut writer, &acc_table, - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, record_idx)| &key_rows[record_idx as usize]), - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, record_idx)| record_idx as usize), )?; - - // next bucket - begin = end; + offset += bucket_count; } // EOF @@ -584,24 +582,24 @@ impl MergingData { let mut entries = self.entries; let key_rows = self.key_rows; let acc_table = self.acc_table; - radix_sort_unstable_by_key(&mut entries, |(bucket_id, ..)| *bucket_id as u16); + + let mut bucket_counts = vec![0; NUM_SPILL_BUCKETS]; + radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { + *bucket_id as usize + }); let mut writer = spill.get_compressed_writer(); - let mut begin = 0; - let mut end; - while begin < entries.len() { - let cur_bucket_id = entries[begin].0; - end = begin + 1; - while end < entries.len() && entries[end].0 == cur_bucket_id { - end += 1; + let mut offset = 0; + for (cur_bucket_id, bucket_count) in bucket_counts.into_iter().enumerate() { + if bucket_count == 0 { + continue; } - - write_len(cur_bucket_id as usize, &mut writer)?; - write_len(end - begin, &mut writer)?; + write_len(cur_bucket_id, &mut writer)?; + write_len(bucket_count, &mut writer)?; write_spill_bucket( &mut writer, &acc_table, - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, batch_idx, row_idx, _)| { key_rows[batch_idx as usize] @@ -609,13 +607,11 @@ impl MergingData { .as_ref() .as_raw_bytes() }), - entries[begin..end] + entries[offset..][..bucket_count] .iter() .map(|&(_, _, _, record_idx)| record_idx as usize), )?; - - // next bucket - begin = end; + offset += bucket_count; } // EOF diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index b684a013..79cfcd13 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -152,7 +152,7 @@ impl Agg for AggGenericCollect { idx_for! { (acc_idx in acc_idx) => { list.push(ScalarValue::List(ScalarValue::new_list( - &accs.take_values(acc_idx, self.arg_type.clone()), + &accs.take_values(acc_idx), &self.arg_type, true, ))); @@ -168,7 +168,7 @@ pub trait AccCollectionColumn: AccColumn + Send + Sync + 'static { fn merge_items(&mut self, idx: usize, other: &mut Self, other_idx: usize); fn save_raw(&self, idx: usize, w: &mut impl Write) -> Result<()>; fn load_raw(&mut self, idx: usize, r: &mut impl Read) -> Result<()>; - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec; + fn take_values(&mut self, idx: usize) -> Vec; fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec]) -> Result<()> { let mut array_idx = 0; @@ -195,25 +195,6 @@ pub trait AccCollectionColumn: AccColumn + Send + Sync + 'static { } Ok(()) } - - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - idx_for! { - (idx in idx) => { - self.save_raw(idx, w)?; - } - } - Ok(()) - } - - fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - let idx = self.num_records(); - self.resize(idx + num_rows); - - while idx < self.num_records() { - self.load_raw(idx, r)?; - } - Ok(()) - } } pub struct AccSetColumn { @@ -265,10 +246,10 @@ impl AccCollectionColumn for AccSetColumn { Ok(()) } - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec { + fn take_values(&mut self, idx: usize) -> Vec { self.mem_used -= self.set[idx].mem_size(); std::mem::take(&mut self.set[idx]) - .into_values(dt, false) + .into_values(self.dt.clone(), false) .collect() } } @@ -309,23 +290,37 @@ impl AccColumn for AccSetColumn { } fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - AccCollectionColumn::spill(self, idx, w) + idx_for! { + (idx in idx) => { + self.save_raw(idx, w)?; + } + } + Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - AccCollectionColumn::unspill(self, num_rows, r) + let mut idx = self.num_records(); + self.resize(idx + num_rows); + + while idx < self.num_records() { + self.load_raw(idx, r)?; + idx += 1; + } + Ok(()) } } pub struct AccListColumn { list: Vec, + dt: DataType, mem_used: usize, } impl AccCollectionColumn for AccListColumn { - fn empty(_dt: DataType) -> Self { + fn empty(dt: DataType) -> Self { Self { list: vec![], + dt, mem_used: 0, } } @@ -360,10 +355,10 @@ impl AccCollectionColumn for AccListColumn { Ok(()) } - fn take_values(&mut self, idx: usize, dt: DataType) -> Vec { + fn take_values(&mut self, idx: usize) -> Vec { self.mem_used -= self.list[idx].mem_size(); std::mem::take(&mut self.list[idx]) - .into_values(dt, false) + .into_values(self.dt.clone(), false) .collect() } } @@ -404,11 +399,23 @@ impl AccColumn for AccListColumn { } fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - AccCollectionColumn::spill(self, idx, w) + idx_for! { + (idx in idx) => { + self.save_raw(idx, w)?; + } + } + Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - AccCollectionColumn::unspill(self, num_rows, r) + let mut idx = self.num_records(); + self.resize(idx + num_rows); + + while idx < self.num_records() { + self.load_raw(idx, r)?; + idx += 1; + } + Ok(()) } } @@ -622,3 +629,91 @@ fn acc_hash(value: impl AsRef<[u8]>) -> u64 { foldhash::fast::FixedState::with_seed(ACC_HASH_SEED as u64); HASHER.hash_one(value.as_ref()) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::DataType; + use datafusion::common::ScalarValue; + use crate::memmgr::spill::Spill; + + #[test] + fn test_acc_set_append() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + + acc_set.append(&value1, false); + acc_set.append(&value2, false); + + assert_eq!(acc_set.list.raw.len(), 8); // 4 bytes for each int32 + assert_eq!(acc_set.set.len(), 2); + } + + #[test] + fn test_acc_set_merge() { + let mut acc_set1 = AccSet::default(); + let mut acc_set2 = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + let value3 = ScalarValue::Int32(Some(3)); + + acc_set1.append(&value1, false); + acc_set1.append(&value2, false); + acc_set2.append(&value2, false); + acc_set2.append(&value3, false); + + acc_set1.merge(&mut acc_set2); + + assert_eq!(acc_set1.list.raw.len(), 12); // 4 bytes for each int32 + assert_eq!(acc_set1.set.len(), 3); + } + + #[test] + fn test_acc_set_into_values() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + let value2 = ScalarValue::Int32(Some(2)); + + acc_set.append(&value1, false); + acc_set.append(&value2, false); + + let values: Vec = acc_set.into_values(DataType::Int32, false).collect(); + assert_eq!(values, vec![value1, value2]); + } + + #[test] + fn test_acc_set_duplicate_append() { + let mut acc_set = AccSet::default(); + let value1 = ScalarValue::Int32(Some(1)); + + acc_set.append(&value1, false); + acc_set.append(&value1, false); + + assert_eq!(acc_set.list.raw.len(), 4); // 4 bytes for one int32 + assert_eq!(acc_set.set.len(), 1); + } + + #[test] + fn test_acc_set_spill() { + let mut acc_col = AccSetColumn::empty(DataType::Int32); + acc_col.resize(3); + acc_col.append_item(1, &ScalarValue::Int32(Some(1))); + acc_col.append_item(1, &ScalarValue::Int32(Some(2))); + acc_col.append_item(2, &ScalarValue::Int32(Some(3))); + acc_col.append_item(2, &ScalarValue::Int32(Some(4))); + acc_col.append_item(2, &ScalarValue::Int32(Some(5))); + acc_col.append_item(2, &ScalarValue::Int32(Some(6))); + acc_col.append_item(2, &ScalarValue::Int32(Some(7))); + + let mut spill: Box = Box::new(vec![]); + acc_col.spill(IdxSelection::Range(0, 3), &mut spill.get_compressed_writer()).unwrap(); + + let mut acc_col_unspill = AccSetColumn::empty(DataType::Int32); + acc_col_unspill.unspill(3, &mut spill.get_compressed_reader()).unwrap(); + + assert_eq!(acc_col.take_values(0), acc_col_unspill.take_values(0)); + assert_eq!(acc_col.take_values(1), acc_col_unspill.take_values(1)); + assert_eq!(acc_col.take_values(2), acc_col_unspill.take_values(2)); + } +} \ No newline at end of file diff --git a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs index 68ff8d89..105fccb8 100644 --- a/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs +++ b/native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs @@ -17,11 +17,7 @@ use std::{ fmt::{Debug, Formatter}, fs::File, io::{BufReader, Cursor, Read, Seek, SeekFrom}, - sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, - mpsc::Receiver, - Arc, - }, + sync::{mpsc::Receiver, Arc}, }; use arrow::{ @@ -50,7 +46,7 @@ use datafusion_ext_commons::{ }; use jni::objects::{GlobalRef, JObject}; use once_cell::sync::OnceCell; -use parking_lot::Mutex; +use tokio::task::JoinHandle; use crate::common::{ execution_context::ExecutionContext, ipc_compression::IpcCompressionReader, @@ -147,15 +143,23 @@ impl ExecutionPlan for IpcReaderExec { // spawn a blocking thread for reading ipcs and providing batches let blocks = jni_new_global_ref!(blocks_local.as_obj())?; - let rx = read_ipc_into_channel(blocks, exec_ctx.clone()); - Ok( - exec_ctx.output_with_sender("IpcReader", move |sender| async move { - while let Some(batch) = rx.recv().expect("receive error").transpose()? { - sender.send(batch).await; + let (rx, handle) = read_ipc_into_channel(blocks, exec_ctx.clone()); + let output = exec_ctx.output_with_sender("IpcReader", move |sender| async move { + loop { + match rx.recv() { + Ok(batch) => { + sender.send(batch).await; + } + Err(_disconnected) => { + drop(rx); + handle.await.expect("tokio error")?; + break; + } } - Ok(()) - }), - ) + } + Ok(()) + }); + Ok(output) } fn metrics(&self) -> Option { @@ -170,118 +174,94 @@ impl ExecutionPlan for IpcReaderExec { fn read_ipc_into_channel( blocks: GlobalRef, exec_ctx: Arc, -) -> Receiver>> { - let (tx, rx) = std::sync::mpsc::channel(); - tokio::task::spawn_blocking(move || { +) -> (Receiver, JoinHandle>) { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let handle = tokio::task::spawn_blocking(move || { let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone(); let _timer = elapsed_compute.timer(); log::info!("start ipc reading"); let size_counter = exec_ctx.register_counter_metric("size"); let batch_size = batch_size(); - let staging_cols: Arc>>> = Arc::new(Mutex::new(vec![])); - let staging_num_rows = AtomicUsize::new(0); - let staging_mem_size = AtomicUsize::new(0); - - let provide_batches = || -> Result<()> { - while is_task_running() { - // get next block - let blocks = blocks.clone(); - if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? { - break; + let output_batch_mem_size = suggested_output_batch_mem_size(); + let mut staging_cols: Vec> = vec![]; + let mut staging_num_rows = 0; + let mut staging_mem_size = 0; + + while is_task_running() { + // get next block + let blocks = blocks.clone(); + if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? { + break; + } + let next_block = jni_new_global_ref!( + jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj() + )?; + + // get ipc reader + let mut reader = match next_block { + b if jni_call!(BlazeBlockObject(b.as_obj()).hasFileSegment() -> bool)? => { + get_file_reader(b.as_obj())? } - let next_block = jni_new_global_ref!( - jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj() - )?; - - // get ipc reader - let mut reader = Box::pin(match next_block { - b if jni_call!(BlazeBlockObject(b.as_obj()).hasFileSegment() -> bool)? => { - get_file_reader(b.as_obj())? - } - b if jni_call!(BlazeBlockObject(b.as_obj()).hasByteBuffer() -> bool)? => { - get_byte_buffer_reader(b.as_obj())? - } - b => get_channel_reader(b.as_obj())?, - }); - - while let Some((num_rows, cols)) = - reader.as_mut().read_batch(&exec_ctx.output_schema())? - { - let (cur_staging_num_rows, cur_staging_mem_size) = { - let staging_cols_cloned = staging_cols.clone(); - let mut staging_cols = staging_cols_cloned.lock(); - let mut cols_mem_size = 0; - staging_cols.resize_with(cols.len(), || vec![]); - for (col_idx, col) in cols.into_iter().enumerate() { - cols_mem_size += col.get_array_mem_size(); - staging_cols[col_idx].push(col); - } - drop(staging_cols); - staging_num_rows.fetch_add(num_rows, SeqCst); - staging_mem_size.fetch_add(cols_mem_size, SeqCst); - (staging_num_rows.load(SeqCst), staging_mem_size.load(SeqCst)) - }; - - if cur_staging_num_rows >= batch_size - || cur_staging_mem_size >= suggested_output_batch_mem_size() - { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - exec_ctx.output_schema(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)), - )?; - staging_num_rows.store(0, SeqCst); - staging_mem_size.store(0, SeqCst); - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - - if elapsed_compute - .exclude_timer(|| tx.send(Some(Ok(batch)))) - .is_err() - { - break; - } + b if jni_call!(BlazeBlockObject(b.as_obj()).hasByteBuffer() -> bool)? => { + get_byte_buffer_reader(b.as_obj())? + } + b => get_channel_reader(b.as_obj())?, + }; + + while let Some((num_rows, cols)) = reader.read_batch(&exec_ctx.output_schema())? { + let mut cols_mem_size = 0; + staging_cols.resize_with(cols.len(), || vec![]); + for (col_idx, col) in cols.into_iter().enumerate() { + cols_mem_size += col.get_array_mem_size(); + staging_cols[col_idx].push(col); + } + staging_num_rows += num_rows; + staging_mem_size += cols_mem_size; + + if staging_num_rows >= batch_size || staging_mem_size >= output_batch_mem_size { + let coalesced_cols = std::mem::take(&mut staging_cols) + .into_iter() + .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) + .collect::>(); + let batch = RecordBatch::try_new_with_options( + exec_ctx.output_schema(), + coalesced_cols, + &RecordBatchOptions::new().with_row_count(Some(staging_num_rows)), + )?; + staging_num_rows = 0; + staging_mem_size = 0; + size_counter.add(batch.get_array_mem_size()); + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + if !elapsed_compute.exclude_timer(|| tx.send(batch)).is_ok() { + break; } } } + } - let cur_staging_num_rows = staging_num_rows.load(SeqCst); - if cur_staging_num_rows > 0 { - let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock()) - .into_iter() - .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) - .collect::>(); - let batch = RecordBatch::try_new_with_options( - exec_ctx.output_schema(), - coalesced_cols, - &RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows)), - )?; - size_counter.add(batch.get_array_mem_size()); - exec_ctx.baseline_metrics().record_output(batch.num_rows()); - let _ = elapsed_compute.exclude_timer(|| tx.send(Some(Ok(batch)))); - } - let _ = elapsed_compute.exclude_timer(|| tx.send(None)); - Ok::<_, DataFusionError>(()) - }; - - if let Err(err) = provide_batches() { - elapsed_compute - .exclude_timer(|| tx.send(Some(Err(err)))) - .expect("send error"); + if staging_num_rows > 0 { + let coalesced_cols = staging_cols + .into_iter() + .map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols)) + .collect::>(); + let batch = RecordBatch::try_new_with_options( + exec_ctx.output_schema(), + coalesced_cols, + &RecordBatchOptions::new().with_row_count(Some(staging_num_rows)), + )?; + size_counter.add(batch.get_array_mem_size()); + exec_ctx.baseline_metrics().record_output(batch.num_rows()); + let _ = elapsed_compute.exclude_timer(|| tx.send(batch)); } + Ok::<_, DataFusionError>(()) }); - rx + (rx, handle) } fn get_channel_reader(block: JObject) -> Result>> { let channel_reader = ReadableByteChannelReader::try_new(block)?; - log::info!("start ipc channel reader"); Ok(IpcCompressionReader::new(Box::new( BufReader::with_capacity(65536, channel_reader), ))) diff --git a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs index c31394aa..963d1d84 100644 --- a/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs +++ b/native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs @@ -27,7 +27,6 @@ use arrow::{ }; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{ - algorithm::rdxsort::RadixSortIterExt, io::{read_len, read_raw_slice, write_len, write_raw_slice}, prefetch_read_data, spark_hash::create_hashes, @@ -117,7 +116,7 @@ impl Table { num_valid_items += 1; (idx as u32, hash) }) - .radix_sorted_unstable_by_key(|&(_idx, hash)| hash) + .sorted_unstable_by_key(|&(idx, hash)| (hash, idx)) .chunk_by(|(_, hash)| *hash) .into_iter() { @@ -403,7 +402,7 @@ impl JoinHashMap { } pub fn lookup_many(&self, hashes: Vec) -> Vec { - self.table.lookup_many(hashes) + tokio::task::block_in_place(|| self.table.lookup_many(hashes)) } pub fn get_range(&self, map_value: MapValue) -> &[u32] { diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs index 4fd2bd96..22e6b1e2 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs @@ -106,6 +106,8 @@ impl ParquetExec { let (projected_schema, projected_statistics, _projected_output_ordering) = base_config.project(); + log::warn!("XXX base_config.file_schema: {:?}", file_schema); + log::warn!("XXX project_schema: {:?}", projected_schema); Self { fs_resource_id, base_config, diff --git a/native-engine/datafusion-ext-plans/src/scan/mod.rs b/native-engine/datafusion-ext-plans/src/scan/mod.rs index 019b3a1d..4a6187b7 100644 --- a/native-engine/datafusion-ext-plans/src/scan/mod.rs +++ b/native-engine/datafusion-ext-plans/src/scan/mod.rs @@ -168,8 +168,12 @@ fn schema_adapter_cast_column(col: &ArrayRef, data_type: &DataType) -> Result match col.data_type() { DataType::List(_from_field) => { + log::warn!("XXX col.dt: {:?}", col.data_type()); + log::warn!("XXX to_field: {to_field:?}"); + log::warn!("XXX from_field: {_from_field:?}"); let col = col.as_list::(); let from_inner = col.values(); + log::warn!("XXX from_inner.dt: {:?}", from_inner.data_type()); let to_inner = schema_adapter_cast_column(from_inner, to_field.data_type())?; Ok(Arc::new(ListArray::try_new( to_field.clone(), diff --git a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs index 3f957a79..3e0eae52 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/buffered_data.rs @@ -14,20 +14,26 @@ use std::io::Write; -use arrow::{array::ArrayRef, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; use blaze_jni_bridge::{is_task_running, jni_call}; +use bytesize::ByteSize; use count_write::CountWrite; use datafusion::{ common::Result, physical_plan::{metrics::Time, Partitioning}, }; use datafusion_ext_commons::{ - algorithm::rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, - arrow::{array_size::ArraySize, coalesce::coalesce_arrays_unchecked, selection::take_batch}, - assume, compute_suggested_batch_size_for_output, df_execution_err, unchecked, + algorithm::{ + rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree}, + rdxsort::radix_sort_by_key, + }, + arrow::{ + array_size::ArraySize, + selection::{create_batch_interleaver, BatchInterleaver}, + }, + compute_suggested_batch_size_for_output, df_execution_err, }; use jni::objects::GlobalRef; -use unchecked_index::UncheckedIndex; use crate::{ common::{ipc_compression::IpcCompressionWriter, timer_helper::TimerHelper}, @@ -36,56 +42,92 @@ use crate::{ pub struct BufferedData { partition_id: usize, + partitioning: Partitioning, + staging_batches: Vec, + staging_num_rows: usize, + staging_mem_used: usize, sorted_batches: Vec, - sorted_parts: Vec>, + sorted_offsets: Vec>, num_rows: usize, - mem_used: usize, + sorted_mem_used: usize, sort_time: Time, } impl BufferedData { - pub fn new(partition_id: usize, sort_time: Time) -> Self { + pub fn new(partitioning: Partitioning, partition_id: usize, sort_time: Time) -> Self { Self { partition_id, + partitioning, + staging_batches: vec![], + staging_num_rows: 0, + staging_mem_used: 0, sorted_batches: vec![], - sorted_parts: vec![], + sorted_offsets: vec![], num_rows: 0, - mem_used: 0, + sorted_mem_used: 0, sort_time, } } pub fn drain(&mut self) -> Self { - std::mem::replace(self, Self::new(self.partition_id, self.sort_time.clone())) + std::mem::replace( + self, + Self::new( + self.partitioning.clone(), + self.partition_id, + self.sort_time.clone(), + ), + ) } - pub fn add_batch(&mut self, batch: RecordBatch, partitioning: &Partitioning) -> Result<()> { + pub fn add_batch(&mut self, batch: RecordBatch) -> Result<()> { self.num_rows += batch.num_rows(); + self.staging_num_rows += batch.num_rows(); + self.staging_mem_used += batch.get_array_mem_size(); + self.staging_batches.push(batch); + + let suggested_batch_size = + compute_suggested_batch_size_for_output(self.staging_mem_used, self.staging_num_rows); + if self.staging_mem_used > suggested_batch_size { + self.flush_staging()?; + } + Ok(()) + } - let (parts, sorted_batch) = self + fn flush_staging(&mut self) -> Result<()> { + let staging_batches = std::mem::take(&mut self.staging_batches); + let (offsets, sorted_batch) = self .sort_time - .with_timer(|| sort_batch_by_partition_id(batch, partitioning))?; - self.mem_used += - sorted_batch.get_array_mem_size() + parts.len() * size_of::(); + .with_timer(|| sort_batches_by_partition_id(staging_batches, &self.partitioning))?; + self.staging_num_rows = 0; + self.staging_mem_used = 0; + + self.sorted_mem_used += sorted_batch.get_array_mem_size() + offsets.len() * 4; self.sorted_batches.push(sorted_batch); - self.sorted_parts.push(parts); + self.sorted_offsets.push(offsets); Ok(()) } // write buffered data to spill/target file, returns uncompressed size and // offsets to each partition - pub fn write(self, mut w: W, partitioning: &Partitioning) -> Result> { - log::info!("draining all buffered data, total_mem={}", self.mem_used()); + pub fn write(mut self, mut w: W) -> Result> { + if !self.staging_batches.is_empty() { + self.flush_staging()?; + } + + let mem_used = ByteSize(self.mem_used() as u64); + log::info!("draining all buffered data, total_mem={mem_used}"); if self.num_rows == 0 { - return Ok(vec![0; partitioning.partition_count() + 1]); + return Ok(vec![0; self.partitioning.partition_count() + 1]); } + let num_partitions = self.partitioning.partition_count(); let mut writer = IpcCompressionWriter::new(CountWrite::from(&mut w)); let mut offsets = vec![]; let mut offset = 0; - let mut iter = self.into_sorted_batches(partitioning)?; + let mut iter = self.into_sorted_batches()?; - while (iter.cur_part_id() as usize) < partitioning.partition_count() { + while !iter.finished() { if !is_task_running() { df_execution_err!("task completed/killed")?; } @@ -96,39 +138,38 @@ impl BufferedData { // write all batches with this part id while iter.cur_part_id() == cur_part_id { - let (num_rows, cols) = iter.next_batch(); - writer.write_batch(num_rows, &cols)?; + let batch = iter.next_batch()?; + writer.write_batch(batch.num_rows(), batch.columns())?; } writer.finish_current_buf()?; offset = writer.inner().count(); offsets.push(offset); } - while offsets.len() <= partitioning.partition_count() { + while offsets.len() <= num_partitions { offsets.push(offset); // fill offsets of empty partitions } - let compressed_size = offsets.last().cloned().unwrap_or_default(); + let compressed_size = ByteSize(offsets.last().cloned().unwrap_or_default() as u64); log::info!("all buffered data drained, compressed_size={compressed_size}"); Ok(offsets) } // write buffered data to rss, returns uncompressed size - pub fn write_rss( - self, - rss_partition_writer: GlobalRef, - partitioning: &Partitioning, - ) -> Result<()> { + pub fn write_rss(mut self, rss_partition_writer: GlobalRef) -> Result<()> { + if !self.staging_batches.is_empty() { + self.flush_staging()?; + } + + let mem_used = ByteSize(self.mem_used() as u64); + log::info!("draining all buffered data to rss, total_mem={mem_used}"); + if self.num_rows == 0 { return Ok(()); } - log::info!( - "draining all buffered data to rss, total_mem={}", - self.mem_used() - ); - let mut iter = self.into_sorted_batches(partitioning)?; + let mut iter = self.into_sorted_batches()?; let mut writer = IpcCompressionWriter::new(RssWriter::new(rss_partition_writer.clone(), 0)); - while (iter.cur_part_id() as usize) < partitioning.partition_count() { + while !iter.finished() { if !is_task_running() { df_execution_err!("task completed/killed")?; } @@ -140,8 +181,8 @@ impl BufferedData { // write all batches with this part id while iter.cur_part_id() == cur_part_id { - let (num_rows, cols) = iter.next_batch(); - writer.write_batch(num_rows, &cols)?; + let batch = iter.next_batch()?; + writer.write_batch(batch.num_rows(), batch.columns())?; } writer.finish_current_buf()?; } @@ -150,48 +191,47 @@ impl BufferedData { Ok(()) } - fn into_sorted_batches( - self, - partitioning: &Partitioning, - ) -> Result { + fn into_sorted_batches(self) -> Result { let sub_batch_size = compute_suggested_batch_size_for_output(self.mem_used(), self.num_rows); Ok(PartitionedBatchesIterator { - batches: unchecked!(self.sorted_batches.clone()), + batch_interleaver: create_batch_interleaver(&self.sorted_batches, true)?, cursors: RadixTournamentTree::new( - self.sorted_parts + self.sorted_offsets .into_iter() .enumerate() - .map(|(idx, partition_indices)| { + .map(|(idx, offsets)| { let mut cur = PartCursor { idx, - parts: partition_indices, + offsets, parts_idx: 0, }; cur.skip_empty_parts(); cur }) .collect(), - partitioning.partition_count(), + self.partitioning.partition_count(), ), num_output_rows: 0, num_rows: self.num_rows, - num_cols: self.sorted_batches[0].num_columns(), batch_size: sub_batch_size, }) } pub fn mem_used(&self) -> usize { - self.mem_used + self.sorted_mem_used + self.staging_mem_used + } + + pub fn is_empty(&self) -> bool { + self.sorted_batches.is_empty() && self.staging_batches.is_empty() } } struct PartitionedBatchesIterator { - batches: UncheckedIndex>, + batch_interleaver: BatchInterleaver, cursors: RadixTournamentTree, num_output_rows: usize, num_rows: usize, - num_cols: usize, batch_size: usize, } @@ -200,107 +240,110 @@ impl PartitionedBatchesIterator { self.cursors.peek().rdx() as u32 } - fn next_batch(&mut self) -> (usize, Vec) { + pub fn finished(&self) -> bool { + self.num_output_rows >= self.num_rows + } + + pub fn next_batch(&mut self) -> Result { let cur_batch_size = self.batch_size.min(self.num_rows - self.num_output_rows); let cur_part_id = self.cur_part_id(); - let mut slices = vec![vec![]; self.num_cols]; - let mut slices_len = 0; + let mut indices = Vec::with_capacity(cur_batch_size); // add rows with same parition id under this cursor - while slices_len < cur_batch_size { + while indices.len() < cur_batch_size { let mut min_cursor = self.cursors.peek_mut(); if min_cursor.rdx() as u32 != cur_part_id { break; } - - let cur_part = min_cursor.parts[min_cursor.parts_idx]; - for i in 0..self.num_cols { - slices[i].push( - self.batches[min_cursor.idx] - .column(i) - .slice(cur_part.start as usize, cur_part.len as usize), - ); - } - slices_len += cur_part.len as usize; + let batch_idx = min_cursor.idx; + let min_offsets = &min_cursor.offsets; + let min_parts_idx = min_cursor.parts_idx; + let cur_offset_range = min_offsets[min_parts_idx]..min_offsets[min_parts_idx + 1]; + indices.extend(cur_offset_range.map(|offset| (batch_idx, offset as usize))); // forward to next non-empty partition min_cursor.parts_idx += 1; min_cursor.skip_empty_parts(); } - let output_slices = slices - .into_iter() - .map(|s| coalesce_arrays_unchecked(s[0].data_type(), &s)) - .collect::>(); - - self.num_output_rows += slices_len; - (slices_len, output_slices) + let batch_interleaver = &mut self.batch_interleaver; + let output_batch = batch_interleaver(&indices)?; + self.num_output_rows += output_batch.num_rows(); + Ok(output_batch) } } struct PartCursor { idx: usize, - parts: Vec, + offsets: Vec, parts_idx: usize, } impl PartCursor { fn skip_empty_parts(&mut self) { - while self.parts_idx < self.parts.len() && self.parts[self.parts_idx].len == 0 { - self.parts_idx += 1; + if self.parts_idx < self.num_partitions() { + if self.offsets[self.parts_idx + 1] == self.offsets[self.parts_idx] { + self.parts_idx += 1; + self.skip_empty_parts(); + } } } + + fn num_partitions(&self) -> usize { + self.offsets.len() - 1 + } } impl KeyForRadixTournamentTree for PartCursor { fn rdx(&self) -> usize { - self.parts_idx.min(self.parts.len()) + self.parts_idx } } -#[derive(Clone, Copy, Default)] -struct PartitionInBatch { - start: u32, - len: u32, -} - -fn sort_batch_by_partition_id( - batch: RecordBatch, +fn sort_batches_by_partition_id( + batches: Vec, partitioning: &Partitioning, -) -> Result<(Vec, RecordBatch)> { +) -> Result<(Vec, RecordBatch)> { let num_partitions = partitioning.partition_count(); - let num_rows = batch.num_rows(); // compute partition indices - let hashes = evaluate_hashes(partitioning, &batch) - .expect(&format!("error evaluating hashes with {partitioning}")); - let part_ids = evaluate_partition_ids(hashes, partitioning.partition_count()); + let mut partition_indices = batches + .iter() + .enumerate() + .flat_map(|(batch_idx, batch)| { + let hashes = evaluate_hashes(partitioning, &batch) + .expect(&format!("error evaluating hashes with {partitioning}")); + evaluate_partition_ids(hashes, partitioning.partition_count()) + .into_iter() + .enumerate() + .map(move |(row_idx, part_id)| (part_id, batch_idx as u32, row_idx as u32)) + }) + .collect::>(); - // compute partitions - let mut partitions = vec![PartitionInBatch::default(); num_partitions]; - let mut start = 0; + // sort + let mut part_counts = vec![0; num_partitions]; + radix_sort_by_key( + &mut partition_indices, + &mut part_counts, + |&(part_id, ..)| part_id as usize, + ); - for &part_id in &part_ids { - assume!((part_id as usize) < partitions.len()); - partitions[part_id as usize].len += 1; - } - for part in &mut partitions { - part.start = start; - start += part.len; + // compute partitions + let mut partition_offsets = Vec::with_capacity(num_partitions + 1); + let mut offset = 0; + for part_count in part_counts { + partition_offsets.push(offset); + offset += part_count as u32; } + partition_offsets.push(offset); - // bucket sort - let mut sorted_row_indices = vec![0; num_rows]; - let mut bucket_starts = partitions.iter().map(|part| part.start).collect::>(); - - for (row_idx, part_id) in part_ids.into_iter().enumerate() { - let start = bucket_starts[part_id as usize]; - - assume!((part_id as usize) < bucket_starts.len()); - assume!((start as usize) < sorted_row_indices.len()); - bucket_starts[part_id as usize] += 1; - sorted_row_indices[start as usize] = row_idx as u32; - } - let sorted_batch = take_batch(batch, sorted_row_indices)?; - return Ok((partitions, sorted_batch)); + // get sorted batch + let batches_interleaver = create_batch_interleaver(&batches, true)?; + let sorted_batch = batches_interleaver( + &partition_indices + .into_iter() + .map(|(_, batch_idx, row_idx)| (batch_idx as usize, row_idx as usize)) + .collect::>(), + )?; + return Ok((partition_offsets, sorted_batch)); } diff --git a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs index 78eb4a45..1bbcb2a6 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/rss_sort_repartitioner.rs @@ -20,7 +20,7 @@ use datafusion::{ common::Result, physical_plan::{metrics::Time, Partitioning}, }; -use datafusion_ext_commons::{arrow::array_size::ArraySize, df_execution_err}; +use datafusion_ext_commons::arrow::array_size::ArraySize; use futures::lock::Mutex; use jni::objects::GlobalRef; @@ -33,7 +33,6 @@ pub struct RssSortShuffleRepartitioner { name: String, mem_consumer_info: Option>, data: Mutex, - partitioning: Partitioning, rss: GlobalRef, } @@ -47,8 +46,7 @@ impl RssSortShuffleRepartitioner { Self { name: format!("RssSortShufflePartitioner[partition={}]", partition_id), mem_consumer_info: None, - data: Mutex::new(BufferedData::new(partition_id, sort_time)), - partitioning, + data: Mutex::new(BufferedData::new(partitioning, partition_id, sort_time)), rss: rss_partition_writer, } } @@ -73,11 +71,10 @@ impl MemConsumer for RssSortShuffleRepartitioner { async fn spill(&self) -> Result<()> { let data = self.data.lock().await.drain(); let rss = self.rss.clone(); - let partitioning = self.partitioning.clone(); - tokio::task::spawn_blocking(move || data.write_rss(rss, &partitioning)) + tokio::task::spawn_blocking(move || data.write_rss(rss)) .await - .or_else(|err| df_execution_err!("{err}"))??; + .expect("tokio error")?; self.update_mem_used(0).await?; Ok(()) } @@ -99,7 +96,7 @@ impl ShuffleRepartitioner for RssSortShuffleRepartitioner { // add batch to buffered data let mem_used = { let mut data = self.data.lock().await; - data.add_batch(input, &self.partitioning)?; + data.add_batch(input)?; data.mem_used() }; self.update_mem_used(mem_used).await?; @@ -115,7 +112,7 @@ impl ShuffleRepartitioner for RssSortShuffleRepartitioner { async fn shuffle_write(&self) -> Result<()> { self.set_spillable(false); - let has_data = self.data.lock().await.mem_used() > 0; + let has_data = !self.data.lock().await.is_empty(); if has_data { self.spill().await?; } diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index f8eeb998..1366c852 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -49,7 +49,6 @@ pub struct SortShuffleRepartitioner { output_index_file: String, data: Mutex, spills: Mutex>, - partitioning: Partitioning, num_output_partitions: usize, output_io_time: Time, } @@ -71,9 +70,8 @@ impl SortShuffleRepartitioner { mem_consumer_info: None, output_data_file, output_index_file, - data: Mutex::new(BufferedData::new(partition_id, sort_time)), + data: Mutex::new(BufferedData::new(partitioning, partition_id, sort_time)), spills: Mutex::default(), - partitioning, num_output_partitions, output_io_time, } @@ -100,7 +98,7 @@ impl MemConsumer for SortShuffleRepartitioner { let data = self.data.lock().await.drain(); let mut spill = try_new_spill(self.exec_ctx.spill_metrics())?; - let offsets = data.write(spill.get_buf_writer(), &self.partitioning)?; + let offsets = data.write(spill.get_buf_writer())?; self.spills .lock() .await @@ -126,7 +124,7 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { // add batch to buffered data let mem_used = { let mut data = self.data.lock().await; - data.add_batch(input, &self.partitioning)?; + data.add_batch(input)?; data.mem_used() }; self.update_mem_used(mem_used).await?; @@ -163,7 +161,6 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { // no spills - directly write current batches into final file if spills.is_empty() { - let partitioning = self.partitioning.clone(); let output_io_time = self.output_io_time.clone(); tokio::task::spawn_blocking(move || { let mut output_data = output_io_time.wrap_writer( @@ -182,7 +179,7 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { ); // write data file - let offsets = data.write(&mut output_data, &partitioning)?; + let offsets = data.write(&mut output_data)?; // write index file let mut offsets_data = vec![]; @@ -221,10 +218,10 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner { } // write rest data into an in-memory buffer - if data.mem_used() > 0 { + if !data.is_empty() { let mut spill = Box::new(vec![]); let writer = spill.get_buf_writer(); - let offsets = data.write(writer, &self.partitioning)?; + let offsets = data.write(writer)?; self.update_mem_used(spill.len()).await?; spills.push(ShuffleSpill { spill, offsets }); } diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index db747dd0..f8d99213 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -27,11 +27,6 @@ public enum BlazeConf { /// actual off-heap memory usage is expected to be spark.executor.memoryOverhead * fraction. MEMORY_FRACTION("spark.blaze.memoryFraction", 0.6), - /// number of worker threads used in tokio runtime, 0 to use default available parallism value. - /// for cpus those support hyperthreading, it is recommended to set this value to the number - /// of available physical cores. - TOKIO_NUM_WORKER_THREADS("spark.blaze.tokio.num.worker.threads", 1), - /// enable converting upper/lower functions to native, special cases may provide different /// outputs from spark due to different unicode versions. CASE_CONVERT_FUNCTIONS_ENABLE("spark.blaze.enable.caseconvert.functions", true),