diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index a56d36086df..b89c11c36a9 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -339,11 +339,37 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - ( - Binary::::with_capacity(capacity), - MutableBitmap::with_capacity(capacity), - ) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::Optional(_, values) => ( + // this is an upper bound - we may not consume the whole page. + // we do not know how many nulls are there, so we do not know how many + // valid items are there to discount over the sequence. + Binary::::with_capacity(capacity, values.num_bytes()), + MutableBitmap::with_capacity(capacity), + ), + State::Required(values) => ( + // this is an upper bound - we may not consume the whole page. + Binary::::with_capacity(capacity, values.values.num_bytes()), + MutableBitmap::new(), + ), + State::FilteredRequiredDictionary(_) + | State::FilteredRequired(_) + | State::RequiredDictionary(_) + | State::Delta(_) + | State::FilteredDelta(_) => ( + Binary::::with_capacity(capacity, 0), + MutableBitmap::new(), + ), + State::FilteredOptionalDictionary(_, _) + | State::FilteredOptional(_, _) + | State::OptionalDictionary(_, _) + | State::OptionalDelta(_, _) + | State::FilteredOptionalDelta(_, _) => ( + Binary::::with_capacity(capacity, 0), + MutableBitmap::with_capacity(capacity), + ), + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index 6f883528ef8..96344265899 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -59,7 +59,7 @@ fn read_dict(data_type: DataType, dict: &DictPage) -> Box let values = SizedBinaryIter::new(&dict.buffer, dict.num_values); - let mut data = Binary::::with_capacity(dict.num_values); + let mut data = Binary::::with_capacity(dict.num_values, 0); data.values = Vec::with_capacity(dict.buffer.len() - 4 * dict.num_values); for item in values { data.push(item) diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 2d345140db7..1f31eeac37b 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -85,7 +85,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( - Binary::::with_capacity(capacity), + Binary::::with_capacity(capacity, 0), MutableBitmap::with_capacity(capacity), ) } diff --git a/src/io/parquet/read/deserialize/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs index ec514766fa2..37e6e695139 100644 --- a/src/io/parquet/read/deserialize/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -36,10 +36,10 @@ impl Pushable for Offsets { impl Binary { #[inline] - pub fn with_capacity(capacity: usize) -> Self { + pub fn with_capacity(capacity: usize, values_capacity: usize) -> Self { Self { offsets: Offsets::with_capacity(capacity), - values: Vec::with_capacity(capacity.min(100) * 24), + values: Vec::with_capacity(values_capacity), } } @@ -86,6 +86,7 @@ impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { self.values.reserve(additional * avg_len); self.offsets.reserve(additional); } + #[inline] fn len(&self) -> usize { self.len() @@ -117,6 +118,10 @@ impl<'a> BinaryIter<'a> { pub fn new(values: &'a [u8]) -> Self { Self { values } } + + pub fn num_bytes(&self) -> usize { + self.values.len() + } } impl<'a> Iterator for BinaryIter<'a> { @@ -149,6 +154,11 @@ impl<'a> SizedBinaryIter<'a> { remaining: size, } } + + pub fn num_bytes(&self) -> usize { + // iter is formed as [length as i32][bytes] + self.iter.num_bytes() - 4 * self.remaining + } } impl<'a> Iterator for SizedBinaryIter<'a> { @@ -164,6 +174,7 @@ impl<'a> Iterator for SizedBinaryIter<'a> { self.iter.next() } + #[inline] fn size_hint(&self) -> (usize, Option) { (self.remaining, Some(self.remaining)) } diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index d12bff3eced..2f891db999f 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -136,11 +136,16 @@ impl<'a> Decoder<'a> for BooleanDecoder { } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - ( - MutableBitmap::with_capacity(capacity), - MutableBitmap::with_capacity(capacity), - ) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::Optional(_, _) | State::FilteredOptional(_, _) => ( + MutableBitmap::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ), + State::Required(_) | State::FilteredRequired(_) => { + (MutableBitmap::with_capacity(capacity), MutableBitmap::new()) + } + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index 43fcaa8ab21..aaf7c010a62 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -142,11 +142,16 @@ where } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - ( - Vec::::with_capacity(capacity), - MutableBitmap::with_capacity(capacity), - ) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::Optional(_) | State::FilteredOptional(_, _) => ( + Vec::::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ), + State::Required(_) | State::FilteredRequired(_) => { + (Vec::::with_capacity(capacity), MutableBitmap::new()) + } + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index ab47aa98cf3..49fa5f9d446 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -194,11 +194,17 @@ impl<'a> Decoder<'a> for BinaryDecoder { } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - ( - FixedSizeBinary::with_capacity(capacity, self.size), - MutableBitmap::with_capacity(capacity), - ) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::FilteredOptional(_, _) | State::OptionalDictionary(_) | State::Optional(_) => ( + FixedSizeBinary::with_capacity(capacity, self.size), + MutableBitmap::with_capacity(capacity), + ), + State::FilteredRequired(_) | State::RequiredDictionary(_) | State::Required(_) => ( + FixedSizeBinary::with_capacity(capacity, self.size), + MutableBitmap::new(), + ), + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index cc41c3159d6..309bb82e004 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -194,11 +194,18 @@ where } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - ( - Vec::::with_capacity(capacity), - MutableBitmap::with_capacity(capacity), - ) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::Optional(_, _) + | State::OptionalDictionary(_, _) + | State::FilteredOptional(_, _) => ( + Vec::::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ), + State::Required(_) | State::RequiredDictionary(_) | State::FilteredRequired(_) => { + (Vec::::with_capacity(capacity), MutableBitmap::new()) + } + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/primitive/integer.rs b/src/io/parquet/read/deserialize/primitive/integer.rs index f55b0bec3da..1ab25e4a86c 100644 --- a/src/io/parquet/read/deserialize/primitive/integer.rs +++ b/src/io/parquet/read/deserialize/primitive/integer.rs @@ -127,8 +127,18 @@ where } } - fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - self.0.with_capacity(capacity) + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState { + match page { + State::Common(page) => self.0.with_capacity(capacity, page), + State::DeltaBinaryPackedRequired(_) | State::FilteredDeltaBinaryPackedRequired(_) => { + (Vec::::with_capacity(capacity), MutableBitmap::new()) + } + State::DeltaBinaryPackedOptional(_, _) + | State::FilteredDeltaBinaryPackedOptional(_, _) => ( + Vec::::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ), + } } fn extend_from_state( diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 71bce24c586..6c5c8d2586b 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -387,7 +387,7 @@ pub(super) trait Decoder<'a> { ) -> Result; /// Initializes a new [`Self::DecodedState`]. - fn with_capacity(&self, capacity: usize) -> Self::DecodedState; + fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState; /// extends [`Self::DecodedState`] by deserializing items in [`Self::State`]. /// It guarantees that the length of `decoded` is at most `decoded.len() + remaining`. @@ -416,7 +416,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( decoded } else { // there is no state => initialize it - decoder.with_capacity(capacity) + decoder.with_capacity(capacity, &page) }; let existing = decoded.len(); @@ -429,7 +429,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( while page.len() > 0 && *remaining > 0 { let additional = chunk_size.min(*remaining); - let mut decoded = decoder.with_capacity(additional); + let mut decoded = decoder.with_capacity(additional, &page); decoder.extend_from_state(&mut page, &mut decoded, additional); *remaining -= decoded.len(); items.push_back(decoded)