diff --git a/src/parquet_copy_hook/copy_utils.rs b/src/parquet_copy_hook/copy_utils.rs index 65118b3..48fa24f 100644 --- a/src/parquet_copy_hook/copy_utils.rs +++ b/src/parquet_copy_hook/copy_utils.rs @@ -1,7 +1,7 @@ use std::{ffi::CStr, str::FromStr}; use pgrx::{ - is_a, + ereport, is_a, pg_sys::{ addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name, get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier, @@ -9,7 +9,7 @@ use pgrx::{ NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry, }, - PgBox, PgList, PgRelation, PgTupleDesc, + PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode, PgTupleDesc, }; use url::Url; @@ -300,7 +300,7 @@ pub(crate) fn copy_stmt_get_option( PgBox::null() } -pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox) -> bool { +fn is_copy_parquet_stmt(p_stmt: &PgBox, copy_from: bool) -> bool { // the GUC pg_parquet.enable_copy_hook must be set to true if !ENABLE_PARQUET_COPY_HOOK.get() { return false; @@ -314,7 +314,7 @@ pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox) -> bool { let copy_stmt = unsafe { PgBox::::from_pg(p_stmt.utilityStmt as _) }; - if copy_stmt.is_from { + if copy_from != copy_stmt.is_from { return false; } @@ -334,44 +334,35 @@ pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox) -> bool { // extension checks are done via catalog (not yet searched via cache by postgres till pg18) // this is why we check them after the uri checks - extension_exists("pg_parquet") && !extension_exists("crunchy_query_engine") -} - -pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox) -> bool { - // the GUC pg_parquet.enable_copy_hook must be set to true - if !ENABLE_PARQUET_COPY_HOOK.get() { - return false; - } - - let is_copy_stmt = unsafe { is_a(p_stmt.utilityStmt, T_CopyStmt) }; - if !is_copy_stmt { + // crunchy_query_engine should not be created + if extension_exists("crunchy_query_engine") { return false; } - let copy_stmt = unsafe { PgBox::::from_pg(p_stmt.utilityStmt as _) }; + // pg_parquet should be created + if !extension_exists("pg_parquet") { + ereport!( + PgLogLevel::WARNING, + PgSqlErrorCode::ERRCODE_WARNING, + "pg_parquet can handle this COPY command but is not enabled", + "Run CREATE EXTENSION pg_parquet; to enable the pg_parquet extension.", + ); - if !copy_stmt.is_from { return false; } - if copy_stmt.is_program { - return false; - } - - if copy_stmt.filename.is_null() { - return false; - } - - let uri = copy_stmt_uri(p_stmt).expect("uri is None"); + true +} - if !is_parquet_format_option(p_stmt) && !is_parquet_uri(uri) { - return false; - } +pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox) -> bool { + let copy_from = false; + is_copy_parquet_stmt(p_stmt, copy_from) +} - // extension checks are done via catalog (not yet searched via cache by postgres till pg18) - // this is why we check them after the uri checks - extension_exists("pg_parquet") && !extension_exists("crunchy_query_engine") +pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox) -> bool { + let copy_from = true; + is_copy_parquet_stmt(p_stmt, copy_from) } fn is_parquet_format_option(p_stmt: &PgBox) -> bool {