Skip to content

Commit

Permalink
remove cast_mode option
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Nov 25, 2024
1 parent 7fa0477 commit d48996c
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 220 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ Alternatively, you can use the following environment variables when starting pos

`pg_parquet` supports the following options in the `COPY FROM` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
- `cast_mode <string>`: Specifies the casting behavior, which can be set to either `strict` or `relaxed`. This determines whether lossy conversions are allowed. By default, the mode is `strict`, which does not permit lossy conversions (e.g., `bigint => int` causes a schema mismatch error during schema validation). When set to `relaxed`, lossy conversions are allowed, and errors will only be raised at runtime if a value cannot be properly converted. This option provides flexibility to handle schema mismatches by deferring error checks to runtime.

## Configuration
There is currently only one GUC parameter to enable/disable the `pg_parquet`:
Expand Down
1 change: 0 additions & 1 deletion src/arrow_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub(crate) mod arrow_to_pg;
pub(crate) mod arrow_utils;
pub(crate) mod cast_mode;
pub(crate) mod compression;
pub(crate) mod parquet_reader;
pub(crate) mod parquet_writer;
Expand Down
22 changes: 0 additions & 22 deletions src/arrow_parquet/cast_mode.rs

This file was deleted.

10 changes: 4 additions & 6 deletions src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use crate::{

use super::{
arrow_to_pg::{collect_arrow_to_pg_attribute_contexts, ArrowToPgAttributeContext},
cast_mode::CastMode,
schema_parser::{
ensure_arrow_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
},
uri_utils::{parquet_reader_from_uri, PG_BACKEND_TOKIO_RUNTIME},
};
Expand All @@ -42,7 +41,7 @@ pub(crate) struct ParquetReaderContext {
}

impl ParquetReaderContext {
pub(crate) fn new(uri: Url, cast_mode: CastMode, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(uri: Url, tupledesc: &PgTupleDesc) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
Expand All @@ -63,14 +62,13 @@ impl ParquetReaderContext {

let tupledesc_schema = Arc::new(tupledesc_schema);

// Ensure that the arrow schema matches the tupledesc.
// Ensure that the file schema matches the tupledesc schema.
// Gets cast_to_types for each attribute if a cast is needed for the attribute's columnar array
// to match the expected columnar array for its tupledesc type.
let cast_to_types = ensure_arrow_schema_match_tupledesc_schema(
let cast_to_types = ensure_file_schema_match_tupledesc_schema(
parquet_file_schema.clone(),
tupledesc_schema.clone(),
&attributes,
cast_mode,
);

let attribute_contexts = collect_arrow_to_pg_attribute_contexts(
Expand Down
122 changes: 30 additions & 92 deletions src/arrow_parquet/schema_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_schema::{DataType, FieldRef};
use parquet::arrow::{arrow_to_parquet_schema, PARQUET_FIELD_ID_META_KEY};
use pg_sys::{
can_coerce_type,
CoercionContext::{self, COERCION_EXPLICIT, COERCION_IMPLICIT},
CoercionContext::{self, COERCION_EXPLICIT},
FormData_pg_attribute, InvalidOid, Oid, BOOLOID, BYTEAOID, CHAROID, DATEOID, FLOAT4OID,
FLOAT8OID, INT2OID, INT4OID, INT8OID, NUMERICOID, OIDOID, TEXTOID, TIMEOID, TIMESTAMPOID,
TIMESTAMPTZOID, TIMETZOID,
Expand All @@ -27,8 +27,6 @@ use crate::{
},
};

use super::cast_mode::CastMode;

pub(crate) fn parquet_schema_string_from_attributes(
attributes: &[FormData_pg_attribute],
) -> String {
Expand Down Expand Up @@ -344,69 +342,45 @@ fn adjust_map_entries_field(field: FieldRef) -> FieldRef {
Arc::new(entries_field)
}

// ensure_arrow_schema_match_tupledesc_schema throws an error if the arrow schema does not match the table schema.
// If the arrow schema is castable to the table schema, it returns a vector of Option<DataType> to cast to
// for each field.
pub(crate) fn ensure_arrow_schema_match_tupledesc_schema(
arrow_schema: Arc<Schema>,
// ensure_file_schema_match_tupledesc_schema throws an error if the file's schema does not match the table schema.
// If the file's arrow schema is castable to the table's arrow schema, it returns a vector of Option<DataType>
// to cast to for each field.
pub(crate) fn ensure_file_schema_match_tupledesc_schema(
file_schema: Arc<Schema>,
tupledesc_schema: Arc<Schema>,
attributes: &[FormData_pg_attribute],
cast_mode: CastMode,
) -> Vec<Option<DataType>> {
let mut cast_to_types = Vec::new();

for (tupledesc_field, attribute) in tupledesc_schema.fields().iter().zip(attributes.iter()) {
let field_name = tupledesc_field.name();
for (tupledesc_schema_field, attribute) in
tupledesc_schema.fields().iter().zip(attributes.iter())
{
let field_name = tupledesc_schema_field.name();

let arrow_field = arrow_schema.column_with_name(field_name);
let file_schema_field = file_schema.column_with_name(field_name);

if arrow_field.is_none() {
if file_schema_field.is_none() {
panic!("column \"{}\" is not found in parquet file", field_name);
}

let (_, arrow_field) = arrow_field.unwrap();
let arrow_field = Arc::new(arrow_field.clone());
let (_, file_schema_field) = file_schema_field.unwrap();
let file_schema_field = Arc::new(file_schema_field.clone());

let from_type = arrow_field.data_type();
let to_type = tupledesc_field.data_type();
let from_type = file_schema_field.data_type();
let to_type = tupledesc_schema_field.data_type();

// no cast needed
if from_type == to_type {
cast_to_types.push(None);
continue;
}

if let Err(coercion_error) = is_coercible(
from_type,
to_type,
attribute.atttypid,
attribute.atttypmod,
cast_mode,
) {
let type_mismatch_message = format!(
if !is_coercible(from_type, to_type, attribute.atttypid, attribute.atttypmod) {
panic!(
"type mismatch for column \"{}\" between table and parquet file.\n\n\
table has \"{}\"\n\nparquet file has \"{}\"",
field_name, to_type, from_type
);

match coercion_error {
CoercionError::NoStrictCoercionPath => ereport!(
pgrx::PgLogLevel::ERROR,
PgSqlErrorCode::ERRCODE_CANNOT_COERCE,
type_mismatch_message,
"Try COPY FROM '..' WITH (cast_mode 'relaxed') to allow lossy casts with runtime checks."
),
CoercionError::NoCoercionPath => ereport!(
pgrx::PgLogLevel::ERROR,
PgSqlErrorCode::ERRCODE_CANNOT_COERCE,
type_mismatch_message
),
CoercionError::MapEntriesNullable => ereport!(
pgrx::PgLogLevel::ERROR,
PgSqlErrorCode::ERRCODE_CANNOT_COERCE,
format!("entries field in map type cannot be nullable for column \"{}\"", field_name)
),
}
}

pgrx::debug2!(
Expand All @@ -422,12 +396,6 @@ pub(crate) fn ensure_arrow_schema_match_tupledesc_schema(
cast_to_types
}

enum CoercionError {
NoStrictCoercionPath,
NoCoercionPath,
MapEntriesNullable,
}

// is_coercible first checks if "from_type" can be cast to "to_type" by arrow-cast.
// Then, it checks if the cast is meaningful at Postgres by seeing if there is
// an explicit coercion from "from_typoid" to "to_typoid".
Expand All @@ -436,17 +404,11 @@ enum CoercionError {
// Arrow supports casting struct fields by field position instead of field name,
// which is not the intended behavior for pg_parquet. Hence, we make sure the field names
// match for structs.
fn is_coercible(
from_type: &DataType,
to_type: &DataType,
to_typoid: Oid,
to_typmod: i32,
cast_mode: CastMode,
) -> Result<(), CoercionError> {
fn is_coercible(from_type: &DataType, to_type: &DataType, to_typoid: Oid, to_typmod: i32) -> bool {
match (from_type, to_type) {
(DataType::Struct(from_fields), DataType::Struct(to_fields)) => {
if from_fields.len() != to_fields.len() {
return Err(CoercionError::NoCoercionPath);
return false;
}

let tupledesc = tuple_desc(to_typoid, to_typmod);
Expand All @@ -458,19 +420,20 @@ fn is_coercible(
.zip(to_fields.iter().zip(attributes.iter()))
{
if from_field.name() != to_field.name() {
return Err(CoercionError::NoCoercionPath);
return false;
}

is_coercible(
if !is_coercible(
from_field.data_type(),
to_field.data_type(),
to_attribute.type_oid().value(),
to_attribute.type_mod(),
cast_mode,
)?;
) {
return false;
}
}

Ok(())
true
}
(DataType::List(from_field), DataType::List(to_field)) => {
let element_oid = array_element_typoid(to_typoid);
Expand All @@ -481,13 +444,12 @@ fn is_coercible(
to_field.data_type(),
element_oid,
element_typmod,
cast_mode,
)
}
(DataType::Map(from_entries_field, _), DataType::Map(to_entries_field, _)) => {
// entries field cannot be null
if from_entries_field.is_nullable() {
return Err(CoercionError::MapEntriesNullable);
return false;
}

let entries_typoid = domain_array_base_elem_typoid(to_typoid);
Expand All @@ -497,47 +459,23 @@ fn is_coercible(
to_entries_field.data_type(),
entries_typoid,
to_typmod,
cast_mode,
)

Check warning on line 462 in src/arrow_parquet/schema_parser.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/schema_parser.rs#L449-L462

Added lines #L449 - L462 were not covered by tests
}
_ => {
// check if arrow-cast can cast the types
if !can_cast_types(from_type, to_type) {
return Err(CoercionError::NoCoercionPath);
return false;
}

let from_typoid = pg_type_for_arrow_primitive_type(from_type);

// pg_parquet could not recognize that arrow type
if from_typoid == InvalidOid {
return Err(CoercionError::NoCoercionPath);
return false;

Check warning on line 474 in src/arrow_parquet/schema_parser.rs

View check run for this annotation

Codecov / codecov/patch

src/arrow_parquet/schema_parser.rs#L474

Added line #L474 was not covered by tests
}

let can_coerce_via_relaxed_mode =
can_pg_coerce_types(from_typoid, to_typoid, COERCION_EXPLICIT);

// check if coercion is meaningful at Postgres (it has a coercion path)
match cast_mode {
CastMode::Strict => {
let can_coerce_via_strict_mode =
can_pg_coerce_types(from_typoid, to_typoid, COERCION_IMPLICIT);

if !can_coerce_via_strict_mode && can_coerce_via_relaxed_mode {
Err(CoercionError::NoStrictCoercionPath)
} else if !can_coerce_via_strict_mode {
Err(CoercionError::NoCoercionPath)
} else {
Ok(())
}
}
CastMode::Relaxed => {
if !can_coerce_via_relaxed_mode {
Err(CoercionError::NoCoercionPath)
} else {
Ok(())
}
}
}
can_pg_coerce_types(from_typoid, to_typoid, COERCION_EXPLICIT)
}
}
}
Expand Down
Loading

0 comments on commit d48996c

Please sign in to comment.