diff --git a/src/arrow_parquet.rs b/src/arrow_parquet.rs index 460de91..e30f1fc 100644 --- a/src/arrow_parquet.rs +++ b/src/arrow_parquet.rs @@ -1,3 +1,4 @@ +pub(crate) mod arrow_to_pg; pub(crate) mod arrow_utils; pub(crate) mod codec; pub(crate) mod parquet_writer; diff --git a/src/arrow_parquet/arrow_to_pg.rs b/src/arrow_parquet/arrow_to_pg.rs new file mode 100644 index 0000000..608c248 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg.rs @@ -0,0 +1,488 @@ +use arrow::array::{ + Array, ArrayData, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, ListArray, MapArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, UInt32Array, +}; +use pgrx::{ + datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, + pg_sys::{ + Datum, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, + INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID, + }, + prelude::PgHeapTuple, + AllocatedByRust, AnyNumeric, IntoDatum, PgTupleDesc, +}; + +use crate::{ + pgrx_utils::{array_element_typoid, is_array_type, is_composite_type, tuple_desc}, + type_compat::{ + fallback_to_text::{reset_fallback_to_text_context, FallbackToText}, + geometry::{is_postgis_geometry_typoid, Geometry}, + map::{is_crunchy_map_typoid, CrunchyMap}, + pg_arrow_type_conversions::{ + extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, + MAX_DECIMAL_PRECISION, + }, + }, +}; + +pub(crate) mod bool; +pub(crate) mod bytea; +pub(crate) mod char; +pub(crate) mod date; +pub(crate) mod fallback_to_text; +pub(crate) mod float4; +pub(crate) mod float8; +pub(crate) mod geometry; +pub(crate) mod int2; +pub(crate) mod int4; +pub(crate) mod int8; +pub(crate) mod map; +pub(crate) mod numeric; +pub(crate) mod oid; +pub(crate) mod record; +pub(crate) mod text; +pub(crate) mod time; +pub(crate) mod timestamp; +pub(crate) mod timestamptz; +pub(crate) mod timetz; + +pub(crate) trait ArrowArrayToPgType<'a, A: From, T: 'a + IntoDatum> { + fn to_pg_type(array: A, context: ArrowToPgPerAttributeContext<'a>) -> Option; +} + +#[derive(Clone)] +pub(crate) struct ArrowToPgPerAttributeContext<'a> { + typoid: Oid, + typmod: i32, + tupledesc: Option>, + precision: Option, + scale: Option, +} + +impl<'a> ArrowToPgPerAttributeContext<'a> { + pub(crate) fn new(typoid: Oid, typmod: i32) -> Self { + Self { + typoid, + typmod, + tupledesc: None, + precision: None, + scale: None, + } + } + + pub(crate) fn with_tupledesc(mut self, tupledesc: PgTupleDesc<'a>) -> Self { + self.tupledesc = Some(tupledesc); + self + } + + pub(crate) fn with_precision(mut self, precision: usize) -> Self { + self.precision = Some(precision); + self + } + + pub(crate) fn with_scale(mut self, scale: usize) -> Self { + self.scale = Some(scale); + self + } +} + +pub(crate) fn to_pg_datum( + attribute_array: ArrayData, + attribute_typoid: Oid, + attribute_typmod: i32, +) -> Option { + if is_array_type(attribute_typoid) { + let attribute_element_typoid = array_element_typoid(attribute_typoid); + + let attribute_context = + ArrowToPgPerAttributeContext::new(attribute_element_typoid, attribute_typmod); + + to_pg_array_datum(attribute_array.into(), attribute_context) + } else { + let attribute_context = + ArrowToPgPerAttributeContext::new(attribute_typoid, attribute_typmod); + + to_pg_nonarray_datum(attribute_array, attribute_context) + } +} + +fn to_pg_nonarray_datum( + primitive_array: ArrayData, + attribute_context: ArrowToPgPerAttributeContext, +) -> Option { + match attribute_context.typoid { + FLOAT4OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + FLOAT8OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT2OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT4OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT8OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + BOOLOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + CHAROID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + TEXTOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + BYTEAOID => { + let val = as ArrowArrayToPgType>>::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + OIDOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + NUMERICOID => to_pg_numeric_datum(primitive_array, attribute_context), + DATEOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + TIMEOID => { + let val =