Skip to content

Commit

Permalink
refactor pg to parquet (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt authored Sep 11, 2024
1 parent b2d459d commit 4cb8105
Show file tree
Hide file tree
Showing 27 changed files with 654 additions and 861 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"editor.formatOnSave": true,
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"rust-analyzer.check.command": "clippy",
"rust-analyzer.checkOnSave": true
"rust-analyzer.checkOnSave": true,
"editor.inlayHints.enabled": "offUnlessPressed",
}
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pg_test = []

[dependencies]
arrow = {version = "53", default-features = false}
arrow-select = {version = "53", default-features = false}
arrow-schema = {version = "53", default-features = false}
futures = "0.3"
object_store = {version = "0.11", default-features = false, features = ["aws"]}
Expand Down
29 changes: 12 additions & 17 deletions src/arrow_parquet/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,22 @@ pub(crate) fn arrow_map_offsets(maps: &Vec<Option<CrunchyMap>>) -> (OffsetBuffer
}

pub(crate) fn arrow_array_offsets<T>(
arrays: &Vec<Option<Vec<Option<T>>>>,
pg_array: &Option<pgrx::Array<T>>,
) -> (OffsetBuffer<i32>, NullBuffer) {
pgrx::pg_sys::check_for_interrupts!();

let mut nulls = vec![];
let mut offsets = vec![0];
let mut current_offset = 0;
for array in arrays {
if let Some(array) = array {
let len = array.len() as i32;
current_offset += len;
offsets.push(current_offset);
nulls.push(true);
} else {
offsets.push(current_offset);
nulls.push(false);
}
}

let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
let nulls = NullBuffer::from(nulls);
if let Some(pg_array) = pg_array {
let len = pg_array.len() as i32;
offsets.push(len);
nulls.push(true);
} else {
offsets.push(0);
nulls.push(false);
};

let offsets = arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets));
let nulls = arrow::buffer::NullBuffer::from(nulls);

(offsets, nulls)
}
57 changes: 30 additions & 27 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::sync::Arc;

use arrow::{
array::{ArrayRef, RecordBatch, StructArray},
datatypes::FieldRef,
};
use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
Expand All @@ -14,13 +11,15 @@ use tokio::runtime::Runtime;

use crate::{
arrow_parquet::{
codec::ParquetCodecOption, pg_to_arrow::collect_attribute_array_from_tuples,
schema_visitor::parse_arrow_schema_from_tupledesc, uri_utils::parquet_writer_from_uri,
codec::ParquetCodecOption, schema_visitor::parse_arrow_schema_from_tupledesc,
uri_utils::parquet_writer_from_uri,
},
pgrx_utils::collect_valid_attributes,
type_compat::{geometry::reset_postgis_context, map::reset_crunchy_map_context},
};

use super::pg_to_arrow::to_arrow_array;

pub(crate) struct ParquetWriterContext<'a> {
runtime: Runtime,
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
Expand Down Expand Up @@ -67,17 +66,23 @@ impl<'a> ParquetWriterContext<'a> {
&mut self,
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
) {
pgrx::pg_sys::check_for_interrupts!();
let mut record_batches = vec![];

// collect arrow arrays for each attribute in the tuples
let tuple_attribute_arrow_arrays = collect_arrow_attribute_arrays_from_tupledesc(
tuples,
self.tupledesc.clone(),
self.schema.clone(),
);
for tuple in tuples {
pgrx::pg_sys::check_for_interrupts!();

let struct_array = StructArray::from(tuple_attribute_arrow_arrays);
let record_batch = RecordBatch::from(struct_array);
if let Some(tuple) = tuple {
let record_batch =
pg_tuple_to_record_batch(tuple, &self.tupledesc, self.schema.clone());

record_batches.push(record_batch);
} else {
let null_record_batch = RecordBatch::new_empty(self.schema.clone());
record_batches.push(null_record_batch);
}
}

let record_batch = arrow::compute::concat_batches(&self.schema, &record_batches).unwrap();

let parquet_writer = &mut self.parquet_writer;

Expand All @@ -92,15 +97,15 @@ impl<'a> ParquetWriterContext<'a> {
}
}

fn collect_arrow_attribute_arrays_from_tupledesc(
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
tupledesc: PgTupleDesc,
fn pg_tuple_to_record_batch(
tuple: PgHeapTuple<AllocatedByRust>,
tupledesc: &PgTupleDesc,
schema: SchemaRef,
) -> Vec<(FieldRef, ArrayRef)> {
) -> RecordBatch {
let include_generated_columns = true;
let attributes = collect_valid_attributes(&tupledesc, include_generated_columns);
let attributes = collect_valid_attributes(tupledesc, include_generated_columns);

let mut tuple_attribute_arrow_arrays = vec![];
let mut attribute_arrow_arrays = vec![];

for attribute in attributes {
let attribute_name = attribute.name();
Expand All @@ -110,18 +115,16 @@ fn collect_arrow_attribute_arrays_from_tupledesc(
.field_with_name(attribute_name)
.expect("Expected attribute field");

let (field, array) = collect_attribute_array_from_tuples(
&tuples,
let (_field, array) = to_arrow_array(
&tuple,
attribute_name,
attribute_typoid,
attribute_typmod,
Arc::new(attribute_field.clone()),
);

let tuple_attribute_arrow_array = (field, array);

tuple_attribute_arrow_arrays.push(tuple_attribute_arrow_array);
attribute_arrow_arrays.push(array);
}

tuple_attribute_arrow_arrays
RecordBatch::try_new(schema, attribute_arrow_arrays).expect("Expected record batch")
}
Loading

0 comments on commit 4cb8105

Please sign in to comment.