From c76b4b3f1b6ea2eed902f07519909a9009672d6b Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Thu, 31 Oct 2024 02:04:11 +0300 Subject: [PATCH] Numeric improvement and fix **Problem** Previously, we were writing unbounded numerics, that does not specify precision and scale (i.e. `numeric`), as text since they can be too large to represent as parquet decimal. Most of the time users ignore the precision for numeric columns, so they were written as text. That prevented pushing down some operators on the numeric type by execution engines. **Improvement** We start to read/write unbounded numerics as numeric(38, 16) to parquet file. We throw a runtime error if an unbounded numeric value exceeds 22 digits before the decimal or 16 digits after the decimal. For the ones that bump into the error, we give hint to change the column type to a numeric(p,s) with precision and scale specified, to get rid of the error. **Fix** Arrow to pg conversions were not correct for some cases e.g. when there is no decimal point. These cases are fixed and covered by tests. --- README.md | 7 +- src/arrow_parquet/arrow_to_pg.rs | 16 +- src/arrow_parquet/arrow_to_pg/numeric.rs | 13 +- src/arrow_parquet/pg_to_arrow.rs | 16 +- src/arrow_parquet/pg_to_arrow/numeric.rs | 10 +- src/arrow_parquet/schema_parser.rs | 8 +- src/lib.rs | 330 +++++++++++++++++- .../copy_to_dest_receiver.rs | 29 +- src/type_compat/pg_arrow_type_conversions.rs | 208 +++++++---- 9 files changed, 506 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index 7c679a1..66e1b7a 100644 --- a/README.md +++ b/README.md @@ -225,7 +225,12 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`: | `crunchy_map`(5) | GROUP | MAP | > [!WARNING] -> - (1) The `numeric` types with <= `38` precision is represented as `FIXED_LEN_BYTE_ARRAY(16)` with `DECIMAL(128)` logical type. The `numeric` types with > `38` precision is represented as `BYTE_ARRAY` with `STRING` logical type. +> - (1) `numeric` type is written the smallest possible memory width to parquet file as follows: +> * `numeric(P <= 9, S)` is represented as `INT32` with `DECIMAL` logical type +> * `numeric(9 < P <= 18, S)` is represented as `INT64` with `DECIMAL` logical type +> * `numeric(18 < P <= 38, S)` is represented as `FIXED_LEN_BYTE_ARRAY(9-16)` with `DECIMAL` logical type +> * `numeric(38 < P, S)` is represented as `BYTE_ARRAY` with `STRING` logical type +> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (16) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (22 integral digits before decimal point, 16 digits after decimal point). > - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files. > - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files. > - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. diff --git a/src/arrow_parquet/arrow_to_pg.rs b/src/arrow_parquet/arrow_to_pg.rs index 1908f3f..83b0369 100644 --- a/src/arrow_parquet/arrow_to_pg.rs +++ b/src/arrow_parquet/arrow_to_pg.rs @@ -24,8 +24,7 @@ use crate::{ geometry::{is_postgis_geometry_type, Geometry}, map::{is_map_type, Map}, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -66,8 +65,8 @@ pub(crate) struct ArrowToPgAttributeContext { is_map: bool, attribute_contexts: Option>, attribute_tupledesc: Option>, - precision: Option, - scale: Option, + precision: Option, + scale: Option, } impl ArrowToPgAttributeContext { @@ -127,8 +126,9 @@ impl ArrowToPgAttributeContext { let precision; let scale; if attribute_typoid == NUMERICOID { - precision = Some(extract_precision_from_numeric_typmod(typmod)); - scale = Some(extract_scale_from_numeric_typmod(typmod)); + let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod); + precision = Some(p); + scale = Some(s); } else { precision = None; scale = None; @@ -263,7 +263,7 @@ fn to_pg_nonarray_datum( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_pg_datum!( @@ -415,7 +415,7 @@ fn to_pg_array_datum( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_pg_datum!( diff --git a/src/arrow_parquet/arrow_to_pg/numeric.rs b/src/arrow_parquet/arrow_to_pg/numeric.rs index d5380f4..dc09d60 100644 --- a/src/arrow_parquet/arrow_to_pg/numeric.rs +++ b/src/arrow_parquet/arrow_to_pg/numeric.rs @@ -11,8 +11,15 @@ impl ArrowArrayToPgType for Decimal128Array { if self.is_null(0) { None } else { + let precision = context.precision.expect("Expected precision"); let scale = context.scale.expect("Expected scale"); - Some(i128_to_numeric(self.value(0), scale)) + + Some(i128_to_numeric( + self.value(0), + precision, + scale, + context.typmod, + )) } } } @@ -20,10 +27,12 @@ impl ArrowArrayToPgType for Decimal128Array { // Numeric[] impl ArrowArrayToPgType>> for Decimal128Array { fn to_pg_type(self, context: &ArrowToPgAttributeContext) -> Option>> { + let precision = context.precision.expect("Expected precision"); let scale = context.scale.expect("Expected scale"); + let mut vals = vec![]; for val in self.iter() { - let val = val.map(|v| i128_to_numeric(v, scale)); + let val = val.map(|v| i128_to_numeric(v, precision, scale, context.typmod)); vals.push(val); } Some(vals) diff --git a/src/arrow_parquet/pg_to_arrow.rs b/src/arrow_parquet/pg_to_arrow.rs index dc65100..0b2cc7a 100644 --- a/src/arrow_parquet/pg_to_arrow.rs +++ b/src/arrow_parquet/pg_to_arrow.rs @@ -23,8 +23,7 @@ use crate::{ geometry::{is_postgis_geometry_type, Geometry}, map::{is_map_type, Map}, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -65,8 +64,8 @@ pub(crate) struct PgToArrowAttributeContext { is_geometry: bool, is_map: bool, attribute_contexts: Option>, - scale: Option, - precision: Option, + scale: Option, + precision: Option, } impl PgToArrowAttributeContext { @@ -126,8 +125,9 @@ impl PgToArrowAttributeContext { let precision; let scale; if attribute_typoid == NUMERICOID { - precision = Some(extract_precision_from_numeric_typmod(typmod)); - scale = Some(extract_scale_from_numeric_typmod(typmod)); + let (p, s) = extract_precision_and_scale_from_numeric_typmod(typmod); + precision = Some(p); + scale = Some(s); } else { precision = None; scale = None; @@ -274,7 +274,7 @@ fn to_arrow_primitive_array( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_arrow_primitive_array!(FallbackToText, tuples, attribute_context) @@ -359,7 +359,7 @@ fn to_arrow_list_array( .precision .expect("missing precision in context"); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { reset_fallback_to_text_context(attribute_context.typoid, attribute_context.typmod); to_arrow_list_array!(pgrx::Array, tuples, attribute_context) diff --git a/src/arrow_parquet/pg_to_arrow/numeric.rs b/src/arrow_parquet/pg_to_arrow/numeric.rs index 6317cb9..fcfdc43 100644 --- a/src/arrow_parquet/pg_to_arrow/numeric.rs +++ b/src/arrow_parquet/pg_to_arrow/numeric.rs @@ -22,7 +22,10 @@ impl PgTypeToArrowArray for Vec> { let numerics = self .into_iter() - .map(|numeric| numeric.map(numeric_to_i128)) + .map(|numeric| { + numeric + .map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name())) + }) .collect::>(); let numeric_array = Decimal128Array::from(numerics) @@ -43,7 +46,10 @@ impl PgTypeToArrowArray for Vec>>> { .into_iter() .flatten() .flatten() - .map(|numeric| numeric.map(numeric_to_i128)) + .map(|numeric| { + numeric + .map(|numeric| numeric_to_i128(numeric, context.typmod, context.field.name())) + }) .collect::>(); let precision = context diff --git a/src/arrow_parquet/schema_parser.rs b/src/arrow_parquet/schema_parser.rs index 61719c2..8dd79cf 100644 --- a/src/arrow_parquet/schema_parser.rs +++ b/src/arrow_parquet/schema_parser.rs @@ -18,8 +18,7 @@ use crate::{ geometry::is_postgis_geometry_type, map::is_map_type, pg_arrow_type_conversions::{ - extract_precision_from_numeric_typmod, extract_scale_from_numeric_typmod, - MAX_DECIMAL_PRECISION, + extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text, }, }, }; @@ -213,10 +212,9 @@ fn parse_primitive_schema( INT4OID => Field::new(elem_name, arrow::datatypes::DataType::Int32, true), INT8OID => Field::new(elem_name, arrow::datatypes::DataType::Int64, true), NUMERICOID => { - let precision = extract_precision_from_numeric_typmod(typmod); - let scale = extract_scale_from_numeric_typmod(typmod); + let (precision, scale) = extract_precision_and_scale_from_numeric_typmod(typmod); - if precision > MAX_DECIMAL_PRECISION { + if should_write_numeric_as_text(precision) { Field::new(elem_name, arrow::datatypes::DataType::Utf8, true) } else { Field::new( diff --git a/src/lib.rs b/src/lib.rs index daaf057..183682f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,9 @@ mod tests { use crate::type_compat::fallback_to_text::FallbackToText; use crate::type_compat::geometry::Geometry; use crate::type_compat::map::Map; + use crate::type_compat::pg_arrow_type_conversions::{ + DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE, + }; use pgrx::pg_sys::Oid; use pgrx::{ composite_type, @@ -275,6 +278,54 @@ mod tests { } } + fn test_assert_float(expected_result: Vec>, result: Vec>) { + for (expected, actual) in expected_result.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(actual.is_none()); + } + + if expected.is_some() { + assert!(actual.is_some()); + + let expected = expected.unwrap(); + let actual = actual.unwrap(); + + if expected.is_nan() { + assert!(actual.is_nan()); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert!(expected.is_sign_positive() == actual.is_sign_positive()); + } else { + assert_eq!(expected, actual); + } + } + } + } + + fn test_assert_double(expected_result: Vec>, result: Vec>) { + for (expected, actual) in expected_result.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(actual.is_none()); + } + + if expected.is_some() { + assert!(actual.is_some()); + + let expected = expected.unwrap(); + let actual = actual.unwrap(); + + if expected.is_nan() { + assert!(actual.is_nan()); + } else if expected.is_infinite() { + assert!(actual.is_infinite()); + assert!(expected.is_sign_positive() == actual.is_sign_positive()); + } else { + assert_eq!(expected, actual); + } + } + } + } + fn test_helper(test_table: TestTable) { let test_result = test_common(test_table); test_assert(test_result.expected, test_result.result); @@ -327,35 +378,77 @@ mod tests { } #[pg_test] - fn test_flaot4() { + fn test_float4() { let test_table = TestTable::::new("float4".into()); - test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null);"); - test_helper(test_table); + test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null), ('nan'), ('infinity'), ('-infinity');"); + + let TestResult { expected, result } = test_common(test_table); + + let expected = expected.into_iter().map(|(val,)| val).collect::>(); + let result = result.into_iter().map(|(val,)| val).collect::>(); + test_assert_float(expected, result); } #[pg_test] fn test_float4_array() { let test_table = TestTable::>>::new("float4[]".into()); test_table.insert( - "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null]), (null), (array[1]), (array[]::float4[]);", + "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null,'nan','infinity','-infinity']), (null), (array[1]), (array[]::float4[]);", ); - test_helper(test_table); + + let TestResult { expected, result } = test_common(test_table); + + for ((expected,), (result,)) in expected.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(result.is_none()); + } + + if expected.is_some() { + assert!(result.is_some()); + + let expected = expected.unwrap(); + let result = result.unwrap(); + + test_assert_float(expected, result); + } + } } #[pg_test] - fn test_flaot8() { + fn test_float8() { let test_table = TestTable::::new("float8".into()); - test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null);"); - test_helper(test_table); + test_table.insert("INSERT INTO test_expected (a) VALUES (1.0), (2.23213123), (null), ('nan'), ('infinity'), ('-infinity');"); + + let TestResult { expected, result } = test_common(test_table); + + let expected = expected.into_iter().map(|(val,)| val).collect::>(); + let result = result.into_iter().map(|(val,)| val).collect::>(); + test_assert_double(expected, result); } #[pg_test] fn test_float8_array() { let test_table = TestTable::>>::new("float8[]".into()); test_table.insert( - "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null]), (null), (array[1]), (array[]::float8[]);", + "INSERT INTO test_expected (a) VALUES (array[1.123,2.2,null,'nan','infinity','-infinity']), (null), (array[1]), (array[]::float8[]);", ); - test_helper(test_table); + + let TestResult { expected, result } = test_common(test_table); + + for ((expected,), (result,)) in expected.into_iter().zip(result.into_iter()) { + if expected.is_none() { + assert!(result.is_none()); + } + + if expected.is_some() { + assert!(result.is_some()); + + let expected = expected.unwrap(); + let result = result.unwrap(); + + test_assert_double(expected, result); + } + } } #[pg_test] @@ -866,15 +959,79 @@ mod tests { } #[pg_test] - fn test_numeric() { - let test_table = TestTable::::new("numeric(10,4)".into()); + fn test_small_numeric() { + let attribute_schema_getter = || -> Vec<(Option, Option, String, String)> { + Spi::connect(|client| { + let parquet_schema_command = "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a';"; + + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }) + }; + + // (P <= 9) => INT32 + let test_table = TestTable::::new("numeric(9,4)".into()); test_table .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + (Some(9), Some(4), "DECIMAL".to_string(), "INT32".to_string()) + ); + + // (9 < P <= 18) => INT64 + let test_table = TestTable::::new("numeric(18,4)".into()); + test_table + .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); + test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(18), + Some(4), + "DECIMAL".to_string(), + "INT64".to_string() + ) + ); + + // (18 < P <= 38) => FIXED_LEN_BYTE_ARRAY(9-16) + let test_table = TestTable::::new("numeric(38,4)".into()); + test_table + .insert("INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (-2.12313), (.3), (4), (null);"); + test_helper(test_table); + + let attribute_schema = attribute_schema_getter(); + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(38), + Some(4), + "DECIMAL".to_string(), + "FIXED_LEN_BYTE_ARRAY".to_string() + ) + ); } #[pg_test] - fn test_numeric_array() { + fn test_small_numeric_array() { let test_table = TestTable::>>::new("numeric(10,4)[]".into()); test_table.insert( "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,+1.020,-2.12313,.3,4,null]), (null), (array[]::numeric(10,4)[]);", @@ -883,8 +1040,11 @@ mod tests { } #[pg_test] - fn test_huge_numeric() { - let test_table = TestTable::::new("numeric(100,4)".into()); + fn test_large_numeric() { + let large_precision = DEFAULT_UNBOUNDED_NUMERIC_PRECISION + 1; + + let test_table = + TestTable::::new(format!("numeric({},4)", large_precision)); test_table.insert( "INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.020), (2.12313), (3), (null);", ); @@ -892,14 +1052,150 @@ mod tests { } #[pg_test] - fn test_huge_numeric_array() { - let test_table = TestTable::>>::new("numeric(100,4)[]".into()); + fn test_large_numeric_array() { + let large_precision = DEFAULT_UNBOUNDED_NUMERIC_PRECISION + 1; + + let test_table = TestTable::>>::new(format!( + "numeric({},4)[]", + large_precision + )); test_table.insert( "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,1.020,2.12313,3,null]), (null), (array[]::numeric(100,4)[]);", ); test_helper(test_table); } + #[pg_test] + fn test_unbounded_numeric() { + let test_table = TestTable::::new("numeric".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (0.0), (.0), (1.), (+1.02), (2.12), (3), (null);", + ); + test_helper(test_table); + + let parquet_schema_command = + "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a';"; + + let attribute_schema = Spi::connect(|client| { + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }); + + assert_eq!(attribute_schema.len(), 1); + assert_eq!( + attribute_schema[0], + ( + Some(DEFAULT_UNBOUNDED_NUMERIC_PRECISION as _), + Some(DEFAULT_UNBOUNDED_NUMERIC_SCALE as _), + "DECIMAL".to_string(), + "FIXED_LEN_BYTE_ARRAY".to_string() + ) + ); + } + + #[pg_test] + fn test_unbounded_numeric_array() { + let test_table = TestTable::>>::new("numeric[]".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (array[0.0,.0,1.,1.02,2.12,3,null]), (null), (array[]::numeric[]);", + ); + test_helper(test_table); + + let parquet_schema_command = + "select precision, scale, logical_type, type_name from parquet.schema('/tmp/test.parquet') WHERE name = 'a' ORDER BY logical_type;"; + + let attribute_schema = Spi::connect(|client| { + let tup_table = client.select(parquet_schema_command, None, None).unwrap(); + let mut results = Vec::new(); + + for row in tup_table { + let precision = row["precision"].value::().unwrap(); + let scale = row["scale"].value::().unwrap(); + let logical_type = row["logical_type"].value::().unwrap().unwrap(); + let physical_type = row["type_name"].value::().unwrap(); + + results.push((precision, scale, logical_type, physical_type)); + } + + results + }); + + assert_eq!(attribute_schema.len(), 2); + assert_eq!( + attribute_schema[0], + ( + Some(DEFAULT_UNBOUNDED_NUMERIC_PRECISION as _), + Some(DEFAULT_UNBOUNDED_NUMERIC_SCALE as _), + "DECIMAL".to_string(), + Some("FIXED_LEN_BYTE_ARRAY".to_string()) + ) + ); + assert_eq!(attribute_schema[1], (None, None, "LIST".to_string(), None)); + } + + #[pg_test] + #[should_panic( + expected = "numeric value contains 23 digits before decimal point, which exceeds max allowed integral digits 22 during copy to parquet" + )] + fn test_invalid_unbounded_numeric_integral_digits() { + let invalid_integral_digits = + DEFAULT_UNBOUNDED_NUMERIC_PRECISION - DEFAULT_UNBOUNDED_NUMERIC_SCALE + 1; + + let copy_to_command = format!( + "copy (select (repeat('1', {}) || '.2')::numeric as a) to '/tmp/test.parquet'", + invalid_integral_digits + ); + + Spi::run(©_to_command).unwrap(); + } + + #[pg_test] + #[should_panic( + expected = "numeric value contains 17 digits after decimal point, which exceeds max allowed decimal digits 16 during copy to parquet" + )] + fn test_invalid_unbounded_numeric_decimal_digits() { + let invalid_decimal_digits = DEFAULT_UNBOUNDED_NUMERIC_SCALE + 1; + + let copy_to_command = format!( + "copy (select ('2.' || repeat('1', {}) )::numeric as a) to '/tmp/test.parquet'", + invalid_decimal_digits + ); + + Spi::run(©_to_command).unwrap(); + } + + #[cfg(feature = "pg14")] + #[pg_test] + #[should_panic = "NUMERIC scale -2 must be between 0 and precision 5"] + fn test_numeric_negative_scale() { + let test_table = TestTable::::new("numeric(5,-2)".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (1234567.1231244), (123.23223), (12.0), (-12.12303), (null), (0);", + ); + test_helper(test_table); + } + + #[cfg(not(feature = "pg14"))] + #[pg_test] + fn test_numeric_negative_scale() { + let test_table = TestTable::::new("numeric(5,-2)".into()); + test_table.insert( + "INSERT INTO test_expected (a) VALUES (1234567.1231244), (123.23223), (12.0), (-12.12303), (null), (0);", + ); + test_helper(test_table); + } + #[pg_test] fn test_geometry() { let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;"; diff --git a/src/parquet_copy_hook/copy_to_dest_receiver.rs b/src/parquet_copy_hook/copy_to_dest_receiver.rs index c0f6308..b7e23cd 100644 --- a/src/parquet_copy_hook/copy_to_dest_receiver.rs +++ b/src/parquet_copy_hook/copy_to_dest_receiver.rs @@ -21,6 +21,14 @@ use crate::{ pgrx_utils::collect_valid_attributes, }; +#[repr(C)] +struct CopyToParquetOptions { + pub row_group_size: i64, + pub row_group_size_bytes: i64, + pub compression: PgParquetCompression, + pub compression_level: i32, +} + #[repr(C)] struct CopyToParquetDestReceiver { dest: DestReceiver, @@ -31,10 +39,7 @@ struct CopyToParquetDestReceiver { collected_tuple_size: i64, collected_tuple_column_sizes: *mut i64, uri: *const c_char, - compression: PgParquetCompression, - compression_level: i32, - row_group_size: i64, - row_group_size_bytes: i64, + copy_options: CopyToParquetOptions, per_copy_context: MemoryContext, } @@ -75,11 +80,11 @@ impl CopyToParquetDestReceiver { } fn collected_tuples_exceeds_row_group_size(&self) -> bool { - self.collected_tuple_count >= self.row_group_size + self.collected_tuple_count >= self.copy_options.row_group_size } fn collected_tuples_exceeds_row_group_size_bytes(&self) -> bool { - self.collected_tuple_size >= self.row_group_size_bytes + self.collected_tuple_size >= self.copy_options.row_group_size_bytes } fn collected_tuples_exceeds_max_col_size(&self, tuple_column_sizes: &[i32]) -> bool { @@ -196,9 +201,9 @@ extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc: let uri = parse_uri(uri); - let compression = parquet_dest.compression; + let compression = parquet_dest.copy_options.compression; - let compression_level = parquet_dest.compression_level; + let compression_level = parquet_dest.copy_options.compression_level; // parquet writer context is used throughout the COPY TO operation. // This might be put into ParquetCopyDestReceiver, but it's hard to preserve repr(C). @@ -348,10 +353,10 @@ pub extern "C" fn create_copy_to_parquet_dest_receiver( parquet_dest.collected_tuple_count = 0; parquet_dest.collected_tuples = std::ptr::null_mut(); parquet_dest.collected_tuple_column_sizes = std::ptr::null_mut(); - parquet_dest.row_group_size = row_group_size; - parquet_dest.row_group_size_bytes = row_group_size_bytes; - parquet_dest.compression = compression; - parquet_dest.compression_level = compression_level; + parquet_dest.copy_options.row_group_size = row_group_size; + parquet_dest.copy_options.row_group_size_bytes = row_group_size_bytes; + parquet_dest.copy_options.compression = compression; + parquet_dest.copy_options.compression_level = compression_level; parquet_dest.per_copy_context = per_copy_context; unsafe { std::mem::transmute(parquet_dest) } diff --git a/src/type_compat/pg_arrow_type_conversions.rs b/src/type_compat/pg_arrow_type_conversions.rs index 9d9375f..757b1f1 100644 --- a/src/type_compat/pg_arrow_type_conversions.rs +++ b/src/type_compat/pg_arrow_type_conversions.rs @@ -1,12 +1,14 @@ -use std::ffi::{CStr, CString}; +use core::panic; +use std::ffi::CStr; +use arrow::datatypes::{Decimal128Type, DecimalType}; use pgrx::{ datum::{Date, Interval, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone}, - direct_function_call, pg_sys, AnyNumeric, IntoDatum, + direct_function_call, ereport, + pg_sys::{self, AsPgCStr}, + AnyNumeric, IntoDatum, Numeric, }; -pub(crate) const MAX_DECIMAL_PRECISION: usize = 38; - pub(crate) fn date_to_i32(date: Date) -> i32 { // PG epoch is (2000-01-01). Convert it to Unix epoch (1970-01-01). +10957 days let adjusted_date: Date = unsafe { @@ -167,86 +169,47 @@ pub(crate) fn i64_to_timetz(i64_timetz: i64) -> TimeWithTimeZone { .unwrap_or_else(|e| panic!("{}", e)) } -pub(crate) fn numeric_to_i128(numeric: AnyNumeric) -> i128 { - // obtain numeric's string representation - // cannot use numeric_send because byte representation is not compatible with parquet's decimal - let numeric_str: &CStr = unsafe { - direct_function_call(pg_sys::numeric_out, &[numeric.into_datum()]) - .expect("cannot convert numeric to bytes") - }; - let numeric_str = numeric_str - .to_str() - .expect("numeric string is an invalid CString"); +pub(crate) fn numeric_to_i128(numeric: AnyNumeric, typmod: i32, col_name: &str) -> i128 { + let mut numeric_str = format!("{}", numeric); - let sign = if numeric_str.starts_with('-') { -1 } else { 1 }; + if is_unbounded_numeric_typmod(typmod) { + ensure_valid_unbounded_numeric(&numeric_str, col_name); - // remove sign as we already stored it. we also remove the decimal point - // since arrow decimal expects a i128 representation of the decimal - let numeric_str = numeric_str.replace(['-', '+', '.'], ""); + // we cannot directly use numeric_out, we need to convert numeric to numeric(P,S) + // with default P and S, to have correct string representation for unbounded numeric + const PRECISION: u32 = DEFAULT_UNBOUNDED_NUMERIC_PRECISION as _; + const SCALE: u32 = DEFAULT_UNBOUNDED_NUMERIC_SCALE as _; - let numeric_digits = numeric_str - .chars() - .map(|c| c.to_digit(10).expect("not a valid digit") as i8); + let numeric = + Numeric::::try_from(numeric).unwrap_or_else(|e| panic!("{}", e)); - // convert digits into arrow decimal - let mut decimal: i128 = 0; - for digit in numeric_digits.into_iter() { - decimal = decimal * 10 + digit as i128; - } - decimal *= sign; - - decimal -} + numeric_str = format!("{}", numeric) + }; -pub(crate) fn i128_to_numeric(i128_decimal: i128, scale: usize) -> AnyNumeric { - let sign = if i128_decimal < 0 { "-" } else { "" }; - let i128_decimal = i128_decimal.abs(); - - // calculate decimal digits - let mut decimal_digits = vec![]; - let mut decimal = i128_decimal; - while decimal > 0 { - let digit = (decimal % 10) as i8; - decimal_digits.push(digit); - decimal /= 10; - } + let normalized_numeric_str = numeric_str.replace('.', ""); - // get fraction as string - let fraction = decimal_digits - .iter() - .take(scale) - .map(|v| v.to_string()) - .rev() - .reduce(|acc, v| acc + &v) - .unwrap_or_default(); - - // get integral as string - let integral = decimal_digits - .iter() - .skip(scale) - .map(|v| v.to_string()) - .rev() - .reduce(|acc, v| acc + &v) - .unwrap_or_default(); - - // create numeric string representation - let numeric_str = if integral.is_empty() && fraction.is_empty() { - "0".into() - } else { - format!("{}{}.{}", sign, integral, fraction) - }; + normalized_numeric_str + .parse::() + .expect("invalid decimal") +} - // numeric_in would not validate the numeric string when typmod is -1 - let typmod = -1; +pub(crate) fn i128_to_numeric( + decimal: i128, + precision: i32, + scale: i32, + typmod: i32, +) -> AnyNumeric { + // format decimal via arrow since it is consistent with PG's numeric formatting + let numeric_str = Decimal128Type::format_decimal(decimal, precision as _, scale as _); // compute numeric from string representation let numeric: AnyNumeric = unsafe { - let numeric_str = CString::new(numeric_str).expect("numeric cstring is invalid"); + let numeric_cstring = CStr::from_ptr(numeric_str.as_pg_cstr()); direct_function_call( pg_sys::numeric_in, &[ - numeric_str.into_datum(), + numeric_cstring.into_datum(), 0.into_datum(), typmod.into_datum(), ], @@ -257,14 +220,107 @@ pub(crate) fn i128_to_numeric(i128_decimal: i128, scale: usize) -> AnyNumeric { numeric } -// taken from PG's numeric.c +fn ensure_valid_unbounded_numeric(numeric_str: &str, col_name: &str) { + let numeric_str = numeric_str.replace(['-', '+'], ""); + let has_decimal_point = numeric_str.contains('.'); + + let (integral_digits, decimal_digits) = if has_decimal_point { + let parts = numeric_str.split('.').collect::>(); + (parts[0], Some(parts[1])) + } else { + (numeric_str.as_str(), None) + }; + + let n_integral_digits = integral_digits.len(); + + if n_integral_digits > DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS as _ { + ereport!( + pgrx::PgLogLevel::ERROR, + pgrx::PgSqlErrorCode::ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE, + format!( + "numeric value contains {} digits before decimal point, which exceeds max allowed integral digits {} during copy to parquet", + n_integral_digits, DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS + ), + format!( + "Consider specifying precision and scale for column \"{}\". Replace type \"numeric\" to \"numeric(P,S)\".", + col_name + ), + ); + } + + if let Some(decimal_digits) = decimal_digits { + let n_decimal_digits = decimal_digits.len(); + + if n_decimal_digits > DEFAULT_UNBOUNDED_NUMERIC_SCALE as _ { + ereport!( + pgrx::PgLogLevel::ERROR, + pgrx::PgSqlErrorCode::ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE, + format!( + "numeric value contains {} digits after decimal point, which exceeds max allowed decimal digits {} during copy to parquet", + n_decimal_digits, DEFAULT_UNBOUNDED_NUMERIC_SCALE + ), + format!( + "Consider specifying precision and scale for column \"{}\". Replace type \"numeric\" to \"numeric(P,S)\".", + col_name + ), + ); + } + } +} + +const MAX_NUMERIC_PRECISION: i32 = 38; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_PRECISION: i32 = MAX_NUMERIC_PRECISION; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_SCALE: i32 = 16; +pub(crate) const DEFAULT_UNBOUNDED_NUMERIC_MAX_INTEGRAL_DIGITS: i32 = + DEFAULT_UNBOUNDED_NUMERIC_PRECISION - DEFAULT_UNBOUNDED_NUMERIC_SCALE; + +// should_write_numeric_as_text determines whether a numeric datum should be written as text. +// It is written as text when precision is greater than MAX_NUMERIC_PRECISION e.g. "numeric(50, 10)" +pub(crate) fn should_write_numeric_as_text(precision: i32) -> bool { + precision > MAX_NUMERIC_PRECISION +} + +pub(crate) fn extract_precision_and_scale_from_numeric_typmod(typmod: i32) -> (i32, i32) { + // if typmod is -1, it means unbounded numeric, so we use default precision and scale + if is_unbounded_numeric_typmod(typmod) { + return ( + DEFAULT_UNBOUNDED_NUMERIC_PRECISION, + DEFAULT_UNBOUNDED_NUMERIC_SCALE, + ); + } + + let mut precision = extract_precision_from_numeric_typmod(typmod); + let mut scale = extract_scale_from_numeric_typmod(typmod); + + // Even if PG allows negative scale, arrow does not. We adjust precision by adding scale to it. + if scale < 0 { + adjust_precision_and_scale_if_negative_scale(&mut precision, &mut scale); + } + + (precision, scale) +} + #[inline] -pub(crate) fn extract_precision_from_numeric_typmod(typmod: i32) -> usize { - (((typmod - pg_sys::VARHDRSZ as i32) >> 16) & 0xffff) as usize +fn extract_precision_from_numeric_typmod(typmod: i32) -> i32 { + // taken from PG's numeric.c + (((typmod - pg_sys::VARHDRSZ as i32) >> 16) & 0xffff) as _ } -// taken from PG's numeric.c #[inline] -pub(crate) fn extract_scale_from_numeric_typmod(typmod: i32) -> usize { - ((((typmod - pg_sys::VARHDRSZ as i32) & 0x7ff) ^ 1024) - 1024) as usize +fn extract_scale_from_numeric_typmod(typmod: i32) -> i32 { + // taken from PG's numeric.c + (((typmod - pg_sys::VARHDRSZ as i32) & 0x7ff) ^ 1024) - 1024 +} + +// adjust_precision_and_scale_if_negative_scale adjusts precision and scale if scale is negative. +// Even if PG allows negative scale, arrow does not. We adjust precision by adding scale to it. +fn adjust_precision_and_scale_if_negative_scale(precision: &mut i32, scale: &mut i32) { + if *scale < 0 { + *precision += scale.abs(); + *scale = 0; + } +} + +fn is_unbounded_numeric_typmod(typmod: i32) -> bool { + typmod == -1 }