Skip to content

Commit

Permalink
feat(spill): Align with the multi IO compression codec in spill (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston authored Dec 22, 2024
1 parent e254e9b commit 64f4b5e
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 18 deletions.
1 change: 1 addition & 0 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS);
define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING);
define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);

pub trait BooleanConf {
fn key(&self) -> &'static str;
Expand Down
3 changes: 2 additions & 1 deletion native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,10 @@ impl HashingData {
// next bucket
begin = end;
}

// EOF
write_len(NUM_SPILL_BUCKETS, &mut writer)?;
write_len(0, &mut writer)?;
writer.flush()?;
Ok(())
}
}
Expand Down Expand Up @@ -621,6 +621,7 @@ impl MergingData {
// EOF
write_len(NUM_SPILL_BUCKETS, &mut writer)?;
write_len(0, &mut writer)?;
writer.flush()?;
Ok(())
}
}
Expand Down
18 changes: 11 additions & 7 deletions native-engine/datafusion-ext-plans/src/common/ipc_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,25 @@ impl<R: Read> IpcCompressionReader<R> {
}
}

enum IoCompressionWriter<W: Write> {
pub enum IoCompressionWriter<W: Write> {
LZ4(lz4_flex::frame::FrameEncoder<W>),
ZSTD(zstd::Encoder<'static, W>),
}

impl<W: Write> IoCompressionWriter<W> {
fn new_with_configured_codec(inner: W) -> Self {
pub fn new_with_configured_codec(inner: W) -> Self {
Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder")
}

fn try_new(codec: &str, inner: W) -> Result<Self> {
pub fn try_new(codec: &str, inner: W) -> Result<Self> {
match codec {
"lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameEncoder::new(inner))),
"zstd" => Ok(Self::ZSTD(zstd::Encoder::new(inner, ZSTD_LEVEL)?)),
_ => df_execution_err!("unsupported codec: {}", codec),
}
}

fn finish(&mut self) -> Result<()> {
pub fn finish(&mut self) -> Result<()> {
match self {
IoCompressionWriter::LZ4(w) => {
w.try_finish()
Expand Down Expand Up @@ -216,21 +216,25 @@ impl<W: Write> Write for IoCompressionWriter<W> {
}
}

enum IoCompressionReader<R: Read> {
pub enum IoCompressionReader<R: Read> {
LZ4(lz4_flex::frame::FrameDecoder<R>),
ZSTD(zstd::Decoder<'static, BufReader<R>>),
}

impl<R: Read> IoCompressionReader<R> {
fn try_new(codec: &str, inner: R) -> Result<Self> {
pub fn new_with_configured_codec(inner: R) -> Self {
Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder")
}

pub fn try_new(codec: &str, inner: R) -> Result<Self> {
match codec {
"lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameDecoder::new(inner))),
"zstd" => Ok(Self::ZSTD(zstd::Decoder::new(inner)?)),
_ => df_execution_err!("unsupported codec: {}", codec),
}
}

fn finish_into_inner(self) -> Result<R> {
pub fn finish_into_inner(self) -> Result<R> {
match self {
Self::LZ4(r) => Ok(r.into_inner()),
Self::ZSTD(r) => Ok(r.finish().into_inner()),
Expand Down
36 changes: 27 additions & 9 deletions native-engine/datafusion-ext-plans/src/memmgr/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ use std::{
};

use blaze_jni_bridge::{
is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static, jni_get_string,
jni_new_direct_byte_buffer, jni_new_global_ref,
conf, conf::StringConf, is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static,
jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref,
};
use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time};
use jni::{objects::GlobalRef, sys::jlong};
use log::warn;
use once_cell::sync::OnceCell;

use crate::memmgr::metrics::SpillMetrics;
use crate::{
common::ipc_compression::{IoCompressionReader, IoCompressionWriter},
memmgr::metrics::SpillMetrics,
};

pub type SpillCompressedReader<'a> =
lz4_flex::frame::FrameDecoder<BufReader<Box<dyn Read + Send + 'a>>>;
pub type SpillCompressedWriter<'a> =
lz4_flex::frame::AutoFinishEncoder<BufWriter<Box<dyn Write + Send + 'a>>>;
pub type SpillCompressedReader<'a> = IoCompressionReader<BufReader<Box<dyn Read + Send + 'a>>>;
pub type SpillCompressedWriter<'a> = IoCompressionWriter<BufWriter<Box<dyn Write + Send + 'a>>>;

pub trait Spill: Send + Sync {
fn as_any(&self) -> &dyn Any;
Expand All @@ -43,11 +45,13 @@ pub trait Spill: Send + Sync {
fn get_buf_writer<'a>(&'a mut self) -> BufWriter<Box<dyn Write + Send + 'a>>;

fn get_compressed_reader(&self) -> SpillCompressedReader<'_> {
lz4_flex::frame::FrameDecoder::new(self.get_buf_reader())
IoCompressionReader::try_new(spill_compression_codec(), self.get_buf_reader())
.expect("error creating compression reader")
}

fn get_compressed_writer(&mut self) -> SpillCompressedWriter<'_> {
lz4_flex::frame::FrameEncoder::new(self.get_buf_writer()).auto_finish()
IoCompressionWriter::try_new(spill_compression_codec(), self.get_buf_writer())
.expect("error creating compression writer")
}
}

Expand All @@ -69,6 +73,20 @@ impl Spill for Vec<u8> {
}
}

fn spill_compression_codec() -> &'static str {
static CODEC: OnceCell<String> = OnceCell::new();
CODEC
.get_or_try_init(|| {
if is_jni_bridge_inited() {
conf::SPILL_COMPRESSION_CODEC.value()
} else {
Ok(format!("lz4")) // for testing
}
})
.expect("error reading spark.blaze.spill.compression.codec")
.as_str()
}

pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result<Box<dyn Spill>> {
if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? {
Ok(Box::new(FileSpill::try_new(spill_metrics)?))
Expand Down
2 changes: 2 additions & 0 deletions native-engine/datafusion-ext-plans/src/sort_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl BufferedData {
write_one_batch(batch.num_rows(), batch.columns(), &mut writer)?;
writer.write_all(&key_collector.store)?;
}
writer.flush()?;
Ok(())
}

Expand Down Expand Up @@ -946,6 +947,7 @@ fn merge_spills(
)?;
output_writer.write_all(&key_collector.store)?;
}
output_writer.flush()?;
drop(output_writer);
Ok(output_spill)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ public enum BlazeConf {
SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"),

// replace all sort-merge join to shuffled-hash join, only used for benchmarking
FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false);
FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false),

// spark spill compression codec
SPILL_COMPRESSION_CODEC("spark.blaze.spill.compression.codec", "lz4");

public final String key;
private final Object defaultValue;
Expand Down

0 comments on commit 64f4b5e

Please sign in to comment.