Skip to content

Commit

Permalink
optimize attribute context: create once for each group
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Sep 11, 2024
1 parent 6a85929 commit a614c8f
Show file tree
Hide file tree
Showing 25 changed files with 532 additions and 788 deletions.
16 changes: 1 addition & 15 deletions src/arrow_parquet/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,7 @@
use std::{ops::Deref, sync::Arc};

use arrow::{
buffer::{NullBuffer, OffsetBuffer, ScalarBuffer},
datatypes::{Field, FieldRef},
};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};

use crate::type_compat::map::CrunchyMap;

pub(crate) fn to_not_nullable_field(field: FieldRef) -> FieldRef {
let name = field.deref().name();
let data_type = field.deref().data_type();
let metadata = field.deref().metadata().clone();

let field = Field::new(name, data_type.clone(), false).with_metadata(metadata);
Arc::new(field)
}

pub(crate) fn arrow_map_offsets(maps: &Vec<Option<CrunchyMap>>) -> (OffsetBuffer<i32>, NullBuffer) {
let mut offsets = vec![0];
let mut nulls = vec![];
Expand Down
67 changes: 25 additions & 42 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::{
codec::ParquetCodecOption, schema_visitor::parse_arrow_schema_from_tupledesc,
uri_utils::parquet_writer_from_uri,
},
pgrx_utils::collect_valid_attributes,
type_compat::{geometry::reset_postgis_context, map::reset_crunchy_map_context},
};

use super::pg_to_arrow::to_arrow_array;
use super::pg_to_arrow::{collect_attribute_contexts, to_arrow_array, PgToArrowAttributeContext};

pub(crate) struct ParquetWriterContext<'a> {
runtime: Runtime,
Expand Down Expand Up @@ -66,20 +65,20 @@ impl<'a> ParquetWriterContext<'a> {
&mut self,
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
) {
let attribute_contexts = collect_attribute_contexts(&self.tupledesc, &self.schema.fields);

let mut record_batches = vec![];

for tuple in tuples {
pgrx::pg_sys::check_for_interrupts!();

if let Some(tuple) = tuple {
let record_batch =
pg_tuple_to_record_batch(tuple, &self.tupledesc, self.schema.clone());

record_batches.push(record_batch);
let record_batch = if let Some(tuple) = tuple {
Self::pg_tuple_to_record_batch(tuple, &attribute_contexts, self.schema.clone())
} else {
let null_record_batch = RecordBatch::new_empty(self.schema.clone());
record_batches.push(null_record_batch);
}
RecordBatch::new_empty(self.schema.clone())
};

record_batches.push(record_batch);
}

let record_batch = arrow::compute::concat_batches(&self.schema, &record_batches).unwrap();
Expand All @@ -92,39 +91,23 @@ impl<'a> ParquetWriterContext<'a> {
self.runtime.block_on(parquet_writer.flush()).unwrap();
}

pub(crate) fn close(self) {
self.runtime.block_on(self.parquet_writer.close()).unwrap();
}
}
fn pg_tuple_to_record_batch(
tuple: PgHeapTuple<AllocatedByRust>,
attribute_contexts: &Vec<PgToArrowAttributeContext>,
schema: SchemaRef,
) -> RecordBatch {
let mut attribute_arrow_arrays = vec![];

fn pg_tuple_to_record_batch(
tuple: PgHeapTuple<AllocatedByRust>,
tupledesc: &PgTupleDesc,
schema: SchemaRef,
) -> RecordBatch {
let include_generated_columns = true;
let attributes = collect_valid_attributes(tupledesc, include_generated_columns);

let mut attribute_arrow_arrays = vec![];

for attribute in attributes {
let attribute_name = attribute.name();
let attribute_typoid = attribute.type_oid().value();
let attribute_typmod = attribute.type_mod();
let attribute_field = schema
.field_with_name(attribute_name)
.expect("Expected attribute field");

let (_field, array) = to_arrow_array(
&tuple,
attribute_name,
attribute_typoid,
attribute_typmod,
Arc::new(attribute_field.clone()),
);

attribute_arrow_arrays.push(array);
for attribute_context in attribute_contexts {
let array = to_arrow_array(&tuple, attribute_context);

attribute_arrow_arrays.push(array);
}

RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch")
}

RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch")
pub(crate) fn close(self) {
self.runtime.block_on(self.parquet_writer.close()).unwrap();
}
}
Loading

0 comments on commit a614c8f

Please sign in to comment.