Skip to content

Commit

Permalink
Registers copy hooks
Browse files Browse the repository at this point in the history
- registers COPY TO/FROM parquet hook,
- adds a GUC, named "pg_parquet.enable_copy_hooks", which enables/disabled the hook,
- exposes parquet dest receiver api so that the dest receiver could be used independently from the hook.
  • Loading branch information
aykut-bozkurt committed Oct 3, 2024
1 parent 79bbd29 commit 9a62645
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
use pgrx::prelude::*;
use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
use pg_sys::MarkGUCPrefixReserved;
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};

mod arrow_parquet;
mod parquet_copy_hook;
mod pgrx_utils;
mod type_compat;

// re-export external api
#[allow(unused_imports)]
pub use crate::arrow_parquet::codec::ParquetCodecOption;
#[allow(unused_imports)]
pub use crate::parquet_copy_hook::copy_to_dest_receiver::create_copy_to_parquet_dest_receiver;

pgrx::pg_module_magic!();

#[allow(static_mut_refs)]
#[pg_guard]
pub extern "C" fn _PG_init() {}
pub extern "C" fn _PG_init() {
GucRegistry::define_bool_guc(
"pg_parquet.enable_copy_hooks",
"Enable parquet copy hooks",
"Enable parquet copy hooks",
&ENABLE_PARQUET_COPY_HOOK,
GucContext::Userset,
GucFlags::default(),
);

unsafe { MarkGUCPrefixReserved("pg_parquet".as_ptr() as _) };

init_parquet_copy_hook();
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
Expand Down
1 change: 1 addition & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub(crate) mod copy_from;
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_utils;
pub(crate) mod hook;
155 changes: 155 additions & 0 deletions src/parquet_copy_hook/hook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::ffi::CStr;

use pg_sys::{
standard_ProcessUtility, AsPgCStr, CommandTag, DestReceiver, ParamListInfoData, PlannedStmt,
ProcessUtility_hook, ProcessUtility_hook_type, QueryCompletion, QueryEnvironment,
};
use pgrx::{prelude::*, GucSetting};

use crate::{
arrow_parquet::uri_utils::uri_as_string,
parquet_copy_hook::{
copy_to_dest_receiver::create_copy_to_parquet_dest_receiver,
copy_utils::{
copy_stmt_uri, copy_to_stmt_row_group_size, is_copy_from_parquet_stmt,
is_copy_to_parquet_stmt,
},
},
};

use super::{
copy_from::{execute_copy_from, pop_parquet_reader_context},
copy_to::execute_copy_to_with_dest_receiver,
copy_to_dest_receiver::pop_parquet_writer_context,
copy_utils::{copy_to_stmt_compression, validate_copy_from_options, validate_copy_to_options},
};

pub(crate) static ENABLE_PARQUET_COPY_HOOK: GucSetting<bool> = GucSetting::<bool>::new(true);

static mut PREV_PROCESS_UTILITY_HOOK: ProcessUtility_hook_type = None;

#[pg_guard]
#[no_mangle]
pub(crate) extern "C" fn init_parquet_copy_hook() {
unsafe {
if ProcessUtility_hook.is_some() {
PREV_PROCESS_UTILITY_HOOK = ProcessUtility_hook
}

ProcessUtility_hook = Some(parquet_copy_hook);
}
}

#[pg_guard]
#[allow(clippy::too_many_arguments)]
extern "C" fn parquet_copy_hook(
p_stmt: *mut PlannedStmt,
query_string: *const i8,
read_only_tree: bool,
context: u32,
params: *mut ParamListInfoData,
query_env: *mut QueryEnvironment,
dest: *mut DestReceiver,
completion_tag: *mut QueryCompletion,
) {
let p_stmt = unsafe { PgBox::from_pg(p_stmt) };
let query_string = unsafe { CStr::from_ptr(query_string) };
let params = unsafe { PgBox::from_pg(params) };
let query_env = unsafe { PgBox::from_pg(query_env) };

if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) {
validate_copy_to_options(&p_stmt);

let uri = copy_stmt_uri(&p_stmt).expect("uri is None");
let row_group_size = copy_to_stmt_row_group_size(&p_stmt);
let compression = copy_to_stmt_compression(&p_stmt, uri.clone());

PgTryBuilder::new(|| {
let parquet_dest = create_copy_to_parquet_dest_receiver(
uri_as_string(&uri).as_pg_cstr(),
row_group_size,
compression,
);

let parquet_dest = unsafe { PgBox::from_pg(parquet_dest) };

let nprocessed = execute_copy_to_with_dest_receiver(
&p_stmt,
query_string,
params,
query_env,
parquet_dest,
);

let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) };

if !completion_tag.is_null() {
completion_tag.nprocessed = nprocessed;
completion_tag.commandTag = CommandTag::CMDTAG_COPY;
}
})
.catch_others(|cause| {
// make sure to pop the parquet writer context
// In case we did not push the context, we should not throw an error while popping
let throw_error = false;
pop_parquet_writer_context(throw_error);

cause.rethrow()
})
.execute();

return;
} else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) {
validate_copy_from_options(&p_stmt);

let uri = copy_stmt_uri(&p_stmt).expect("uri is None");

PgTryBuilder::new(|| {
let nprocessed = execute_copy_from(p_stmt, query_string, query_env, uri);

let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) };

if !completion_tag.is_null() {
completion_tag.nprocessed = nprocessed;
completion_tag.commandTag = CommandTag::CMDTAG_COPY;
}
})
.catch_others(|cause| {
// make sure to pop the parquet reader context
// In case we did not push the context, we should not throw an error while popping
let throw_error = false;
pop_parquet_reader_context(throw_error);

cause.rethrow()
})
.execute();

return;
}

unsafe {
if let Some(prev_hook) = PREV_PROCESS_UTILITY_HOOK {
prev_hook(
p_stmt.into_pg(),
query_string.as_ptr(),
read_only_tree,
context,
params.into_pg(),
query_env.into_pg(),
dest,
completion_tag,
)
} else {
standard_ProcessUtility(
p_stmt.into_pg(),
query_string.as_ptr(),
read_only_tree,
context,
params.into_pg(),
query_env.into_pg(),
dest,
completion_tag,
)
}
}
}

0 comments on commit 9a62645

Please sign in to comment.