From 7aa76492428dbe4bf18d60ef9d7cfed50a97e7f9 Mon Sep 17 00:00:00 2001 From: Chandra Penke <16369152+ncpenke@users.noreply.github.com> Date: Mon, 27 Jun 2022 14:57:46 -0700 Subject: [PATCH] Fix #40 Add support for converting to Chunk (#44) - Improve test coverage --- README.md | 13 ++- arrow2_convert/src/deserialize.rs | 24 ++--- arrow2_convert/src/lib.rs | 7 ++ arrow2_convert/src/serialize.rs | 55 ++++++++++++ arrow2_convert/tests/test_deserialize.rs | 17 +++- arrow2_convert/tests/test_round_trip.rs | 69 +++++++++++++++ arrow2_convert/tests/test_serialize.rs | 106 ++++++++++++++++++++++- 7 files changed, 268 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 0de4044..b0bd4ed 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,14 @@ Provides an API on top of [`arrow2`](https://github.com/jorgecarleitao/arrow2) t The Arrow ecosystem provides many ways to convert between Arrow and other popular formats across several languages. This project aims to serve the need for rust-centric data pipelines to easily convert to/from Arrow via an auto-generated compile-time schema. -## Design +## API -Types that implements the `ArrowField`, `ArrowSerialize` and `ArrowDeserialize` traits can be converted to/from Arrow via the `try_into_arrow` and the `try_into_collection` methods. +Types that implement the `ArrowField`, `ArrowSerialize` and `ArrowDeserialize` traits can be converted to/from Arrow. The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowSerialize` and `ArrowDeserialize` implementations provide the conversion logic via arrow2's data structures. -The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowSerialize` and `ArrowDeserialize` implementations provide the conversion logic via arrow2's data structures. + +For serializing to arrow, the `TryIntoArrow::try_into_arrow` method can be used to serialize any iterable into an `arrow2::Array`, which represents the in-memory Arrow layout or a `arrow2::Chunk`, which represents a column group, `Vec>`. `arrow2::Chunk` can be used with `arrow2` API for other functionality such converting to parquet and arrow flight RPC. + +For deserializing from arrow, the `TryIntoCollection::try_into_collection` can be used to deserialize from an `arrow2::Array` representation into any container that implements `FromIterator`. ## Features @@ -31,6 +34,10 @@ The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowS This is not an exhaustive list. Please open an issue if you need a feature. +### A note on nested option times + +Since the Arrow format only supports one level of validity, nested option types such as `Option>` after serialization to Arrow will lose intermediate nesting of None values. For example, `Some(None)` will be serialized to `None`, + ## Memory Pass-thru conversions perform a single memory copy. Deserialization performs a copy from arrow2 to the destination. Serialization performs a copy from the source to arrow2. In-place deserialization is theoretically possible but currently not supported. diff --git a/arrow2_convert/src/deserialize.rs b/arrow2_convert/src/deserialize.rs index f991242..add595b 100644 --- a/arrow2_convert/src/deserialize.rs +++ b/arrow2_convert/src/deserialize.rs @@ -92,7 +92,7 @@ where fn arrow_deserialize( v: <&Self::ArrayType as IntoIterator>::Item, ) -> Option<::Type> { - Some(Self::arrow_deserialize_internal(v)) + Self::arrow_deserialize_internal(v).map(Some) } #[inline] @@ -194,14 +194,10 @@ where for<'a> &'a T::ArrayType: IntoIterator, { use std::ops::Deref; - match v { - Some(t) => { - arrow_array_deserialize_iterator_internal::<::Type, T>(t.deref()) - .ok() - .map(|i| i.collect::::Type>>()) - } - None => None, - } + v.map(|t| { + arrow_array_deserialize_iterator_internal::<::Type, T>(t.deref()) + .collect::::Type>>() + }) } // Blanket implementation for Vec @@ -270,15 +266,13 @@ where /// Helper to return an iterator for elements from a [`arrow2::array::Array`]. fn arrow_array_deserialize_iterator_internal<'a, Element, Field>( b: &'a dyn arrow2::array::Array, -) -> arrow2::error::Result + 'a> +) -> impl Iterator + 'a where Field: ArrowDeserialize + ArrowField + 'static, for<'b> &'b ::ArrayType: IntoIterator, { - Ok( - <::ArrayType as ArrowArray>::iter_from_array_ref(b) - .map(::arrow_deserialize_internal), - ) + <::ArrayType as ArrowArray>::iter_from_array_ref(b) + .map(::arrow_deserialize_internal) } pub fn arrow_array_deserialize_iterator_as_type<'a, Element, ArrowType>( @@ -297,7 +291,7 @@ where Ok(arrow_array_deserialize_iterator_internal::< Element, ArrowType, - >(arr)?) + >(arr)) } } diff --git a/arrow2_convert/src/lib.rs b/arrow2_convert/src/lib.rs index b0b4481..3c84c79 100644 --- a/arrow2_convert/src/lib.rs +++ b/arrow2_convert/src/lib.rs @@ -1,3 +1,5 @@ +#![forbid(unsafe_code)] + // The proc macro is implemented in derive_internal, and re-exported by this // crate. This is because a single crate can not define both a proc macro and a // macro_rules macro. @@ -8,3 +10,8 @@ pub mod serialize; #[cfg(feature = "arrow2_convert_derive")] #[doc(hidden)] pub use arrow2_convert_derive::ArrowField; + +// Test README with doctests +#[doc = include_str!("../README.md")] +#[cfg(doctest)] +struct ReadmeDoctests; diff --git a/arrow2_convert/src/serialize.rs b/arrow2_convert/src/serialize.rs index fd9c814..5c56de3 100644 --- a/arrow2_convert/src/serialize.rs +++ b/arrow2_convert/src/serialize.rs @@ -1,4 +1,5 @@ use arrow2::array::*; +use arrow2::chunk::Chunk; use chrono::{NaiveDate, NaiveDateTime}; use std::sync::Arc; @@ -449,3 +450,57 @@ where Ok(arrow_serialize_internal::(self)?.as_box()) } } + +impl<'a, Element, Collection> TryIntoArrow<'a, Chunk>, Element> for Collection +where + Element: ArrowSerialize + ArrowField + 'static, + Collection: IntoIterator, +{ + fn try_into_arrow(self) -> arrow2::error::Result>> { + Ok(Chunk::new(vec![arrow_serialize_internal::< + Element, + Element, + Collection, + >(self)? + .as_arc()])) + } + + fn try_into_arrow_as_type(self) -> arrow2::error::Result>> + where + Field: ArrowSerialize + ArrowField + 'static, + { + Ok(Chunk::new(vec![arrow_serialize_internal::< + Element, + Field, + Collection, + >(self)? + .as_arc()])) + } +} + +impl<'a, Element, Collection> TryIntoArrow<'a, Chunk>, Element> for Collection +where + Element: ArrowSerialize + ArrowField + 'static, + Collection: IntoIterator, +{ + fn try_into_arrow(self) -> arrow2::error::Result>> { + Ok(Chunk::new(vec![arrow_serialize_internal::< + Element, + Element, + Collection, + >(self)? + .as_box()])) + } + + fn try_into_arrow_as_type(self) -> arrow2::error::Result>> + where + E: ArrowSerialize + ArrowField + 'static, + { + Ok(Chunk::new(vec![arrow_serialize_internal::< + Element, + E, + Collection, + >(self)? + .as_box()])) + } +} diff --git a/arrow2_convert/tests/test_deserialize.rs b/arrow2_convert/tests/test_deserialize.rs index db8a4fa..1b7c704 100644 --- a/arrow2_convert/tests/test_deserialize.rs +++ b/arrow2_convert/tests/test_deserialize.rs @@ -18,14 +18,19 @@ fn test_deserialize_iterator() { } let original_array = [S { a1: 1 }, S { a1: 100 }, S { a1: 1000 }]; - let b: Box = original_array.try_into_arrow().unwrap(); - let iter = arrow_array_deserialize_iterator::(b.borrow()).unwrap(); - for (i, k) in iter.zip(original_array.iter()) { assert_eq!(&i, k); } + + let original_array = [Some(Some(1_i32)), Some(Some(100)), Some(None), None]; + let expected = [Some(Some(1_i32)), Some(Some(100)), None, None]; + let b: Box = original_array.try_into_arrow().unwrap(); + let iter = arrow_array_deserialize_iterator::>>(b.borrow()).unwrap(); + for (i, k) in iter.zip(expected.iter()) { + assert_eq!(&i, k); + } } #[test] @@ -41,9 +46,13 @@ fn test_deserialize_schema_mismatch_error() { let arr1 = vec![S1 { a: 1 }, S1 { a: 2 }]; let arr1: Box = arr1.try_into_arrow().unwrap(); - let result: Result> = arr1.try_into_collection(); assert!(result.is_err()); + + let arr1 = vec![S1 { a: 1 }, S1 { a: 2 }]; + let arr1: Box = arr1.try_into_arrow().unwrap(); + let result: Result> = arr1.try_into_collection_as_type::(); + assert!(result.is_err()); } #[test] diff --git a/arrow2_convert/tests/test_round_trip.rs b/arrow2_convert/tests/test_round_trip.rs index f5ee674..6aa0833 100644 --- a/arrow2_convert/tests/test_round_trip.rs +++ b/arrow2_convert/tests/test_round_trip.rs @@ -7,6 +7,7 @@ use arrow2_convert::{ field::{FixedSizeBinary, FixedSizeVec, LargeString, LargeVec}, ArrowField, }; +use std::sync::Arc; #[test] fn test_nested_optional_struct_array() { @@ -138,3 +139,71 @@ fn test_fixed_size_vec() { .unwrap(); assert_eq!(round_trip, ints); } + +#[test] +fn test_primitive_type_vec() { + macro_rules! test_int_type { + ($t:ty) => { + let original_array = vec![1 as $t, 2, 3]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec<$t> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(1 as $t), None, Some(3)]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(1 as $t), None, Some(3)]; + let b: Arc = original_array.try_into_arrow().unwrap(); + let round_trip: Vec> = + b.try_into_collection_as_type::>().unwrap(); + assert_eq!(original_array, round_trip); + }; + } + + macro_rules! test_float_type { + ($t:ty) => { + let original_array = vec![1 as $t, 2., 3.]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec<$t> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(1 as $t), None, Some(3.)]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(1 as $t), None, Some(3.)]; + let b: Arc = original_array.try_into_arrow().unwrap(); + let round_trip: Vec> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + }; + } + + test_int_type!(i8); + test_int_type!(i16); + test_int_type!(i32); + test_int_type!(i64); + test_int_type!(u8); + test_int_type!(u16); + test_int_type!(u32); + test_int_type!(u64); + test_float_type!(f32); + test_float_type!(f64); + + let original_array = vec![false, true, false]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(false), Some(true), None]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); + + let original_array = vec![Some(b"aa".to_vec()), None]; + let b: Box = original_array.try_into_arrow().unwrap(); + let round_trip: Vec>> = b.try_into_collection().unwrap(); + assert_eq!(original_array, round_trip); +} diff --git a/arrow2_convert/tests/test_serialize.rs b/arrow2_convert/tests/test_serialize.rs index c30fc1a..c257863 100644 --- a/arrow2_convert/tests/test_serialize.rs +++ b/arrow2_convert/tests/test_serialize.rs @@ -1,6 +1,8 @@ use arrow2::array::Array; -use arrow2_convert::field::FixedSizeBinary; +use arrow2::chunk::Chunk; +use arrow2_convert::field::{ArrowField, FixedSizeBinary}; use arrow2_convert::serialize::*; +use std::sync::Arc; #[test] fn test_error_exceed_fixed_size_binary() { @@ -9,3 +11,105 @@ fn test_error_exceed_fixed_size_binary() { strs.try_into_arrow_as_type::>(); assert!(r.is_err()) } + +#[test] +fn test_chunk() { + let strs = [b"abc".to_vec()]; + let r: Chunk> = strs.try_into_arrow_as_type::>().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!( + r.arrays()[0].data_type(), + & as ArrowField>::data_type() + ); + + let r: Chunk> = strs.try_into_arrow().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!( + r.arrays()[0].data_type(), + & as ArrowField>::data_type() + ); + + let r: Chunk> = strs.try_into_arrow_as_type::>().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!( + r.arrays()[0].data_type(), + & as ArrowField>::data_type() + ); + + let r: Chunk> = strs.try_into_arrow().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!( + r.arrays()[0].data_type(), + & as ArrowField>::data_type() + ); +} + +#[test] +fn test_array() { + let strs = [b"abc".to_vec()]; + let r: Box = strs.try_into_arrow_as_type::>().unwrap(); + assert_eq!( + r.data_type(), + & as ArrowField>::data_type() + ); + + let r: Box = strs.try_into_arrow().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!(r.data_type(), & as ArrowField>::data_type()); + + let r: Arc = strs.try_into_arrow_as_type::>().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!( + r.data_type(), + & as ArrowField>::data_type() + ); + + let r: Arc = strs.try_into_arrow().unwrap(); + assert_eq!(r.len(), 1); + assert_eq!(r.data_type(), & as ArrowField>::data_type()); +} + +#[test] +fn test_field_serialize_error() { + pub struct CustomType(u64); + + impl arrow2_convert::field::ArrowField for CustomType { + type Type = Self; + + #[inline] + fn data_type() -> arrow2::datatypes::DataType { + arrow2::datatypes::DataType::Extension( + "custom".to_string(), + Box::new(arrow2::datatypes::DataType::UInt64), + None, + ) + } + } + + impl arrow2_convert::serialize::ArrowSerialize for CustomType { + type MutableArrayType = arrow2::array::MutablePrimitiveArray; + + #[inline] + fn new_array() -> Self::MutableArrayType { + Self::MutableArrayType::from(::data_type()) + } + + #[inline] + fn arrow_serialize(_: &Self, _: &mut Self::MutableArrayType) -> arrow2::error::Result<()> { + Err(arrow2::error::Error::NotYetImplemented("".to_owned())) + } + } + + impl arrow2_convert::deserialize::ArrowDeserialize for CustomType { + type ArrayType = arrow2::array::PrimitiveArray; + + #[inline] + fn arrow_deserialize(v: Option<&u64>) -> Option { + v.map(|t| CustomType(*t)) + } + } + + let arr = vec![CustomType(0)]; + let r: arrow2::error::Result> = arr.try_into_arrow(); + assert!(r.is_err()) +}