Skip to content

Commit

Permalink
fix: Properly load nested Parquet Statistics (#20610)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jan 8, 2025
1 parent bac5f44 commit 7ddbf3a
Show file tree
Hide file tree
Showing 17 changed files with 434 additions and 1,695 deletions.
5 changes: 5 additions & 0 deletions crates/polars-arrow/src/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ impl<O: Offset> OffsetsBuffer<O> {
Self(vec![O::zero()].into())
}

#[inline]
pub fn one_with_length(length: O) -> Self {
Self(vec![O::zero(), length].into())
}

/// Copy-on-write API to convert [`OffsetsBuffer`] into [`Offsets`].
#[inline]
pub fn into_mut(self) -> either::Either<Self, Offsets<O>> {
Expand Down
45 changes: 28 additions & 17 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ use polars_parquet::read::RowGroupMetadata;

use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};

impl ColumnStats {
fn from_arrow_stats(stats: Statistics, field: &ArrowField) -> Self {
Self::new(
field.into(),
Some(Series::try_from((PlSmallStr::EMPTY, stats.null_count)).unwrap()),
Some(Series::try_from((PlSmallStr::EMPTY, stats.min_value)).unwrap()),
Some(Series::try_from((PlSmallStr::EMPTY, stats.max_value)).unwrap()),
)
}
}

/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
md: &RowGroupMetadata,
Expand All @@ -25,13 +14,35 @@ pub(crate) fn collect_statistics(
let stats = schema
.iter_values()
.map(|field| {
let iter = md.columns_under_root_iter(&field.name).unwrap();
let mut iter = md.columns_under_root_iter(&field.name).unwrap();

Ok(if iter.len() == 0 {
ColumnStats::new(field.into(), None, None, None)
} else {
ColumnStats::from_arrow_stats(deserialize(field, iter)?, field)
})
let statistics = deserialize(field, &mut iter)?;
assert!(iter.next().is_none());

// We don't support reading nested statistics for now. It does not really make any
// sense at the moment with how we structure statistics.
let Some(Statistics::Column(stats)) = statistics else {
return Ok(ColumnStats::new(field.into(), None, None, None));
};

let stats = stats.into_arrow()?;

let null_count = stats
.null_count
.map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY));
let min_value = stats
.min_value
.map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
let max_value = stats
.max_value
.map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());

Ok(ColumnStats::new(
field.into(),
null_count,
min_value,
max_value,
))
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down
4 changes: 4 additions & 0 deletions crates/polars-parquet/src/arrow/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
Ok(_read_metadata_async(reader).await?)
}

fn convert_year_month(value: &[u8]) -> i32 {
i32::from_le_bytes(value[..4].try_into().unwrap())
}

fn convert_days_ms(value: &[u8]) -> arrow::types::days_ms {
arrow::types::days_ms(
i32::from_le_bytes(value[4..8].try_into().unwrap()),
Expand Down
Loading

0 comments on commit 7ddbf3a

Please sign in to comment.