Skip to content

Commit

Permalink
Fix #40 Add support for converting to Chunk (#44)
Browse files Browse the repository at this point in the history
- Improve test coverage
  • Loading branch information
ncpenke authored Jun 27, 2022
1 parent 0424380 commit 7aa7649
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 23 deletions.
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ Provides an API on top of [`arrow2`](https://github.com/jorgecarleitao/arrow2) t

The Arrow ecosystem provides many ways to convert between Arrow and other popular formats across several languages. This project aims to serve the need for rust-centric data pipelines to easily convert to/from Arrow via an auto-generated compile-time schema.

## Design
## API

Types that implements the `ArrowField`, `ArrowSerialize` and `ArrowDeserialize` traits can be converted to/from Arrow via the `try_into_arrow` and the `try_into_collection` methods.
Types that implement the `ArrowField`, `ArrowSerialize` and `ArrowDeserialize` traits can be converted to/from Arrow. The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowSerialize` and `ArrowDeserialize` implementations provide the conversion logic via arrow2's data structures.

The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowSerialize` and `ArrowDeserialize` implementations provide the conversion logic via arrow2's data structures.

For serializing to arrow, the `TryIntoArrow::try_into_arrow` method can be used to serialize any iterable into an `arrow2::Array`, which represents the in-memory Arrow layout or a `arrow2::Chunk`, which represents a column group, `Vec<arrow2::Array>>`. `arrow2::Chunk` can be used with `arrow2` API for other functionality such converting to parquet and arrow flight RPC.

For deserializing from arrow, the `TryIntoCollection::try_into_collection` can be used to deserialize from an `arrow2::Array` representation into any container that implements `FromIterator`.

## Features

Expand All @@ -31,6 +34,10 @@ The `ArrowField` implementation for a type defines the Arrow schema. The `ArrowS

This is not an exhaustive list. Please open an issue if you need a feature.

### A note on nested option times

Since the Arrow format only supports one level of validity, nested option types such as `Option<Option<T>>` after serialization to Arrow will lose intermediate nesting of None values. For example, `Some(None)` will be serialized to `None`,

## Memory

Pass-thru conversions perform a single memory copy. Deserialization performs a copy from arrow2 to the destination. Serialization performs a copy from the source to arrow2. In-place deserialization is theoretically possible but currently not supported.
Expand Down
24 changes: 9 additions & 15 deletions arrow2_convert/src/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
fn arrow_deserialize(
v: <&Self::ArrayType as IntoIterator>::Item,
) -> Option<<Self as ArrowField>::Type> {
Some(Self::arrow_deserialize_internal(v))
Self::arrow_deserialize_internal(v).map(Some)
}

#[inline]
Expand Down Expand Up @@ -194,14 +194,10 @@ where
for<'a> &'a T::ArrayType: IntoIterator,
{
use std::ops::Deref;
match v {
Some(t) => {
arrow_array_deserialize_iterator_internal::<<T as ArrowField>::Type, T>(t.deref())
.ok()
.map(|i| i.collect::<Vec<<T as ArrowField>::Type>>())
}
None => None,
}
v.map(|t| {
arrow_array_deserialize_iterator_internal::<<T as ArrowField>::Type, T>(t.deref())
.collect::<Vec<<T as ArrowField>::Type>>()
})
}

// Blanket implementation for Vec
Expand Down Expand Up @@ -270,15 +266,13 @@ where
/// Helper to return an iterator for elements from a [`arrow2::array::Array`].
fn arrow_array_deserialize_iterator_internal<'a, Element, Field>(
b: &'a dyn arrow2::array::Array,
) -> arrow2::error::Result<impl Iterator<Item = Element> + 'a>
) -> impl Iterator<Item = Element> + 'a
where
Field: ArrowDeserialize + ArrowField<Type = Element> + 'static,
for<'b> &'b <Field as ArrowDeserialize>::ArrayType: IntoIterator,
{
Ok(
<<Field as ArrowDeserialize>::ArrayType as ArrowArray>::iter_from_array_ref(b)
.map(<Field as ArrowDeserialize>::arrow_deserialize_internal),
)
<<Field as ArrowDeserialize>::ArrayType as ArrowArray>::iter_from_array_ref(b)
.map(<Field as ArrowDeserialize>::arrow_deserialize_internal)
}

pub fn arrow_array_deserialize_iterator_as_type<'a, Element, ArrowType>(
Expand All @@ -297,7 +291,7 @@ where
Ok(arrow_array_deserialize_iterator_internal::<
Element,
ArrowType,
>(arr)?)
>(arr))
}
}

Expand Down
7 changes: 7 additions & 0 deletions arrow2_convert/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![forbid(unsafe_code)]

// The proc macro is implemented in derive_internal, and re-exported by this
// crate. This is because a single crate can not define both a proc macro and a
// macro_rules macro.
Expand All @@ -8,3 +10,8 @@ pub mod serialize;
#[cfg(feature = "arrow2_convert_derive")]
#[doc(hidden)]
pub use arrow2_convert_derive::ArrowField;

// Test README with doctests
#[doc = include_str!("../README.md")]
#[cfg(doctest)]
struct ReadmeDoctests;
55 changes: 55 additions & 0 deletions arrow2_convert/src/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use arrow2::array::*;
use arrow2::chunk::Chunk;
use chrono::{NaiveDate, NaiveDateTime};
use std::sync::Arc;

Expand Down Expand Up @@ -449,3 +450,57 @@ where
Ok(arrow_serialize_internal::<Element, E, Collection>(self)?.as_box())
}
}

impl<'a, Element, Collection> TryIntoArrow<'a, Chunk<Arc<dyn Array>>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Chunk<Arc<dyn Array>>> {
Ok(Chunk::new(vec![arrow_serialize_internal::<
Element,
Element,
Collection,
>(self)?
.as_arc()]))
}

fn try_into_arrow_as_type<Field>(self) -> arrow2::error::Result<Chunk<Arc<dyn Array>>>
where
Field: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(Chunk::new(vec![arrow_serialize_internal::<
Element,
Field,
Collection,
>(self)?
.as_arc()]))
}
}

impl<'a, Element, Collection> TryIntoArrow<'a, Chunk<Box<dyn Array>>, Element> for Collection
where
Element: ArrowSerialize + ArrowField<Type = Element> + 'static,
Collection: IntoIterator<Item = &'a Element>,
{
fn try_into_arrow(self) -> arrow2::error::Result<Chunk<Box<dyn Array>>> {
Ok(Chunk::new(vec![arrow_serialize_internal::<
Element,
Element,
Collection,
>(self)?
.as_box()]))
}

fn try_into_arrow_as_type<E>(self) -> arrow2::error::Result<Chunk<Box<dyn Array>>>
where
E: ArrowSerialize + ArrowField<Type = Element> + 'static,
{
Ok(Chunk::new(vec![arrow_serialize_internal::<
Element,
E,
Collection,
>(self)?
.as_box()]))
}
}
17 changes: 13 additions & 4 deletions arrow2_convert/tests/test_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ fn test_deserialize_iterator() {
}

let original_array = [S { a1: 1 }, S { a1: 100 }, S { a1: 1000 }];

let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();

let iter = arrow_array_deserialize_iterator::<S>(b.borrow()).unwrap();

for (i, k) in iter.zip(original_array.iter()) {
assert_eq!(&i, k);
}

let original_array = [Some(Some(1_i32)), Some(Some(100)), Some(None), None];
let expected = [Some(Some(1_i32)), Some(Some(100)), None, None];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let iter = arrow_array_deserialize_iterator::<Option<Option<i32>>>(b.borrow()).unwrap();
for (i, k) in iter.zip(expected.iter()) {
assert_eq!(&i, k);
}
}

#[test]
Expand All @@ -41,9 +46,13 @@ fn test_deserialize_schema_mismatch_error() {

let arr1 = vec![S1 { a: 1 }, S1 { a: 2 }];
let arr1: Box<dyn Array> = arr1.try_into_arrow().unwrap();

let result: Result<Vec<S2>> = arr1.try_into_collection();
assert!(result.is_err());

let arr1 = vec![S1 { a: 1 }, S1 { a: 2 }];
let arr1: Box<dyn Array> = arr1.try_into_arrow().unwrap();
let result: Result<Vec<_>> = arr1.try_into_collection_as_type::<S2>();
assert!(result.is_err());
}

#[test]
Expand Down
69 changes: 69 additions & 0 deletions arrow2_convert/tests/test_round_trip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow2_convert::{
field::{FixedSizeBinary, FixedSizeVec, LargeString, LargeVec},
ArrowField,
};
use std::sync::Arc;

#[test]
fn test_nested_optional_struct_array() {
Expand Down Expand Up @@ -138,3 +139,71 @@ fn test_fixed_size_vec() {
.unwrap();
assert_eq!(round_trip, ints);
}

#[test]
fn test_primitive_type_vec() {
macro_rules! test_int_type {
($t:ty) => {
let original_array = vec![1 as $t, 2, 3];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<$t> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(1 as $t), None, Some(3)];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<$t>> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(1 as $t), None, Some(3)];
let b: Arc<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<$t>> =
b.try_into_collection_as_type::<Option<$t>>().unwrap();
assert_eq!(original_array, round_trip);
};
}

macro_rules! test_float_type {
($t:ty) => {
let original_array = vec![1 as $t, 2., 3.];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<$t> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(1 as $t), None, Some(3.)];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<$t>> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(1 as $t), None, Some(3.)];
let b: Arc<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<$t>> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);
};
}

test_int_type!(i8);
test_int_type!(i16);
test_int_type!(i32);
test_int_type!(i64);
test_int_type!(u8);
test_int_type!(u16);
test_int_type!(u32);
test_int_type!(u64);
test_float_type!(f32);
test_float_type!(f64);

let original_array = vec![false, true, false];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<bool> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(false), Some(true), None];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<bool>> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);

let original_array = vec![Some(b"aa".to_vec()), None];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let round_trip: Vec<Option<Vec<u8>>> = b.try_into_collection().unwrap();
assert_eq!(original_array, round_trip);
}
106 changes: 105 additions & 1 deletion arrow2_convert/tests/test_serialize.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use arrow2::array::Array;
use arrow2_convert::field::FixedSizeBinary;
use arrow2::chunk::Chunk;
use arrow2_convert::field::{ArrowField, FixedSizeBinary};
use arrow2_convert::serialize::*;
use std::sync::Arc;

#[test]
fn test_error_exceed_fixed_size_binary() {
Expand All @@ -9,3 +11,105 @@ fn test_error_exceed_fixed_size_binary() {
strs.try_into_arrow_as_type::<FixedSizeBinary<2>>();
assert!(r.is_err())
}

#[test]
fn test_chunk() {
let strs = [b"abc".to_vec()];
let r: Chunk<Box<dyn Array>> = strs.try_into_arrow_as_type::<FixedSizeBinary<3>>().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(
r.arrays()[0].data_type(),
&<FixedSizeBinary<3> as ArrowField>::data_type()
);

let r: Chunk<Box<dyn Array>> = strs.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(
r.arrays()[0].data_type(),
&<Vec<u8> as ArrowField>::data_type()
);

let r: Chunk<Arc<dyn Array>> = strs.try_into_arrow_as_type::<FixedSizeBinary<3>>().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(
r.arrays()[0].data_type(),
&<FixedSizeBinary<3> as ArrowField>::data_type()
);

let r: Chunk<Arc<dyn Array>> = strs.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(
r.arrays()[0].data_type(),
&<Vec<u8> as ArrowField>::data_type()
);
}

#[test]
fn test_array() {
let strs = [b"abc".to_vec()];
let r: Box<dyn Array> = strs.try_into_arrow_as_type::<FixedSizeBinary<3>>().unwrap();
assert_eq!(
r.data_type(),
&<FixedSizeBinary<3> as ArrowField>::data_type()
);

let r: Box<dyn Array> = strs.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(r.data_type(), &<Vec<u8> as ArrowField>::data_type());

let r: Arc<dyn Array> = strs.try_into_arrow_as_type::<FixedSizeBinary<3>>().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(
r.data_type(),
&<FixedSizeBinary<3> as ArrowField>::data_type()
);

let r: Arc<dyn Array> = strs.try_into_arrow().unwrap();
assert_eq!(r.len(), 1);
assert_eq!(r.data_type(), &<Vec<u8> as ArrowField>::data_type());
}

#[test]
fn test_field_serialize_error() {
pub struct CustomType(u64);

impl arrow2_convert::field::ArrowField for CustomType {
type Type = Self;

#[inline]
fn data_type() -> arrow2::datatypes::DataType {
arrow2::datatypes::DataType::Extension(
"custom".to_string(),
Box::new(arrow2::datatypes::DataType::UInt64),
None,
)
}
}

impl arrow2_convert::serialize::ArrowSerialize for CustomType {
type MutableArrayType = arrow2::array::MutablePrimitiveArray<u64>;

#[inline]
fn new_array() -> Self::MutableArrayType {
Self::MutableArrayType::from(<Self as arrow2_convert::field::ArrowField>::data_type())
}

#[inline]
fn arrow_serialize(_: &Self, _: &mut Self::MutableArrayType) -> arrow2::error::Result<()> {
Err(arrow2::error::Error::NotYetImplemented("".to_owned()))
}
}

impl arrow2_convert::deserialize::ArrowDeserialize for CustomType {
type ArrayType = arrow2::array::PrimitiveArray<u64>;

#[inline]
fn arrow_deserialize(v: Option<&u64>) -> Option<Self> {
v.map(|t| CustomType(*t))
}
}

let arr = vec![CustomType(0)];
let r: arrow2::error::Result<Box<dyn Array>> = arr.try_into_arrow();
assert!(r.is_err())
}

0 comments on commit 7aa7649

Please sign in to comment.