From a614c8ff33584f764edc9769254e50bbab88da33 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Wed, 11 Sep 2024 18:11:40 +0300 Subject: [PATCH] optimize attribute context: create once for each group --- src/arrow_parquet/arrow_utils.rs | 16 +- src/arrow_parquet/parquet_writer.rs | 67 ++- src/arrow_parquet/pg_to_arrow.rs | 413 ++++++++++-------- src/arrow_parquet/pg_to_arrow/bool.rs | 37 +- src/arrow_parquet/pg_to_arrow/bytea.rs | 37 +- src/arrow_parquet/pg_to_arrow/char.rs | 39 +- src/arrow_parquet/pg_to_arrow/date.rs | 39 +- .../pg_to_arrow/fallback_to_text.rs | 39 +- src/arrow_parquet/pg_to_arrow/float4.rs | 37 +- src/arrow_parquet/pg_to_arrow/float8.rs | 37 +- src/arrow_parquet/pg_to_arrow/geometry.rs | 33 +- src/arrow_parquet/pg_to_arrow/int2.rs | 37 +- src/arrow_parquet/pg_to_arrow/int4.rs | 37 +- src/arrow_parquet/pg_to_arrow/int8.rs | 37 +- src/arrow_parquet/pg_to_arrow/map.rs | 49 +-- src/arrow_parquet/pg_to_arrow/numeric.rs | 47 +- src/arrow_parquet/pg_to_arrow/oid.rs | 33 +- src/arrow_parquet/pg_to_arrow/record.rs | 71 +-- src/arrow_parquet/pg_to_arrow/text.rs | 37 +- src/arrow_parquet/pg_to_arrow/time.rs | 39 +- src/arrow_parquet/pg_to_arrow/timestamp.rs | 39 +- src/arrow_parquet/pg_to_arrow/timestamptz.rs | 39 +- src/arrow_parquet/pg_to_arrow/timetz.rs | 39 +- src/arrow_parquet/schema_visitor.rs | 14 +- src/pgrx_utils.rs | 8 +- 25 files changed, 532 insertions(+), 788 deletions(-) diff --git a/src/arrow_parquet/arrow_utils.rs b/src/arrow_parquet/arrow_utils.rs index 243c2bb..2c0bc72 100644 --- a/src/arrow_parquet/arrow_utils.rs +++ b/src/arrow_parquet/arrow_utils.rs @@ -1,21 +1,7 @@ -use std::{ops::Deref, sync::Arc}; - -use arrow::{ - buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}, - datatypes::{Field, FieldRef}, -}; +use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use crate::type_compat::map::CrunchyMap; -pub(crate) fn to_not_nullable_field(field: FieldRef) -> FieldRef { - let name = field.deref().name(); - let data_type = field.deref().data_type(); - let metadata = field.deref().metadata().clone(); - - let field = Field::new(name, data_type.clone(), false).with_metadata(metadata); - Arc::new(field) -} - pub(crate) fn arrow_map_offsets(maps: &Vec>) -> (OffsetBuffer, NullBuffer) { let mut offsets = vec![0]; let mut nulls = vec![]; diff --git a/src/arrow_parquet/parquet_writer.rs b/src/arrow_parquet/parquet_writer.rs index 2d7bd8f..a008f43 100644 --- a/src/arrow_parquet/parquet_writer.rs +++ b/src/arrow_parquet/parquet_writer.rs @@ -14,11 +14,10 @@ use crate::{ codec::ParquetCodecOption, schema_visitor::parse_arrow_schema_from_tupledesc, uri_utils::parquet_writer_from_uri, }, - pgrx_utils::collect_valid_attributes, type_compat::{geometry::reset_postgis_context, map::reset_crunchy_map_context}, }; -use super::pg_to_arrow::to_arrow_array; +use super::pg_to_arrow::{collect_attribute_contexts, to_arrow_array, PgToArrowAttributeContext}; pub(crate) struct ParquetWriterContext<'a> { runtime: Runtime, @@ -66,20 +65,20 @@ impl<'a> ParquetWriterContext<'a> { &mut self, tuples: Vec>>, ) { + let attribute_contexts = collect_attribute_contexts(&self.tupledesc, &self.schema.fields); + let mut record_batches = vec![]; for tuple in tuples { pgrx::pg_sys::check_for_interrupts!(); - if let Some(tuple) = tuple { - let record_batch = - pg_tuple_to_record_batch(tuple, &self.tupledesc, self.schema.clone()); - - record_batches.push(record_batch); + let record_batch = if let Some(tuple) = tuple { + Self::pg_tuple_to_record_batch(tuple, &attribute_contexts, self.schema.clone()) } else { - let null_record_batch = RecordBatch::new_empty(self.schema.clone()); - record_batches.push(null_record_batch); - } + RecordBatch::new_empty(self.schema.clone()) + }; + + record_batches.push(record_batch); } let record_batch = arrow::compute::concat_batches(&self.schema, &record_batches).unwrap(); @@ -92,39 +91,23 @@ impl<'a> ParquetWriterContext<'a> { self.runtime.block_on(parquet_writer.flush()).unwrap(); } - pub(crate) fn close(self) { - self.runtime.block_on(self.parquet_writer.close()).unwrap(); - } -} + fn pg_tuple_to_record_batch( + tuple: PgHeapTuple, + attribute_contexts: &Vec, + schema: SchemaRef, + ) -> RecordBatch { + let mut attribute_arrow_arrays = vec![]; -fn pg_tuple_to_record_batch( - tuple: PgHeapTuple, - tupledesc: &PgTupleDesc, - schema: SchemaRef, -) -> RecordBatch { - let include_generated_columns = true; - let attributes = collect_valid_attributes(tupledesc, include_generated_columns); - - let mut attribute_arrow_arrays = vec![]; - - for attribute in attributes { - let attribute_name = attribute.name(); - let attribute_typoid = attribute.type_oid().value(); - let attribute_typmod = attribute.type_mod(); - let attribute_field = schema - .field_with_name(attribute_name) - .expect("Expected attribute field"); - - let (_field, array) = to_arrow_array( - &tuple, - attribute_name, - attribute_typoid, - attribute_typmod, - Arc::new(attribute_field.clone()), - ); - - attribute_arrow_arrays.push(array); + for attribute_context in attribute_contexts { + let array = to_arrow_array(&tuple, attribute_context); + + attribute_arrow_arrays.push(array); + } + + RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch") } - RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch") + pub(crate) fn close(self) { + self.runtime.block_on(self.parquet_writer.close()).unwrap(); + } } diff --git a/src/arrow_parquet/pg_to_arrow.rs b/src/arrow_parquet/pg_to_arrow.rs index bd14183..ec4f52e 100644 --- a/src/arrow_parquet/pg_to_arrow.rs +++ b/src/arrow_parquet/pg_to_arrow.rs @@ -1,4 +1,5 @@ use arrow::{array::ArrayRef, datatypes::FieldRef}; +use arrow_schema::Fields; use pgrx::{ datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone, UnboxDatum}, heap_tuple::PgHeapTuple, @@ -6,11 +7,14 @@ use pgrx::{ Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID, INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID, }, - AllocatedByRust, AnyNumeric, FromDatum, IntoDatum, + AllocatedByRust, AnyNumeric, FromDatum, IntoDatum, PgTupleDesc, }; use crate::{ - pgrx_utils::{array_element_typoid, is_array_type, is_composite_type}, + pgrx_utils::{ + array_element_typoid, collect_valid_attributes, domain_array_base_elem_typoid, + is_array_type, is_composite_type, tuple_desc, + }, type_compat::{ fallback_to_text::{reset_fallback_to_text_context, FallbackToText}, geometry::{is_postgis_geometry_typoid, Geometry}, @@ -44,106 +48,186 @@ pub(crate) mod timestamptz; pub(crate) mod timetz; pub(crate) trait PgTypeToArrowArray { - fn to_arrow_array(self, context: PgToArrowPerAttributeContext) -> (FieldRef, ArrayRef); + fn to_arrow_array(self, context: &PgToArrowAttributeContext) -> ArrayRef; } #[derive(Clone)] -pub(crate) struct PgToArrowPerAttributeContext<'a> { - name: &'a str, +pub(crate) struct PgToArrowAttributeContext { + name: String, field: FieldRef, typoid: Oid, typmod: i32, + is_array: bool, + is_composite: bool, + is_geometry: bool, + is_crunchy_map: bool, + attribute_contexts: Option>, scale: Option, precision: Option, } -impl<'a> PgToArrowPerAttributeContext<'a> { - fn new(name: &'a str, typoid: Oid, typmod: i32, field: FieldRef) -> Self { +impl PgToArrowAttributeContext { + fn new(name: String, typoid: Oid, typmod: i32, fields: Fields) -> Self { + let field = fields + .iter() + .find(|field| field.name() == &name) + .unwrap() + .clone(); + + let is_array = is_array_type(typoid); + let is_composite; + let is_geometry; + let is_crunchy_map; + let attribute_typoid; + let attribute_field; + + if is_array { + let element_typoid = array_element_typoid(typoid); + + is_composite = is_composite_type(element_typoid); + is_geometry = is_postgis_geometry_typoid(element_typoid); + is_crunchy_map = is_crunchy_map_typoid(element_typoid); + + if is_crunchy_map { + let entries_typoid = domain_array_base_elem_typoid(element_typoid); + attribute_typoid = entries_typoid; + } else { + attribute_typoid = element_typoid; + } + + attribute_field = match field.data_type() { + arrow::datatypes::DataType::List(field) => field.clone(), + _ => unreachable!(), + } + } else { + is_composite = is_composite_type(typoid); + is_geometry = is_postgis_geometry_typoid(typoid); + is_crunchy_map = is_crunchy_map_typoid(typoid); + + if is_crunchy_map { + let entries_typoid = domain_array_base_elem_typoid(typoid); + attribute_typoid = entries_typoid; + } else { + attribute_typoid = typoid; + } + + attribute_field = field.clone(); + } + + let attribute_tupledesc = if is_composite || is_crunchy_map { + Some(tuple_desc(attribute_typoid, typmod)) + } else { + None + }; + + let precision; + let scale; + if attribute_typoid == NUMERICOID { + precision = Some(extract_precision_from_numeric_typmod(typmod)); + scale = Some(extract_scale_from_numeric_typmod(typmod)); + } else { + precision = None; + scale = None; + } + + // for composite and crunchy_map types, recursively collect attribute contexts + let attribute_contexts = attribute_tupledesc.map(|attribute_tupledesc| { + let fields = match attribute_field.data_type() { + arrow::datatypes::DataType::Struct(fields) => fields.clone(), + arrow::datatypes::DataType::Map(struct_field, _) => { + match struct_field.data_type() { + arrow::datatypes::DataType::Struct(fields) => fields.clone(), + _ => unreachable!(), + } + } + _ => unreachable!(), + }; + + collect_attribute_contexts(&attribute_tupledesc, &fields) + }); + Self { name, - field, - typoid, + field: attribute_field, + typoid: attribute_typoid, typmod, - scale: None, - precision: None, + is_array, + is_composite, + is_geometry, + is_crunchy_map, + attribute_contexts, + scale, + precision, } } +} - fn with_scale(mut self, scale: usize) -> Self { - self.scale = Some(scale); - self - } +pub(crate) fn collect_attribute_contexts( + tupledesc: &PgTupleDesc, + fields: &Fields, +) -> Vec { + let include_generated_columns = true; + let attributes = collect_valid_attributes(tupledesc, include_generated_columns); + let mut attribute_contexts = vec![]; + + for attribute in attributes { + let attribute_name = attribute.name(); + let attribute_typoid = attribute.type_oid().value(); + let attribute_typmod = attribute.type_mod(); + + let attribute_context = PgToArrowAttributeContext::new( + attribute_name.to_string(), + attribute_typoid, + attribute_typmod, + fields.clone(), + ); - fn with_precision(mut self, precision: usize) -> Self { - self.precision = Some(precision); - self + attribute_contexts.push(attribute_context); } + + attribute_contexts } pub(crate) fn to_arrow_array( tuple: &PgHeapTuple, - attribute_name: &str, - attribute_typoid: Oid, - attribute_typmod: i32, - attribute_field: FieldRef, -) -> (FieldRef, ArrayRef) { - if is_array_type(attribute_typoid) { - let attribute_element_typoid = array_element_typoid(attribute_typoid); - - let attribute_context = PgToArrowPerAttributeContext::new( - attribute_name, - attribute_element_typoid, - attribute_typmod, - attribute_field, - ); - + attribute_context: &PgToArrowAttributeContext, +) -> ArrayRef { + if attribute_context.is_array { to_arrow_list_array(tuple, attribute_context) } else { - let attribute_context = PgToArrowPerAttributeContext::new( - attribute_name, - attribute_typoid, - attribute_typmod, - attribute_field, - ); - to_arrow_primitive_array(tuple, attribute_context) } } fn to_arrow_primitive_array( tuple: &PgHeapTuple, - attribute_context: PgToArrowPerAttributeContext, -) -> (FieldRef, ArrayRef) { + attribute_context: &PgToArrowAttributeContext, +) -> ArrayRef { match attribute_context.typoid { FLOAT4OID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } FLOAT8OID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } INT2OID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } INT4OID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } INT8OID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } NUMERICOID => { let precision = extract_precision_from_numeric_typmod(attribute_context.typmod); @@ -152,114 +236,92 @@ fn to_arrow_primitive_array( reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); let attribute_val: Option = - tuple.get_by_name(attribute_context.name).unwrap(); + tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } else { - let scale = extract_scale_from_numeric_typmod(attribute_context.typmod); - - let attribute_context = attribute_context - .with_scale(scale) - .with_precision(precision); - let attribute_val: Option = - tuple.get_by_name(attribute_context.name).unwrap(); + tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } } BOOLOID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } DATEOID => { - let attribute_val: Option = tuple.get_by_name(attribute_context.name).unwrap(); + let attribute_val: Option = tuple.get_by_name(&attribute_context.name).unwrap(); - let (field, array) = attribute_val.to_arrow_array(attribute_context); - (field, array) + attribute_val.to_arrow_array(attribute_context) } TIMEOID => { - let attribute_val: Option