Skip to content

Commit

Permalink
convert Postgres rows to arrow array
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Oct 2, 2024
1 parent 81bc0d5 commit f168646
Show file tree
Hide file tree
Showing 23 changed files with 1,394 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/arrow_parquet.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub(crate) mod arrow_utils;
pub(crate) mod pg_to_arrow;
pub(crate) mod schema_visitor;
47 changes: 47 additions & 0 deletions src/arrow_parquet/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};

use crate::type_compat::map::CrunchyMap;

pub(crate) fn arrow_map_offsets(maps: &Vec<Option<CrunchyMap>>) -> (OffsetBuffer<i32>, NullBuffer) {
let mut offsets = vec![0];
let mut nulls = vec![];

for map in maps {
if let Some(map) = map {
let len = map.entries.len() as i32;
offsets.push(offsets.last().expect("failed to get last map offset") + len);
nulls.push(true);
} else {
offsets.push(*offsets.last().expect("failed to get last map offset"));
nulls.push(false);
}
}

let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
let nulls = NullBuffer::from(nulls);

(offsets, nulls)
}

pub(crate) fn arrow_array_offsets<T>(
pg_array: &Vec<Option<Vec<Option<T>>>>,
) -> (OffsetBuffer<i32>, NullBuffer) {
let mut nulls = vec![];
let mut offsets = vec![0];

for pg_array in pg_array {
if let Some(pg_array) = pg_array {
let len = pg_array.len() as i32;
offsets.push(offsets.last().expect("failed to get last array offset") + len);
nulls.push(true);
} else {
offsets.push(*offsets.last().expect("failed to get last array offset"));
nulls.push(false);
}
}

let offsets = arrow::buffer::OffsetBuffer::new(arrow::buffer::ScalarBuffer::from(offsets));
let nulls = arrow::buffer::NullBuffer::from(nulls);

(offsets, nulls)
}
Loading

0 comments on commit f168646

Please sign in to comment.