From 047ff2e51090556e2b358a3f521bcac453e5d5d8 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 13:29:13 +0300 Subject: [PATCH] convert arrow array to Postgres rows --- src/arrow_parquet.rs | 1 + src/arrow_parquet/arrow_to_pg.rs | 488 ++++++++++++++++++ src/arrow_parquet/arrow_to_pg/bool.rs | 30 ++ src/arrow_parquet/arrow_to_pg/bytea.rs | 33 ++ src/arrow_parquet/arrow_to_pg/char.rs | 34 ++ src/arrow_parquet/arrow_to_pg/date.rs | 33 ++ .../arrow_to_pg/fallback_to_text.rs | 38 ++ src/arrow_parquet/arrow_to_pg/float4.rs | 29 ++ src/arrow_parquet/arrow_to_pg/float8.rs | 29 ++ src/arrow_parquet/arrow_to_pg/geometry.rs | 38 ++ src/arrow_parquet/arrow_to_pg/int2.rs | 29 ++ src/arrow_parquet/arrow_to_pg/int4.rs | 29 ++ src/arrow_parquet/arrow_to_pg/int8.rs | 29 ++ src/arrow_parquet/arrow_to_pg/map.rs | 99 ++++ src/arrow_parquet/arrow_to_pg/numeric.rs | 37 ++ src/arrow_parquet/arrow_to_pg/oid.rs | 31 ++ src/arrow_parquet/arrow_to_pg/record.rs | 72 +++ src/arrow_parquet/arrow_to_pg/text.rs | 30 ++ src/arrow_parquet/arrow_to_pg/time.rs | 35 ++ src/arrow_parquet/arrow_to_pg/timestamp.rs | 37 ++ src/arrow_parquet/arrow_to_pg/timestamptz.rs | 39 ++ src/arrow_parquet/arrow_to_pg/timetz.rs | 37 ++ 22 files changed, 1257 insertions(+) create mode 100644 src/arrow_parquet/arrow_to_pg.rs create mode 100644 src/arrow_parquet/arrow_to_pg/bool.rs create mode 100644 src/arrow_parquet/arrow_to_pg/bytea.rs create mode 100644 src/arrow_parquet/arrow_to_pg/char.rs create mode 100644 src/arrow_parquet/arrow_to_pg/date.rs create mode 100644 src/arrow_parquet/arrow_to_pg/fallback_to_text.rs create mode 100644 src/arrow_parquet/arrow_to_pg/float4.rs create mode 100644 src/arrow_parquet/arrow_to_pg/float8.rs create mode 100644 src/arrow_parquet/arrow_to_pg/geometry.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int2.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int4.rs create mode 100644 src/arrow_parquet/arrow_to_pg/int8.rs create mode 100644 src/arrow_parquet/arrow_to_pg/map.rs create mode 100644 src/arrow_parquet/arrow_to_pg/numeric.rs create mode 100644 src/arrow_parquet/arrow_to_pg/oid.rs create mode 100644 src/arrow_parquet/arrow_to_pg/record.rs create mode 100644 src/arrow_parquet/arrow_to_pg/text.rs create mode 100644 src/arrow_parquet/arrow_to_pg/time.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timestamp.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timestamptz.rs create mode 100644 src/arrow_parquet/arrow_to_pg/timetz.rs diff --git a/src/arrow_parquet.rs b/src/arrow_parquet.rs index 460de91..e30f1fc 100644 --- a/src/arrow_parquet.rs +++ b/src/arrow_parquet.rs @@ -1,3 +1,4 @@ +pub(crate) mod arrow_to_pg; pub(crate) mod arrow_utils; pub(crate) mod codec; pub(crate) mod parquet_writer; diff --git a/src/arrow_parquet/arrow_to_pg.rs b/src/arrow_parquet/arrow_to_pg.rs new file mode 100644 index 0000000..608c248 --- /dev/null +++ b/src/arrow_parquet/arrow_to_pg.rs @@ -0,0 +1,488 @@ +use arrow::array::{ + Array, ArrayData, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, ListArray, MapArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, UInt32Array, +}; +use pgrx::{ + datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, + pg_sys::{ + Datum, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, + INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID, + }, + prelude::PgHeapTuple, + AllocatedByRust, AnyNumeric, IntoDatum, PgTupleDesc, +}; + +use crate::{ + pgrx_utils::{array_element_typoid, is_array_type, is_composite_type, tuple_desc}, + type_compat::{ + fallback_to_text::{reset_fallback_to_text_context, FallbackToText}, + geometry::{is_postgis_geometry_typoid, Geometry}, + map::{is_crunchy_map_typoid, CrunchyMap}, + pg_arrow_type_conversions::{ + extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, + MAX_DECIMAL_PRECISION, + }, + }, +}; + +pub(crate) mod bool; +pub(crate) mod bytea; +pub(crate) mod char; +pub(crate) mod date; +pub(crate) mod fallback_to_text; +pub(crate) mod float4; +pub(crate) mod float8; +pub(crate) mod geometry; +pub(crate) mod int2; +pub(crate) mod int4; +pub(crate) mod int8; +pub(crate) mod map; +pub(crate) mod numeric; +pub(crate) mod oid; +pub(crate) mod record; +pub(crate) mod text; +pub(crate) mod time; +pub(crate) mod timestamp; +pub(crate) mod timestamptz; +pub(crate) mod timetz; + +pub(crate) trait ArrowArrayToPgType<'a, A: From, T: 'a + IntoDatum> { + fn to_pg_type(array: A, context: ArrowToPgPerAttributeContext<'a>) -> Option; +} + +#[derive(Clone)] +pub(crate) struct ArrowToPgPerAttributeContext<'a> { + typoid: Oid, + typmod: i32, + tupledesc: Option>, + precision: Option, + scale: Option, +} + +impl<'a> ArrowToPgPerAttributeContext<'a> { + pub(crate) fn new(typoid: Oid, typmod: i32) -> Self { + Self { + typoid, + typmod, + tupledesc: None, + precision: None, + scale: None, + } + } + + pub(crate) fn with_tupledesc(mut self, tupledesc: PgTupleDesc<'a>) -> Self { + self.tupledesc = Some(tupledesc); + self + } + + pub(crate) fn with_precision(mut self, precision: usize) -> Self { + self.precision = Some(precision); + self + } + + pub(crate) fn with_scale(mut self, scale: usize) -> Self { + self.scale = Some(scale); + self + } +} + +pub(crate) fn to_pg_datum( + attribute_array: ArrayData, + attribute_typoid: Oid, + attribute_typmod: i32, +) -> Option { + if is_array_type(attribute_typoid) { + let attribute_element_typoid = array_element_typoid(attribute_typoid); + + let attribute_context = + ArrowToPgPerAttributeContext::new(attribute_element_typoid, attribute_typmod); + + to_pg_array_datum(attribute_array.into(), attribute_context) + } else { + let attribute_context = + ArrowToPgPerAttributeContext::new(attribute_typoid, attribute_typmod); + + to_pg_nonarray_datum(attribute_array, attribute_context) + } +} + +fn to_pg_nonarray_datum( + primitive_array: ArrayData, + attribute_context: ArrowToPgPerAttributeContext, +) -> Option { + match attribute_context.typoid { + FLOAT4OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + FLOAT8OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT2OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT4OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + INT8OID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + BOOLOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + CHAROID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + TEXTOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + BYTEAOID => { + let val = as ArrowArrayToPgType>>::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + OIDOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + NUMERICOID => to_pg_numeric_datum(primitive_array, attribute_context), + DATEOID => { + let val = >::to_pg_type( + primitive_array.into(), + attribute_context, + ); + val.into_datum() + } + TIMEOID => { + let val =