Skip to content

Commit

Permalink
Adds Parquet Metadata Inspection Udfs
Browse files Browse the repository at this point in the history
- adds "parquet.schema" udf to inspect the schema of a parquet file,
- adds "parquet.metadata", "parquet.file_metadata", and "parquet.kv_metadata" udfs to inspect the metadata of a parquet file.
  • Loading branch information
aykut-bozkurt committed Oct 3, 2024
1 parent 9a62645 commit fa6751a
Show file tree
Hide file tree
Showing 4 changed files with 440 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};

mod arrow_parquet;
mod parquet_copy_hook;
mod parquet_udfs;
mod pgrx_utils;
mod type_compat;

Expand Down
2 changes: 2 additions & 0 deletions src/parquet_udfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod metadata;
pub(crate) mod schema;
268 changes: 268 additions & 0 deletions src/parquet_udfs/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use ::parquet::file::statistics::Statistics;
use pgrx::{iter::TableIterator, name, pg_extern, pg_schema};

use crate::arrow_parquet::uri_utils::{parquet_metadata_from_uri, parse_uri, uri_as_string};

#[pg_schema]
mod parquet {
use super::*;

#[pg_extern]
#[allow(clippy::type_complexity)]
fn metadata(
uri: String,
) -> TableIterator<
'static,
(
name!(uri, String),
name!(row_group_id, i64),
name!(row_group_num_rows, i64),
name!(row_group_num_columns, i64),
name!(row_group_bytes, i64),
name!(column_id, i64),
name!(file_offset, i64),
name!(num_values, i64),
name!(path_in_schema, String),
name!(type_name, String),
name!(stats_null_count, Option<i64>),
name!(stats_distinct_count, Option<i64>),
name!(stats_min, Option<String>),
name!(stats_max, Option<String>),
name!(compression, String),
name!(encodings, String),
name!(index_page_offset, Option<i64>),
name!(dictionary_page_offset, Option<i64>),
name!(data_page_offset, i64),
name!(total_compressed_size, i64),
name!(total_uncompressed_size, i64),
),
> {
let uri = parse_uri(&uri);

// new_current_thread runtime uses the current thread to run the tokio reactor.
// This uses the same thread that is running the Postgres backend.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
panic!("Failed to create tokio runtime: {}", e);
});

let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri));

let mut rows = vec![];

for (row_group_id, row_group) in parquet_metadata.row_groups().iter().enumerate() {
let row_group_num_rows = row_group.num_rows();
let row_group_num_columns = row_group.num_columns() as i64;
let row_group_bytes = row_group.total_byte_size();

for (column_id, column) in row_group.columns().iter().enumerate() {
let file_offset = column.file_offset();

let num_values = column.num_values();

let path_in_schema = column.column_path().string();

let type_name = column.column_type().to_string();

let mut stats_min = None;
let mut stats_max = None;
let mut stats_null_count = None;
let mut stats_distinct_count = None;

if let Some(statistics) = column.statistics() {
stats_min = stats_min_value_to_str(statistics);

stats_max = stats_max_value_to_str(statistics);

stats_null_count = statistics.null_count_opt().map(|v| v as i64);

stats_distinct_count = statistics.distinct_count_opt().map(|v| v as i64);
}

let compression = column.compression().to_string();

let encodings = column
.encodings()
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(",");

let index_page_offset = column.index_page_offset();

let dictionary_page_offset = column.dictionary_page_offset();

let data_page_offset = column.data_page_offset();

let total_compressed_size = column.compressed_size();

let total_uncompressed_size = column.uncompressed_size();

let row = (
uri_as_string(&uri),
row_group_id as i64,
row_group_num_rows,
row_group_num_columns,
row_group_bytes,
column_id as i64,
file_offset,
num_values,
path_in_schema,
type_name,
stats_null_count,
stats_distinct_count,
stats_min,
stats_max,
compression,
encodings,
index_page_offset,
dictionary_page_offset,
data_page_offset,
total_compressed_size,
total_uncompressed_size,
);

rows.push(row);
}
}

TableIterator::new(rows)
}

#[pg_extern]
fn file_metadata(
uri: String,
) -> TableIterator<
'static,
(
name!(uri, String),
name!(created_by, Option<String>),
name!(num_rows, i64),
name!(num_row_groups, i64),
name!(format_version, String),
),
> {
let uri = parse_uri(&uri);

// new_current_thread runtime uses the current thread to run the tokio reactor.
// This uses the same thread that is running the Postgres backend.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
panic!("Failed to create tokio runtime: {}", e);
});

let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri));

let created_by = parquet_metadata
.file_metadata()
.created_by()
.map(|c| c.to_string());

let num_rows = parquet_metadata.file_metadata().num_rows();

let num_row_groups = parquet_metadata.num_row_groups() as i64;

let format_version = parquet_metadata.file_metadata().version().to_string();

let row = (
uri_as_string(&uri),
created_by,
num_rows,
num_row_groups,
format_version,
);

TableIterator::new(vec![row])
}

#[pg_extern]
fn kv_metadata(
uri: String,
) -> TableIterator<
'static,
(
name!(uri, String),
name!(key, Vec<u8>),
name!(value, Option<Vec<u8>>),
),
> {
let uri = parse_uri(&uri);

// new_current_thread runtime uses the current thread to run the tokio reactor.
// This uses the same thread that is running the Postgres backend.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
panic!("Failed to create tokio runtime: {}", e);
});

let parquet_metadata = runtime.block_on(parquet_metadata_from_uri(&uri));
let kv_metadata = parquet_metadata.file_metadata().key_value_metadata();

if kv_metadata.is_none() {
return TableIterator::new(vec![]);
}

let kv_metadata = kv_metadata.expect("kv_metadata should be Some");

let mut rows = vec![];

for kv in kv_metadata {
let key = kv.key.as_bytes().to_owned();
let value = kv.value.as_ref().map(|v| v.as_bytes().to_owned());

let row = (uri_as_string(&uri), key, value);

rows.push(row);
}

TableIterator::new(rows)
}
}

fn stats_min_value_to_str(statistics: &Statistics) -> Option<String> {
match &statistics {
Statistics::Boolean(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::Int32(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::Int64(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::Int96(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::Float(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::Double(val_stats) => val_stats.min_opt().map(|v| v.to_string()),
Statistics::ByteArray(val_stats) => val_stats.min_opt().map(|v| match v.as_utf8() {
Ok(v) => v.to_string(),
Err(_) => v.to_string(),
}),
Statistics::FixedLenByteArray(val_stats) => {
val_stats.min_opt().map(|v| match v.as_utf8() {
Ok(v) => v.to_string(),
Err(_) => v.to_string(),
})
}
}
}

fn stats_max_value_to_str(statistics: &Statistics) -> Option<String> {
match statistics {
Statistics::Boolean(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::Int32(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::Int64(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::Int96(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::Float(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::Double(statistics) => statistics.max_opt().map(|v| v.to_string()),
Statistics::ByteArray(statistics) => statistics.max_opt().map(|v| match v.as_utf8() {
Ok(v) => v.to_string(),
Err(_) => v.to_string(),
}),
Statistics::FixedLenByteArray(statistics) => {
statistics.max_opt().map(|v| match v.as_utf8() {
Ok(v) => v.to_string(),
Err(_) => v.to_string(),
})
}
}
}
Loading

0 comments on commit fa6751a

Please sign in to comment.