Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Reduced re-alloc in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 18, 2022
1 parent efa630b commit e8e04f7
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 34 deletions.
36 changes: 31 additions & 5 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,37 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Binary::<O>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::Optional(_, values) => (
// this is an upper bound - we may not consume the whole page.
// we do not know how many nulls are there, so we do not know how many
// valid items are there to discount over the sequence.
Binary::<O>::with_capacity(capacity, values.num_bytes()),
MutableBitmap::with_capacity(capacity),
),
State::Required(values) => (
// this is an upper bound - we may not consume the whole page.
Binary::<O>::with_capacity(capacity, values.values.num_bytes()),
MutableBitmap::new(),
),
State::FilteredRequiredDictionary(_)
| State::FilteredRequired(_)
| State::RequiredDictionary(_)
| State::Delta(_)
| State::FilteredDelta(_) => (
Binary::<O>::with_capacity(capacity, 0),
MutableBitmap::new(),
),
State::FilteredOptionalDictionary(_, _)
| State::FilteredOptional(_, _)
| State::OptionalDictionary(_, _)
| State::OptionalDelta(_, _)
| State::FilteredOptionalDelta(_, _) => (
Binary::<O>::with_capacity(capacity, 0),
MutableBitmap::with_capacity(capacity),
),
}
}

fn extend_from_state(
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn read_dict<O: Offset>(data_type: DataType, dict: &DictPage) -> Box<dyn Array>

let values = SizedBinaryIter::new(&dict.buffer, dict.num_values);

let mut data = Binary::<O>::with_capacity(dict.num_values);
let mut data = Binary::<O>::with_capacity(dict.num_values, 0);
data.values = Vec::with_capacity(dict.buffer.len() - 4 * dict.num_values);
for item in values {
data.push(item)
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Binary::<O>::with_capacity(capacity),
Binary::<O>::with_capacity(capacity, 0),
MutableBitmap::with_capacity(capacity),
)
}
Expand Down
15 changes: 13 additions & 2 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ impl<O: Offset> Pushable<usize> for Offsets<O> {

impl<O: Offset> Binary<O> {
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
pub fn with_capacity(capacity: usize, values_capacity: usize) -> Self {
Self {
offsets: Offsets::with_capacity(capacity),
values: Vec::with_capacity(capacity.min(100) * 24),
values: Vec::with_capacity(values_capacity),
}
}

Expand Down Expand Up @@ -86,6 +86,7 @@ impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
self.values.reserve(additional * avg_len);
self.offsets.reserve(additional);
}

#[inline]
fn len(&self) -> usize {
self.len()
Expand Down Expand Up @@ -117,6 +118,10 @@ impl<'a> BinaryIter<'a> {
pub fn new(values: &'a [u8]) -> Self {
Self { values }
}

pub fn num_bytes(&self) -> usize {
self.values.len()
}
}

impl<'a> Iterator for BinaryIter<'a> {
Expand Down Expand Up @@ -149,6 +154,11 @@ impl<'a> SizedBinaryIter<'a> {
remaining: size,
}
}

pub fn num_bytes(&self) -> usize {
// iter is formed as [length as i32][bytes]
self.iter.num_bytes() - 4 * self.remaining
}
}

impl<'a> Iterator for SizedBinaryIter<'a> {
Expand All @@ -164,6 +174,7 @@ impl<'a> Iterator for SizedBinaryIter<'a> {
self.iter.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
Expand Down
15 changes: 10 additions & 5 deletions src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,16 @@ impl<'a> Decoder<'a> for BooleanDecoder {
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
MutableBitmap::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::Optional(_, _) | State::FilteredOptional(_, _) => (
MutableBitmap::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
),
State::Required(_) | State::FilteredRequired(_) => {
(MutableBitmap::with_capacity(capacity), MutableBitmap::new())
}
}
}

fn extend_from_state(
Expand Down
15 changes: 10 additions & 5 deletions src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,16 @@ where
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Vec::<K>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::Optional(_) | State::FilteredOptional(_, _) => (
Vec::<K>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
),
State::Required(_) | State::FilteredRequired(_) => {
(Vec::<K>::with_capacity(capacity), MutableBitmap::new())
}
}
}

fn extend_from_state(
Expand Down
16 changes: 11 additions & 5 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,17 @@ impl<'a> Decoder<'a> for BinaryDecoder {
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
FixedSizeBinary::with_capacity(capacity, self.size),
MutableBitmap::with_capacity(capacity),
)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::FilteredOptional(_, _) | State::OptionalDictionary(_) | State::Optional(_) => (
FixedSizeBinary::with_capacity(capacity, self.size),
MutableBitmap::with_capacity(capacity),
),
State::FilteredRequired(_) | State::RequiredDictionary(_) | State::Required(_) => (
FixedSizeBinary::with_capacity(capacity, self.size),
MutableBitmap::new(),
),
}
}

fn extend_from_state(
Expand Down
17 changes: 12 additions & 5 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,18 @@ where
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Vec::<T>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::Optional(_, _)
| State::OptionalDictionary(_, _)
| State::FilteredOptional(_, _) => (
Vec::<T>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
),
State::Required(_) | State::RequiredDictionary(_) | State::FilteredRequired(_) => {
(Vec::<T>::with_capacity(capacity), MutableBitmap::new())
}
}
}

fn extend_from_state(
Expand Down
14 changes: 12 additions & 2 deletions src/io/parquet/read/deserialize/primitive/integer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,18 @@ where
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
self.0.with_capacity(capacity)
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState {
match page {
State::Common(page) => self.0.with_capacity(capacity, page),
State::DeltaBinaryPackedRequired(_) | State::FilteredDeltaBinaryPackedRequired(_) => {
(Vec::<T>::with_capacity(capacity), MutableBitmap::new())
}
State::DeltaBinaryPackedOptional(_, _)
| State::FilteredDeltaBinaryPackedOptional(_, _) => (
Vec::<T>::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
),
}
}

fn extend_from_state(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ pub(super) trait Decoder<'a> {
) -> Result<Self::State, Error>;

/// Initializes a new [`Self::DecodedState`].
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;
fn with_capacity(&self, capacity: usize, page: &Self::State) -> Self::DecodedState;

/// extends [`Self::DecodedState`] by deserializing items in [`Self::State`].
/// It guarantees that the length of `decoded` is at most `decoded.len() + remaining`.
Expand Down Expand Up @@ -416,7 +416,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
decoded
} else {
// there is no state => initialize it
decoder.with_capacity(capacity)
decoder.with_capacity(capacity, &page)
};
let existing = decoded.len();

Expand All @@ -429,7 +429,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
while page.len() > 0 && *remaining > 0 {
let additional = chunk_size.min(*remaining);

let mut decoded = decoder.with_capacity(additional);
let mut decoded = decoder.with_capacity(additional, &page);
decoder.extend_from_state(&mut page, &mut decoded, additional);
*remaining -= decoded.len();
items.push_back(decoded)
Expand Down

0 comments on commit e8e04f7

Please sign in to comment.