Skip to content

Commit

Permalink
bump pgrx to 0.12.3 (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt authored Sep 11, 2024
1 parent 3747377 commit b2d459d
Show file tree
Hide file tree
Showing 43 changed files with 1,132 additions and 971 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Install and configure pgrx
run: |
cargo install --locked cargo-pgrx@0.11.4
cargo install --locked cargo-pgrx@0.12.3
cargo pgrx init --pg16 $(which pg_config)
- name: Format and lint
Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]
crate-type = ["cdylib","lib"]

[[bin]]
name = "pgrx_embed_pg_parquet"
path = "./src/bin/pgrx_embed.rs"

[features]
default = ["pg16"]
Expand All @@ -26,13 +30,13 @@ parquet = {version = "53", default-features = false, features = [
"zstd",
"object_store",
]}
pgrx = "=0.11.4"
pgrx = "=0.12.3"
serde = {version = "1", default-features = false}
serde_json = "1"
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}

[dev-dependencies]
pgrx-tests = "=0.11.4"
pgrx-tests = "=0.12.3"

[profile.dev]
panic = "unwind"
Expand Down
192 changes: 65 additions & 127 deletions src/arrow_parquet/arrow_to_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use arrow::array::{
StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, UInt32Array,
};
use pgrx::{
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
pg_sys::{
Datum, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID, FLOAT8OID, INT2OID, INT4OID,
INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID, TIMESTAMPTZOID, TIMETZOID,
},
prelude::PgHeapTuple,
AllocatedByRust, AnyNumeric, Date, IntoDatum, PgTupleDesc, Time, TimeWithTimeZone, Timestamp,
TimestampWithTimeZone,
AllocatedByRust, AnyNumeric, IntoDatum, PgTupleDesc,
};

use crate::{
Expand Down Expand Up @@ -92,35 +92,22 @@ pub(crate) fn to_pg_datum(
attribute_typoid: Oid,
attribute_typmod: i32,
) -> Option<Datum> {
if is_composite_type(attribute_typoid) {
let attribute_tupledesc = tuple_desc(attribute_typoid, attribute_typmod);

let attribute_context =
ArrowToPgPerAttributeContext::new(attribute_typoid, attribute_typmod)
.with_tupledesc(attribute_tupledesc);

to_pg_composite_datum(attribute_array.into(), attribute_context)
} else if is_array_type(attribute_typoid) {
if is_array_type(attribute_typoid) {
let attribute_element_typoid = array_element_typoid(attribute_typoid);

let attribute_context =
ArrowToPgPerAttributeContext::new(attribute_element_typoid, attribute_typmod);

to_pg_array_datum(attribute_array.into(), attribute_context)
} else if is_crunchy_map_typoid(attribute_typoid) {
let attribute_context =
ArrowToPgPerAttributeContext::new(attribute_typoid, attribute_typmod);

to_pg_map_datum(attribute_array.into(), attribute_context)
} else {
let attribute_context =
ArrowToPgPerAttributeContext::new(attribute_typoid, attribute_typmod);

to_pg_primitive_datum(attribute_array, attribute_context)
to_pg_nonarray_datum(attribute_array, attribute_context)
}
}

fn to_pg_primitive_datum(
fn to_pg_nonarray_datum(
primitive_array: ArrayData,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
Expand Down Expand Up @@ -233,10 +220,41 @@ fn to_pg_primitive_datum(
val.into_datum()
}
_ => {
if is_postgis_geometry_typoid(attribute_context.typoid) {
to_pg_geometry_datum(primitive_array.into(), attribute_context)
if is_composite_type(attribute_context.typoid) {
let attribute_tupledesc =
tuple_desc(attribute_context.typoid, attribute_context.typmod);
let attribute_context = attribute_context.with_tupledesc(attribute_tupledesc);

let val = <PgHeapTuple<AllocatedByRust> as ArrowArrayToPgType<
StructArray,
PgHeapTuple<AllocatedByRust>,
>>::to_pg_type(primitive_array.into(), attribute_context);

val.into_datum()
} else if is_crunchy_map_typoid(attribute_context.typoid) {
let val = <CrunchyMap as ArrowArrayToPgType<MapArray, CrunchyMap>>::to_pg_type(
primitive_array.into(),
attribute_context,
);

val.into_datum()
} else if is_postgis_geometry_typoid(attribute_context.typoid) {
let val = <Geometry as ArrowArrayToPgType<BinaryArray, Geometry>>::to_pg_type(
primitive_array.into(),
attribute_context,
);

val.into_datum()
} else {
to_pg_fallback_to_text_datum(primitive_array.into(), attribute_context)
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

let val =
<FallbackToText as ArrowArrayToPgType<StringArray, FallbackToText>>::to_pg_type(
primitive_array.into(),
attribute_context,
);

val.into_datum()
}
}
}
Expand Down Expand Up @@ -372,120 +390,40 @@ fn to_pg_array_datum(
if is_composite_type(attribute_context.typoid) {
let attribute_tupledesc =
tuple_desc(attribute_context.typoid, attribute_context.typmod);
let attribute_context = attribute_context.with_tupledesc(attribute_tupledesc);

to_pg_composite_array_datum(
list_array.into(),
attribute_context.with_tupledesc(attribute_tupledesc),
)
} else if is_crunchy_map_typoid(attribute_context.typoid) {
to_pg_map_array_datum(list_array.into(), attribute_context)
} else if is_postgis_geometry_typoid(attribute_context.typoid) {
to_pg_geometry_array_datum(list_array.into(), attribute_context)
} else {
to_pg_fallback_to_text_array_datum(list_array.into(), attribute_context)
}
}
}
}

fn to_pg_composite_datum(
struct_array: StructArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <PgHeapTuple<AllocatedByRust> as ArrowArrayToPgType<
StructArray,
PgHeapTuple<AllocatedByRust>,
>>::to_pg_type(struct_array, attribute_context);

val.into_datum()
}
let val = <Vec<Option<PgHeapTuple<AllocatedByRust>>> as ArrowArrayToPgType<
StructArray,
Vec<Option<PgHeapTuple<AllocatedByRust>>>,
>>::to_pg_type(list_array.into(), attribute_context);

fn to_pg_composite_array_datum(
struct_array: StructArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <Vec<Option<PgHeapTuple<AllocatedByRust>>> as ArrowArrayToPgType<
StructArray,
Vec<Option<PgHeapTuple<AllocatedByRust>>>,
>>::to_pg_type(struct_array, attribute_context);

val.into_datum()
}

fn to_pg_map_datum(
map_array: MapArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <CrunchyMap as ArrowArrayToPgType<MapArray, CrunchyMap>>::to_pg_type(
map_array,
attribute_context,
);

val.into_datum()
}

fn to_pg_map_array_datum(
map_array: MapArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <Vec<Option<CrunchyMap>> as ArrowArrayToPgType<MapArray, Vec<Option<CrunchyMap>>>>::to_pg_type(
map_array,
attribute_context,
);

val.into_datum()
}

fn to_pg_geometry_datum(
geometry_array: BinaryArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <Geometry as ArrowArrayToPgType<BinaryArray, Geometry>>::to_pg_type(
geometry_array,
attribute_context,
);

val.into_datum()
}
val.into_datum()
} else if is_crunchy_map_typoid(attribute_context.typoid) {
let val = <Vec<Option<CrunchyMap>> as ArrowArrayToPgType<
MapArray,
Vec<Option<CrunchyMap>>,
>>::to_pg_type(list_array.into(), attribute_context);

fn to_pg_geometry_array_datum(
geometry_array: BinaryArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
let val = <Vec<Option<Geometry>> as ArrowArrayToPgType<
val.into_datum()
} else if is_postgis_geometry_typoid(attribute_context.typoid) {
let val = <Vec<Option<Geometry>> as ArrowArrayToPgType<
BinaryArray,
Vec<Option<Geometry>>,
>>::to_pg_type(geometry_array, attribute_context);

val.into_datum()
}

fn to_pg_fallback_to_text_datum(
text_array: StringArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);
>>::to_pg_type(list_array.into(), attribute_context);

let val = <FallbackToText as ArrowArrayToPgType<StringArray, FallbackToText>>::to_pg_type(
text_array,
attribute_context,
);

val.into_datum()
}

fn to_pg_fallback_to_text_array_datum(
text_array: StringArray,
attribute_context: ArrowToPgPerAttributeContext,
) -> Option<Datum> {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);
val.into_datum()
} else {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

let val = <Vec<Option<FallbackToText>> as ArrowArrayToPgType<
StringArray,
Vec<Option<FallbackToText>>,
>>::to_pg_type(text_array, attribute_context);
let val = <Vec<Option<FallbackToText>> as ArrowArrayToPgType<
StringArray,
Vec<Option<FallbackToText>>,
>>::to_pg_type(list_array.into(), attribute_context);

val.into_datum()
val.into_datum()
}
}
}
}

fn to_pg_numeric_datum(
Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg/date.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, Date32Array};
use pgrx::Date;
use pgrx::datum::Date;

use crate::type_compat::pg_arrow_type_conversions::i32_to_date;

Expand Down
25 changes: 18 additions & 7 deletions src/arrow_parquet/arrow_to_pg/map.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, MapArray, StructArray};
use pgrx::{prelude::PgHeapTuple, AllocatedByRust};
use pgrx::{prelude::PgHeapTuple, AllocatedByRust, FromDatum, IntoDatum};

use crate::{
pgrx_utils::{domain_array_base_elem_typoid, tuple_desc},
Expand Down Expand Up @@ -32,9 +32,14 @@ impl<'b, 'a: 'b> ArrowArrayToPgType<'b, MapArray, CrunchyMap<'a>> for CrunchyMap
>>::to_pg_type(entries_array, entries_context);

if let Some(entries) = entries {
// entries cannot be null if the map is not null (arrow does not allow it)
let entries = entries.into_iter().flatten().collect();
Some(CrunchyMap { entries })
let entries_datum = entries.into_datum();

if let Some(entries_datum) = entries_datum {
let entries = unsafe { pgrx::Array::from_datum(entries_datum, false).unwrap() };
Some(CrunchyMap { entries })
} else {
None
}
} else {
None
}
Expand Down Expand Up @@ -67,9 +72,15 @@ impl<'b, 'a: 'b> ArrowArrayToPgType<'b, MapArray, Vec<Option<CrunchyMap<'a>>>>
>>::to_pg_type(entries_array, entries_context.clone());

if let Some(entries) = entries {
// entries cannot be null if the map is not null (arrow does not allow it)
let entries = entries.into_iter().flatten().collect();
maps.push(Some(CrunchyMap { entries }));
let entries_datum = entries.into_datum();

if let Some(entries_datum) = entries_datum {
let entries =
unsafe { pgrx::Array::from_datum(entries_datum, false).unwrap() };
maps.push(Some(CrunchyMap { entries }))
} else {
maps.push(None);
}
} else {
maps.push(None);
}
Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg/time.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, Time64MicrosecondArray};
use pgrx::Time;
use pgrx::datum::Time;

use crate::type_compat::pg_arrow_type_conversions::i64_to_time;

Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, TimestampMicrosecondArray};
use pgrx::Timestamp;
use pgrx::datum::Timestamp;

use crate::type_compat::pg_arrow_type_conversions::i64_to_timestamp;

Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg/timestamptz.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, TimestampMicrosecondArray};
use pgrx::TimestampWithTimeZone;
use pgrx::datum::TimestampWithTimeZone;

use crate::type_compat::pg_arrow_type_conversions::i64_to_timestamptz;

Expand Down
2 changes: 1 addition & 1 deletion src/arrow_parquet/arrow_to_pg/timetz.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{Array, Time64MicrosecondArray};
use pgrx::TimeWithTimeZone;
use pgrx::datum::TimeWithTimeZone;

use crate::type_compat::pg_arrow_type_conversions::i64_to_timetz;

Expand Down
8 changes: 2 additions & 6 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ fn collect_arrow_attribute_arrays_from_tupledesc(

let mut tuple_attribute_arrow_arrays = vec![];

let mut tuples = tuples;

for attribute in attributes {
let attribute_name = attribute.name();
let attribute_typoid = attribute.type_oid().value();
Expand All @@ -112,16 +110,14 @@ fn collect_arrow_attribute_arrays_from_tupledesc(
.field_with_name(attribute_name)
.expect("Expected attribute field");

let (field, array, tups) = collect_attribute_array_from_tuples(
tuples,
tupledesc.clone(),
let (field, array) = collect_attribute_array_from_tuples(
&tuples,
attribute_name,
attribute_typoid,
attribute_typmod,
Arc::new(attribute_field.clone()),
);

tuples = tups;
let tuple_attribute_arrow_array = (field, array);

tuple_attribute_arrow_arrays.push(tuple_attribute_arrow_array);
Expand Down
Loading

0 comments on commit b2d459d

Please sign in to comment.