From 64f4b5ec91f23c8a2517c28839731c5c901cc4d0 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Sun, 22 Dec 2024 23:30:24 +0800 Subject: [PATCH] feat(spill): Align with the multi IO compression codec in spill (#657) --- native-engine/blaze-jni-bridge/src/conf.rs | 1 + .../datafusion-ext-plans/src/agg/agg_table.rs | 3 +- .../src/common/ipc_compression.rs | 18 ++++++---- .../datafusion-ext-plans/src/memmgr/spill.rs | 36 ++++++++++++++----- .../datafusion-ext-plans/src/sort_exec.rs | 2 ++ .../org/apache/spark/sql/blaze/BlazeConf.java | 5 ++- 6 files changed, 47 insertions(+), 18 deletions(-) diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index 6198a58c..440b3ce2 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs @@ -41,6 +41,7 @@ define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS); define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING); define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER); define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC); +define_conf!(StringConf, SPILL_COMPRESSION_CODEC); pub trait BooleanConf { fn key(&self) -> &'static str; diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index e89ab395..5b07c9b1 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -508,10 +508,10 @@ impl HashingData { // next bucket begin = end; } - // EOF write_len(NUM_SPILL_BUCKETS, &mut writer)?; write_len(0, &mut writer)?; + writer.flush()?; Ok(()) } } @@ -621,6 +621,7 @@ impl MergingData { // EOF write_len(NUM_SPILL_BUCKETS, &mut writer)?; write_len(0, &mut writer)?; + writer.flush()?; Ok(()) } } diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs index 8f7baeeb..784a9f54 100644 --- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs +++ b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs @@ -168,17 +168,17 @@ impl IpcCompressionReader { } } -enum IoCompressionWriter { +pub enum IoCompressionWriter { LZ4(lz4_flex::frame::FrameEncoder), ZSTD(zstd::Encoder<'static, W>), } impl IoCompressionWriter { - fn new_with_configured_codec(inner: W) -> Self { + pub fn new_with_configured_codec(inner: W) -> Self { Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder") } - fn try_new(codec: &str, inner: W) -> Result { + pub fn try_new(codec: &str, inner: W) -> Result { match codec { "lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameEncoder::new(inner))), "zstd" => Ok(Self::ZSTD(zstd::Encoder::new(inner, ZSTD_LEVEL)?)), @@ -186,7 +186,7 @@ impl IoCompressionWriter { } } - fn finish(&mut self) -> Result<()> { + pub fn finish(&mut self) -> Result<()> { match self { IoCompressionWriter::LZ4(w) => { w.try_finish() @@ -216,13 +216,17 @@ impl Write for IoCompressionWriter { } } -enum IoCompressionReader { +pub enum IoCompressionReader { LZ4(lz4_flex::frame::FrameDecoder), ZSTD(zstd::Decoder<'static, BufReader>), } impl IoCompressionReader { - fn try_new(codec: &str, inner: R) -> Result { + pub fn new_with_configured_codec(inner: R) -> Self { + Self::try_new(io_compression_codec(), inner).expect("error creating compression encoder") + } + + pub fn try_new(codec: &str, inner: R) -> Result { match codec { "lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameDecoder::new(inner))), "zstd" => Ok(Self::ZSTD(zstd::Decoder::new(inner)?)), @@ -230,7 +234,7 @@ impl IoCompressionReader { } } - fn finish_into_inner(self) -> Result { + pub fn finish_into_inner(self) -> Result { match self { Self::LZ4(r) => Ok(r.into_inner()), Self::ZSTD(r) => Ok(r.finish().into_inner()), diff --git a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs b/native-engine/datafusion-ext-plans/src/memmgr/spill.rs index 1657d60c..7c377529 100644 --- a/native-engine/datafusion-ext-plans/src/memmgr/spill.rs +++ b/native-engine/datafusion-ext-plans/src/memmgr/spill.rs @@ -22,19 +22,21 @@ use std::{ }; use blaze_jni_bridge::{ - is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static, jni_get_string, - jni_new_direct_byte_buffer, jni_new_global_ref, + conf, conf::StringConf, is_jni_bridge_inited, jni_bridge::LocalRef, jni_call, jni_call_static, + jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref, }; use datafusion::{common::Result, parquet::file::reader::Length, physical_plan::metrics::Time}; use jni::{objects::GlobalRef, sys::jlong}; use log::warn; +use once_cell::sync::OnceCell; -use crate::memmgr::metrics::SpillMetrics; +use crate::{ + common::ipc_compression::{IoCompressionReader, IoCompressionWriter}, + memmgr::metrics::SpillMetrics, +}; -pub type SpillCompressedReader<'a> = - lz4_flex::frame::FrameDecoder>>; -pub type SpillCompressedWriter<'a> = - lz4_flex::frame::AutoFinishEncoder>>; +pub type SpillCompressedReader<'a> = IoCompressionReader>>; +pub type SpillCompressedWriter<'a> = IoCompressionWriter>>; pub trait Spill: Send + Sync { fn as_any(&self) -> &dyn Any; @@ -43,11 +45,13 @@ pub trait Spill: Send + Sync { fn get_buf_writer<'a>(&'a mut self) -> BufWriter>; fn get_compressed_reader(&self) -> SpillCompressedReader<'_> { - lz4_flex::frame::FrameDecoder::new(self.get_buf_reader()) + IoCompressionReader::try_new(spill_compression_codec(), self.get_buf_reader()) + .expect("error creating compression reader") } fn get_compressed_writer(&mut self) -> SpillCompressedWriter<'_> { - lz4_flex::frame::FrameEncoder::new(self.get_buf_writer()).auto_finish() + IoCompressionWriter::try_new(spill_compression_codec(), self.get_buf_writer()) + .expect("error creating compression writer") } } @@ -69,6 +73,20 @@ impl Spill for Vec { } } +fn spill_compression_codec() -> &'static str { + static CODEC: OnceCell = OnceCell::new(); + CODEC + .get_or_try_init(|| { + if is_jni_bridge_inited() { + conf::SPILL_COMPRESSION_CODEC.value() + } else { + Ok(format!("lz4")) // for testing + } + }) + .expect("error reading spark.blaze.spill.compression.codec") + .as_str() +} + pub fn try_new_spill(spill_metrics: &SpillMetrics) -> Result> { if !is_jni_bridge_inited() || jni_call_static!(JniBridge.isDriverSide() -> bool)? { Ok(Box::new(FileSpill::try_new(spill_metrics)?)) diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs b/native-engine/datafusion-ext-plans/src/sort_exec.rs index acb67d97..f24759b4 100644 --- a/native-engine/datafusion-ext-plans/src/sort_exec.rs +++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs @@ -277,6 +277,7 @@ impl BufferedData { write_one_batch(batch.num_rows(), batch.columns(), &mut writer)?; writer.write_all(&key_collector.store)?; } + writer.flush()?; Ok(()) } @@ -946,6 +947,7 @@ fn merge_spills( )?; output_writer.write_all(&key_collector.store)?; } + output_writer.flush()?; drop(output_writer); Ok(output_spill) } diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index db747dd0..a78f60bc 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -61,7 +61,10 @@ public enum BlazeConf { SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"), // replace all sort-merge join to shuffled-hash join, only used for benchmarking - FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false); + FORCE_SHUFFLED_HASH_JOIN("spark.blaze.forceShuffledHashJoin", false), + + // spark spill compression codec + SPILL_COMPRESSION_CODEC("spark.blaze.spill.compression.codec", "lz4"); public final String key; private final Object defaultValue;