From 451f3476b086444933f46ae285a6f5f8f3a79162 Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Sat, 9 Nov 2024 17:22:40 +0300 Subject: [PATCH] Numeric improvement and fix (#65) **Problem** Previously, we were writing unbounded numerics, that does not specify precision and scale (i.e. `numeric`), as text since they can be too large to represent as parquet decimal. Most of the time users ignore the precision for numeric columns, so they were written as text. That prevented pushing down some operators on the numeric type by execution engines. **Improvement** We start to read/write unbounded numerics as numeric(38, 16) to parquet file. We throw a runtime error if an unbounded numeric value exceeds 22 digits before the decimal or 16 digits after the decimal. For the ones that bump into the error, we give hint to change the column type to a numeric(p,s) with precision and scale specified, to get rid of the error. **Fix** Arrow to pg conversions were not correct for some cases e.g. when there is no decimal point. These cases are fixed and covered by tests. --- README.md | 7 +- src/arrow_parquet/arrow_to_pg.rs | 16 +- src/arrow_parquet/arrow_to_pg/numeric.rs | 13 +- src/arrow_parquet/pg_to_arrow.rs | 16 +- src/arrow_parquet/pg_to_arrow/numeric.rs | 10 +- src/arrow_parquet/schema_parser.rs | 8 +- src/lib.rs | 330 +++++++++++++++++- .../copy_to_dest_receiver.rs | 29 +- src/type_compat/pg_arrow_type_conversions.rs | 225 +++++++----- 9 files changed, 521 insertions(+), 133 deletions(-) diff --git a/README.md b/README.md index 7c679a1..66e1b7a 100644 --- a/README.md +++ b/README.md @@ -225,7 +225,12 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`: | `crunchy_map`(5) | GROUP | MAP | > [!WARNING] -> - (1) The `numeric` types with <= `38` precision is represented as `FIXED_LEN_BYTE_ARRAY(16)` with `DECIMAL(128)` logical type. The `numeric` types with > `38` precision is represented as `BYTE_ARRAY` with `STRING` logical type. +> - (1) `numeric` type is written the smallest possible memory width to parquet file as follows: +> * `numeric(P <= 9, S)` is represented as `INT32` with `DECIMAL` logical type +> * `numeric(9 < P <= 18, S)` is represented as `INT64` with `DECIMAL` logical type +> * `numeric(18 < P <= 38, S)` is represented as `FIXED_LEN_BYTE_ARRAY(9-16)` with `DECIMAL` logical type +> * `numeric(38 < P, S)` is represented as `BYTE_ARRAY` with `STRING` logical type +> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (16) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (22 integral digits before decimal point, 16 digits after decimal point). > - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files. > - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files. > - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. diff --git a/src/arrow_parquet/arrow_to_pg.rs b/src/arrow_parquet/arrow_to_pg.rs index 1908f3f..ec7c9ce 100644 --- a/src/arrow_parquet/arrow_to_pg.rs +++ b/src/arrow_parquet/arrow_to_pg.rs @@ -24,8 +24,7 @@ use crate::{ geometry::{is_postgis_geometry_type, Geometry}, map::{is_map_type, Map}, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -66,8 +65,8 @@ pub(crate) struct ArrowToPgAttributeContext { is_map: bool, attribute_contexts: Option>, attribute_tupledesc: Option>, - precision: Option, - scale: Option, + precision: Option, + scale: Option, } impl ArrowToPgAttributeContext { @@ -127,8 +126,9 @@ impl ArrowToPgAttributeContext { let precision; let scale; if attribute_typoid == NUMERICOID { - precision = Some(extract_precision_from_numeric_typmod(typmod)); - scale = Some(extract_scale_from_numeric_typmod(typmod)); + let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod); + precision = Some(p); + scale = Some(s); } else { precision = None; scale = None; @@ -263,7 +263,7 @@ fn to_pg_nonarray_datum( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_pg_datum!( @@ -415,7 +415,7 @@ fn to_pg_array_datum( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_pg_datum!( diff --git a/src/arrow_parquet/arrow_to_pg/numeric.rs b/src/arrow_parquet/arrow_to_pg/numeric.rs index d5380f4..dc09d60 100644 --- a/src/arrow_parquet/arrow_to_pg/numeric.rs +++ b/src/arrow_parquet/arrow_to_pg/numeric.rs @@ -11,8 +11,15 @@ impl ArrowArrayToPgType for Decimal128Array { if self.is_null(0) { None } else { + let precision = context.precision.expect("Expected precision"); let scale = context.scale.expect("Expected scale"); - Some(i128_to_numeric(self.value(0), scale)) + + Some(i128_to_numeric( + self.value(0), + precision, + scale, + context.typmod, + )) } } } @@ -20,10 +27,12 @@ impl ArrowArrayToPgType for Decimal128Array { // Numeric[] impl ArrowArrayToPgType>> for Decimal128Array { fn to_pg_type(self, context: &ArrowToPgAttributeContext) -> Option>> { + let precision = context.precision.expect("Expected precision"); let scale = context.scale.expect("Expected scale"); + let mut vals = vec![]; for val in self.iter() { - let val = val.map(|v| i128_to_numeric(v, scale)); + let val = val.map(|v| i128_to_numeric(v, precision, scale, context.typmod)); vals.push(val); } Some(vals) diff --git a/src/arrow_parquet/pg_to_arrow.rs b/src/arrow_parquet/pg_to_arrow.rs index dc65100..40cc03c 100644 --- a/src/arrow_parquet/pg_to_arrow.rs +++ b/src/arrow_parquet/pg_to_arrow.rs @@ -23,8 +23,7 @@ use crate::{ geometry::{is_postgis_geometry_type, Geometry}, map::{is_map_type, Map}, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -65,8 +64,8 @@ pub(crate) struct PgToArrowAttributeContext { is_geometry: bool, is_map: bool, attribute_contexts: Option>, - scale: Option, - precision: Option, + scale: Option, + precision: Option, } impl PgToArrowAttributeContext { @@ -126,8 +125,9 @@ impl PgToArrowAttributeContext { let precision; let scale; if attribute_typoid == NUMERICOID { - precision = Some(extract_precision_from_numeric_typmod(typmod)); - scale = Some(extract_scale_from_numeric_typmod(typmod)); + let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod); + precision = Some(p); + scale = Some(s); } else { precision = None; scale = None; @@ -274,7 +274,7 @@ fn to_arrow_primitive_array( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_arrow_primitive_array!(FallbackToText, tuples, attribute_context) @@ -359,7 +359,7 @@ fn to_arrow_list_array( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_arrow_list_array!(pgrx::Array, tuples, attribute_context) diff --git a/src/arrow_parquet/pg_to_arrow/numeric.rs b/src/arrow_parquet/pg_to_arrow/numeric.rs index 6317cb9..fcfdc43 100644 --- a/src/arrow_parquet/pg_to_arrow/numeric.rs +++ b/src/arrow_parquet/pg_to_arrow/numeric.rs @@ -22,7 +22,10 @@ impl PgTypeToArrowArray for Vec> { let numerics = self .into_iter() - .map(|numeric| numeric.map(numeric_to_i128)) + .map(|numeric| { + numeric + .map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name())) + }) .collect::>(); let numeric_array = Decimal128Array::from(numerics) @@ -43,7 +46,10 @@ impl PgTypeToArrowArray for Vec>>> { .into_iter() .flatten() .flatten() - .map(|numeric| numeric.map(numeric_to_i128)) + .map(|numeric| { + numeric + .map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name())) + }) .collect::>(); let precision = context diff --git a/src/arrow_parquet/schema_parser.rs b/src/arrow_parquet/schema_parser.rs index 61719c2..8dd79cf 100644 --- a/src/arrow_parquet/schema_parser.rs +++ b/src/arrow_parquet/schema_parser.rs @@ -18,8 +18,7 @@ use crate::{ geometry::is_postgis_geometry_type, map::is_map_type, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -213,10 +212,9 @@ fn parse_primitive_schema( INT4OID => Field::new(elem_name, arrow::datatypes::DataType::Int32, true), INT8OID => Field::new(elem_name, arrow::datatypes::DataType::Int64, true), NUMERICOID => { - let precision = extract_precision_from_numeric_typmod(typmod); - let scale = extract_scale_from_numeric_typmod(typmod); + let (precision, scale) = extract_precision_and_scale_from_numeric_typmod(typmod); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { Field::new(elem_name, arrow::datatypes::DataType::Utf8, true) } else { Field::new( diff --git a/src/lib.rs b/src/lib.rs index daaf057..183682f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,9 @@ mod tests { use crate::type_compat::fallback_to_text::FallbackToText; use crate::type_compat::geometry::Geometry; use crate::type_compat::map::Map; + use crate::type_compat::pg_arrow_type_conversions::{ + DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE, + }; use pgrx::pg_sys::Oid; use pgrx::{ composite_type, @@ -275,6 +278,54 @@ mod tests { } } + fn test_assert_float(expected_result: Vec>, result: Vec>) { + for (expected, actual) in expected_result.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(actual.is_none()); + } + + if expected.is_some() { + assert!(actual.is_some()); + + let expected = expected.unwrap(); + let actual = actual.unwrap(); + + if expected.is_nan() { + assert!(actual.is_nan()); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert!(expected.is_sign_positive() == actual.is_sign_positive()); + } else { + assert_eq!(expected, actual); + } + } + } + } + + fn test_assert_double(expected_result: Vec>, result: Vec>) { + for (expected, actual) in expected_result.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(actual.is_none()); + } + + if expected.is_some() { + assert!(actual.is_some()); + + let expected = expected.unwrap(); + let actual = actual.unwrap(); + + if expected.is_nan() { + assert!(actual.is_nan()); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert!(expected.is_sign_positive() == actual.is_sign_positive()); + } else { + assert_eq!(expected, actual); + } + } + } + } + fn test_helper(test_table: TestTable) { let test_result = test_common(test_table); test_assert(test_result.expected, test_result.result); @@ -327,35 +378,77 @@ mod tests { } #[pg_test] - fn test_flaot4() { + fn test_float4() { let test_table = TestTable::::new("float4".into()); - test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null);"); - test_helper(test_table); + test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null), ('nan'), ('infinity'), ('-infinity');"); + + let TestResult { expected, result } = test_common(test_table); + + let expected = expected.into_iter().map(|(val,)| val).collect::>(); + let result = result.into_iter().map(|(val,)| val).collect::>(); + test_assert_float(expected, result); } #[pg_test] fn test_float4_array() { let test_table = TestTable::>>::new("float4[]".into()); test_table.insert( - "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null]), (null), (array[1]), (array[]::float4[]);", + "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null,'nan','infinity','-infinity']), (null), (array[1]), (array[]::float4[]);", ); - test_helper(test_table); + + let TestResult { expected, result } = test_common(test_table); + + for ((expected,), (result,)) in expected.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(result.is_none()); + } + + if expected.is_some() { + assert!(result.is_some()); + + let expected = expected.unwrap(); + let result = result.unwrap(); + + test_assert_float(expected, result); + } + } } #[pg_test] - fn test_flaot8() { + fn test_float8() { let test_table = TestTable::::new("float8".into()); - test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null);"); - test_helper(test_table); + test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null), ('nan'), ('infinity'), ('-infinity');"); + + let TestResult { expected, result } = test_common(test_table); + + let expected = expected.into_iter().map(|(val,)| val).collect::>(); + let result = result.into_iter().map(|(val,)| val).collect::>(); + test_assert_double(expected, result); } #[pg_test] fn test_float8_array() { let test_table = TestTable::>>::new("float8[]".into()); test_table.insert( - "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null]), (null), (array[1]), (array[]::float8[]);", + "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null,'nan','infinity','-infinity']), (null), (array[1]), (array[]::float8[]);", ); - test_helper(test_table); + + let TestResult { expected, result } = test_common(test_table); + + for ((expected,), (result,)) in expected.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(result.is_none()); + } + + if expected.is_some() { + assert!(result.is_some()); + + let expected = expected.unwrap(); + let result = result.unwrap(); + + test_assert_double(expected, result); + } + } } #[pg_test] @@ -866,15 +959,79 @@ mod tests { } #[pg_test] - fn test_numeric() { - let test_table = TestTable::::new("numeric(10,4)".into()); + fn test_small_numeric() { + let attribute_schema_getter = || -> Vec<(Option, Option, String, String)> { + Spi::connect(|client| { + let parquet_schema_command = "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a';"; + + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }) + }; + + // (P <= 9) => INT32 + let test_table = TestTable::::new("numeric(9,4)".into()); test_table .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + (Some(9), Some(4), "DECIMAL".to_string(), "INT32".to_string()) + ); + + // (9 < P <= 18) => INT64 + let test_table = TestTable::::new("numeric(18,4)".into()); + test_table + .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); + test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(18), + Some(4), + "DECIMAL".to_string(), + "INT64".to_string() + ) + ); + + // (18 < P <= 38) => FIXED_LEN_BYTE_ARRAY(9-16) + let test_table = TestTable::::new("numeric(38,4)".into()); + test_table + .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); + test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(38), + Some(4), + "DECIMAL".to_string(), + "FIXED_LEN_BYTE_ARRAY".to_string() + ) + ); } #[pg_test] - fn test_numeric_array() { + fn test_small_numeric_array() { let test_table = TestTable::>>::new("numeric(10,4)[]".into()); test_table.insert( "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,+1.020,-2.12313,.3,4,null]), (null), (array[]::numeric(10,4)[]);", @@ -883,8 +1040,11 @@ mod tests { } #[pg_test] - fn test_huge_numeric() { - let test_table = TestTable::::new("numeric(100,4)".into()); + fn test_large_numeric() { + let large_precision = DEFAULT_UNBOUNDED_NUMERIC_PRECISION + 1; + + let test_table = + TestTable::::new(format!("numeric({},4)", large_precision)); test_table.insert( "INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (2.12313), (3), (null);", ); @@ -892,14 +1052,150 @@ mod tests { } #[pg_test] - fn test_huge_numeric_array() { - let test_table = TestTable::>>::new("numeric(100,4)[]".into()); + fn test_large_numeric_array() { + let large_precision = DEFAULT_UNBOUNDED_NUMERIC_PRECISION + 1; + + let test_table = TestTable::>>::new(format!( + "numeric({},4)[]", + large_precision + )); test_table.insert( "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,1.020,2.12313,3,null]), (null), (array[]::numeric(100,4)[]);", ); test_helper(test_table); } + #[pg_test] + fn test_unbounded_numeric() { + let test_table = TestTable::::new("numeric".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.02), (2.12), (3), (null);", + ); + test_helper(test_table); + + let parquet_schema_command = + "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a';"; + + let attribute_schema = Spi::connect(|client| { + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }); + + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(DEFAULT_UNBOUNDED_NUMERIC_PRECISION as _), + Some(DEFAULT_UNBOUNDED_NUMERIC_SCALE as _), + "DECIMAL".to_string(), + "FIXED_LEN_BYTE_ARRAY".to_string() + ) + ); + } + + #[pg_test] + fn test_unbounded_numeric_array() { + let test_table = TestTable::>>::new("numeric[]".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,1.02,2.12,3,null]), (null), (array[]::numeric[]);", + ); + test_helper(test_table); + + let parquet_schema_command = + "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a' ORDER BY logical_type;"; + + let attribute_schema = Spi::connect(|client| { + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }); + + assert_eq!(attribute_schema.len(), 2); + assert_eq!( + attribute_schema[0], + ( + Some(DEFAULT_UNBOUNDED_NUMERIC_PRECISION as _), + Some(DEFAULT_UNBOUNDED_NUMERIC_SCALE as _), + "DECIMAL".to_string(), + Some("FIXED_LEN_BYTE_ARRAY".to_string()) + ) + ); + assert_eq!(attribute_schema[1], (None, None, "LIST".to_string(), None)); + } + + #[pg_test] + #[should_panic( + expected = "numeric value contains 23 digits before decimal point, which exceeds max allowed integral digits 22 during copy to parquet" + )] + fn test_invalid_unbounded_numeric_integral_digits() { + let invalid_integral_digits = + DEFAULT_UNBOUNDED_NUMERIC_PRECISION - DEFAULT_UNBOUNDED_NUMERIC_SCALE + 1; + + let copy_to_command = format!( + "copy (select (repeat('1', {}) || '.2')::numeric as a) to '/tmp/test.parquet'", + invalid_integral_digits + ); + + Spi::run(©_to_command).unwrap(); + } + + #[pg_test] + #[should_panic( + expected = "numeric value contains 17 digits after decimal point, which exceeds max allowed decimal digits 16 during copy to parquet" + )] + fn test_invalid_unbounded_numeric_decimal_digits() { + let invalid_decimal_digits = DEFAULT_UNBOUNDED_NUMERIC_SCALE + 1; + + let copy_to_command = format!( + "copy (select ('2.' || repeat('1', {}) )::numeric as a) to '/tmp/test.parquet'", + invalid_decimal_digits + ); + + Spi::run(©_to_command).unwrap(); + } + + #[cfg(feature = "pg14")] + #[pg_test] + #[should_panic = "NUMERIC scale -2 must be between 0 and precision 5"] + fn test_numeric_negative_scale() { + let test_table = TestTable::::new("numeric(5,-2)".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (1234567.1231244), (123.23223), (12.0), (-12.12303), (null), (0);", + ); + test_helper(test_table); + } + + #[cfg(not(feature = "pg14"))] + #[pg_test] + fn test_numeric_negative_scale() { + let test_table = TestTable::::new("numeric(5,-2)".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (1234567.1231244), (123.23223), (12.0), (-12.12303), (null), (0);", + ); + test_helper(test_table); + } + #[pg_test] fn test_geometry() { let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;"; diff --git a/src/parquet_copy_hook/copy_to_dest_receiver.rs b/src/parquet_copy_hook/copy_to_dest_receiver.rs index c0f6308..b7e23cd 100644 --- a/src/parquet_copy_hook/copy_to_dest_receiver.rs +++ b/src/parquet_copy_hook/copy_to_dest_receiver.rs @@ -21,6 +21,14 @@ use crate::{ pgrx_utils::collect_valid_attributes, }; +#[repr(C)] +struct CopyToParquetOptions { + pub row_group_size: i64, + pub row_group_size_bytes: i64, + pub compression: PgParquetCompression, + pub compression_level: i32, +} + #[repr(C)] struct CopyToParquetDestReceiver { dest: DestReceiver, @@ -31,10 +39,7 @@ struct CopyToParquetDestReceiver { collected_tuple_size: i64, collected_tuple_column_sizes: *mut i64, uri: *const c_char, - compression: PgParquetCompression, - compression_level: i32, - row_group_size: i64, - row_group_size_bytes: i64, + copy_options: CopyToParquetOptions, per_copy_context: MemoryContext, } @@ -75,11 +80,11 @@ impl CopyToParquetDestReceiver { } fn collected_tuples_exceeds_row_group_size(&self) -> bool { - self.collected_tuple_count >= self.row_group_size + self.collected_tuple_count >= self.copy_options.row_group_size } fn collected_tuples_exceeds_row_group_size_bytes(&self) -> bool { - self.collected_tuple_size >= self.row_group_size_bytes + self.collected_tuple_size >= self.copy_options.row_group_size_bytes } fn collected_tuples_exceeds_max_col_size(&self, tuple_column_sizes: &[i32]) -> bool { @@ -196,9 +201,9 @@ extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc: let uri = parse_uri(uri); - let compression = parquet_dest.compression; + let compression = parquet_dest.copy_options.compression; - let compression_level = parquet_dest.compression_level; + let compression_level = parquet_dest.copy_options.compression_level; // parquet writer context is used throughout the COPY TO operation. // This might be put into ParquetCopyDestReceiver, but it's hard to preserve repr(C). @@ -348,10 +353,10 @@ pub extern "C" fn create_copy_to_parquet_dest_receiver( parquet_dest.collected_tuple_count = 0; parquet_dest.collected_tuples = std::ptr::null_mut(); parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut(); - parquet_dest.row_group_size = row_group_size; - parquet_dest.row_group_size_bytes = row_group_size_bytes; - parquet_dest.compression = compression; - parquet_dest.compression_level = compression_level; + parquet_dest.copy_options.row_group_size = row_group_size; + parquet_dest.copy_options.row_group_size_bytes = row_group_size_bytes; + parquet_dest.copy_options.compression = compression; + parquet_dest.copy_options.compression_level = compression_level; parquet_dest.per_copy_context = per_copy_context; unsafe { std::mem::transmute(parquet_dest) } diff --git a/src/type_compat/pg_arrow_type_conversions.rs b/src/type_compat/pg_arrow_type_conversions.rs index 9d9375f..43de2ba 100644 --- a/src/type_compat/pg_arrow_type_conversions.rs +++ b/src/type_compat/pg_arrow_type_conversions.rs @@ -1,12 +1,14 @@ -use std::ffi::{CStr, CString}; +use core::panic; +use std::ffi::CStr; +use arrow::datatypes::{Decimal128Type, DecimalType}; use pgrx::{ datum::{Date, Interval, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, - direct_function_call, pg_sys, AnyNumeric, IntoDatum, + direct_function_call, ereport, + pg_sys::{self, AsPgCStr}, + AnyNumeric, IntoDatum, Numeric, }; -pub(crate) const MAX_DECIMAL_PRECISION: usize = 38; - pub(crate) fn date_to_i32(date: Date) -> i32 { // PG epoch is (2000-01-01). Convert it to Unix epoch (1970-01-01). +10957 days let adjusted_date: Date = unsafe { @@ -167,86 +169,40 @@ pub(crate) fn i64_to_timetz(i64_timetz: i64) -> TimeWithTimeZone { .unwrap_or_else(|e| panic!("{}", e)) } -pub(crate) fn numeric_to_i128(numeric: AnyNumeric) -> i128 { - // obtain numeric's string representation - // cannot use numeric_send because byte representation is not compatible with parquet's decimal - let numeric_str: &CStr = unsafe { - direct_function_call(pg_sys::numeric_out, &[numeric.into_datum()]) - .expect("cannot convert numeric to bytes") - }; - let numeric_str = numeric_str - .to_str() - .expect("numeric string is an invalid CString"); - - let sign = if numeric_str.starts_with('-') { -1 } else { 1 }; +pub(crate) fn numeric_to_i128(numeric: AnyNumeric, typmod: i32, col_name: &str) -> i128 { + let numeric_str = if is_unbounded_numeric_typmod(typmod) { + let rescaled_unbounded_numeric = rescale_unbounded_numeric_or_error(numeric, col_name); - // remove sign as we already stored it. we also remove the decimal point - // since arrow decimal expects a i128 representation of the decimal - let numeric_str = numeric_str.replace(['-', '+', '.'], ""); - - let numeric_digits = numeric_str - .chars() - .map(|c| c.to_digit(10).expect("not a valid digit") as i8); + format!("{}", rescaled_unbounded_numeric) + } else { + // format returns a string representation of the numeric value based on numeric_out + format!("{}", numeric) + }; - // convert digits into arrow decimal - let mut decimal: i128 = 0; - for digit in numeric_digits.into_iter() { - decimal = decimal * 10 + digit as i128; - } - decimal *= sign; + let normalized_numeric_str = numeric_str.replace('.', ""); - decimal + normalized_numeric_str + .parse::() + .expect("invalid decimal") } -pub(crate) fn i128_to_numeric(i128_decimal: i128, scale: usize) -> AnyNumeric { - let sign = if i128_decimal < 0 { "-" } else { "" }; - let i128_decimal = i128_decimal.abs(); - - // calculate decimal digits - let mut decimal_digits = vec![]; - let mut decimal = i128_decimal; - while decimal > 0 { - let digit = (decimal % 10) as i8; - decimal_digits.push(digit); - decimal /= 10; - } - - // get fraction as string - let fraction = decimal_digits - .iter() - .take(scale) - .map(|v| v.to_string()) - .rev() - .reduce(|acc, v| acc + &v) - .unwrap_or_default(); - - // get integral as string - let integral = decimal_digits - .iter() - .skip(scale) - .map(|v| v.to_string()) - .rev() - .reduce(|acc, v| acc + &v) - .unwrap_or_default(); - - // create numeric string representation - let numeric_str = if integral.is_empty() && fraction.is_empty() { - "0".into() - } else { - format!("{}{}.{}", sign, integral, fraction) - }; - - // numeric_in would not validate the numeric string when typmod is -1 - let typmod = -1; +pub(crate) fn i128_to_numeric( + decimal: i128, + precision: u32, + scale: u32, + typmod: i32, +) -> AnyNumeric { + // format decimal via arrow since it is consistent with PG's numeric formatting + let numeric_str = Decimal128Type::format_decimal(decimal, precision as _, scale as _); // compute numeric from string representation let numeric: AnyNumeric = unsafe { - let numeric_str = CString::new(numeric_str).expect("numeric cstring is invalid"); + let numeric_cstring = CStr::from_ptr(numeric_str.as_pg_cstr()); direct_function_call( pg_sys::numeric_in, &[ - numeric_str.into_datum(), + numeric_cstring.into_datum(), 0.into_datum(), typmod.into_datum(), ], @@ -257,14 +213,127 @@ pub(crate) fn i128_to_numeric(i128_decimal: i128, scale: usize) -> AnyNumeric { numeric } -// taken from PG's numeric.c +// unbounded_numeric_value_digits returns the number of integral and decimal digits in an unbounded numeric value. +fn unbounded_numeric_value_digits(numeric_str: &str) -> (usize, usize) { + let numeric_str = numeric_str.replace(['-', '+'], ""); + + let has_decimal_point = numeric_str.contains('.'); + + if has_decimal_point { + let parts = numeric_str.split('.').collect::>(); + (parts[0].len(), parts[1].len()) + } else { + (numeric_str.as_str().len(), 0) + } +} + +fn rescale_unbounded_numeric_or_error( + unbounded_numeric: AnyNumeric, + col_name: &str, +) -> Numeric { + let unbounded_numeric_str = format!("{}", unbounded_numeric); + + let (n_integral_digits, n_scale_digits) = + unbounded_numeric_value_digits(&unbounded_numeric_str); + + // we need to do error checks before rescaling since rescaling to a lower scale + // silently truncates the value + if n_integral_digits > DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS as _ { + ereport!( + pgrx::PgLogLevel::ERROR, + pgrx::PgSqlErrorCode::ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE, + format!( + "numeric value contains {} digits before decimal point, which exceeds max allowed integral digits {} during copy to parquet", + n_integral_digits, DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS + ), + format!( + "Consider specifying precision and scale for column \"{}\". Replace type \"numeric\" to \"numeric(P,S)\".", + col_name + ), + ); + } else if n_scale_digits > DEFAULT_UNBOUNDED_NUMERIC_SCALE as _ { + ereport!( + pgrx::PgLogLevel::ERROR, + pgrx::PgSqlErrorCode::ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE, + format!( + "numeric value contains {} digits after decimal point, which exceeds max allowed decimal digits {} during copy to parquet", + n_scale_digits, DEFAULT_UNBOUNDED_NUMERIC_SCALE + ), + format!( + "Consider specifying precision and scale for column \"{}\". Replace type \"numeric\" to \"numeric(P,S)\".", + col_name + ), + ); + } + + unbounded_numeric + .rescale::() + .unwrap_or_else(|e| panic!("{}", e)) +} + +const MAX_NUMERIC_PRECISION: u32 = 38; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_PRECISION: u32 = MAX_NUMERIC_PRECISION; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_SCALE: u32 = 16; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS: u32 = + DEFAULT_UNBOUNDED_NUMERIC_PRECISION - DEFAULT_UNBOUNDED_NUMERIC_SCALE; + +// should_write_numeric_as_text determines whether a numeric datum should be written as text. +// It is written as text when precision is greater than MAX_NUMERIC_PRECISION e.g. "numeric(50, 10)" +pub(crate) fn should_write_numeric_as_text(precision: u32) -> bool { + precision > MAX_NUMERIC_PRECISION +} + +// extract_precision_and_scale_from_numeric_typmod extracts precision and scale from numeric typmod +// with the following rules: +// - If typmod is -1, it means unbounded numeric, so we use default precision and scale. +// - Even if PG allows negative scale, arrow does not. We adjust precision by adding abs(scale) to it, +// and set scale to 0. +// +// It always returns non-negative precision and scale due to the above rule. +pub(crate) fn extract_precision_and_scale_from_numeric_typmod(typmod: i32) -> (u32, u32) { + // if typmod is -1, it means unbounded numeric, so we use default precision and scale + if is_unbounded_numeric_typmod(typmod) { + return ( + DEFAULT_UNBOUNDED_NUMERIC_PRECISION, + DEFAULT_UNBOUNDED_NUMERIC_SCALE, + ); + } + + let mut precision = extract_precision_from_numeric_typmod(typmod); + let mut scale = extract_scale_from_numeric_typmod(typmod); + + // Even if PG allows negative scale, arrow does not. We adjust precision by adding scale to it. + if scale < 0 { + adjust_precision_and_scale_if_negative_scale(&mut precision, &mut scale); + } + + debug_assert!(precision >= 0); + debug_assert!(scale >= 0); + + (precision as _, scale as _) +} + #[inline] -pub(crate) fn extract_precision_from_numeric_typmod(typmod: i32) -> usize { - (((typmod - pg_sys::VARHDRSZ as i32) >> 16) & 0xffff) as usize +fn extract_precision_from_numeric_typmod(typmod: i32) -> i32 { + // taken from PG's numeric.c + (((typmod - pg_sys::VARHDRSZ as i32) >> 16) & 0xffff) as _ } -// taken from PG's numeric.c #[inline] -pub(crate) fn extract_scale_from_numeric_typmod(typmod: i32) -> usize { - ((((typmod - pg_sys::VARHDRSZ as i32) & 0x7ff) ^ 1024) - 1024) as usize +fn extract_scale_from_numeric_typmod(typmod: i32) -> i32 { + // taken from PG's numeric.c + (((typmod - pg_sys::VARHDRSZ as i32) & 0x7ff) ^ 1024) - 1024 +} + +// adjust_precision_and_scale_if_negative_scale adjusts precision and scale if scale is negative. +// Even if PG allows negative scale, arrow does not. We adjust precision by adding scale to it. +fn adjust_precision_and_scale_if_negative_scale(precision: &mut i32, scale: &mut i32) { + if *scale < 0 { + *precision += scale.abs(); + *scale = 0; + } +} + +fn is_unbounded_numeric_typmod(typmod: i32) -> bool { + typmod == -1 }