Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jan 9, 2025
1 parent 323b88c commit a57d59e
Show file tree
Hide file tree
Showing 20 changed files with 483 additions and 473 deletions.
7 changes: 2 additions & 5 deletions crates/polars-arrow/src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct StreamMetadata {
}

/// Reads the metadata of the stream
pub fn read_stream_metadata<R: Read>(reader: &mut R) -> PolarsResult<StreamMetadata> {
pub fn read_stream_metadata(reader: &mut dyn std::io::Read) -> PolarsResult<StreamMetadata> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
Expand All @@ -48,10 +48,7 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> PolarsResult<StreamMetad

let mut buffer = vec![];
buffer.try_reserve(length)?;
reader
.by_ref()
.take(length as u64)
.read_to_end(&mut buffer)?;
reader.take(length as u64).read_to_end(&mut buffer)?;

deserialize_stream_metadata(&buffer)
}
Expand Down
70 changes: 69 additions & 1 deletion crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::record_batch::RecordBatch;
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::utils::{_split_offsets, accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use crate::POOL;

impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame {
Expand Down Expand Up @@ -51,3 +51,71 @@ impl DataFrame {
}
}
}

/// Split DataFrame into chunks in preparation for writing. The chunks have a
/// maximum number of rows per chunk to ensure reasonable memory efficiency when
/// reading the resulting file, and a minimum size per chunk to ensure
/// reasonable performance when writing.
pub fn chunk_df_for_writing(
df: &mut DataFrame,
row_group_size: usize,
) -> PolarsResult<std::borrow::Cow<DataFrame>> {
// ensures all chunks are aligned.
df.align_chunks_par();

// Accumulate many small chunks to the row group size.
// See: #16403
if !df.get_columns().is_empty()
&& df.get_columns()[0]
.as_materialized_series()
.chunk_lengths()
.take(5)
.all(|len| len < row_group_size)
{
fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
new.as_single_chunk_par();
new_chunks.push(new);
}

let mut new_chunks = Vec::with_capacity(df.first_col_n_chunks()); // upper limit;
let mut scratch = vec![];
let mut remaining = row_group_size;

for df in df.split_chunks() {
remaining = remaining.saturating_sub(df.height());
scratch.push(df);

if remaining == 0 {
remaining = row_group_size;
finish(&mut scratch, &mut new_chunks);
}
}
if !scratch.is_empty() {
finish(&mut scratch, &mut new_chunks);
}
return Ok(std::borrow::Cow::Owned(
accumulate_dataframes_vertical_unchecked(new_chunks),
));
}

let n_splits = df.height() / row_group_size;
let result = if n_splits > 0 {
let mut splits = split_df_as_ref(df, n_splits, false);

for df in splits.iter_mut() {
// If the chunks are small enough, writing many small chunks
// leads to slow writing performance, so in that case we
// merge them.
let n_chunks = df.first_col_n_chunks();
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
df.as_single_chunk_par();
}
}

std::borrow::Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
} else {
std::borrow::Cow::Borrowed(df)
};
Ok(result)
}
13 changes: 13 additions & 0 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,19 @@ impl Column {
}
}

/// Returns whether the flags were set
pub fn set_flags(&mut self, flags: StatisticsFlags) -> bool {
match self {
Column::Series(s) => {
s.set_flags(flags);
true
},
// @partition-opt
Column::Partitioned(_) => false,
Column::Scalar(_) => false,
}
}

pub fn vec_hash(&self, build_hasher: PlRandomState, buf: &mut Vec<u64>) -> PolarsResult<()> {
// @scalar-opt?
self.as_materialized_series().vec_hash(build_hasher, buf)
Expand Down
38 changes: 1 addition & 37 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{HEAD_DEFAULT_LENGTH, TAIL_DEFAULT_LENGTH};
#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
mod chunks;
pub use chunks::chunk_df_for_writing;
pub mod column;
pub mod explode;
mod from;
Expand Down Expand Up @@ -3578,41 +3579,4 @@ mod test {
assert_eq!(df.get_column_names(), &["a", "b", "c"]);
Ok(())
}

#[cfg(feature = "serde")]
#[test]
fn test_deserialize_height_validation_8751() {
// Construct an invalid directly from the inner fields as the `new_unchecked_*` functions
// have debug assertions

use polars_utils::pl_serialize;

let df = DataFrame {
height: 2,
columns: vec![
Int64Chunked::full("a".into(), 1, 2).into_column(),
Int64Chunked::full("b".into(), 1, 1).into_column(),
],
cached_schema: OnceLock::new(),
};

// We rely on the fact that the serialization doesn't check the heights of all columns
let serialized = serde_json::to_string(&df).unwrap();
let err = serde_json::from_str::<DataFrame>(&serialized).unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));

let serialized = pl_serialize::SerializeOptions::default()
.serialize_to_bytes(&df)
.unwrap();
let err = pl_serialize::SerializeOptions::default()
.deserialize_from_reader::<DataFrame, _>(serialized.as_slice())
.unwrap_err();

assert!(err.to_string().contains(
"successful parse invalid data: lengths don't match: could not create a new DataFrame:",
));
}
}
188 changes: 157 additions & 31 deletions crates/polars-core/src/serde/df.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,144 @@
use polars_error::PolarsError;
use std::sync::Arc;

use arrow::datatypes::Metadata;
use arrow::io::ipc::read::{read_stream_metadata, StreamReader, StreamState};
use arrow::io::ipc::write::WriteOptions;
use polars_error::{polars_err, to_compute_err, PolarsResult};
use polars_utils::format_pl_smallstr;
use polars_utils::pl_serialize::deserialize_map_bytes;
use polars_utils::pl_str::PlSmallStr;
use serde::de::Error;
use serde::*;

use crate::prelude::{Column, DataFrame};

// utility to ensure we serde to a struct
// {
// columns: Vec<Series>
// }
// that ensures it differentiates between Vec<Series>
// and is backwards compatible
#[derive(Deserialize)]
struct Util {
columns: Vec<Column>,
}
use crate::chunked_array::flags::StatisticsFlags;
use crate::config;
use crate::frame::chunk_df_for_writing;
use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
use crate::utils::accumulate_dataframes_vertical_unchecked;

#[derive(Serialize)]
struct UtilBorrowed<'a> {
columns: &'a [Column],
}
const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");

impl<'de> Deserialize<'de> for DataFrame {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let parsed = <Util>::deserialize(deserializer)?;
DataFrame::new(parsed.columns).map_err(|e| {
let e = PolarsError::ComputeError(format!("successful parse invalid data: {e}").into());
D::Error::custom::<PolarsError>(e)
})
impl DataFrame {
pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
let schema = self.schema();

if schema.iter_values().any(|x| x.is_object()) {
return Err(polars_err!(
ComputeError:
"serializing data of type Object is not supported",
));
}

let mut ipc_writer =
arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });

ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
self.get_columns().iter().map(|c| {
(
format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
PlSmallStr::from(c.get_flags().bits().to_string()),
)
}),
)));

ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
FLAGS_KEY,
serde_json::to_string(
&self
.iter()
.map(|s| s.get_flags().bits())
.collect::<Vec<u32>>(),
)
.map_err(to_compute_err)?
.into(),
)])));

ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;

for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
{
ipc_writer.write(&batch, None)?;
}

ipc_writer.finish()?;

Ok(())
}

pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
let mut buf = vec![];
self.serialize_into_writer(&mut buf)?;

Ok(buf)
}

pub fn deserialize_from_reader(reader: &mut dyn std::io::Read) -> PolarsResult<Self> {
let mut md = read_stream_metadata(reader)?;
let arrow_schema = md.schema.clone();

let custom_metadata = md.custom_schema_metadata.take();

let reader = StreamReader::new(reader, md, None);
let dfs = reader
.into_iter()
.map_while(|batch| match batch {
Ok(StreamState::Some(batch)) => Some(DataFrame::try_from((batch, &arrow_schema))),
Ok(StreamState::Waiting) => None,
Err(e) => Some(Err(e)),
})
.collect::<PolarsResult<Vec<DataFrame>>>()?;

let mut df = accumulate_dataframes_vertical_unchecked(dfs);

// Set custom metadata (fallible)
(|| {
let custom_metadata = custom_metadata?;
let flags = custom_metadata.get(&FLAGS_KEY)?;

let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);

let verbose = config::verbose();

if let Err(e) = &flags {
if verbose {
eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {}", e);
}
}

let flags = flags.ok()?;

if flags.len() != df.width() {
if verbose {
eprintln!(
"DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
flags.len(),
df.width()
);
}

return None;
}

let mut n_set = 0;

for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
if let Some(flags) = StatisticsFlags::from_bits(v) {
n_set += c.set_flags(flags) as usize;
}
}

if verbose {
eprintln!(
"DataFrame::read_ipc: Loaded metadata for {} / {} columns",
n_set,
df.width()
);
}

Some(())
})();

Ok(df)
}
}

Expand All @@ -38,9 +147,26 @@ impl Serialize for DataFrame {
where
S: Serializer,
{
UtilBorrowed {
columns: &self.columns,
}
.serialize(serializer)
use serde::ser::Error;

let mut bytes = vec![];
self.clone()
.serialize_into_writer(&mut bytes)
.map_err(S::Error::custom)?;

serializer.serialize_bytes(bytes.as_slice())
}
}

impl<'de> Deserialize<'de> for DataFrame {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserialize_map_bytes(deserializer, &mut |b| {
let v = &mut b.as_ref();
Self::deserialize_from_reader(v)
})?
.map_err(D::Error::custom)
}
}
Loading

0 comments on commit a57d59e

Please sign in to comment.