diff --git a/Cargo.toml b/Cargo.toml index 6c7e50389e6..46d1af489ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,7 +170,7 @@ io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. -io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"] +io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator", "compute_take"] io_parquet_compression = [ "io_parquet_zstd", diff --git a/arrow-parquet-integration-testing/main.py b/arrow-parquet-integration-testing/main.py index a880af617d8..4cf197a2f7d 100644 --- a/arrow-parquet-integration-testing/main.py +++ b/arrow-parquet-integration-testing/main.py @@ -73,6 +73,7 @@ def variations(): "generated_datetime", "generated_decimal", "generated_interval", + "generated_nested", # see https://issues.apache.org/jira/browse/ARROW-13486 and # https://issues.apache.org/jira/browse/ARROW-13487 # "generated_dictionary", diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index e7ddf8991c4..52b41e699b9 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -2,9 +2,9 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType use parquet2::{page::Page, write::DynIter}; use std::fmt::Debug; -use crate::array::{ListArray, StructArray}; +use crate::array::{FixedSizeListArray, ListArray, PrimitiveArray, StructArray}; use crate::bitmap::Bitmap; -use crate::datatypes::PhysicalType; +use crate::datatypes::{DataType, PhysicalType}; use crate::io::parquet::read::schema::is_nullable; use crate::offset::Offset; use crate::{ @@ -174,6 +174,32 @@ fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array> let array = array.as_any().downcast_ref::>().unwrap(); to_leaves_recursive(array.values().as_ref(), leaves); } + FixedSizeList => { + let indices: Option> = array.validity().map(|validity| { + validity + .into_iter() + .enumerate() + .map(|(idx, val)| if val { Some(idx as u32) } else { None }) + .flatten() + .collect() + }); + + if let Some(indices) = indices { + let new_array = crate::compute::take::take( + array, + &PrimitiveArray::new(DataType::UInt32, indices.into(), None), + ) + .unwrap(); + let new_array = new_array + .as_any() + .downcast_ref::() + .unwrap(); + to_leaves_recursive(new_array.values().as_ref(), leaves); + } else { + let array = array.as_any().downcast_ref::().unwrap(); + to_leaves_recursive(array.values().as_ref(), leaves); + } + } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 | LargeUtf8 | Dictionary(_) => leaves.push(array), other => todo!("Writing {:?} to parquet not yet implemented", other),