Skip to content

Commit

Permalink
add warning when pg_parquet not created
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Dec 16, 2024
1 parent e2f3621 commit 199c832
Showing 1 changed file with 23 additions and 32 deletions.
55 changes: 23 additions & 32 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{ffi::CStr, str::FromStr};

use pgrx::{
is_a,
ereport, is_a,
pg_sys::{
addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name,
get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node,
NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry,
},
PgBox, PgList, PgRelation, PgTupleDesc,
PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode, PgTupleDesc,
};
use url::Url;

Expand Down Expand Up @@ -300,7 +300,7 @@ pub(crate) fn copy_stmt_get_option(
PgBox::null()
}

pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
fn is_copy_parquet_stmt(p_stmt: &PgBox<PlannedStmt>, copy_from: bool) -> bool {
// the GUC pg_parquet.enable_copy_hook must be set to true
if !ENABLE_PARQUET_COPY_HOOK.get() {
return false;
Expand All @@ -314,7 +314,7 @@ pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {

let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(p_stmt.utilityStmt as _) };

if copy_stmt.is_from {
if copy_from != copy_stmt.is_from {
return false;
}

Expand All @@ -334,44 +334,35 @@ pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {

// extension checks are done via catalog (not yet searched via cache by postgres till pg18)
// this is why we check them after the uri checks
extension_exists("pg_parquet") && !extension_exists("crunchy_query_engine")
}

pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
// the GUC pg_parquet.enable_copy_hook must be set to true
if !ENABLE_PARQUET_COPY_HOOK.get() {
return false;
}

let is_copy_stmt = unsafe { is_a(p_stmt.utilityStmt, T_CopyStmt) };

if !is_copy_stmt {
// crunchy_query_engine should not be created
if extension_exists("crunchy_query_engine") {
return false;
}

let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(p_stmt.utilityStmt as _) };
// pg_parquet should be created
if !extension_exists("pg_parquet") {
ereport!(
PgLogLevel::WARNING,
PgSqlErrorCode::ERRCODE_WARNING,
"pg_parquet can handle this COPY command but is not enabled",
"Run CREATE EXTENSION pg_parquet; to enable the pg_parquet extension.",
);

Check warning on line 350 in src/parquet_copy_hook/copy_utils.rs

View check run for this annotation

Codecov / codecov/patch

src/parquet_copy_hook/copy_utils.rs#L345-L350

Added lines #L345 - L350 were not covered by tests

if !copy_stmt.is_from {
return false;
}

if copy_stmt.is_program {
return false;
}

if copy_stmt.filename.is_null() {
return false;
}

let uri = copy_stmt_uri(p_stmt).expect("uri is None");
true
}

if !is_parquet_format_option(p_stmt) && !is_parquet_uri(uri) {
return false;
}
pub(crate) fn is_copy_to_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
let copy_from = false;
is_copy_parquet_stmt(p_stmt, copy_from)
}

// extension checks are done via catalog (not yet searched via cache by postgres till pg18)
// this is why we check them after the uri checks
extension_exists("pg_parquet") && !extension_exists("crunchy_query_engine")
pub(crate) fn is_copy_from_parquet_stmt(p_stmt: &PgBox<PlannedStmt>) -> bool {
let copy_from = true;
is_copy_parquet_stmt(p_stmt, copy_from)
}

fn is_parquet_format_option(p_stmt: &PgBox<PlannedStmt>) -> bool {
Expand Down

0 comments on commit 199c832

Please sign in to comment.