From 9c96302f47727b3bfd5d956aacde8024f70deab4 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 13:51:20 +0300 Subject: [PATCH] Adds Parquet Metadata Inspection Udfs - adds "parquet.schema" udf to inspect the schema of a parquet file, - adds "parquet.metadata", "parquet.file_metadata", and "parquet.kv_metadata" udfs to inspect the metadata of a parquet file. --- src/lib.rs | 1 + src/parquet_udfs.rs | 2 + src/parquet_udfs/metadata.rs | 268 +++++++++++++++++++++++++++++++++++ src/parquet_udfs/schema.rs | 169 ++++++++++++++++++++++ 4 files changed, 440 insertions(+) create mode 100644 src/parquet_udfs.rs create mode 100644 src/parquet_udfs/metadata.rs create mode 100644 src/parquet_udfs/schema.rs diff --git a/src/lib.rs b/src/lib.rs index ecdd3c4..93fa453 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry}; mod arrow_parquet; mod parquet_copy_hook; +mod parquet_udfs; mod pgrx_utils; mod type_compat; diff --git a/src/parquet_udfs.rs b/src/parquet_udfs.rs new file mode 100644 index 0000000..c04ca43 --- /dev/null +++ b/src/parquet_udfs.rs @@ -0,0 +1,2 @@ +pub(crate) mod metadata; +pub(crate) mod schema; diff --git a/src/parquet_udfs/metadata.rs b/src/parquet_udfs/metadata.rs new file mode 100644 index 0000000..85993ea --- /dev/null +++ b/src/parquet_udfs/metadata.rs @@ -0,0 +1,268 @@ +use ::parquet::file::statistics::Statistics; +use pgrx::{iter::TableIterator, name, pg_extern, pg_schema}; + +use crate::arrow_parquet::uri_utils::{parquet_metadata_from_uri, parse_uri, uri_as_string}; + +#[pg_schema] +mod parquet { + use super::*; + + #[pg_extern] + #[allow(clippy::type_complexity)] + fn metadata( + uri: String, + ) -> TableIterator< + 'static, + ( + name!(uri, String), + name!(row_group_id, i64), + name!(row_group_num_rows, i64), + name!(row_group_num_columns, i64), + name!(row_group_bytes, i64), + name!(column_id, i64), + name!(file_offset, i64), + name!(num_values, i64), + name!(path_in_schema, String), + name!(type_name, String), + name!(stats_null_count, Option), + name!(stats_distinct_count, Option), + name!(stats_min, Option), + name!(stats_max, Option), + name!(compression, String), + name!(encodings, String), + name!(index_page_offset, Option), + name!(dictionary_page_offset, Option), + name!(data_page_offset, i64), + name!(total_compressed_size, i64), + name!(total_uncompressed_size, i64), + ), + > { + let uri = parse_uri(&uri); + + // new_current_thread runtime uses the current thread to run the tokio reactor. + // This uses the same thread that is running the Postgres backend. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + panic!("Failed to create tokio runtime: {}", e); + }); + + let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri)); + + let mut rows = vec![]; + + for (row_group_id, row_group) in parquet_metadata.row_groups().iter().enumerate() { + let row_group_num_rows = row_group.num_rows(); + let row_group_num_columns = row_group.num_columns() as i64; + let row_group_bytes = row_group.total_byte_size(); + + for (column_id, column) in row_group.columns().iter().enumerate() { + let file_offset = column.file_offset(); + + let num_values = column.num_values(); + + let path_in_schema = column.column_path().string(); + + let type_name = column.column_type().to_string(); + + let mut stats_min = None; + let mut stats_max = None; + let mut stats_null_count = None; + let mut stats_distinct_count = None; + + if let Some(statistics) = column.statistics() { + stats_min = stats_min_value_to_str(statistics); + + stats_max = stats_max_value_to_str(statistics); + + stats_null_count = statistics.null_count_opt().map(|v| v as i64); + + stats_distinct_count = statistics.distinct_count_opt().map(|v| v as i64); + } + + let compression = column.compression().to_string(); + + let encodings = column + .encodings() + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(","); + + let index_page_offset = column.index_page_offset(); + + let dictionary_page_offset = column.dictionary_page_offset(); + + let data_page_offset = column.data_page_offset(); + + let total_compressed_size = column.compressed_size(); + + let total_uncompressed_size = column.uncompressed_size(); + + let row = ( + uri_as_string(&uri), + row_group_id as i64, + row_group_num_rows, + row_group_num_columns, + row_group_bytes, + column_id as i64, + file_offset, + num_values, + path_in_schema, + type_name, + stats_null_count, + stats_distinct_count, + stats_min, + stats_max, + compression, + encodings, + index_page_offset, + dictionary_page_offset, + data_page_offset, + total_compressed_size, + total_uncompressed_size, + ); + + rows.push(row); + } + } + + TableIterator::new(rows) + } + + #[pg_extern] + fn file_metadata( + uri: String, + ) -> TableIterator< + 'static, + ( + name!(uri, String), + name!(created_by, Option), + name!(num_rows, i64), + name!(num_row_groups, i64), + name!(format_version, String), + ), + > { + let uri = parse_uri(&uri); + + // new_current_thread runtime uses the current thread to run the tokio reactor. + // This uses the same thread that is running the Postgres backend. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + panic!("Failed to create tokio runtime: {}", e); + }); + + let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri)); + + let created_by = parquet_metadata + .file_metadata() + .created_by() + .map(|c| c.to_string()); + + let num_rows = parquet_metadata.file_metadata().num_rows(); + + let num_row_groups = parquet_metadata.num_row_groups() as i64; + + let format_version = parquet_metadata.file_metadata().version().to_string(); + + let row = ( + uri_as_string(&uri), + created_by, + num_rows, + num_row_groups, + format_version, + ); + + TableIterator::new(vec![row]) + } + + #[pg_extern] + fn kv_metadata( + uri: String, + ) -> TableIterator< + 'static, + ( + name!(uri, String), + name!(key, Vec), + name!(value, Option>), + ), + > { + let uri = parse_uri(&uri); + + // new_current_thread runtime uses the current thread to run the tokio reactor. + // This uses the same thread that is running the Postgres backend. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + panic!("Failed to create tokio runtime: {}", e); + }); + + let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri)); + let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); + + if kv_metadata.is_none() { + return TableIterator::new(vec![]); + } + + let kv_metadata = kv_metadata.expect("kv_metadata should be Some"); + + let mut rows = vec![]; + + for kv in kv_metadata { + let key = kv.key.as_bytes().to_owned(); + let value = kv.value.as_ref().map(|v| v.as_bytes().to_owned()); + + let row = (uri_as_string(&uri), key, value); + + rows.push(row); + } + + TableIterator::new(rows) + } +} + +fn stats_min_value_to_str(statistics: &Statistics) -> Option { + match &statistics { + Statistics::Boolean(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::Int32(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::Int64(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::Int96(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::Float(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::Double(val_stats) => val_stats.min_opt().map(|v| v.to_string()), + Statistics::ByteArray(val_stats) => val_stats.min_opt().map(|v| match v.as_utf8() { + Ok(v) => v.to_string(), + Err(_) => v.to_string(), + }), + Statistics::FixedLenByteArray(val_stats) => { + val_stats.min_opt().map(|v| match v.as_utf8() { + Ok(v) => v.to_string(), + Err(_) => v.to_string(), + }) + } + } +} + +fn stats_max_value_to_str(statistics: &Statistics) -> Option { + match statistics { + Statistics::Boolean(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::Int32(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::Int64(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::Int96(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::Float(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::Double(statistics) => statistics.max_opt().map(|v| v.to_string()), + Statistics::ByteArray(statistics) => statistics.max_opt().map(|v| match v.as_utf8() { + Ok(v) => v.to_string(), + Err(_) => v.to_string(), + }), + Statistics::FixedLenByteArray(statistics) => { + statistics.max_opt().map(|v| match v.as_utf8() { + Ok(v) => v.to_string(), + Err(_) => v.to_string(), + }) + } + } +} diff --git a/src/parquet_udfs/schema.rs b/src/parquet_udfs/schema.rs new file mode 100644 index 0000000..518af80 --- /dev/null +++ b/src/parquet_udfs/schema.rs @@ -0,0 +1,169 @@ +use crate::arrow_parquet::uri_utils::{parquet_schema_from_uri, parse_uri, uri_as_string}; + +use ::parquet::{ + format::{ConvertedType, FieldRepetitionType, LogicalType, Type}, + schema::types::to_thrift, +}; +use pgrx::{iter::TableIterator, name, pg_extern, pg_schema}; + +#[pg_schema] +mod parquet { + use super::*; + + #[pg_extern] + #[allow(clippy::type_complexity)] + fn schema( + uri: String, + ) -> TableIterator< + 'static, + ( + name!(uri, String), + name!(name, String), + name!(type_name, Option), + name!(type_length, Option), + name!(repetition_type, Option), + name!(num_children, Option), + name!(converted_type, Option), + name!(scale, Option), + name!(precision, Option), + name!(field_id, Option), + name!(logical_type, Option), + ), + > { + let uri = parse_uri(&uri); + + // new_current_thread runtime uses the current thread to run the tokio reactor. + // This uses the same thread that is running the Postgres backend. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + panic!("Failed to create tokio runtime: {}", e); + }); + + let parquet_schema = runtime.block_on(parquet_schema_from_uri(&uri)); + + let root_type = parquet_schema.root_schema(); + let thrift_schema_elements = to_thrift(root_type).unwrap_or_else(|e| { + panic!("Failed to convert schema to thrift: {}", e); + }); + + let mut rows = vec![]; + + for schema_elem in thrift_schema_elements { + let name = schema_elem.name; + + let type_name = schema_elem.type_.map(thrift_type_to_str); + + let type_length = schema_elem.type_length.map(|t| t.to_string()); + + let repetition_type = schema_elem + .repetition_type + .map(thrift_repetition_type_to_str); + + let num_children = schema_elem.num_children; + + let converted_type = schema_elem.converted_type.map(thrift_converted_type_to_str); + + let scale = schema_elem.scale; + + let precision = schema_elem.precision; + + let field_id = schema_elem.field_id; + + let logical_type = schema_elem.logical_type.map(thrift_logical_type_to_str); + + let row = ( + uri_as_string(&uri), + name, + type_name, + type_length, + repetition_type, + num_children, + converted_type, + scale, + precision, + field_id, + logical_type, + ); + + rows.push(row); + } + + TableIterator::new(rows) + } +} + +fn thrift_type_to_str(thrift_type: Type) -> String { + match thrift_type { + Type::BOOLEAN => "BOOLEAN", + Type::INT32 => "INT32", + Type::INT64 => "INT64", + Type::INT96 => "INT96", + Type::FLOAT => "FLOAT", + Type::DOUBLE => "DOUBLE", + Type::BYTE_ARRAY => "BYTE_ARRAY", + Type::FIXED_LEN_BYTE_ARRAY => "FIXED_LEN_BYTE_ARRAY", + _ => "UNKNOWN", + } + .into() +} + +fn thrift_repetition_type_to_str(repetition_type: FieldRepetitionType) -> String { + match repetition_type { + FieldRepetitionType::REQUIRED => "REQUIRED", + FieldRepetitionType::OPTIONAL => "OPTIONAL", + FieldRepetitionType::REPEATED => "REPEATED", + _ => "UNKNOWN", + } + .into() +} + +fn thrift_logical_type_to_str(logical_type: LogicalType) -> String { + match logical_type { + LogicalType::STRING(_) => "STRING", + LogicalType::MAP(_) => "MAP", + LogicalType::LIST(_) => "LIST", + LogicalType::ENUM(_) => "ENUM", + LogicalType::DECIMAL(_) => "DECIMAL", + LogicalType::DATE(_) => "DATE", + LogicalType::TIME(_) => "TIME", + LogicalType::TIMESTAMP(_) => "TIMESTAMP", + LogicalType::INTEGER(_) => "INTEGER", + LogicalType::UNKNOWN(_) => "UNKNOWN", + LogicalType::JSON(_) => "JSON", + LogicalType::BSON(_) => "BSON", + LogicalType::UUID(_) => "UUID", + LogicalType::FLOAT16(_) => "FLOAT16", + } + .into() +} + +fn thrift_converted_type_to_str(converted_type: ConvertedType) -> String { + match converted_type { + ConvertedType::UTF8 => "UTF8", + ConvertedType::MAP => "MAP", + ConvertedType::MAP_KEY_VALUE => "MAP_KEY_VALUE", + ConvertedType::LIST => "LIST", + ConvertedType::ENUM => "ENUM", + ConvertedType::DECIMAL => "DECIMAL", + ConvertedType::DATE => "DATE", + ConvertedType::TIME_MILLIS => "TIME_MILLIS", + ConvertedType::TIME_MICROS => "TIME_MICROS", + ConvertedType::TIMESTAMP_MILLIS => "TIMESTAMP_MILLIS", + ConvertedType::TIMESTAMP_MICROS => "TIMESTAMP_MICROS", + ConvertedType::UINT_8 => "UINT_8", + ConvertedType::UINT_16 => "UINT_16", + ConvertedType::UINT_32 => "UINT_32", + ConvertedType::UINT_64 => "UINT_64", + ConvertedType::INT_8 => "INT_8", + ConvertedType::INT_16 => "INT_16", + ConvertedType::INT_32 => "INT_32", + ConvertedType::INT_64 => "INT_64", + ConvertedType::JSON => "JSON", + ConvertedType::BSON => "BSON", + ConvertedType::INTERVAL => "INTERVAL", + _ => "UNKOWN", + } + .into() +}