Skip to content

Commit

Permalink
fix unexpected off-heap memory overflow:
Browse files Browse the repository at this point in the history
	1. use smaller batch size for parquet scaning.
	2. reduce native-to-spark ffi channel buffer size.
	3. shorten batch lifetime in project-filtering and batch coalescing.
	4. other minor code refection.
  • Loading branch information
zhangli20 committed Jan 1, 2024
1 parent 7a72a97 commit 014bb7f
Show file tree
Hide file tree
Showing 49 changed files with 522 additions and 627 deletions.
2 changes: 1 addition & 1 deletion native-engine/blaze-jni-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ resolver = "1"
datafusion = { workspace = true }
jni = "0.20.0"
log = "0.4.14"
once_cell = "1.19.0"
once_cell = "1.11.0"
paste = "1.0.7"
4 changes: 2 additions & 2 deletions native-engine/blaze-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ macro_rules! jni_throw {

#[macro_export]
macro_rules! jni_fatal_error {
($value:expr) => {{
$crate::jni_bridge::THREAD_JNIENV.with(|env| env.fatal_error($value))
($($arg:tt)*) => {{
$crate::jni_bridge::THREAD_JNIENV.with(|env| env.fatal_error(format!($($arg)*)))
}};
}

Expand Down
76 changes: 76 additions & 0 deletions native-engine/blaze/src/alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// #[global_allocator]
// static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::{
alloc::{GlobalAlloc, Layout},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Mutex,
},
};

#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// only used for debugging
//
// #[global_allocator]
// static GLOBAL: DebugAlloc<jemallocator::Jemalloc> =
// DebugAlloc::new(jemallocator::Jemalloc);

#[allow(unused)]
struct DebugAlloc<T: GlobalAlloc> {
inner: T,
last_updated: AtomicUsize,
current: AtomicUsize,
mutex: Mutex<()>,
}

#[allow(unused)]
impl<T: GlobalAlloc> DebugAlloc<T> {
pub const fn new(inner: T) -> Self {
Self {
inner,
last_updated: AtomicUsize::new(0),
current: AtomicUsize::new(0),
mutex: Mutex::new(()),
}
}

fn update(&self) {
let _lock = self.mutex.lock().unwrap();
let current = self.current.load(SeqCst);
let last_updated = self.last_updated.load(SeqCst);
let delta = (current as isize - last_updated as isize).abs();
if delta > 104857600 {
eprintln!(" * ALLOC {} -> {}", last_updated, current);
self.last_updated.store(current, SeqCst);
}
}
}

unsafe impl<T: GlobalAlloc> GlobalAlloc for DebugAlloc<T> {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
self.current.fetch_add(layout.size(), SeqCst);
self.update();
self.inner.alloc(layout)
}

unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.current.fetch_sub(layout.size(), SeqCst);
self.update();
self.inner.dealloc(ptr, layout)
}

unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
self.current.fetch_add(layout.size(), SeqCst);
self.update();
self.inner.alloc_zeroed(layout)
}

unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
self.current.fetch_add(new_size - layout.size(), SeqCst);
self.update();
self.inner.realloc(ptr, layout, new_size)
}
}
13 changes: 7 additions & 6 deletions native-engine/blaze/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::{
physical_plan::{displayable, ExecutionPlan},
prelude::{SessionConfig, SessionContext},
};
use datafusion_ext_commons::df_execution_err;
use datafusion_ext_plans::memmgr::MemManager;
use jni::{
objects::{JClass, JObject},
Expand Down Expand Up @@ -87,22 +88,22 @@ pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
let task_definition = TaskDefinition::decode(
jni_convert_byte_array!(raw_task_definition.as_obj())?.as_slice(),
)
.map_err(|err| DataFusionError::Plan(format!("cannot decode execution plan: {:?}", err)))?;
.or_else(|err| df_execution_err!("cannot decode execution plan: {err:?}"))?;

let task_id = &task_definition.task_id.expect("task_id is empty");
let plan = &task_definition.plan.expect("plan is empty");
drop(raw_task_definition);

// get execution plan
let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into().map_err(|err| {
DataFusionError::Plan(format!("cannot create execution plan: {:?}", err))
})?;
let execution_plan: Arc<dyn ExecutionPlan> = plan
.try_into()
.or_else(|err| df_execution_err!("cannot create execution plan: {err:?}"))?;
let execution_plan_displayable = displayable(execution_plan.as_ref())
.indent(true)
.to_string();
log::info!("Creating native execution plan succeeded");
log::info!(" task_id={:?}", task_id);
log::info!(" execution plan:\n{}", execution_plan_displayable);
log::info!(" task_id={task_id:?}");
log::info!(" execution plan:\n{execution_plan_displayable}");

// execute to stream
let runtime = Box::new(NativeExecutionRuntime::start(
Expand Down
14 changes: 3 additions & 11 deletions native-engine/blaze/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use std::{any::Any, error::Error, fmt::Debug, panic::AssertUnwindSafe};
use blaze_jni_bridge::*;
use jni::objects::{JObject, JThrowable};

mod alloc;
mod exec;
mod logging;
mod metrics;
mod rt;

#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn handle_unwinded(err: Box<dyn Any + Send>) {
// default handling:
// * caused by Interrupted/TaskKilled: do nothing but just print a message.
Expand All @@ -48,10 +46,7 @@ fn handle_unwinded(err: Box<dyn Any + Send>) {
Ok(())
};
recover().unwrap_or_else(|err: Box<dyn Error>| {
jni_fatal_error!(format!(
"Error recovering from panic, cannot resume: {:?}",
err
));
jni_fatal_error!("Error recovering from panic, cannot resume: {err:?}");
});
}

Expand All @@ -70,10 +65,7 @@ fn throw_runtime_exception(msg: &str, cause: JObject) -> datafusion::error::Resu
let e = jni_new_object!(JavaRuntimeException(msg.as_obj(), cause))?;

if let Err(err) = jni_throw!(JThrowable::from(e.as_obj())) {
jni_fatal_error!(format!(
"Error throwing RuntimeException, cannot result: {:?}",
err
));
jni_fatal_error!("Error throwing RuntimeException, cannot result: {err:?}");
}
Ok(())
}
94 changes: 43 additions & 51 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::{
error::Error,
panic::AssertUnwindSafe,
sync::{mpsc::Receiver, Arc},
};
Expand All @@ -35,8 +36,8 @@ use datafusion::{
ExecutionPlan,
},
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use datafusion_ext_plans::common::output::WrappedRecordBatchSender;
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
use datafusion_ext_plans::common::output::TaskOutputter;
use futures::{FutureExt, StreamExt};
use jni::objects::{GlobalRef, JObject};
use tokio::runtime::Runtime;
Expand All @@ -48,7 +49,6 @@ pub struct NativeExecutionRuntime {
plan: Arc<dyn ExecutionPlan>,
task_context: Arc<TaskContext>,
partition: usize,
ffi_schema: Arc<FFI_ArrowSchema>,
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
rt: Runtime,
}
Expand All @@ -71,9 +71,9 @@ impl NativeExecutionRuntime {
)?;

// init ffi schema
let ffi_schema = Arc::new(FFI_ArrowSchema::try_from(schema.as_ref())?);
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj())
.importSchema(ffi_schema.as_ref() as *const FFI_ArrowSchema as i64) -> ()
.importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> ()
)?;

// create tokio runtime
Expand All @@ -92,84 +92,80 @@ impl NativeExecutionRuntime {
})
.build()?;

let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(2);
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(0);
let nrt = Self {
native_wrapper: native_wrapper.clone(),
plan,
partition,
rt,
ffi_schema,
batch_receiver,
task_context: context,
};

// spawn batch producer
let batch_sender_cloned = batch_sender.clone();
let consume_stream = move || async move {
let err_sender = batch_sender.clone();
let consume_stream = async move {
while let Some(batch) = AssertUnwindSafe(stream.next())
.catch_unwind()
.await
.unwrap_or_else(|err| {
let panic_message =
panic_message::get_panic_message(&err).unwrap_or("unknown error");
Some(Err(DataFusionError::Execution(panic_message.to_owned())))
Some(df_execution_err!("{}", panic_message))
})
.transpose()
.map_err(|err| DataFusionError::Execution(format!("{}", err)))?
.or_else(|err| df_execution_err!("{err}"))?
{
batch_sender.send(Ok(Some(batch))).map_err(|err| {
DataFusionError::Execution(format!("send batch error: {err}"))
})?;
batch_sender
.send(Ok(Some(batch)))
.or_else(|err| df_execution_err!("send batch error: {err}"))?;
}
batch_sender
.send(Ok(None))
.map_err(|err| DataFusionError::Execution(format!("send batch error: {err}")))?;

.or_else(|err| df_execution_err!("send batch error: {err}"))?;
log::info!("[partition={partition}] finished");
Ok::<_, DataFusionError>(())
};
nrt.rt.spawn(async move {
let result = consume_stream().await;
result.unwrap_or_else(|err| handle_unwinded_scope(|| -> Result<()> {
batch_sender_cloned.send(
Err(DataFusionError::Execution(format!("execution aborted")))
).map_err(|err| {
DataFusionError::Execution(format!("send batch error: {err}"))
})?;
let task_running = is_task_running();
if !task_running {
log::warn!(
"[partition={partition}] task completed/interrupted before native execution done",
);
return Ok(());
}

let cause =
if jni_exception_check!()? {
consume_stream.await.unwrap_or_else(|err| {
handle_unwinded_scope(|| {
let task_running = is_task_running();
if !task_running {
log::warn!(
"[partition={partition}] task completed before native execution done"
);
return Ok(());
}

err_sender.send(df_execution_err!("execution aborted"))?;
let cause = if jni_exception_check!()? {
log::error!("[partition={partition}] panics with an java exception: {err}");
Some(jni_exception_occurred!()?)
} else {
log::error!("[partition={partition}] panics: {err}");
None
};

set_error(
&native_wrapper,
&format!("[partition={partition}] panics: {err}"),
cause.map(|e| e.as_obj()),
)?;
log::info!("[partition={partition}] exited abnormally.");
Ok::<_, DataFusionError>(())
}));
set_error(
&native_wrapper,
&format!("[partition={partition}] panics: {err}"),
cause.map(|e| e.as_obj()),
)?;
log::info!("[partition={partition}] exited abnormally.");
Ok::<_, Box<dyn Error>>(())
})
});
});
Ok(nrt)
}

pub fn next_batch(&self) -> bool {
let next_batch = || -> Result<bool> {
match self.batch_receiver.recv().map_err(|err| {
DataFusionError::Execution(format!("receive batch error: {err}"))
})?? {
match self
.batch_receiver
.recv()
.or_else(|err| df_execution_err!("receive batch error: {err}"))??
{
Some(batch) => {
let ffi_array = FFI_ArrowArray::new(&StructArray::from(batch).into_data());
jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
Expand Down Expand Up @@ -197,14 +193,10 @@ impl NativeExecutionRuntime {

pub fn finalize(self) {
log::info!("native execution [partition={}] finalizing", self.partition);
let _ = self.update_metrics();
log::info!("native execution [partition={}] 1", self.partition);
drop(self.ffi_schema);
log::info!("native execution [partition={}] 2", self.partition);
self.update_metrics().unwrap_or_default();
drop(self.plan);
log::info!("native execution [partition={}] 3", self.partition);
WrappedRecordBatchSender::cancel_task(&self.task_context); // cancel all pending streams
log::info!("native execution [partition={}] 4", self.partition);

self.task_context.cancel_task(); // cancel all pending streams
self.rt.shutdown_background();
log::info!("native execution [partition={}] finalized", self.partition);
}
Expand Down
8 changes: 4 additions & 4 deletions native-engine/datafusion-ext-commons/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use arrow::{array::*, datatypes::*};
use bigdecimal::{FromPrimitive, ToPrimitive};
use datafusion::common::{
cast::{as_float32_array, as_float64_array},
DataFusionError, Result,
Result,
};
use num::{cast::AsPrimitive, Bounded, Integer, Signed};
use paste::paste;

use crate::df_execution_err;

pub fn cast(array: &dyn Array, cast_type: &DataType) -> Result<ArrayRef> {
return cast_impl(array, cast_type, false);
}
Expand Down Expand Up @@ -109,9 +111,7 @@ pub fn cast_impl(

if !match_struct_fields {
if to_fields.len() != struct_.num_columns() {
return Err(DataFusionError::Execution(
"cannot cast structs with different numbers of fields".to_string(),
));
df_execution_err!("cannot cast structs with different numbers of fields")?;
}

let casted_arrays = struct_
Expand Down
Loading

0 comments on commit 014bb7f

Please sign in to comment.