diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index fd2fad7cf74..0cf38427ef0 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -34,12 +34,12 @@ where } else { unreachable!("") } + let num_values = nested::num_values(&nested); let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, &nested, num_values, &mut buffer)?; - let array = array.slice(start, len); encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { @@ -50,7 +50,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + num_values, nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 0e6ce9f5718..2deef0b0f04 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -30,10 +30,11 @@ pub fn array_to_page( } else { unreachable!("") } + let num_values = nested::num_values(&nested); let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, &nested, num_values, &mut buffer)?; encode_plain(&array, is_optional, &mut buffer)?; @@ -45,7 +46,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, - nested::num_values(&nested), + num_values, nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 8cbe6c766e6..bece2863914 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -73,6 +73,7 @@ fn serialize_levels( length: usize, type_: &PrimitiveType, nested: &[Nested], + num_values: usize, options: WriteOptions, buffer: &mut Vec, ) -> Result<(usize, usize)> { @@ -82,7 +83,7 @@ fn serialize_levels( let definition_levels_byte_length = buffer.len(); Ok((0, definition_levels_byte_length)) } else { - nested::write_rep_and_def(options.version, nested, buffer) + nested::write_rep_and_def(options.version, nested, num_values, buffer) } } @@ -122,27 +123,24 @@ fn serialize_keys( unreachable!("") } + let num_values = nested::num_values(&nested); + let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( validity.as_ref(), array.len(), &type_, &nested, + num_values, options, &mut buffer, )?; serialize_keys_values(&array, validity.as_ref(), &mut buffer)?; - let (num_values, num_rows) = if nested.len() == 1 { - (array.len(), array.len()) - } else { - (nested::num_values(&nested), nested[0].len()) - }; - utils::build_plain_page( buffer, num_values, - num_rows, + array.len(), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/nested/def.rs b/src/io/parquet/write/nested/def.rs index 2b958cfcb9a..2b029290500 100644 --- a/src/io/parquet/write/nested/def.rs +++ b/src/io/parquet/write/nested/def.rs @@ -1,7 +1,6 @@ use crate::{bitmap::Bitmap, offset::Offset}; use super::super::pages::{ListNested, Nested}; -use super::rep::num_values; use super::to_length; trait DebugIter: Iterator + std::fmt::Debug {} @@ -87,9 +86,7 @@ pub struct DefLevelsIter<'a> { } impl<'a> DefLevelsIter<'a> { - pub fn new(nested: &'a [Nested]) -> Self { - let remaining_values = num_values(nested); - + pub fn new(nested: &'a [Nested], num_values: usize) -> Self { let iter = iter(nested); let remaining = vec![0; iter.len()]; let validity = vec![0; iter.len()]; @@ -100,7 +97,7 @@ impl<'a> DefLevelsIter<'a> { validity, total: 0, current_level: 0, - remaining_values, + remaining_values: num_values, } } } @@ -170,10 +167,11 @@ impl<'a> Iterator for DefLevelsIter<'a> { #[cfg(test)] mod tests { + use super::super::num_values; use super::*; fn test(nested: Vec, expected: Vec) { - let mut iter = DefLevelsIter::new(&nested); + let mut iter = DefLevelsIter::new(&nested, num_values(&nested)); assert_eq!(iter.size_hint().0, expected.len()); let result = iter.by_ref().collect::>(); assert_eq!(result, expected); diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs index 49951ec7fb1..5b91cb162c0 100644 --- a/src/io/parquet/write/nested/mod.rs +++ b/src/io/parquet/write/nested/mod.rs @@ -28,14 +28,19 @@ fn write_levels_v1) -> Result<()>>( } /// writes the rep levels to a `Vec`. -fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { +fn write_rep_levels( + buffer: &mut Vec, + nested: &[Nested], + num_values: usize, + version: Version, +) -> Result<()> { let max_level = max_rep_level(nested) as i16; if max_level == 0 { return Ok(()); } let num_bits = get_bit_width(max_level); - let levels = rep::RepLevelsIter::new(nested); + let levels = rep::RepLevelsIter::new(nested, num_values); match version { Version::V1 => { @@ -53,14 +58,19 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - } /// writes the rep levels to a `Vec`. -fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { +fn write_def_levels( + buffer: &mut Vec, + nested: &[Nested], + num_values: usize, + version: Version, +) -> Result<()> { let max_level = max_def_level(nested) as i16; if max_level == 0 { return Ok(()); } let num_bits = get_bit_width(max_level); - let levels = def::DefLevelsIter::new(nested); + let levels = def::DefLevelsIter::new(nested, num_values); match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { @@ -105,12 +115,13 @@ fn to_length( pub fn write_rep_and_def( page_version: Version, nested: &[Nested], + num_values: usize, buffer: &mut Vec, ) -> Result<(usize, usize)> { - write_rep_levels(buffer, nested, page_version)?; + write_rep_levels(buffer, nested, num_values, page_version)?; let repetition_levels_byte_length = buffer.len(); - write_def_levels(buffer, nested, page_version)?; + write_def_levels(buffer, nested, num_values, page_version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; Ok((repetition_levels_byte_length, definition_levels_byte_length)) diff --git a/src/io/parquet/write/nested/rep.rs b/src/io/parquet/write/nested/rep.rs index c8dc59e7fdc..a875fa36346 100644 --- a/src/io/parquet/write/nested/rep.rs +++ b/src/io/parquet/write/nested/rep.rs @@ -60,9 +60,7 @@ pub struct RepLevelsIter<'a> { } impl<'a> RepLevelsIter<'a> { - pub fn new(nested: &'a [Nested]) -> Self { - let remaining_values = num_values(nested); - + pub fn new(nested: &'a [Nested], num_values: usize) -> Self { let iter = iter(nested); let remaining = vec![0; iter.len()]; @@ -71,7 +69,7 @@ impl<'a> RepLevelsIter<'a> { remaining, total: 0, current_level: 0, - remaining_values, + remaining_values: num_values, } } } @@ -138,7 +136,7 @@ mod tests { use super::*; fn test(nested: Vec, expected: Vec) { - let mut iter = RepLevelsIter::new(&nested); + let mut iter = RepLevelsIter::new(&nested, num_values(&nested)); assert_eq!(iter.size_hint().0, expected.len()); assert_eq!(iter.by_ref().collect::>(), expected); assert_eq!(iter.size_hint().0, 0); diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 6c7bf59f5a2..341d0fa424e 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -42,8 +42,10 @@ where unreachable!("") } + let num_values = nested::num_values(&nested); + let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, &nested, num_values, &mut buffer)?; let buffer = encode_plain(&array, is_optional, buffer); @@ -58,7 +60,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + num_values, nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 1054de6e311..cab88332508 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -34,9 +34,11 @@ where unreachable!("") } + let num_values = nested::num_values(&nested); + let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, &nested, num_values, &mut buffer)?; encode_plain(&array, is_optional, &mut buffer); @@ -48,7 +50,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + num_values, nested[0].len(), array.null_count(), repetition_levels_byte_length,