Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix unexpected off-heap memory overflow #355

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions native-engine/blaze-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ macro_rules! jni_throw {

#[macro_export]
macro_rules! jni_fatal_error {
($value:expr) => {{
$crate::jni_bridge::THREAD_JNIENV.with(|env| env.fatal_error($value))
($($arg:tt)*) => {{
$crate::jni_bridge::THREAD_JNIENV.with(|env| env.fatal_error(format!($($arg)*)))
}};
}

Expand Down
76 changes: 76 additions & 0 deletions native-engine/blaze/src/alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// #[global_allocator]
// static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

use std::{
alloc::{GlobalAlloc, Layout},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Mutex,
},
};

#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// only used for debugging
//
// #[global_allocator]
// static GLOBAL: DebugAlloc<jemallocator::Jemalloc> =
// DebugAlloc::new(jemallocator::Jemalloc);

#[allow(unused)]
struct DebugAlloc<T: GlobalAlloc> {
inner: T,
last_updated: AtomicUsize,
current: AtomicUsize,
mutex: Mutex<()>,
}

#[allow(unused)]
impl<T: GlobalAlloc> DebugAlloc<T> {
pub const fn new(inner: T) -> Self {
Self {
inner,
last_updated: AtomicUsize::new(0),
current: AtomicUsize::new(0),
mutex: Mutex::new(()),
}
}

fn update(&self) {
let _lock = self.mutex.lock().unwrap();
let current = self.current.load(SeqCst);
let last_updated = self.last_updated.load(SeqCst);
let delta = (current as isize - last_updated as isize).abs();
if delta > 104857600 {
eprintln!(" * ALLOC {} -> {}", last_updated, current);
self.last_updated.store(current, SeqCst);
}
}
}

unsafe impl<T: GlobalAlloc> GlobalAlloc for DebugAlloc<T> {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
self.current.fetch_add(layout.size(), SeqCst);
self.update();
self.inner.alloc(layout)
}

unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.current.fetch_sub(layout.size(), SeqCst);
self.update();
self.inner.dealloc(ptr, layout)
}

unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
self.current.fetch_add(layout.size(), SeqCst);
self.update();
self.inner.alloc_zeroed(layout)
}

unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
self.current.fetch_add(new_size - layout.size(), SeqCst);
self.update();
self.inner.realloc(ptr, layout, new_size)
}
}
13 changes: 7 additions & 6 deletions native-engine/blaze/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::{
physical_plan::{displayable, ExecutionPlan},
prelude::{SessionConfig, SessionContext},
};
use datafusion_ext_commons::df_execution_err;
use datafusion_ext_plans::memmgr::MemManager;
use jni::{
objects::{JClass, JObject},
Expand Down Expand Up @@ -87,22 +88,22 @@ pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
let task_definition = TaskDefinition::decode(
jni_convert_byte_array!(raw_task_definition.as_obj())?.as_slice(),
)
.map_err(|err| DataFusionError::Plan(format!("cannot decode execution plan: {:?}", err)))?;
.or_else(|err| df_execution_err!("cannot decode execution plan: {err:?}"))?;

let task_id = &task_definition.task_id.expect("task_id is empty");
let plan = &task_definition.plan.expect("plan is empty");
drop(raw_task_definition);

// get execution plan
let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into().map_err(|err| {
DataFusionError::Plan(format!("cannot create execution plan: {:?}", err))
})?;
let execution_plan: Arc<dyn ExecutionPlan> = plan
.try_into()
.or_else(|err| df_execution_err!("cannot create execution plan: {err:?}"))?;
let execution_plan_displayable = displayable(execution_plan.as_ref())
.indent(true)
.to_string();
log::info!("Creating native execution plan succeeded");
log::info!(" task_id={:?}", task_id);
log::info!(" execution plan:\n{}", execution_plan_displayable);
log::info!(" task_id={task_id:?}");
log::info!(" execution plan:\n{execution_plan_displayable}");

// execute to stream
let runtime = Box::new(NativeExecutionRuntime::start(
Expand Down
14 changes: 3 additions & 11 deletions native-engine/blaze/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use std::{any::Any, error::Error, fmt::Debug, panic::AssertUnwindSafe};
use blaze_jni_bridge::*;
use jni::objects::{JObject, JThrowable};

mod alloc;
mod exec;
mod logging;
mod metrics;
mod rt;

#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn handle_unwinded(err: Box<dyn Any + Send>) {
// default handling:
// * caused by Interrupted/TaskKilled: do nothing but just print a message.
Expand All @@ -48,10 +46,7 @@ fn handle_unwinded(err: Box<dyn Any + Send>) {
Ok(())
};
recover().unwrap_or_else(|err: Box<dyn Error>| {
jni_fatal_error!(format!(
"Error recovering from panic, cannot resume: {:?}",
err
));
jni_fatal_error!("Error recovering from panic, cannot resume: {err:?}");
});
}

Expand All @@ -70,10 +65,7 @@ fn throw_runtime_exception(msg: &str, cause: JObject) -> datafusion::error::Resu
let e = jni_new_object!(JavaRuntimeException(msg.as_obj(), cause))?;

if let Err(err) = jni_throw!(JThrowable::from(e.as_obj())) {
jni_fatal_error!(format!(
"Error throwing RuntimeException, cannot result: {:?}",
err
));
jni_fatal_error!("Error throwing RuntimeException, cannot result: {err:?}");
}
Ok(())
}
94 changes: 43 additions & 51 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::{
error::Error,
panic::AssertUnwindSafe,
sync::{mpsc::Receiver, Arc},
};
Expand All @@ -35,8 +36,8 @@ use datafusion::{
ExecutionPlan,
},
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use datafusion_ext_plans::common::output::WrappedRecordBatchSender;
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
use datafusion_ext_plans::common::output::TaskOutputter;
use futures::{FutureExt, StreamExt};
use jni::objects::{GlobalRef, JObject};
use tokio::runtime::Runtime;
Expand All @@ -48,7 +49,6 @@ pub struct NativeExecutionRuntime {
plan: Arc<dyn ExecutionPlan>,
task_context: Arc<TaskContext>,
partition: usize,
ffi_schema: Arc<FFI_ArrowSchema>,
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
rt: Runtime,
}
Expand All @@ -71,9 +71,9 @@ impl NativeExecutionRuntime {
)?;

// init ffi schema
let ffi_schema = Arc::new(FFI_ArrowSchema::try_from(schema.as_ref())?);
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj())
.importSchema(ffi_schema.as_ref() as *const FFI_ArrowSchema as i64) -> ()
.importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> ()
)?;

// create tokio runtime
Expand All @@ -92,84 +92,80 @@ impl NativeExecutionRuntime {
})
.build()?;

let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(2);
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
let nrt = Self {
native_wrapper: native_wrapper.clone(),
plan,
partition,
rt,
ffi_schema,
batch_receiver,
task_context: context,
};

// spawn batch producer
let batch_sender_cloned = batch_sender.clone();
let consume_stream = move || async move {
let err_sender = batch_sender.clone();
let consume_stream = async move {
while let Some(batch) = AssertUnwindSafe(stream.next())
.catch_unwind()
.await
.unwrap_or_else(|err| {
let panic_message =
panic_message::get_panic_message(&err).unwrap_or("unknown error");
Some(Err(DataFusionError::Execution(panic_message.to_owned())))
Some(df_execution_err!("{}", panic_message))
})
.transpose()
.map_err(|err| DataFusionError::Execution(format!("{}", err)))?
.or_else(|err| df_execution_err!("{err}"))?
{
batch_sender.send(Ok(Some(batch))).map_err(|err| {
DataFusionError::Execution(format!("send batch error: {err}"))
})?;
batch_sender
.send(Ok(Some(batch)))
.or_else(|err| df_execution_err!("send batch error: {err}"))?;
}
batch_sender
.send(Ok(None))
.map_err(|err| DataFusionError::Execution(format!("send batch error: {err}")))?;

.or_else(|err| df_execution_err!("send batch error: {err}"))?;
log::info!("[partition={partition}] finished");
Ok::<_, DataFusionError>(())
};
nrt.rt.spawn(async move {
let result = consume_stream().await;
result.unwrap_or_else(|err| handle_unwinded_scope(|| -> Result<()> {
batch_sender_cloned.send(
Err(DataFusionError::Execution(format!("execution aborted")))
).map_err(|err| {
DataFusionError::Execution(format!("send batch error: {err}"))
})?;
let task_running = is_task_running();
if !task_running {
log::warn!(
"[partition={partition}] task completed/interrupted before native execution done",
);
return Ok(());
}

let cause =
if jni_exception_check!()? {
consume_stream.await.unwrap_or_else(|err| {
handle_unwinded_scope(|| {
let task_running = is_task_running();
if !task_running {
log::warn!(
"[partition={partition}] task completed before native execution done"
);
return Ok(());
}

err_sender.send(df_execution_err!("execution aborted"))?;
let cause = if jni_exception_check!()? {
log::error!("[partition={partition}] panics with an java exception: {err}");
Some(jni_exception_occurred!()?)
} else {
log::error!("[partition={partition}] panics: {err}");
None
};

set_error(
&native_wrapper,
&format!("[partition={partition}] panics: {err}"),
cause.map(|e| e.as_obj()),
)?;
log::info!("[partition={partition}] exited abnormally.");
Ok::<_, DataFusionError>(())
}));
set_error(
&native_wrapper,
&format!("[partition={partition}] panics: {err}"),
cause.map(|e| e.as_obj()),
)?;
log::info!("[partition={partition}] exited abnormally.");
Ok::<_, Box<dyn Error>>(())
})
});
});
Ok(nrt)
}

pub fn next_batch(&self) -> bool {
let next_batch = || -> Result<bool> {
match self.batch_receiver.recv().map_err(|err| {
DataFusionError::Execution(format!("receive batch error: {err}"))
})?? {
match self
.batch_receiver
.recv()
.or_else(|err| df_execution_err!("receive batch error: {err}"))??
{
Some(batch) => {
let ffi_array = FFI_ArrowArray::new(&StructArray::from(batch).into_data());
jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
Expand Down Expand Up @@ -197,14 +193,10 @@ impl NativeExecutionRuntime {

pub fn finalize(self) {
log::info!("native execution [partition={}] finalizing", self.partition);
let _ = self.update_metrics();
log::info!("native execution [partition={}] 1", self.partition);
drop(self.ffi_schema);
log::info!("native execution [partition={}] 2", self.partition);
self.update_metrics().unwrap_or_default();
drop(self.plan);
log::info!("native execution [partition={}] 3", self.partition);
WrappedRecordBatchSender::cancel_task(&self.task_context); // cancel all pending streams
log::info!("native execution [partition={}] 4", self.partition);

self.task_context.cancel_task(); // cancel all pending streams
self.rt.shutdown_background();
log::info!("native execution [partition={}] finalized", self.partition);
}
Expand Down
8 changes: 4 additions & 4 deletions native-engine/datafusion-ext-commons/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ use arrow::{array::*, datatypes::*};
use bigdecimal::{FromPrimitive, ToPrimitive};
use datafusion::common::{
cast::{as_float32_array, as_float64_array},
DataFusionError, Result,
Result,
};
use num::{cast::AsPrimitive, Bounded, Integer, Signed};
use paste::paste;

use crate::df_execution_err;

pub fn cast(array: &dyn Array, cast_type: &DataType) -> Result<ArrayRef> {
return cast_impl(array, cast_type, false);
}
Expand Down Expand Up @@ -109,9 +111,7 @@ pub fn cast_impl(

if !match_struct_fields {
if to_fields.len() != struct_.num_columns() {
return Err(DataFusionError::Execution(
"cannot cast structs with different numbers of fields".to_string(),
));
df_execution_err!("cannot cast structs with different numbers of fields")?;
}

let casted_arrays = struct_
Expand Down
Loading