From c2c8fa36a50d0fa6a3536f908d2770bd1086c8d2 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 13:29:13 +0300 Subject: [PATCH] Converts Arrow arrays to Postgres rows --- src/arrow_parquet.rs | 1 + src/arrow_parquet/arrow_to_pg.rs | 511 ++++++++++++++++++ src/arrow_parquet/arrow_to_pg/bool.rs | 30 + src/arrow_parquet/arrow_to_pg/bytea.rs | 33 ++ src/arrow_parquet/arrow_to_pg/char.rs | 34 ++ src/arrow_parquet/arrow_to_pg/composite.rs | 71 +++ src/arrow_parquet/arrow_to_pg/date.rs | 33 ++ .../arrow_to_pg/fallback_to_text.rs | 36 ++ src/arrow_parquet/arrow_to_pg/float4.rs | 29 + src/arrow_parquet/arrow_to_pg/float8.rs | 29 + src/arrow_parquet/arrow_to_pg/geometry.rs | 35 ++ src/arrow_parquet/arrow_to_pg/int2.rs | 29 + src/arrow_parquet/arrow_to_pg/int4.rs | 29 + src/arrow_parquet/arrow_to_pg/int8.rs | 29 + src/arrow_parquet/arrow_to_pg/map.rs | 79 +++ src/arrow_parquet/arrow_to_pg/numeric.rs | 34 ++ src/arrow_parquet/arrow_to_pg/oid.rs | 31 ++ src/arrow_parquet/arrow_to_pg/text.rs | 30 + src/arrow_parquet/arrow_to_pg/time.rs | 35 ++ src/arrow_parquet/arrow_to_pg/timestamp.rs | 37 ++ src/arrow_parquet/arrow_to_pg/timestamptz.rs | 39 ++ src/arrow_parquet/arrow_to_pg/timetz.rs | 37 ++ 22 files changed, 1251 insertions(+) create mode 100644 src/arrow_parquet/arrow_to_pg.rs create mode 100644 src/arrow_parquet/arrow_to_pg/bool.rs create mode 100644 src/arrow_parquet/arrow_to_pg/bytea.rs create mode 100644 src/arrow_parquet/arrow_to_pg/char.rs create mode 100644 src/arrow_parquet/arrow_to_pg/composite.rs create mode 100644 src/arrow_parquet/arrow_to_pg/date.rs create mode 100644 src/arrow_parquet/arrow_to_pg/fallback_to_text.rs create mode 100644 src/arrow_parquet/arrow_to_pg/float4.rs create mode 100644 src/arrow_parquet/arrow_to_pg/float8.rs create mode 100644 src/arrow_parquet/arrow_to_pg/geometry.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int2.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int4.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int8.rs create mode 100644 src/arrow_parquet/arrow_to_pg/map.rs create mode 100644 src/arrow_parquet/arrow_to_pg/numeric.rs create mode 100644 src/arrow_parquet/arrow_to_pg/oid.rs create mode 100644 src/arrow_parquet/arrow_to_pg/text.rs create mode 100644 src/arrow_parquet/arrow_to_pg/time.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timestamp.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timestamptz.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timetz.rs diff --git a/src/arrow_parquet.rs b/src/arrow_parquet.rs index 460de91..e30f1fc 100644 --- a/src/arrow_parquet.rs +++ b/src/arrow_parquet.rs @@ -1,3 +1,4 @@ +pub(crate) mod arrow_to_pg; pub(crate) mod arrow_utils; pub(crate) mod codec; pub(crate) mod parquet_writer; diff --git a/src/arrow_parquet/arrow_to_pg.rs b/src/arrow_parquet/arrow_to_pg.rs new file mode 100644 index 0000000..e9d942d --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg.rs @@ -0,0 +1,511 @@ +use arrow::array::{ + Array, ArrayData, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, ListArray, MapArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, UInt32Array, +}; +use arrow_schema::Fields; +use pgrx::{ + datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, + pg_sys::{ + Datum, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, + INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID, + }, + prelude::PgHeapTuple, + AllocatedByRust, AnyNumeric, IntoDatum, PgTupleDesc, +}; + +use crate::{ + pgrx_utils::{ + array_element_typoid, collect_valid_attributes, domain_array_base_elem_typoid, + is_array_type, is_composite_type, tuple_desc, + }, + type_compat::{ + fallback_to_text::{reset_fallback_to_text_context, FallbackToText}, + geometry::{is_postgis_geometry_type, Geometry}, + map::{is_crunchy_map_type, CrunchyMap}, + pg_arrow_type_conversions::{ + extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, + MAX_DECIMAL_PRECISION, + }, + }, +}; + +pub(crate) mod bool; +pub(crate) mod bytea; +pub(crate) mod char; +pub(crate) mod composite; +pub(crate) mod date; +pub(crate) mod fallback_to_text; +pub(crate) mod float4; +pub(crate) mod float8; +pub(crate) mod geometry; +pub(crate) mod int2; +pub(crate) mod int4; +pub(crate) mod int8; +pub(crate) mod map; +pub(crate) mod numeric; +pub(crate) mod oid; +pub(crate) mod text; +pub(crate) mod time; +pub(crate) mod timestamp; +pub(crate) mod timestamptz; +pub(crate) mod timetz; + +pub(crate) trait ArrowArrayToPgType, T: IntoDatum> { + fn to_pg_type(array: A, context: &ArrowToPgAttributeContext) -> Option; +} + +#[derive(Clone)] +pub(crate) struct ArrowToPgAttributeContext { + name: String, + typoid: Oid, + typmod: i32, + is_array: bool, + is_composite: bool, + is_geometry: bool, + is_crunchy_map: bool, + attribute_contexts: Option>, + attribute_tupledesc: Option>, + precision: Option, + scale: Option, +} + +impl ArrowToPgAttributeContext { + pub(crate) fn new(name: &str, typoid: Oid, typmod: i32, fields: Fields) -> Self { + let field = fields + .iter() + .find(|field| field.name() == name) + .unwrap_or_else(|| panic!("failed to find field {}", name)) + .clone(); + + let is_array = is_array_type(typoid); + let is_composite; + let is_geometry; + let is_crunchy_map; + let attribute_typoid; + let attribute_field; + + if is_array { + let element_typoid = array_element_typoid(typoid); + + is_composite = is_composite_type(element_typoid); + is_geometry = is_postgis_geometry_type(element_typoid); + is_crunchy_map = is_crunchy_map_type(element_typoid); + + if is_crunchy_map { + let entries_typoid = domain_array_base_elem_typoid(element_typoid); + attribute_typoid = entries_typoid; + } else { + attribute_typoid = element_typoid; + } + + attribute_field = match field.data_type() { + arrow::datatypes::DataType::List(field) => field.clone(), + _ => unreachable!(), + } + } else { + is_composite = is_composite_type(typoid); + is_geometry = is_postgis_geometry_type(typoid); + is_crunchy_map = is_crunchy_map_type(typoid); + + if is_crunchy_map { + let entries_typoid = domain_array_base_elem_typoid(typoid); + attribute_typoid = entries_typoid; + } else { + attribute_typoid = typoid; + } + + attribute_field = field.clone(); + } + + let attribute_tupledesc = if is_composite || is_crunchy_map { + Some(tuple_desc(attribute_typoid, typmod)) + } else { + None + }; + + let precision; + let scale; + if attribute_typoid == NUMERICOID { + precision = Some(extract_precision_from_numeric_typmod(typmod)); + scale = Some(extract_scale_from_numeric_typmod(typmod)); + } else { + precision = None; + scale = None; + } + + // for composite and crunchy_map types, recursively collect attribute contexts + let attribute_contexts = if let Some(attribute_tupledesc) = &attribute_tupledesc { + let fields = match attribute_field.data_type() { + arrow::datatypes::DataType::Struct(fields) => fields.clone(), + arrow::datatypes::DataType::Map(struct_field, _) => { + match struct_field.data_type() { + arrow::datatypes::DataType::Struct(fields) => fields.clone(), + _ => unreachable!(), + } + } + _ => unreachable!(), + }; + + Some(collect_arrow_to_pg_attribute_contexts( + attribute_tupledesc, + &fields, + )) + } else { + None + }; + + Self { + name: name.to_string(), + typoid: attribute_typoid, + typmod, + is_array, + is_composite, + is_geometry, + is_crunchy_map, + attribute_contexts, + attribute_tupledesc, + scale, + precision, + } + } + + pub(crate) fn name(&self) -> &str { + &self.name + } +} + +pub(crate) fn collect_arrow_to_pg_attribute_contexts( + tupledesc: &PgTupleDesc, + fields: &Fields, +) -> Vec { + // parquet file does not contain generated columns. PG will handle them. + let include_generated_columns = false; + let attributes = collect_valid_attributes(tupledesc, include_generated_columns); + let mut attribute_contexts = vec![]; + + for attribute in attributes { + let attribute_name = attribute.name(); + let attribute_typoid = attribute.type_oid().value(); + let attribute_typmod = attribute.type_mod(); + + let attribute_context = ArrowToPgAttributeContext::new( + attribute_name, + attribute_typoid, + attribute_typmod, + fields.clone(), + ); + + attribute_contexts.push(attribute_context); + } + + attribute_contexts +} + +pub(crate) fn to_pg_datum( + attribute_array: ArrayData, + attribute_context: &ArrowToPgAttributeContext, +) -> Option { + if attribute_context.is_array { + to_pg_array_datum(attribute_array, attribute_context) + } else { + to_pg_nonarray_datum(attribute_array, attribute_context) + } +} + +macro_rules! to_pg_datum { + ($arrow_array_type:ty, $pg_type:ty, $arrow_array:expr, $attribute_context:expr) => {{ + let val = <$pg_type as ArrowArrayToPgType<$arrow_array_type, $pg_type>>::to_pg_type( + $arrow_array.into(), + $attribute_context, + ); + + val.into_datum() + }}; +} + +fn to_pg_nonarray_datum( + primitive_array: ArrayData, + attribute_context: &ArrowToPgAttributeContext, +) -> Option { + match attribute_context.typoid { + FLOAT4OID => { + to_pg_datum!(Float32Array, f32, primitive_array, attribute_context) + } + FLOAT8OID => { + to_pg_datum!(Float64Array, f64, primitive_array, attribute_context) + } + INT2OID => { + to_pg_datum!(Int16Array, i16, primitive_array, attribute_context) + } + INT4OID => { + to_pg_datum!(Int32Array, i32, primitive_array, attribute_context) + } + INT8OID => { + to_pg_datum!(Int64Array, i64, primitive_array, attribute_context) + } + BOOLOID => { + to_pg_datum!(BooleanArray, bool, primitive_array, attribute_context) + } + CHAROID => { + to_pg_datum!(StringArray, i8, primitive_array, attribute_context) + } + TEXTOID => { + to_pg_datum!(StringArray, String, primitive_array, attribute_context) + } + BYTEAOID => { + to_pg_datum!(BinaryArray, Vec, primitive_array, attribute_context) + } + OIDOID => { + to_pg_datum!(UInt32Array, Oid, primitive_array, attribute_context) + } + NUMERICOID => { + let precision = attribute_context + .precision + .expect("missing precision in context"); + + if precision > MAX_DECIMAL_PRECISION { + reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); + + to_pg_datum!( + StringArray, + FallbackToText, + primitive_array, + attribute_context + ) + } else { + to_pg_datum!( + Decimal128Array, + AnyNumeric, + primitive_array, + attribute_context + ) + } + } + DATEOID => { + to_pg_datum!(Date32Array, Date, primitive_array, attribute_context) + } + TIMEOID => { + to_pg_datum!( + Time64MicrosecondArray, + Time, + primitive_array, + attribute_context + ) + } + TIMETZOID => { + to_pg_datum!( + Time64MicrosecondArray, + TimeWithTimeZone, + primitive_array, + attribute_context + ) + } + TIMESTAMPOID => { + to_pg_datum!( + TimestampMicrosecondArray, + Timestamp, + primitive_array, + attribute_context + ) + } + TIMESTAMPTZOID => { + to_pg_datum!( + TimestampMicrosecondArray, + TimestampWithTimeZone, + primitive_array, + attribute_context + ) + } + _ => { + if attribute_context.is_composite { + to_pg_datum!( + StructArray, + PgHeapTuple, + primitive_array, + attribute_context + ) + } else if attribute_context.is_crunchy_map { + to_pg_datum!(MapArray, CrunchyMap, primitive_array, attribute_context) + } else if attribute_context.is_geometry { + to_pg_datum!(BinaryArray, Geometry, primitive_array, attribute_context) + } else { + reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); + + to_pg_datum!( + StringArray, + FallbackToText, + primitive_array, + attribute_context + ) + } + } + } +} + +fn to_pg_array_datum( + list_array: ArrayData, + attribute_context: &ArrowToPgAttributeContext, +) -> Option { + let list_array: ListArray = list_array.into(); + + if list_array.is_null(0) { + return None; + } + + let list_array = list_array.value(0).to_data(); + + match attribute_context.typoid { + FLOAT4OID => { + to_pg_datum!( + Float32Array, + Vec>, + list_array, + attribute_context + ) + } + FLOAT8OID => { + to_pg_datum!( + Float64Array, + Vec>, + list_array, + attribute_context + ) + } + INT2OID => { + to_pg_datum!(Int16Array, Vec>, list_array, attribute_context) + } + INT4OID => { + to_pg_datum!(Int32Array, Vec>, list_array, attribute_context) + } + INT8OID => { + to_pg_datum!(Int64Array, Vec>, list_array, attribute_context) + } + BOOLOID => { + to_pg_datum!( + BooleanArray, + Vec>, + list_array, + attribute_context + ) + } + CHAROID => { + to_pg_datum!(StringArray, Vec>, list_array, attribute_context) + } + TEXTOID => { + to_pg_datum!( + StringArray, + Vec>, + list_array, + attribute_context + ) + } + BYTEAOID => { + to_pg_datum!( + BinaryArray, + Vec>>, + list_array, + attribute_context + ) + } + OIDOID => { + to_pg_datum!(UInt32Array, Vec>, list_array, attribute_context) + } + NUMERICOID => { + let precision = attribute_context + .precision + .expect("missing precision in context"); + + if precision > MAX_DECIMAL_PRECISION { + reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); + + to_pg_datum!( + StringArray, + Vec>, + list_array, + attribute_context + ) + } else { + to_pg_datum!( + Decimal128Array, + Vec>, + list_array, + attribute_context + ) + } + } + DATEOID => { + to_pg_datum!( + Date32Array, + Vec>, + list_array, + attribute_context + ) + } + TIMEOID => { + to_pg_datum!( + Time64MicrosecondArray, + Vec>, + list_array, + attribute_context + ) + } + TIMETZOID => { + to_pg_datum!( + Time64MicrosecondArray, + Vec>, + list_array, + attribute_context + ) + } + TIMESTAMPOID => { + to_pg_datum!( + TimestampMicrosecondArray, + Vec>, + list_array, + attribute_context + ) + } + TIMESTAMPTZOID => { + to_pg_datum!( + TimestampMicrosecondArray, + Vec>, + list_array, + attribute_context + ) + } + _ => { + if attribute_context.is_composite { + to_pg_datum!( + StructArray, + Vec>>, + list_array, + attribute_context + ) + } else if attribute_context.is_crunchy_map { + to_pg_datum!( + MapArray, + Vec>, + list_array, + attribute_context + ) + } else if attribute_context.is_geometry { + to_pg_datum!( + BinaryArray, + Vec>, + list_array, + attribute_context + ) + } else { + reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); + + to_pg_datum!( + StringArray, + Vec>, + list_array, + attribute_context + ) + } + } + } +} diff --git a/src/arrow_parquet/arrow_to_pg/bool.rs b/src/arrow_parquet/arrow_to_pg/bool.rs new file mode 100644 index 0000000..584d855 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/bool.rs @@ -0,0 +1,30 @@ +use arrow::array::{Array, BooleanArray}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Bool +impl ArrowArrayToPgType for bool { + fn to_pg_type(arr: BooleanArray, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Bool[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: BooleanArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/bytea.rs b/src/arrow_parquet/arrow_to_pg/bytea.rs new file mode 100644 index 0000000..0fcd803 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/bytea.rs @@ -0,0 +1,33 @@ +use arrow::array::{Array, BinaryArray}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Bytea +impl ArrowArrayToPgType> for Vec { + fn to_pg_type(arr: BinaryArray, _context: &ArrowToPgAttributeContext) -> Option> { + if arr.is_null(0) { + None + } else { + Some(arr.value(0).to_vec()) + } + } +} + +// Bytea[] +impl ArrowArrayToPgType>>> for Vec>> { + fn to_pg_type( + arr: BinaryArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>>> { + let mut vals = vec![]; + for val in arr.iter() { + if let Some(val) = val { + vals.push(Some(val.to_vec())); + } else { + vals.push(None); + } + } + + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/char.rs b/src/arrow_parquet/arrow_to_pg/char.rs new file mode 100644 index 0000000..c8baae8 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/char.rs @@ -0,0 +1,34 @@ +use arrow::array::{Array, StringArray}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Char +impl ArrowArrayToPgType for i8 { + fn to_pg_type(arr: StringArray, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + let val: i8 = val.chars().next().expect("unexpected ascii char") as i8; + Some(val) + } + } +} + +// Char[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: StringArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(|val| { + let val: i8 = val.chars().next().expect("unexpected ascii char") as i8; + val + }); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/composite.rs b/src/arrow_parquet/arrow_to_pg/composite.rs new file mode 100644 index 0000000..1fb263b --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/composite.rs @@ -0,0 +1,71 @@ +use arrow::array::{Array, StructArray}; +use pgrx::{prelude::PgHeapTuple, AllocatedByRust}; + +use super::{to_pg_datum, ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// PgHeapTuple +impl<'a> ArrowArrayToPgType> + for PgHeapTuple<'a, AllocatedByRust> +{ + fn to_pg_type( + arr: StructArray, + context: &ArrowToPgAttributeContext, + ) -> Option> { + if arr.is_null(0) { + return None; + } + + let mut datums = vec![]; + + for attribute_context in context + .attribute_contexts + .as_ref() + .expect("each attribute of the tuple should have a context") + { + let column_data = arr + .column_by_name(&attribute_context.name) + .unwrap_or_else(|| panic!("column {} not found", &attribute_context.name)); + + let datum = to_pg_datum(column_data.into_data(), attribute_context); + + datums.push(datum); + } + + let tupledesc = context + .attribute_tupledesc + .as_ref() + .expect("Expected attribute tupledesc"); + + Some( + unsafe { PgHeapTuple::from_datums(tupledesc.clone(), datums) }.unwrap_or_else(|e| { + panic!("failed to create heap tuple: {}", e); + }), + ) + } +} + +// PgHeapTuple[] +impl<'a> ArrowArrayToPgType>>> + for Vec>> +{ + fn to_pg_type( + arr: StructArray, + context: &ArrowToPgAttributeContext, + ) -> Option>>> { + let len = arr.len(); + let mut values = Vec::with_capacity(len); + + for i in 0..len { + let tuple = arr.slice(i, 1); + + let tuple = as ArrowArrayToPgType< + StructArray, + PgHeapTuple, + >>::to_pg_type(tuple, &context.clone()); + + values.push(tuple); + } + + Some(values) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/date.rs b/src/arrow_parquet/arrow_to_pg/date.rs new file mode 100644 index 0000000..74ede7f --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/date.rs @@ -0,0 +1,33 @@ +use arrow::array::{Array, Date32Array}; +use pgrx::datum::Date; + +use crate::type_compat::pg_arrow_type_conversions::i32_to_date; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Date +impl ArrowArrayToPgType for Date { + fn to_pg_type(arr: Date32Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(i32_to_date(val)) + } + } +} + +// Date[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Date32Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(i32_to_date); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/fallback_to_text.rs b/src/arrow_parquet/arrow_to_pg/fallback_to_text.rs new file mode 100644 index 0000000..ae2c2fe --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/fallback_to_text.rs @@ -0,0 +1,36 @@ +use arrow::array::{Array, StringArray}; + +use crate::type_compat::fallback_to_text::FallbackToText; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Text representation of any type +impl ArrowArrayToPgType for FallbackToText { + fn to_pg_type( + arr: StringArray, + _context: &ArrowToPgAttributeContext, + ) -> Option { + if arr.is_null(0) { + None + } else { + let text_repr = arr.value(0).to_string(); + let val = FallbackToText(text_repr); + Some(val) + } + } +} + +// Text[] representation of any type +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: StringArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(|val| FallbackToText(val.to_string())); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/float4.rs b/src/arrow_parquet/arrow_to_pg/float4.rs new file mode 100644 index 0000000..2165928 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/float4.rs @@ -0,0 +1,29 @@ +use arrow::array::{Array, Float32Array}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Float4 +impl ArrowArrayToPgType for f32 { + fn to_pg_type(arr: Float32Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Float4[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Float32Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/float8.rs b/src/arrow_parquet/arrow_to_pg/float8.rs new file mode 100644 index 0000000..c7f0dd5 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/float8.rs @@ -0,0 +1,29 @@ +use arrow::array::{Array, Float64Array}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Float8 +impl ArrowArrayToPgType for f64 { + fn to_pg_type(arr: Float64Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Float8[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Float64Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/geometry.rs b/src/arrow_parquet/arrow_to_pg/geometry.rs new file mode 100644 index 0000000..a55ca25 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/geometry.rs @@ -0,0 +1,35 @@ +use arrow::array::{Array, BinaryArray}; + +use crate::type_compat::geometry::Geometry; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Geometry +impl ArrowArrayToPgType for Geometry { + fn to_pg_type(arr: BinaryArray, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + Some(arr.value(0).to_vec().into()) + } + } +} + +// Geometry[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: BinaryArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + if let Some(val) = val { + vals.push(Some(val.to_vec().into())); + } else { + vals.push(None); + } + } + + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/int2.rs b/src/arrow_parquet/arrow_to_pg/int2.rs new file mode 100644 index 0000000..132bbb4 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/int2.rs @@ -0,0 +1,29 @@ +use arrow::array::{Array, Int16Array}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Int2 +impl ArrowArrayToPgType for i16 { + fn to_pg_type(arr: Int16Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Int2[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Int16Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/int4.rs b/src/arrow_parquet/arrow_to_pg/int4.rs new file mode 100644 index 0000000..3a23abd --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/int4.rs @@ -0,0 +1,29 @@ +use arrow::array::{Array, Int32Array}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Int4 +impl ArrowArrayToPgType for i32 { + fn to_pg_type(arr: Int32Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Int4[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Int32Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/int8.rs b/src/arrow_parquet/arrow_to_pg/int8.rs new file mode 100644 index 0000000..8f2bf92 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/int8.rs @@ -0,0 +1,29 @@ +use arrow::array::{Array, Int64Array}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Int8 +impl ArrowArrayToPgType for i64 { + fn to_pg_type(arr: Int64Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val) + } + } +} + +// Int8[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Int64Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/map.rs b/src/arrow_parquet/arrow_to_pg/map.rs new file mode 100644 index 0000000..ff01b61 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/map.rs @@ -0,0 +1,79 @@ +use arrow::array::{Array, MapArray, StructArray}; +use pgrx::{prelude::PgHeapTuple, AllocatedByRust, FromDatum, IntoDatum}; + +use crate::type_compat::map::CrunchyMap; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// crunchy_map.key__val_ +impl<'a> ArrowArrayToPgType> for CrunchyMap<'a> { + fn to_pg_type(arr: MapArray, context: &ArrowToPgAttributeContext) -> Option> { + if arr.is_null(0) { + None + } else { + let entries_array = arr.value(0); + + let entries = >> as ArrowArrayToPgType< + StructArray, + Vec>>, + >>::to_pg_type(entries_array, context); + + if let Some(entries) = entries { + let entries_datum = entries.into_datum(); + + if let Some(entries_datum) = entries_datum { + let entries = unsafe { + let is_null = false; + pgrx::Array::from_datum(entries_datum, is_null) + .expect("map entries should be an array") + }; + Some(CrunchyMap { entries }) + } else { + None + } + } else { + None + } + } + } +} + +// crunchy_map.key__val_[] +impl<'a> ArrowArrayToPgType>>> for Vec>> { + fn to_pg_type( + array: MapArray, + context: &ArrowToPgAttributeContext, + ) -> Option>>> { + let mut maps = vec![]; + + for entries_array in array.iter() { + if let Some(entries_array) = entries_array { + let entries = >> as ArrowArrayToPgType< + StructArray, + Vec>>, + >>::to_pg_type(entries_array, context); + + if let Some(entries) = entries { + let entries_datum = entries.into_datum(); + + if let Some(entries_datum) = entries_datum { + let entries = unsafe { + let is_null = false; + pgrx::Array::from_datum(entries_datum, is_null) + .expect("map entries should be an array") + }; + maps.push(Some(CrunchyMap { entries })) + } else { + maps.push(None); + } + } else { + maps.push(None); + } + } else { + maps.push(None); + } + } + + Some(maps) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/numeric.rs b/src/arrow_parquet/arrow_to_pg/numeric.rs new file mode 100644 index 0000000..0accf2e --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/numeric.rs @@ -0,0 +1,34 @@ +use arrow::array::{Array, Decimal128Array}; +use pgrx::AnyNumeric; + +use crate::type_compat::pg_arrow_type_conversions::i128_to_numeric; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Numeric +impl ArrowArrayToPgType for AnyNumeric { + fn to_pg_type(arr: Decimal128Array, context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let scale = context.scale.expect("Expected scale"); + Some(i128_to_numeric(arr.value(0), scale)) + } + } +} + +// Numeric[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: Decimal128Array, + context: &ArrowToPgAttributeContext, + ) -> Option>> { + let scale = context.scale.expect("Expected scale"); + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(|v| i128_to_numeric(v, scale)); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/oid.rs b/src/arrow_parquet/arrow_to_pg/oid.rs new file mode 100644 index 0000000..c3079bd --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/oid.rs @@ -0,0 +1,31 @@ +use arrow::array::{Array, UInt32Array}; +use pgrx::pg_sys::Oid; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Oid +impl ArrowArrayToPgType for Oid { + fn to_pg_type(arr: UInt32Array, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val.into()) + } + } +} + +// Oid[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: UInt32Array, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(|val| val.into()); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/text.rs b/src/arrow_parquet/arrow_to_pg/text.rs new file mode 100644 index 0000000..f534cb2 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/text.rs @@ -0,0 +1,30 @@ +use arrow::array::{Array, StringArray}; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Text +impl ArrowArrayToPgType for String { + fn to_pg_type(arr: StringArray, _context: &ArrowToPgAttributeContext) -> Option { + if arr.is_null(0) { + None + } else { + let val = arr.value(0); + Some(val.to_string()) + } + } +} + +// Text[] +impl ArrowArrayToPgType>> for Vec> { + fn to_pg_type( + arr: StringArray, + _context: &ArrowToPgAttributeContext, + ) -> Option>> { + let mut vals = vec![]; + for val in arr.iter() { + let val = val.map(|val| val.to_string()); + vals.push(val); + } + Some(vals) + } +} diff --git a/src/arrow_parquet/arrow_to_pg/time.rs b/src/arrow_parquet/arrow_to_pg/time.rs new file mode 100644 index 0000000..5b206fc --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg/time.rs @@ -0,0 +1,35 @@ +use arrow::array::{Array, Time64MicrosecondArray}; +use pgrx::datum::Time; + +use crate::type_compat::pg_arrow_type_conversions::i64_to_time; + +use super::{ArrowArrayToPgType, ArrowToPgAttributeContext}; + +// Time +impl ArrowArrayToPgType for Time { + fn to_pg_type( + arr: Time64MicrosecondArray, + _context: &ArrowToPgAttributeContext, + ) -> Option