Skip to content

Commit

Permalink
perf: Move to zlib-rs by default and use zstd::with_buffer (#20614)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jan 8, 2025
1 parent 94847c3 commit 5c46d87
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 49 deletions.
26 changes: 11 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ check-wasm: ## Check wasm build without supported features
--exclude-features azure \
--exclude-features cloud \
--exclude-features decompress \
--exclude-features decompress-fast \
--exclude-features default \
--exclude-features docs-selection \
--exclude-features extract_jsonpath \
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<(
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {
use std::io::Read;
let mut decoder = zstd::Decoder::new(input_buf)?;
let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
}

Expand Down
3 changes: 1 addition & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ ipc_streaming = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
avro = ["arrow/io_avro", "arrow/io_avro_compression"]
csv = ["atoi_simd", "polars-core/rows", "itoa", "ryu", "fast-float2", "simdutf8"]
decompress = ["flate2/rust_backend", "zstd"]
decompress-fast = ["flate2/zlib-ng", "zstd"]
decompress = ["flate2/zlib-rs", "zstd"]
dtype-u8 = ["polars-core/dtype-u8"]
dtype-u16 = ["polars-core/dtype-u16"]
dtype-i8 = ["polars-core/dtype-i8"]
Expand Down
12 changes: 5 additions & 7 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ use super::parser::{
};
use super::reader::prepare_csv_schema;
use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
use super::utils::decompress;
use super::CsvParseOptions;
use crate::csv::read::parser::skip_this_line_naive;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
use crate::utils::compression::SupportedCompression;
use crate::utils::update_row_counts2;
use crate::RowIndex;
Expand Down Expand Up @@ -161,20 +160,19 @@ impl<'a> CoreReader<'a> {
let separator = parse_options.separator;

check_decimal_comma(parse_options.decimal_comma, separator)?;
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
let mut reader_bytes = reader_bytes;

#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
if SupportedCompression::check(&reader_bytes).is_some() {
if !cfg!(feature = "decompress") && SupportedCompression::check(&reader_bytes).is_some() {
polars_bail!(
ComputeError: "cannot read compressed CSV file; \
compile with feature 'decompress' or 'decompress-fast'"
compile with feature 'decompress'"
);
}
// We keep track of the inferred schema bool
// In case the file is compressed this schema inference is wrong and has to be done
// again after decompression.
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
{
let total_n_rows =
n_rows.map(|n| skip_rows + (has_header as usize) + skip_rows_after_header + n);
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
use std::io::Read;
use std::mem::MaybeUninit;

use super::parser::next_line_position;
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
use super::parser::next_line_position_naive;
use super::splitfields::SplitFields;

Expand Down Expand Up @@ -45,7 +45,7 @@ pub(crate) fn get_file_chunks(
offsets
}

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
fn decompress_impl<R: Read>(
decoder: &mut R,
n_rows: Option<usize>,
Expand Down Expand Up @@ -121,7 +121,7 @@ fn decompress_impl<R: Read>(
})
}

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
#[cfg(feature = "decompress")]
pub(crate) fn decompress(
bytes: &[u8],
n_rows: Option<usize>,
Expand All @@ -142,7 +142,7 @@ pub(crate) fn decompress(
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
},
SupportedCompression::ZSTD => {
let mut decoder = zstd::Decoder::new(bytes).ok()?;
let mut decoder = zstd::Decoder::with_buffer(bytes).ok()?;
decompress_impl(&mut decoder, n_rows, separator, quote_char, eol_char)
},
}
Expand Down
13 changes: 4 additions & 9 deletions crates/polars-io/src/utils/compression.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::Read;

use polars_core::prelude::*;
use polars_error::to_compute_err;
use polars_error::{feature_gated, to_compute_err};

/// Represents the compression algorithms that we have decoders for
pub enum SupportedCompression {
Expand Down Expand Up @@ -36,8 +36,7 @@ pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> Pola
assert!(out.is_empty());

if let Some(algo) = SupportedCompression::check(bytes) {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
{
feature_gated!("decompress", {
match algo {
SupportedCompression::GZIP => {
flate2::read::MultiGzDecoder::new(bytes)
Expand All @@ -50,16 +49,12 @@ pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> Pola
.map_err(to_compute_err)?;
},
SupportedCompression::ZSTD => {
zstd::Decoder::new(bytes)?.read_to_end(out)?;
zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
},
}

Ok(out)
}
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
{
panic!("cannot decompress without 'decompress' or 'decompress-fast' feature")
}
})
} else {
Ok(bytes)
}
Expand Down
11 changes: 5 additions & 6 deletions crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ lz4 = { version = "1.24", optional = true }
lz4_flex = { version = "0.11", optional = true }
serde = { workspace = true, optional = true }
snap = { version = "^1.1", optional = true }
zstd = { version = "^0.13", optional = true, default-features = false }
zstd = { workspace = true, optional = true }

xxhash-rust = { version = "0.8", optional = true, features = ["xxh64"] }

Expand All @@ -47,17 +47,16 @@ rand = "0.8"

[features]
compression = [
"zstd",
"brotli",
"gzip",
"snappy",
"lz4",
"brotli",
"snappy",
"zstd",
]

# compression backends
snappy = ["snap"]
gzip = ["flate2/rust_backend"]
gzip_zlib_ng = ["flate2/zlib-ng"]
gzip = ["flate2/zlib-rs"]
lz4 = ["dep:lz4"]
lz4_flex = ["dep:lz4_flex"]

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub fn decompress(
#[cfg(feature = "zstd")]
Compression::Zstd => {
use std::io::Read;
let mut decoder = zstd::Decoder::new(input_buf)?;
let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
},
#[cfg(not(feature = "zstd"))]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ streaming = ["polars/streaming"]
meta = ["polars/meta"]
index_of = ["polars/index_of"]
search_sorted = ["polars/search_sorted"]
decompress = ["polars/decompress-fast"]
decompress = ["polars/decompress"]
regex = ["polars/regex"]
csv = ["polars/csv"]
clipboard = ["arboard"]
Expand Down
1 change: 0 additions & 1 deletion crates/polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ month_start = ["polars-lazy?/month_start"]
month_end = ["polars-lazy?/month_end"]
offset_by = ["polars-lazy?/offset_by"]
decompress = ["polars-io/decompress"]
decompress-fast = ["polars-io/decompress-fast"]
describe = ["polars-core/describe"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy?/diagonal_concat", "polars-sql?/diagonal_concat"]
diff = ["polars-ops/diff", "polars-lazy?/diff"]
Expand Down

0 comments on commit 5c46d87

Please sign in to comment.