-
Notifications
You must be signed in to change notification settings - Fork 129
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix unexpected off-heap memory overflow:
1. use smaller batch size for parquet scaning. 2. reduce native-to-spark ffi channel buffer size. 3. shorten batch lifetime in project-filtering and batch coalescing. 4. other minor code refection.
- Loading branch information
zhangli20
committed
Jan 2, 2024
1 parent
37b7d11
commit 22d4598
Showing
49 changed files
with
522 additions
and
627 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// #[global_allocator] | ||
// static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; | ||
|
||
use std::{ | ||
alloc::{GlobalAlloc, Layout}, | ||
sync::{ | ||
atomic::{AtomicUsize, Ordering::SeqCst}, | ||
Mutex, | ||
}, | ||
}; | ||
|
||
#[global_allocator] | ||
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; | ||
|
||
// only used for debugging | ||
// | ||
// #[global_allocator] | ||
// static GLOBAL: DebugAlloc<jemallocator::Jemalloc> = | ||
// DebugAlloc::new(jemallocator::Jemalloc); | ||
|
||
#[allow(unused)] | ||
struct DebugAlloc<T: GlobalAlloc> { | ||
inner: T, | ||
last_updated: AtomicUsize, | ||
current: AtomicUsize, | ||
mutex: Mutex<()>, | ||
} | ||
|
||
#[allow(unused)] | ||
impl<T: GlobalAlloc> DebugAlloc<T> { | ||
pub const fn new(inner: T) -> Self { | ||
Self { | ||
inner, | ||
last_updated: AtomicUsize::new(0), | ||
current: AtomicUsize::new(0), | ||
mutex: Mutex::new(()), | ||
} | ||
} | ||
|
||
fn update(&self) { | ||
let _lock = self.mutex.lock().unwrap(); | ||
let current = self.current.load(SeqCst); | ||
let last_updated = self.last_updated.load(SeqCst); | ||
let delta = (current as isize - last_updated as isize).abs(); | ||
if delta > 104857600 { | ||
eprintln!(" * ALLOC {} -> {}", last_updated, current); | ||
self.last_updated.store(current, SeqCst); | ||
} | ||
} | ||
} | ||
|
||
unsafe impl<T: GlobalAlloc> GlobalAlloc for DebugAlloc<T> { | ||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 { | ||
self.current.fetch_add(layout.size(), SeqCst); | ||
self.update(); | ||
self.inner.alloc(layout) | ||
} | ||
|
||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { | ||
self.current.fetch_sub(layout.size(), SeqCst); | ||
self.update(); | ||
self.inner.dealloc(ptr, layout) | ||
} | ||
|
||
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { | ||
self.current.fetch_add(layout.size(), SeqCst); | ||
self.update(); | ||
self.inner.alloc_zeroed(layout) | ||
} | ||
|
||
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { | ||
self.current.fetch_add(new_size - layout.size(), SeqCst); | ||
self.update(); | ||
self.inner.realloc(ptr, layout, new_size) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.