Skip to content

Commit

Permalink
add GUC to enable/disable parquet copy hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Sep 11, 2024
1 parent a614c8f commit 83fb936
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl<'a> ParquetWriterContext<'a> {
}

pub(crate) fn close(self) {
self.runtime.block_on(self.parquet_writer.close()).unwrap();
// should not panic as we can call from try catch block
self.runtime.block_on(self.parquet_writer.close()).ok();
}
}
23 changes: 21 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet_copy_hook::hook::init_parquet_copy_hook;
use pgrx::prelude::*;
use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
use pg_sys::MarkGUCPrefixReserved;
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};

mod arrow_parquet;
mod parquet_copy_hook;
Expand All @@ -19,6 +20,17 @@ pgrx::pg_module_magic!();
#[allow(static_mut_refs)]
#[pg_guard]
pub extern "C" fn _PG_init() {
GucRegistry::define_bool_guc(
"pg_parquet.enable_copy_hooks",
"Enable parquet copy hooks",
"Enable parquet copy hooks",
&ENABLE_PARQUET_COPY_HOOK,
GucContext::Userset,
GucFlags::default(),
);

unsafe { MarkGUCPrefixReserved("pg_parquet".as_ptr() as _) };

init_parquet_copy_hook();
}

Expand Down Expand Up @@ -2004,6 +2016,13 @@ mod tests {

Spi::run("DROP TABLE workers; DROP TYPE worker, person;").unwrap();
}

#[pg_test]
#[should_panic(expected = "relative path not allowed for COPY to file")]
fn test_disabled_hooks() {
Spi::run("SET pg_parquet.enable_copy_hooks TO false;").unwrap();
Spi::run("COPY (SELECT 1 as id) TO 'file:///tmp/test.parquet'").unwrap();
}
}

/// This module is required by `cargo pgrx test` invocations.
Expand Down
12 changes: 2 additions & 10 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,7 @@ pub(crate) fn is_copy_to_parquet_stmt(pstmt: &PgBox<pg_sys::PlannedStmt>) -> boo
return false;
}

if is_parquet_format(&copy_stmt) {
return true;
}

is_parquet_file(&copy_stmt)
is_parquet_format(&copy_stmt) || is_parquet_file(&copy_stmt)
}

pub(crate) fn is_copy_from_parquet_stmt(pstmt: &PgBox<pg_sys::PlannedStmt>) -> bool {
Expand All @@ -242,11 +238,7 @@ pub(crate) fn is_copy_from_parquet_stmt(pstmt: &PgBox<pg_sys::PlannedStmt>) -> b
return false;
}

if is_parquet_format(&copy_stmt) {
return true;
}

is_parquet_file(&copy_stmt)
is_parquet_format(&copy_stmt) || is_parquet_file(&copy_stmt)
}

pub(crate) fn copy_has_relation(pstmt: &PgBox<pg_sys::PlannedStmt>) -> bool {
Expand Down
8 changes: 5 additions & 3 deletions src/parquet_copy_hook/hook.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ffi::CStr;

use pg_sys::{standard_ProcessUtility, CommandTag, ProcessUtility_hook, ProcessUtility_hook_type};
use pgrx::prelude::*;
use pgrx::{prelude::*, GucSetting};

use crate::parquet_copy_hook::{
copy_to_dest_receiver::create_copy_to_parquet_dest_receiver,
Expand All @@ -18,6 +18,8 @@ use super::{
copy_utils::{copy_stmt_codec, validate_copy_from_options, validate_copy_to_options},
};

pub(crate) static ENABLE_PARQUET_COPY_HOOK: GucSetting<bool> = GucSetting::<bool>::new(true);

static mut PREV_PROCESS_UTILITY_HOOK: ProcessUtility_hook_type = None;

#[pg_guard]
Expand Down Expand Up @@ -49,7 +51,7 @@ extern "C" fn parquet_copy_hook(
let params = unsafe { PgBox::from_pg(params) };
let query_env = unsafe { PgBox::from_pg(query_env) };

if is_copy_to_parquet_stmt(&pstmt) {
if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&pstmt) {
validate_copy_to_options(&pstmt);

let filename = copy_stmt_filename(&pstmt);
Expand Down Expand Up @@ -86,7 +88,7 @@ extern "C" fn parquet_copy_hook(
.execute();

return;
} else if is_copy_from_parquet_stmt(&pstmt) {
} else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&pstmt) {
validate_copy_from_options(&pstmt);

PgTryBuilder::new(|| {
Expand Down

0 comments on commit 83fb936

Please sign in to comment.