Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add SchemaInferenceOptions options to infer_schema and option to conf…
Browse files Browse the repository at this point in the history
…igure int96 inference (#1533)

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 7, 2023
1 parent 767834e commit fb7b5fe
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 28 deletions.
126 changes: 101 additions & 25 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module has a single entry point, [`parquet_to_arrow_schema`].
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
use parquet2::schema::{
types::{
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
Expand All @@ -8,11 +8,23 @@ use parquet2::schema::{
};

use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use crate::io::parquet::read::schema::SchemaInferenceOptions;

/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
/// any physical column.
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec<Field> {
fields.iter().filter_map(to_field).collect::<Vec<_>>()
parquet_to_arrow_schema_with_options(fields, &None)
}

/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference
pub fn parquet_to_arrow_schema_with_options(
fields: &[ParquetType],
options: &Option<SchemaInferenceOptions>,
) -> Vec<Field> {
fields
.iter()
.filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default())))
.collect::<Vec<_>>()
}

fn from_int32(
Expand Down Expand Up @@ -169,7 +181,10 @@ fn from_fixed_len_byte_array(
}

/// Maps a [`PhysicalType`] with optional metadata to a [`DataType`]
fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
fn to_primitive_type_inner(
primitive_type: &PrimitiveType,
options: &SchemaInferenceOptions,
) -> DataType {
match primitive_type.physical_type {
PhysicalType::Boolean => DataType::Boolean,
PhysicalType::Int32 => {
Expand All @@ -178,7 +193,7 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
PhysicalType::Int64 => {
from_int64(primitive_type.logical_type, primitive_type.converted_type)
}
PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None),
PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None),
PhysicalType::Float => DataType::Float32,
PhysicalType::Double => DataType::Float64,
PhysicalType::ByteArray => {
Expand All @@ -195,8 +210,8 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
/// Entry point for converting parquet primitive type to arrow type.
///
/// This function takes care of repetition.
fn to_primitive_type(primitive_type: &PrimitiveType) -> DataType {
let base_type = to_primitive_type_inner(primitive_type);
fn to_primitive_type(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType {
let base_type = to_primitive_type_inner(primitive_type, options);

if primitive_type.field_info.repetition == Repetition::Repeated {
DataType::List(Box::new(Field::new(
Expand All @@ -214,23 +229,27 @@ fn non_repeated_group(
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name),
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields)
to_map(fields, options)
}
_ => to_struct(fields),
_ => to_struct(fields, options),
}
}

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_struct(fields: &[ParquetType]) -> Option<DataType> {
let fields = fields.iter().filter_map(to_field).collect::<Vec<Field>>();
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let fields = fields
.iter()
.filter_map(|f| to_field(f, options))
.collect::<Vec<Field>>();
if fields.is_empty() {
None
} else {
Expand All @@ -240,8 +259,8 @@ fn to_struct(fields: &[ParquetType]) -> Option<DataType> {

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_map(fields: &[ParquetType]) -> Option<DataType> {
let inner = to_field(&fields[0])?;
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let inner = to_field(&fields[0], options)?;
Some(DataType::Map(Box::new(inner), false))
}

Expand All @@ -254,16 +273,17 @@ fn to_group_type(
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
if field_info.repetition == Repetition::Repeated {
Some(DataType::List(Box::new(Field::new(
&field_info.name,
to_struct(fields)?,
to_struct(fields, options)?,
is_nullable(field_info),
))))
} else {
non_repeated_group(logical_type, converted_type, fields, parent_name)
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
}
}

Expand All @@ -279,10 +299,10 @@ pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
/// Converts parquet schema to arrow field.
/// Returns `None` iff the parquet type has no associated primitive types,
/// i.e. if it is a column-less group type.
fn to_field(type_: &ParquetType) -> Option<Field> {
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
Some(Field::new(
&type_.get_field_info().name,
to_data_type(type_)?,
to_data_type(type_, options)?,
is_nullable(type_.get_field_info()),
))
}
Expand All @@ -291,21 +311,25 @@ fn to_field(type_: &ParquetType) -> Option<Field> {
///
/// To fully understand this algorithm, please refer to
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
fn to_list(fields: &[ParquetType], parent_name: &str) -> Option<DataType> {
fn to_list(
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
let item = fields.first().unwrap();

let item_type = match item {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
ParquetType::GroupType { fields, .. } => {
if fields.len() == 1
&& item.name() != "array"
&& item.name() != format!("{parent_name}_tuple")
{
// extract the repetition field
let nested_item = fields.first().unwrap();
to_data_type(nested_item)
to_data_type(nested_item, options)
} else {
to_struct(fields)
to_struct(fields, options)
}
}
}?;
Expand Down Expand Up @@ -346,9 +370,12 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option<DataType> {
///
/// If this schema is a group type and none of its children is reserved in the
/// conversion, the result is Ok(None).
pub(crate) fn to_data_type(type_: &ParquetType) -> Option<DataType> {
pub(crate) fn to_data_type(
type_: &ParquetType,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
match type_ {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
ParquetType::GroupType {
field_info,
logical_type,
Expand All @@ -364,6 +391,7 @@ pub(crate) fn to_data_type(type_: &ParquetType) -> Option<DataType> {
converted_type,
fields,
&field_info.name,
options,
)
}
}
Expand Down Expand Up @@ -973,4 +1001,52 @@ mod tests {
assert_eq!(arrow_fields, fields);
Ok(())
}

#[test]
fn test_int96_options() -> Result<()> {
for tu in [
TimeUnit::Second,
TimeUnit::Microsecond,
TimeUnit::Millisecond,
TimeUnit::Nanosecond,
] {
let message_type = "
message arrow_schema {
REQUIRED INT96 int96_field;
OPTIONAL GROUP int96_list (LIST) {
REPEATED GROUP list {
OPTIONAL INT96 element;
}
}
REQUIRED GROUP int96_struct {
REQUIRED INT96 int96_field;
}
}
";
let coerced_to = DataType::Timestamp(tu, None);
let arrow_fields = vec![
Field::new("int96_field", coerced_to.clone(), false),
Field::new(
"int96_list",
DataType::List(Box::new(Field::new("element", coerced_to.clone(), true))),
true,
),
Field::new(
"int96_struct",
DataType::Struct(vec![Field::new("int96_field", coerced_to.clone(), false)]),
false,
),
];

let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema_with_options(
parquet_schema.fields(),
&Some(SchemaInferenceOptions {
int96_coerce_to_timeunit: tu,
}),
);
assert_eq!(arrow_fields, fields);
}
Ok(())
}
}
34 changes: 31 additions & 3 deletions src/io/parquet/read/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! APIs to handle Parquet <-> Arrow schemas.
use crate::datatypes::Schema;
use crate::datatypes::{Schema, TimeUnit};
use crate::error::Result;

mod convert;
mod metadata;

pub use convert::parquet_to_arrow_schema;
pub use convert::{parquet_to_arrow_schema, parquet_to_arrow_schema_with_options};
pub use metadata::read_schema_from_metadata;
pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor};
pub use parquet2::schema::types::ParquetType;
Expand All @@ -14,18 +14,46 @@ pub(crate) use convert::*;

use self::metadata::parse_key_value_metadata;

/// Options when inferring schemas from Parquet
pub struct SchemaInferenceOptions {
/// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit
/// in the inferred Arrow Timestamp type.
///
/// This defaults to `TimeUnit::Nanosecond`, but INT96 timestamps outside of the range of years 1678-2262,
/// will overflow when parsed as `Timestamp(TimeUnit::Nanosecond)`. Setting this to a lower resolution
/// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates
/// without overflowing when parsing the data.
pub int96_coerce_to_timeunit: TimeUnit,
}

impl Default for SchemaInferenceOptions {
fn default() -> Self {
SchemaInferenceOptions {
int96_coerce_to_timeunit: TimeUnit::Nanosecond,
}
}
}

/// Infers a [`Schema`] from parquet's [`FileMetaData`]. This first looks for the metadata key
/// `"ARROW:schema"`; if it does not exist, it converts the parquet types declared in the
/// file's parquet schema to Arrow's equivalent.
/// # Error
/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded,
/// indicating that that the file's arrow metadata was incorrectly written.
pub fn infer_schema(file_metadata: &FileMetaData) -> Result<Schema> {
infer_schema_with_options(file_metadata, &None)
}

/// Like [`infer_schema`] but with configurable options which affects the behavior of inference
pub fn infer_schema_with_options(
file_metadata: &FileMetaData,
options: &Option<SchemaInferenceOptions>,
) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata());

let schema = read_schema_from_metadata(&mut metadata)?;
Ok(schema.unwrap_or_else(|| {
let fields = parquet_to_arrow_schema(file_metadata.schema().fields());
let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options);
Schema { fields, metadata }
}))
}

0 comments on commit fb7b5fe

Please sign in to comment.