Skip to content

Commit

Permalink
Numeric improvement and fix (#65)
Browse files Browse the repository at this point in the history
**Problem**
Previously, we were writing unbounded numerics, that does not specify precision and scale (i.e. `numeric`), as text since they can be too large to represent
as parquet decimal. Most of the time users ignore the precision for numeric columns, so they were written as text. That prevented pushing down some operators
on the numeric type by execution engines.

**Improvement**
We start to read/write unbounded numerics as numeric(38, 16) to parquet file. We throw a runtime error if an unbounded numeric value exceeds 22 digits before
the decimal or 16 digits after the decimal. For the ones that bump into the error, we give hint to change the column type to a numeric(p,s) with precision and
scale specified, to get rid of the error.

**Fix**
Arrow to pg conversions were not correct for some cases e.g. when there is no decimal point. These cases are fixed and covered by tests.
  • Loading branch information
aykut-bozkurt authored Nov 9, 2024
1 parent 631cb1a commit 451f347
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 133 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
| `crunchy_map`(5) | GROUP | MAP |

> [!WARNING]
> - (1) The `numeric` types with <= `38` precision is represented as `FIXED_LEN_BYTE_ARRAY(16)` with `DECIMAL(128)` logical type. The `numeric` types with > `38` precision is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (1) `numeric` type is written the smallest possible memory width to parquet file as follows:
> * `numeric(P <= 9, S)` is represented as `INT32` with `DECIMAL` logical type
> * `numeric(9 < P <= 18, S)` is represented as `INT64` with `DECIMAL` logical type
> * `numeric(18 < P <= 38, S)` is represented as `FIXED_LEN_BYTE_ARRAY(9-16)` with `DECIMAL` logical type
> * `numeric(38 < P, S)` is represented as `BYTE_ARRAY` with `STRING` logical type
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (16) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (22 integral digits before decimal point, 16 digits after decimal point).
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
Expand Down
16 changes: 8 additions & 8 deletions src/arrow_parquet/arrow_to_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use crate::{
geometry::{is_postgis_geometry_type, Geometry},
map::{is_map_type, Map},
pg_arrow_type_conversions::{
extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod,
MAX_DECIMAL_PRECISION,
extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text,
},
},
};
Expand Down Expand Up @@ -66,8 +65,8 @@ pub(crate) struct ArrowToPgAttributeContext {
is_map: bool,
attribute_contexts: Option<Vec<ArrowToPgAttributeContext>>,
attribute_tupledesc: Option<PgTupleDesc<'static>>,
precision: Option<usize>,
scale: Option<usize>,
precision: Option<u32>,
scale: Option<u32>,
}

impl ArrowToPgAttributeContext {
Expand Down Expand Up @@ -127,8 +126,9 @@ impl ArrowToPgAttributeContext {
let precision;
let scale;
if attribute_typoid == NUMERICOID {
precision = Some(extract_precision_from_numeric_typmod(typmod));
scale = Some(extract_scale_from_numeric_typmod(typmod));
let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod);
precision = Some(p);
scale = Some(s);
} else {
precision = None;
scale = None;
Expand Down Expand Up @@ -263,7 +263,7 @@ fn to_pg_nonarray_datum(
.precision
.expect("missing precision in context");

if precision > MAX_DECIMAL_PRECISION {
if should_write_numeric_as_text(precision) {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

to_pg_datum!(
Expand Down Expand Up @@ -415,7 +415,7 @@ fn to_pg_array_datum(
.precision
.expect("missing precision in context");

if precision > MAX_DECIMAL_PRECISION {
if should_write_numeric_as_text(precision) {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

to_pg_datum!(
Expand Down
13 changes: 11 additions & 2 deletions src/arrow_parquet/arrow_to_pg/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,28 @@ impl ArrowArrayToPgType<AnyNumeric> for Decimal128Array {
if self.is_null(0) {
None
} else {
let precision = context.precision.expect("Expected precision");
let scale = context.scale.expect("Expected scale");
Some(i128_to_numeric(self.value(0), scale))

Some(i128_to_numeric(
self.value(0),
precision,
scale,
context.typmod,
))
}
}
}

// Numeric[]
impl ArrowArrayToPgType<Vec<Option<AnyNumeric>>> for Decimal128Array {
fn to_pg_type(self, context: &ArrowToPgAttributeContext) -> Option<Vec<Option<AnyNumeric>>> {
let precision = context.precision.expect("Expected precision");
let scale = context.scale.expect("Expected scale");

let mut vals = vec![];
for val in self.iter() {
let val = val.map(|v| i128_to_numeric(v, scale));
let val = val.map(|v| i128_to_numeric(v, precision, scale, context.typmod));
vals.push(val);
}
Some(vals)
Expand Down
16 changes: 8 additions & 8 deletions src/arrow_parquet/pg_to_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use crate::{
geometry::{is_postgis_geometry_type, Geometry},
map::{is_map_type, Map},
pg_arrow_type_conversions::{
extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod,
MAX_DECIMAL_PRECISION,
extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text,
},
},
};
Expand Down Expand Up @@ -65,8 +64,8 @@ pub(crate) struct PgToArrowAttributeContext {
is_geometry: bool,
is_map: bool,
attribute_contexts: Option<Vec<PgToArrowAttributeContext>>,
scale: Option<usize>,
precision: Option<usize>,
scale: Option<u32>,
precision: Option<u32>,
}

impl PgToArrowAttributeContext {
Expand Down Expand Up @@ -126,8 +125,9 @@ impl PgToArrowAttributeContext {
let precision;
let scale;
if attribute_typoid == NUMERICOID {
precision = Some(extract_precision_from_numeric_typmod(typmod));
scale = Some(extract_scale_from_numeric_typmod(typmod));
let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod);
precision = Some(p);
scale = Some(s);
} else {
precision = None;
scale = None;
Expand Down Expand Up @@ -274,7 +274,7 @@ fn to_arrow_primitive_array(
.precision
.expect("missing precision in context");

if precision > MAX_DECIMAL_PRECISION {
if should_write_numeric_as_text(precision) {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

to_arrow_primitive_array!(FallbackToText, tuples, attribute_context)
Expand Down Expand Up @@ -359,7 +359,7 @@ fn to_arrow_list_array(
.precision
.expect("missing precision in context");

if precision > MAX_DECIMAL_PRECISION {
if should_write_numeric_as_text(precision) {
reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod);

to_arrow_list_array!(pgrx::Array<FallbackToText>, tuples, attribute_context)
Expand Down
10 changes: 8 additions & 2 deletions src/arrow_parquet/pg_to_arrow/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ impl PgTypeToArrowArray<AnyNumeric> for Vec<Option<AnyNumeric>> {

let numerics = self
.into_iter()
.map(|numeric| numeric.map(numeric_to_i128))
.map(|numeric| {
numeric
.map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name()))
})
.collect::<Vec<_>>();

let numeric_array = Decimal128Array::from(numerics)
Expand All @@ -43,7 +46,10 @@ impl PgTypeToArrowArray<AnyNumeric> for Vec<Option<Vec<Option<AnyNumeric>>>> {
.into_iter()
.flatten()
.flatten()
.map(|numeric| numeric.map(numeric_to_i128))
.map(|numeric| {
numeric
.map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name()))
})
.collect::<Vec<_>>();

let precision = context
Expand Down
8 changes: 3 additions & 5 deletions src/arrow_parquet/schema_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use crate::{
geometry::is_postgis_geometry_type,
map::is_map_type,
pg_arrow_type_conversions::{
extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod,
MAX_DECIMAL_PRECISION,
extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text,
},
},
};
Expand Down Expand Up @@ -213,10 +212,9 @@ fn parse_primitive_schema(
INT4OID => Field::new(elem_name, arrow::datatypes::DataType::Int32, true),
INT8OID => Field::new(elem_name, arrow::datatypes::DataType::Int64, true),
NUMERICOID => {
let precision = extract_precision_from_numeric_typmod(typmod);
let scale = extract_scale_from_numeric_typmod(typmod);
let (precision, scale) = extract_precision_and_scale_from_numeric_typmod(typmod);

if precision > MAX_DECIMAL_PRECISION {
if should_write_numeric_as_text(precision) {
Field::new(elem_name, arrow::datatypes::DataType::Utf8, true)
} else {
Field::new(
Expand Down
Loading

0 comments on commit 451f347

Please sign in to comment.