From 4cb8105095fa0757e2b0f44f7f44038bb240f55b Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:09:32 +0300 Subject: [PATCH] refactor pg to parquet (#21) --- .vscode/settings.json | 3 +- Cargo.toml | 1 + src/arrow_parquet/arrow_utils.rs | 29 +- src/arrow_parquet/parquet_writer.rs | 57 +- src/arrow_parquet/pg_to_arrow.rs | 498 +++++------------- src/arrow_parquet/pg_to_arrow/bool.rs | 20 +- src/arrow_parquet/pg_to_arrow/bytea.rs | 27 +- src/arrow_parquet/pg_to_arrow/char.rs | 38 +- src/arrow_parquet/pg_to_arrow/date.rs | 38 +- .../pg_to_arrow/fallback_to_text.rs | 38 +- src/arrow_parquet/pg_to_arrow/float4.rs | 20 +- src/arrow_parquet/pg_to_arrow/float8.rs | 21 +- src/arrow_parquet/pg_to_arrow/geometry.rs | 26 +- src/arrow_parquet/pg_to_arrow/int2.rs | 21 +- src/arrow_parquet/pg_to_arrow/int4.rs | 21 +- src/arrow_parquet/pg_to_arrow/int8.rs | 21 +- src/arrow_parquet/pg_to_arrow/map.rs | 103 ++-- src/arrow_parquet/pg_to_arrow/numeric.rs | 34 +- src/arrow_parquet/pg_to_arrow/oid.rs | 37 +- src/arrow_parquet/pg_to_arrow/record.rs | 98 ++-- src/arrow_parquet/pg_to_arrow/text.rs | 21 +- src/arrow_parquet/pg_to_arrow/time.rs | 38 +- src/arrow_parquet/pg_to_arrow/timestamp.rs | 38 +- src/arrow_parquet/pg_to_arrow/timestamptz.rs | 39 +- src/arrow_parquet/pg_to_arrow/timetz.rs | 38 +- src/lib.rs | 158 ++++-- src/type_compat/pg_arrow_type_conversions.rs | 32 +- 27 files changed, 654 insertions(+), 861 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 6285f13..be4b716 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,5 +2,6 @@ "editor.formatOnSave": true, "editor.defaultFormatter": "rust-lang.rust-analyzer", "rust-analyzer.check.command": "clippy", - "rust-analyzer.checkOnSave": true + "rust-analyzer.checkOnSave": true, + "editor.inlayHints.enabled": "offUnlessPressed", } \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index ba6f061..4976dcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ pg_test = [] [dependencies] arrow = {version = "53", default-features = false} +arrow-select = {version = "53", default-features = false} arrow-schema = {version = "53", default-features = false} futures = "0.3" object_store = {version = "0.11", default-features = false, features = ["aws"]} diff --git a/src/arrow_parquet/arrow_utils.rs b/src/arrow_parquet/arrow_utils.rs index 0ef0167..243c2bb 100644 --- a/src/arrow_parquet/arrow_utils.rs +++ b/src/arrow_parquet/arrow_utils.rs @@ -38,27 +38,22 @@ pub(crate) fn arrow_map_offsets(maps: &Vec>) -> (OffsetBuffer } pub(crate) fn arrow_array_offsets( - arrays: &Vec>>>, + pg_array: &Option>, ) -> (OffsetBuffer, NullBuffer) { - pgrx::pg_sys::check_for_interrupts!(); - let mut nulls = vec![]; let mut offsets = vec![0]; - let mut current_offset = 0; - for array in arrays { - if let Some(array) = array { - let len = array.len() as i32; - current_offset += len; - offsets.push(current_offset); - nulls.push(true); - } else { - offsets.push(current_offset); - nulls.push(false); - } - } - let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets)); - let nulls = NullBuffer::from(nulls); + if let Some(pg_array) = pg_array { + let len = pg_array.len() as i32; + offsets.push(len); + nulls.push(true); + } else { + offsets.push(0); + nulls.push(false); + }; + + let offsets = arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets)); + let nulls = arrow::buffer::NullBuffer::from(nulls); (offsets, nulls) } diff --git a/src/arrow_parquet/parquet_writer.rs b/src/arrow_parquet/parquet_writer.rs index e1b1562..2d7bd8f 100644 --- a/src/arrow_parquet/parquet_writer.rs +++ b/src/arrow_parquet/parquet_writer.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use arrow::{ - array::{ArrayRef, RecordBatch, StructArray}, - datatypes::FieldRef, -}; +use arrow::array::RecordBatch; use arrow_schema::SchemaRef; use parquet::{ arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter}, @@ -14,13 +11,15 @@ use tokio::runtime::Runtime; use crate::{ arrow_parquet::{ - codec::ParquetCodecOption, pg_to_arrow::collect_attribute_array_from_tuples, - schema_visitor::parse_arrow_schema_from_tupledesc, uri_utils::parquet_writer_from_uri, + codec::ParquetCodecOption, schema_visitor::parse_arrow_schema_from_tupledesc, + uri_utils::parquet_writer_from_uri, }, pgrx_utils::collect_valid_attributes, type_compat::{geometry::reset_postgis_context, map::reset_crunchy_map_context}, }; +use super::pg_to_arrow::to_arrow_array; + pub(crate) struct ParquetWriterContext<'a> { runtime: Runtime, parquet_writer: AsyncArrowWriter, @@ -67,17 +66,23 @@ impl<'a> ParquetWriterContext<'a> { &mut self, tuples: Vec>>, ) { - pgrx::pg_sys::check_for_interrupts!(); + let mut record_batches = vec![]; - // collect arrow arrays for each attribute in the tuples - let tuple_attribute_arrow_arrays = collect_arrow_attribute_arrays_from_tupledesc( - tuples, - self.tupledesc.clone(), - self.schema.clone(), - ); + for tuple in tuples { + pgrx::pg_sys::check_for_interrupts!(); - let struct_array = StructArray::from(tuple_attribute_arrow_arrays); - let record_batch = RecordBatch::from(struct_array); + if let Some(tuple) = tuple { + let record_batch = + pg_tuple_to_record_batch(tuple, &self.tupledesc, self.schema.clone()); + + record_batches.push(record_batch); + } else { + let null_record_batch = RecordBatch::new_empty(self.schema.clone()); + record_batches.push(null_record_batch); + } + } + + let record_batch = arrow::compute::concat_batches(&self.schema, &record_batches).unwrap(); let parquet_writer = &mut self.parquet_writer; @@ -92,15 +97,15 @@ impl<'a> ParquetWriterContext<'a> { } } -fn collect_arrow_attribute_arrays_from_tupledesc( - tuples: Vec>>, - tupledesc: PgTupleDesc, +fn pg_tuple_to_record_batch( + tuple: PgHeapTuple, + tupledesc: &PgTupleDesc, schema: SchemaRef, -) -> Vec<(FieldRef, ArrayRef)> { +) -> RecordBatch { let include_generated_columns = true; - let attributes = collect_valid_attributes(&tupledesc, include_generated_columns); + let attributes = collect_valid_attributes(tupledesc, include_generated_columns); - let mut tuple_attribute_arrow_arrays = vec![]; + let mut attribute_arrow_arrays = vec![]; for attribute in attributes { let attribute_name = attribute.name(); @@ -110,18 +115,16 @@ fn collect_arrow_attribute_arrays_from_tupledesc( .field_with_name(attribute_name) .expect("Expected attribute field"); - let (field, array) = collect_attribute_array_from_tuples( - &tuples, + let (_field, array) = to_arrow_array( + &tuple, attribute_name, attribute_typoid, attribute_typmod, Arc::new(attribute_field.clone()), ); - let tuple_attribute_arrow_array = (field, array); - - tuple_attribute_arrow_arrays.push(tuple_attribute_arrow_array); + attribute_arrow_arrays.push(array); } - tuple_attribute_arrow_arrays + RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch") } diff --git a/src/arrow_parquet/pg_to_arrow.rs b/src/arrow_parquet/pg_to_arrow.rs index eb59f92..bd14183 100644 --- a/src/arrow_parquet/pg_to_arrow.rs +++ b/src/arrow_parquet/pg_to_arrow.rs @@ -47,6 +47,7 @@ pub(crate) trait PgTypeToArrowArray { fn to_arrow_array(self, context: PgToArrowPerAttributeContext) -> (FieldRef, ArrayRef); } +#[derive(Clone)] pub(crate) struct PgToArrowPerAttributeContext<'a> { name: &'a str, field: FieldRef, @@ -79,9 +80,9 @@ impl<'a> PgToArrowPerAttributeContext<'a> { } } -pub(crate) fn collect_attribute_array_from_tuples<'tup>( - tuples: &'tup [Option>], - attribute_name: &'tup str, +pub(crate) fn to_arrow_array( + tuple: &PgHeapTuple, + attribute_name: &str, attribute_typoid: Oid, attribute_typmod: i32, attribute_field: FieldRef, @@ -96,7 +97,7 @@ pub(crate) fn collect_attribute_array_from_tuples<'tup>( attribute_field, ); - collect_array_attribute_array_from_tuples(tuples, attribute_context) + to_arrow_list_array(tuple, attribute_context) } else { let attribute_context = PgToArrowPerAttributeContext::new( attribute_name, @@ -105,338 +106,203 @@ pub(crate) fn collect_attribute_array_from_tuples<'tup>( attribute_field, ); - collect_nonarray_attribute_array_from_tuples(tuples, attribute_context) + to_arrow_primitive_array(tuple, attribute_context) } } -fn collect_nonarray_attribute_array_from_tuples<'tup>( - tuples: &'tup [Option>], - attribute_context: PgToArrowPerAttributeContext<'tup>, +fn to_arrow_primitive_array( + tuple: &PgHeapTuple, + attribute_context: PgToArrowPerAttributeContext, ) -> (FieldRef, ArrayRef) { match attribute_context.typoid { FLOAT4OID => { - let mut attribute_values: Vec> = Vec::new(); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); - - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } FLOAT8OID => { - let mut attribute_values: Vec> = Vec::new(); - - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } INT2OID => { - let mut attribute_values: Vec> = Vec::new(); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); - - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } INT4OID => { - let mut attribute_values: Vec> = Vec::new(); - - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } INT8OID => { - let mut attribute_values: Vec> = Vec::new(); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); - - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } NUMERICOID => { let precision = extract_precision_from_numeric_typmod(attribute_context.typmod); + if precision > MAX_DECIMAL_PRECISION { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); - let mut attribute_values: Vec> = Vec::new(); - - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option = + tuple.get_by_name(attribute_context.name).unwrap(); - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } else { - let mut attribute_values: Vec> = Vec::new(); + let scale = extract_scale_from_numeric_typmod(attribute_context.typmod); + + let attribute_context = attribute_context + .with_scale(scale) + .with_precision(precision); - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option = + tuple.get_by_name(attribute_context.name).unwrap(); - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } } BOOLOID => { - let mut attribute_values: Vec> = Vec::new(); - - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } DATEOID => { - let mut attribute_values: Vec> = Vec::new(); + let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); - - let (field, array) = attribute_values.to_arrow_array(attribute_context); + let (field, array) = attribute_val.to_arrow_array(attribute_context); (field, array) } TIMEOID => { - let mut attribute_values: Vec> = Vec::new(); - - collect_attribute_array_from_tuples_helper( - tuples, - &attribute_context, - &mut attribute_values, - ); + let attribute_val: Option