From 164ad1448635a8562b13e68e7dfa3057daec80c3 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Thu, 24 Oct 2024 17:22:30 +0300 Subject: [PATCH 1/5] Add pg{13,14,15} to support matrix --- .github/workflows/ci.yml | 2 +- Cargo.toml | 7 +- README.md | 7 +- src/lib.rs | 4 +- src/parquet_copy_hook.rs | 1 + src/parquet_copy_hook/copy_from.rs | 35 +-- src/parquet_copy_hook/copy_to.rs | 20 +- .../copy_to_dest_receiver.rs | 18 +- src/parquet_copy_hook/copy_utils.rs | 12 +- src/parquet_copy_hook/hook.rs | 210 ++++++++++++------ src/parquet_copy_hook/pg_compat.rs | 153 +++++++++++++ src/type_compat/geometry.rs | 17 +- src/type_compat/pg_arrow_type_conversions.rs | 14 +- 13 files changed, 357 insertions(+), 143 deletions(-) create mode 100644 src/parquet_copy_hook/pg_compat.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0e8d3e6..1c16d30 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - postgres: [ 16, 17 ] + postgres: [ 13, 14, 15, 16, 17 ] env: PG_MAJOR: ${{ matrix.postgres }} diff --git a/Cargo.toml b/Cargo.toml index 2e170fa..b8c3557 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,11 @@ path = "./src/bin/pgrx_embed.rs" [features] default = ["pg17"] -pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ] -pg17 = ["pgrx/pg17", "pgrx-tests/pg17" ] +pg17 = ["pgrx/pg17", "pgrx-tests/pg17"] +pg16 = ["pgrx/pg16", "pgrx-tests/pg16"] +pg15 = ["pgrx/pg15", "pgrx-tests/pg15"] +pg14 = ["pgrx/pg14", "pgrx-tests/pg14"] +pg13 = ["pgrx/pg13", "pgrx-tests/pg13"] pg_test = [] [dependencies] diff --git a/README.md b/README.md index ae5b394..b1ac7b8 100644 --- a/README.md +++ b/README.md @@ -235,8 +235,11 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`: > Any type that does not have a corresponding Parquet type will be represented, as a fallback mechanism, as `BYTE_ARRAY` with `STRING` logical type. e.g. `enum` ## Postgres Support Matrix -`pg_parquet` is tested with the following PostgreSQL versions: +`pg_parquet` supports the following PostgreSQL versions: | PostgreSQL Major Version | Supported | |--------------------------|-----------| -| 17 | ✅ | +| 13 | ✅ | +| 14 | ✅ | +| 15 | ✅ | | 16 | ✅ | +| 17 | ✅ | diff --git a/src/lib.rs b/src/lib.rs index ce61540..daaf057 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK}; -use pg_sys::{AsPgCStr, MarkGUCPrefixReserved}; +use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved; use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry}; mod arrow_parquet; @@ -29,7 +29,7 @@ pub extern "C" fn _PG_init() { GucFlags::default(), ); - unsafe { MarkGUCPrefixReserved("pg_parquet".as_pg_cstr()) }; + MarkGUCPrefixReserved("pg_parquet"); init_parquet_copy_hook(); } diff --git a/src/parquet_copy_hook.rs b/src/parquet_copy_hook.rs index 5dafd4e..2fb3002 100644 --- a/src/parquet_copy_hook.rs +++ b/src/parquet_copy_hook.rs @@ -3,3 +3,4 @@ pub(crate) mod copy_to; pub(crate) mod copy_to_dest_receiver; pub(crate) mod copy_utils; pub(crate) mod hook; +pub(crate) mod pg_compat; diff --git a/src/parquet_copy_hook/copy_from.rs b/src/parquet_copy_hook/copy_from.rs index f1fdab9..15a077f 100644 --- a/src/parquet_copy_hook/copy_from.rs +++ b/src/parquet_copy_hook/copy_from.rs @@ -5,8 +5,8 @@ use pgrx::{ pg_sys::{ addNSItemToQuery, assign_expr_collations, canonicalize_qual, check_enable_rls, coerce_to_boolean, eval_const_expressions, make_ands_implicit, transformExpr, AsPgCStr, - BeginCopyFrom, CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node, - Oid, ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, + CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node, Oid, + ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, }, void_mut_ptr, PgBox, PgLogLevel, PgRelation, PgSqlErrorCode, }; @@ -19,9 +19,12 @@ use crate::{ }, }; -use super::copy_utils::{ - copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state, - create_filtered_tupledesc_for_relation, +use super::{ + copy_utils::{ + copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state, + create_filtered_tupledesc_for_relation, + }, + pg_compat::BeginCopyFrom, }; // stack to store parquet reader contexts for COPY FROM. @@ -104,32 +107,32 @@ extern "C" fn copy_parquet_data_to_buffer( // 3. Calls the executor's CopyFrom function to read data from the parquet file and write // it to the copy buffer. pub(crate) fn execute_copy_from( - p_stmt: PgBox, + p_stmt: &PgBox, query_string: &CStr, - query_env: PgBox, + query_env: &PgBox, uri: Url, ) -> u64 { - let rel_oid = copy_stmt_relation_oid(&p_stmt); + let rel_oid = copy_stmt_relation_oid(p_stmt); copy_from_stmt_ensure_row_level_security(rel_oid); - let lock_mode = copy_stmt_lock_mode(&p_stmt); + let lock_mode = copy_stmt_lock_mode(p_stmt); let relation = unsafe { PgRelation::with_lock(rel_oid, lock_mode) }; - let p_state = copy_stmt_create_parse_state(query_string, &query_env); + let p_state = copy_stmt_create_parse_state(query_string, query_env); - let ns_item = copy_stmt_create_namespace_item(&p_stmt, &p_state, &relation); + let ns_item = copy_stmt_create_namespace_item(p_stmt, &p_state, &relation); - let mut where_clause = copy_from_stmt_where_clause(&p_stmt); + let mut where_clause = copy_from_stmt_where_clause(p_stmt); if !where_clause.is_null() { where_clause = copy_from_stmt_transform_where_clause(&p_state, &ns_item, where_clause); } - let attribute_list = copy_stmt_attribute_list(&p_stmt); + let attribute_list = copy_stmt_attribute_list(p_stmt); - let tupledesc = create_filtered_tupledesc_for_relation(&p_stmt, &relation); + let tupledesc = create_filtered_tupledesc_for_relation(p_stmt, &relation); unsafe { // parquet reader context is used throughout the COPY FROM operation. @@ -137,14 +140,12 @@ pub(crate) fn execute_copy_from( push_parquet_reader_context(parquet_reader_context); // makes sure to set binary format - let copy_options = copy_from_stmt_create_option_list(&p_stmt); + let copy_options = copy_from_stmt_create_option_list(p_stmt); let copy_from_state = BeginCopyFrom( p_state.as_ptr(), relation.as_ptr(), where_clause, - std::ptr::null(), - false, Some(copy_parquet_data_to_buffer), attribute_list, copy_options.as_ptr(), diff --git a/src/parquet_copy_hook/copy_to.rs b/src/parquet_copy_hook/copy_to.rs index 1e8df78..d6ed382 100644 --- a/src/parquet_copy_hook/copy_to.rs +++ b/src/parquet_copy_hook/copy_to.rs @@ -3,8 +3,8 @@ use std::ffi::{c_char, CStr}; use pgrx::{ ereport, is_a, pg_sys::{ - makeRangeVar, pg_analyze_and_rewrite_fixedparams, pg_plan_query, A_Star, ColumnRef, - CommandTag, CopyStmt, CreateNewPortal, DestReceiver, GetActiveSnapshot, + makeRangeVar, pg_plan_query, A_Star, ColumnRef, CommandTag, CopyStmt, CreateNewPortal, + DestReceiver, GetActiveSnapshot, Node, NodeTag::{self, T_CopyStmt}, ParamListInfoData, PlannedStmt, PortalDefineQuery, PortalDrop, PortalRun, PortalStart, QueryCompletion, QueryEnvironment, RawStmt, ResTarget, SelectStmt, CURSOR_OPT_PARALLEL_OK, @@ -14,8 +14,9 @@ use pgrx::{ AllocatedByRust, PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode, }; -use crate::parquet_copy_hook::copy_utils::{ - copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid, +use crate::parquet_copy_hook::{ + copy_utils::{copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid}, + pg_compat::pg_analyze_and_rewrite, }; // execute_copy_to_with_dest_receiver executes a COPY TO statement with our custom DestReceiver @@ -28,8 +29,8 @@ use crate::parquet_copy_hook::copy_utils::{ pub(crate) fn execute_copy_to_with_dest_receiver( p_stmt: &PgBox, query_string: &CStr, - params: PgBox, - query_env: PgBox, + params: &PgBox, + query_env: &PgBox, parquet_dest: PgBox, ) -> u64 { unsafe { @@ -51,11 +52,9 @@ pub(crate) fn execute_copy_to_with_dest_receiver( let raw_query = prepare_copy_to_raw_stmt(p_stmt, ©_stmt, &relation); - let rewritten_queries = pg_analyze_and_rewrite_fixedparams( + let rewritten_queries = pg_analyze_and_rewrite( raw_query.as_ptr(), query_string.as_ptr(), - std::ptr::null_mut(), - 0, query_env.as_ptr(), ); @@ -156,8 +155,7 @@ fn convert_copy_to_relation_to_select_stmt( target_list.push(target.into_pg()); } else { // SELECT a,b,... FROM relation - let attribute_name_list = - unsafe { PgList::::from_pg(copy_stmt.attlist) }; + let attribute_name_list = unsafe { PgList::::from_pg(copy_stmt.attlist) }; for attribute_name in attribute_name_list.iter_ptr() { let mut col_ref = unsafe { PgBox::::alloc_node(NodeTag::T_ColumnRef) }; diff --git a/src/parquet_copy_hook/copy_to_dest_receiver.rs b/src/parquet_copy_hook/copy_to_dest_receiver.rs index 07fe504..c0f6308 100644 --- a/src/parquet_copy_hook/copy_to_dest_receiver.rs +++ b/src/parquet_copy_hook/copy_to_dest_receiver.rs @@ -1,13 +1,13 @@ -use std::ffi::{c_char, CStr}; +use std::ffi::{c_char, CStr, CString}; use pg_sys::{ get_typlenbyval, slot_getallattrs, toast_raw_datum_size, AllocSetContextCreateExtended, - AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DatumGetCString, - DestReceiver, HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete, + AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DestReceiver, + HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete, MemoryContextReset, TupleDesc, TupleTableSlot, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, VARHDRSZ, }; -use pgrx::{prelude::*, PgList, PgMemoryContexts, PgTupleDesc}; +use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc}; use crate::{ arrow_parquet::{ @@ -373,15 +373,11 @@ fn tuple_column_sizes(tuple_datums: &[Option], tupledesc: &PgTupleDesc) - (unsafe { toast_raw_datum_size(*column_datum) }) as i32 - VARHDRSZ as i32 } else if typlen == -2 { // cstring - let cstring = unsafe { DatumGetCString(*column_datum) }; - let cstring = unsafe { - CStr::from_ptr(cstring) - .to_str() - .expect("cstring is not a valid CString") + CString::from_datum(*column_datum, false) + .expect("cannot get cstring from datum") }; - - cstring.len() as i32 + 1 + cstring.as_bytes().len() as i32 + 1 } else { // fixed size type typlen as i32 diff --git a/src/parquet_copy_hook/copy_utils.rs b/src/parquet_copy_hook/copy_utils.rs index cb21671..068e95e 100644 --- a/src/parquet_copy_hook/copy_utils.rs +++ b/src/parquet_copy_hook/copy_utils.rs @@ -5,7 +5,7 @@ use pgrx::{ pg_sys::{ addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name, get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier, - AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, + AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node, NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry, }, @@ -19,6 +19,8 @@ use crate::arrow_parquet::{ uri_utils::parse_uri, }; +use super::pg_compat::strVal; + pub(crate) fn validate_copy_to_options(p_stmt: &PgBox, uri: &Url) { validate_copy_option_names( p_stmt, @@ -444,13 +446,7 @@ fn copy_stmt_attribute_names(p_stmt: &PgBox) -> Vec { unsafe { PgList::from_pg(attribute_name_list) .iter_ptr() - .map(|attribute_name: *mut pgrx::pg_sys::String| { - let attribute_name = PgBox::from_pg(attribute_name); - CStr::from_ptr(attribute_name.sval) - .to_str() - .expect("cannot get attribute name for copy from statement") - .to_string() - }) + .map(|attribute_name: *mut Node| strVal(attribute_name)) .collect::>() } } diff --git a/src/parquet_copy_hook/hook.rs b/src/parquet_copy_hook/hook.rs index 1329817..e5f6da9 100644 --- a/src/parquet_copy_hook/hook.rs +++ b/src/parquet_copy_hook/hook.rs @@ -43,6 +43,73 @@ pub(crate) extern "C" fn init_parquet_copy_hook() { } } +fn process_copy_to_parquet( + p_stmt: &PgBox, + query_string: &CStr, + params: &PgBox, + query_env: &PgBox, +) -> u64 { + let uri = copy_stmt_uri(p_stmt).expect("uri is None"); + + let copy_from = false; + ensure_access_privilege_to_uri(&uri, copy_from); + + validate_copy_to_options(p_stmt, &uri); + + let row_group_size = copy_to_stmt_row_group_size(p_stmt); + let row_group_size_bytes = copy_to_stmt_row_group_size_bytes(p_stmt); + let compression = copy_to_stmt_compression(p_stmt, uri.clone()); + let compression_level = copy_to_stmt_compression_level(p_stmt, uri.clone()); + + PgTryBuilder::new(|| { + let parquet_dest = create_copy_to_parquet_dest_receiver( + uri_as_string(&uri).as_pg_cstr(), + &row_group_size, + &row_group_size_bytes, + &compression, + &compression_level.unwrap_or(INVALID_COMPRESSION_LEVEL), + ); + + let parquet_dest = unsafe { PgBox::from_pg(parquet_dest) }; + + execute_copy_to_with_dest_receiver(p_stmt, query_string, params, query_env, parquet_dest) + }) + .catch_others(|cause| { + // make sure to pop the parquet writer context + // In case we did not push the context, we should not throw an error while popping + let throw_error = false; + pop_parquet_writer_context(throw_error); + + cause.rethrow() + }) + .execute() +} + +fn process_copy_from_parquet( + p_stmt: &PgBox, + query_string: &CStr, + query_env: &PgBox, +) -> u64 { + let uri = copy_stmt_uri(p_stmt).expect("uri is None"); + + let copy_from = true; + ensure_access_privilege_to_uri(&uri, copy_from); + + validate_copy_from_options(p_stmt); + + PgTryBuilder::new(|| execute_copy_from(p_stmt, query_string, query_env, uri)) + .catch_others(|cause| { + // make sure to pop the parquet reader context + // In case we did not push the context, we should not throw an error while popping + let throw_error = false; + pop_parquet_reader_context(throw_error); + + cause.rethrow() + }) + .execute() +} + +#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] #[pg_guard] #[allow(clippy::too_many_arguments)] extern "C" fn parquet_copy_hook( @@ -59,85 +126,86 @@ extern "C" fn parquet_copy_hook( let query_string = unsafe { CStr::from_ptr(query_string) }; let params = unsafe { PgBox::from_pg(params) }; let query_env = unsafe { PgBox::from_pg(query_env) }; + let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) }; if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) { - let uri = copy_stmt_uri(&p_stmt).expect("uri is None"); - let copy_from = false; - - ensure_access_privilege_to_uri(&uri, copy_from); - - validate_copy_to_options(&p_stmt, &uri); - - let row_group_size = copy_to_stmt_row_group_size(&p_stmt); - let row_group_size_bytes = copy_to_stmt_row_group_size_bytes(&p_stmt); - let compression = copy_to_stmt_compression(&p_stmt, uri.clone()); - let compression_level = copy_to_stmt_compression_level(&p_stmt, uri.clone()); - - PgTryBuilder::new(|| { - let parquet_dest = create_copy_to_parquet_dest_receiver( - uri_as_string(&uri).as_pg_cstr(), - &row_group_size, - &row_group_size_bytes, - &compression, - &compression_level.unwrap_or(INVALID_COMPRESSION_LEVEL), - ); - - let parquet_dest = unsafe { PgBox::from_pg(parquet_dest) }; - - let nprocessed = execute_copy_to_with_dest_receiver( - &p_stmt, - query_string, - params, - query_env, - parquet_dest, - ); - - let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) }; - - if !completion_tag.is_null() { - completion_tag.nprocessed = nprocessed; - completion_tag.commandTag = CommandTag::CMDTAG_COPY; - } - }) - .catch_others(|cause| { - // make sure to pop the parquet writer context - // In case we did not push the context, we should not throw an error while popping - let throw_error = false; - pop_parquet_writer_context(throw_error); - - cause.rethrow() - }) - .execute(); + let nprocessed = process_copy_to_parquet(&p_stmt, query_string, ¶ms, &query_env); + if !completion_tag.is_null() { + completion_tag.nprocessed = nprocessed; + completion_tag.commandTag = CommandTag::CMDTAG_COPY; + } return; } else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) { - let uri = copy_stmt_uri(&p_stmt).expect("uri is None"); - let copy_from = true; - - ensure_access_privilege_to_uri(&uri, copy_from); + let nprocessed = process_copy_from_parquet(&p_stmt, query_string, &query_env); - validate_copy_from_options(&p_stmt); + if !completion_tag.is_null() { + completion_tag.nprocessed = nprocessed; + completion_tag.commandTag = CommandTag::CMDTAG_COPY; + } + return; + } - PgTryBuilder::new(|| { - let nprocessed = execute_copy_from(p_stmt, query_string, query_env, uri); + unsafe { + if let Some(prev_hook) = PREV_PROCESS_UTILITY_HOOK { + prev_hook( + p_stmt.into_pg(), + query_string.as_ptr(), + read_only_tree, + context, + params.into_pg(), + query_env.into_pg(), + dest, + completion_tag.into_pg(), + ) + } else { + standard_ProcessUtility( + p_stmt.into_pg(), + query_string.as_ptr(), + read_only_tree, + context, + params.into_pg(), + query_env.into_pg(), + dest, + completion_tag.into_pg(), + ) + } + } +} - let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) }; +#[cfg(feature = "pg13")] +#[pg_guard] +#[allow(clippy::too_many_arguments)] +extern "C" fn parquet_copy_hook( + p_stmt: *mut PlannedStmt, + query_string: *const c_char, + context: u32, + params: *mut ParamListInfoData, + query_env: *mut QueryEnvironment, + dest: *mut DestReceiver, + completion_tag: *mut QueryCompletion, +) { + let p_stmt = unsafe { PgBox::from_pg(p_stmt) }; + let query_string = unsafe { CStr::from_ptr(query_string) }; + let params = unsafe { PgBox::from_pg(params) }; + let query_env = unsafe { PgBox::from_pg(query_env) }; + let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) }; - if !completion_tag.is_null() { - completion_tag.nprocessed = nprocessed; - completion_tag.commandTag = CommandTag::CMDTAG_COPY; - } - }) - .catch_others(|cause| { - // make sure to pop the parquet reader context - // In case we did not push the context, we should not throw an error while popping - let throw_error = false; - pop_parquet_reader_context(throw_error); + if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) { + let nprocessed = process_copy_to_parquet(&p_stmt, query_string, ¶ms, &query_env); - cause.rethrow() - }) - .execute(); + if !completion_tag.is_null() { + completion_tag.nprocessed = nprocessed; + completion_tag.commandTag = CommandTag::CMDTAG_COPY; + } + return; + } else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) { + let nprocessed = process_copy_from_parquet(&p_stmt, query_string, &query_env); + if !completion_tag.is_null() { + completion_tag.nprocessed = nprocessed; + completion_tag.commandTag = CommandTag::CMDTAG_COPY; + } return; } @@ -146,23 +214,21 @@ extern "C" fn parquet_copy_hook( prev_hook( p_stmt.into_pg(), query_string.as_ptr(), - read_only_tree, context, params.into_pg(), query_env.into_pg(), dest, - completion_tag, + completion_tag.into_pg(), ) } else { standard_ProcessUtility( p_stmt.into_pg(), query_string.as_ptr(), - read_only_tree, context, params.into_pg(), query_env.into_pg(), dest, - completion_tag, + completion_tag.into_pg(), ) } } diff --git a/src/parquet_copy_hook/pg_compat.rs b/src/parquet_copy_hook/pg_compat.rs new file mode 100644 index 0000000..12689c6 --- /dev/null +++ b/src/parquet_copy_hook/pg_compat.rs @@ -0,0 +1,153 @@ +use std::ffi::{c_char, CStr}; + +use pgrx::{ + datum::TimeWithTimeZone, + direct_function_call, + pg_sys::{ + copy_data_source_cb, AsPgCStr, List, Node, ParseState, QueryEnvironment, RawStmt, Relation, + }, + AnyNumeric, IntoDatum, +}; + +pub(crate) fn pg_analyze_and_rewrite( + raw_stmt: *mut RawStmt, + query_string: *const c_char, + query_env: *mut QueryEnvironment, +) -> *mut List { + #[cfg(any(feature = "pg13", feature = "pg14"))] + unsafe { + pgrx::pg_sys::pg_analyze_and_rewrite( + raw_stmt, + query_string, + std::ptr::null_mut(), + 0, + query_env, + ) + } + + #[cfg(any(feature = "pg15", feature = "pg16", feature = "pg17"))] + unsafe { + pgrx::pg_sys::pg_analyze_and_rewrite_fixedparams( + raw_stmt, + query_string, + std::ptr::null_mut(), + 0, + query_env, + ) + } +} + +#[allow(non_snake_case)] +pub(crate) fn strVal(val: *mut Node) -> String { + #[cfg(any(feature = "pg13", feature = "pg14"))] + unsafe { + let val = (*(val as *mut pgrx::pg_sys::Value)).val.str_; + + CStr::from_ptr(val) + .to_str() + .expect("invalid string") + .to_string() + } + + #[cfg(any(feature = "pg15", feature = "pg16", feature = "pg17"))] + unsafe { + let val = (*(val as *mut pgrx::pg_sys::String)).sval; + + CStr::from_ptr(val) + .to_str() + .expect("invalid string") + .to_string() + } +} + +#[cfg(feature = "pg13")] +#[allow(non_snake_case)] +pub(crate) fn BeginCopyFrom( + pstate: *mut ParseState, + relation: Relation, + _where_clause: *mut Node, + data_source_cb: copy_data_source_cb, + attribute_list: *mut List, + copy_options: *mut List, +) -> *mut pgrx::pg_sys::CopyStateData { + unsafe { + pgrx::pg_sys::BeginCopyFrom( + pstate, + relation, + std::ptr::null(), + false, + data_source_cb, + attribute_list, + copy_options, + ) + } +} + +#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] +#[allow(non_snake_case)] +pub(crate) fn BeginCopyFrom( + pstate: *mut ParseState, + relation: Relation, + _where_clause: *mut Node, + data_source_cb: copy_data_source_cb, + attribute_list: *mut List, + copy_options: *mut List, +) -> *mut pgrx::pg_sys::CopyFromStateData { + unsafe { + pgrx::pg_sys::BeginCopyFrom( + pstate, + relation, + _where_clause, + std::ptr::null(), + false, + data_source_cb, + attribute_list, + copy_options, + ) + } +} + +pub(crate) fn extract_timezone_from_timetz(timetz: TimeWithTimeZone) -> f64 { + #[cfg(feature = "pg13")] + { + let timezone_as_secs: f64 = unsafe { + direct_function_call( + pgrx::pg_sys::timetz_part, + &["timezone".into_datum(), timetz.into_datum()], + ) + } + .expect("cannot extract timezone from timetz"); + + timezone_as_secs + } + + #[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] + { + let timezone_as_secs: AnyNumeric = unsafe { + direct_function_call( + pgrx::pg_sys::extract_timetz, + &["timezone".into_datum(), timetz.into_datum()], + ) + } + .expect("cannot extract timezone from timetz"); + + let timezone_as_secs: f64 = timezone_as_secs + .try_into() + .unwrap_or_else(|e| panic!("{}", e)); + + timezone_as_secs + } +} + +#[allow(non_snake_case)] +pub(crate) fn MarkGUCPrefixReserved(guc_prefix: &str) { + #[cfg(any(feature = "pg13", feature = "pg14"))] + unsafe { + pgrx::pg_sys::EmitWarningsOnPlaceholders(guc_prefix.as_pg_cstr()) + } + + #[cfg(any(feature = "pg15", feature = "pg16", feature = "pg17"))] + unsafe { + pgrx::pg_sys::MarkGUCPrefixReserved(guc_prefix.as_pg_cstr()) + } +} diff --git a/src/type_compat/geometry.rs b/src/type_compat/geometry.rs index b8b2670..fbb3c25 100644 --- a/src/type_compat/geometry.rs +++ b/src/type_compat/geometry.rs @@ -4,11 +4,11 @@ use once_cell::sync::OnceCell; use pgrx::{ datum::UnboxDatum, pg_sys::{ - get_extension_oid, get_extension_schema, makeString, Anum_pg_type_oid, AsPgCStr, Datum, - GetSysCacheOid, InvalidOid, LookupFuncName, Oid, OidFunctionCall1Coll, - SysCacheIdentifier::TYPENAMENSP, BYTEAOID, + get_extension_oid, makeString, Anum_pg_type_oid, AsPgCStr, Datum, GetSysCacheOid, + InvalidOid, LookupFuncName, Oid, OidFunctionCall1Coll, SysCacheIdentifier::TYPENAMENSP, + BYTEAOID, }, - FromDatum, IntoDatum, PgList, + FromDatum, IntoDatum, PgList, Spi, }; // we need to reset the postgis context at each copy start @@ -56,8 +56,7 @@ impl PostgisContext { Some(postgis_ext_oid) }; - let postgis_ext_schema_oid = - postgis_ext_oid.map(|postgis_ext_oid| unsafe { get_extension_schema(postgis_ext_oid) }); + let postgis_ext_schema_oid = postgis_ext_oid.map(|_| Self::extension_schema_oid()); let st_asbinary_funcoid = postgis_ext_oid.map(|postgis_ext_oid| { Self::st_asbinary_funcoid( @@ -82,6 +81,12 @@ impl PostgisContext { } } + fn extension_schema_oid() -> Oid { + Spi::get_one("SELECT extnamespace FROM pg_extension WHERE extname = 'postgis'") + .expect("failed to get postgis extension schema") + .expect("postgis extension schema not found") + } + fn st_asbinary_funcoid(postgis_ext_oid: Oid, postgis_ext_schema_oid: Oid) -> Oid { unsafe { let postgis_geometry_typoid = diff --git a/src/type_compat/pg_arrow_type_conversions.rs b/src/type_compat/pg_arrow_type_conversions.rs index 9d9375f..02752c1 100644 --- a/src/type_compat/pg_arrow_type_conversions.rs +++ b/src/type_compat/pg_arrow_type_conversions.rs @@ -5,6 +5,8 @@ use pgrx::{ direct_function_call, pg_sys, AnyNumeric, IntoDatum, }; +use crate::parquet_copy_hook::pg_compat::extract_timezone_from_timetz; + pub(crate) const MAX_DECIMAL_PRECISION: usize = 38; pub(crate) fn date_to_i32(date: Date) -> i32 { @@ -128,17 +130,7 @@ pub(crate) fn i64_to_time(i64_time: i64) -> Time { } pub(crate) fn timetz_to_i64(timetz: TimeWithTimeZone) -> i64 { - let timezone_as_secs: AnyNumeric = unsafe { - direct_function_call( - pg_sys::extract_timetz, - &["timezone".into_datum(), timetz.into_datum()], - ) - } - .expect("cannot extract timezone from timetz"); - - let timezone_as_secs: f64 = timezone_as_secs - .try_into() - .unwrap_or_else(|e| panic!("{}", e)); + let timezone_as_secs = extract_timezone_from_timetz(timetz); let timezone_as_interval = Interval::from_seconds(timezone_as_secs); let adjusted_timetz: TimeWithTimeZone = unsafe { From 85f2e57fcb60c44021c9efbf5463029f7cce4d0e Mon Sep 17 00:00:00 2001 From: David Christensen Date: Fri, 25 Oct 2024 14:24:57 -0400 Subject: [PATCH 2/5] fix: use explicit pg_config path to major installed version --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c16d30..962b875 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,7 +91,7 @@ jobs: - name: Install and configure pgrx run: | cargo install --locked cargo-pgrx@0.12.6 - cargo pgrx init --pg${{ env.PG_MAJOR }} $(which pg_config) + cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config - name: Install cargo-llvm-cov for coverage report run: cargo install --locked cargo-llvm-cov@0.6.12 From 82a82f411670af1696b1617802fc1dc35c7afb3a Mon Sep 17 00:00:00 2001 From: David Christensen Date: Fri, 25 Oct 2024 14:41:11 -0400 Subject: [PATCH 3/5] fix: remove unneeded import for pg13 --- src/parquet_copy_hook/pg_compat.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/parquet_copy_hook/pg_compat.rs b/src/parquet_copy_hook/pg_compat.rs index 12689c6..d9fc8dd 100644 --- a/src/parquet_copy_hook/pg_compat.rs +++ b/src/parquet_copy_hook/pg_compat.rs @@ -6,9 +6,12 @@ use pgrx::{ pg_sys::{ copy_data_source_cb, AsPgCStr, List, Node, ParseState, QueryEnvironment, RawStmt, Relation, }, - AnyNumeric, IntoDatum, + IntoDatum, }; +#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] +use pgrx::AnyNumeric; + pub(crate) fn pg_analyze_and_rewrite( raw_stmt: *mut RawStmt, query_string: *const c_char, From 14c704faf348adc1d27356a0c25fe08d92747714 Mon Sep 17 00:00:00 2001 From: David Christensen Date: Fri, 25 Oct 2024 15:13:45 -0400 Subject: [PATCH 4/5] fix: missed path --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 962b875..814306a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,8 +104,8 @@ jobs: - name: Run tests run: | # Set up permissions so that the current user below can create extensions - sudo chmod a+rwx $(pg_config --pkglibdir) \ - $(pg_config --sharedir)/extension \ + sudo chmod a+rwx $(/usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config --pkglibdir) \ + $(/usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config --sharedir)/extension \ /var/run/postgresql/ # pgrx tests with runas argument ignores environment variables, so From 8a2d1cd33de8def54e573fb48c1e7a0dcc733991 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Fri, 25 Oct 2024 22:53:21 +0300 Subject: [PATCH 5/5] revert pg13 --- .github/workflows/ci.yml | 10 +- Cargo.toml | 1 - README.md | 1 - src/parquet_copy_hook/copy_from.rs | 15 ++- src/parquet_copy_hook/hook.rs | 62 ------------- src/parquet_copy_hook/pg_compat.rs | 97 +------------------- src/type_compat/pg_arrow_type_conversions.rs | 14 ++- 7 files changed, 27 insertions(+), 173 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 814306a..403e54c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - postgres: [ 13, 14, 15, 16, 17 ] + postgres: [ 14, 15, 16, 17 ] env: PG_MAJOR: ${{ matrix.postgres }} @@ -91,7 +91,7 @@ jobs: - name: Install and configure pgrx run: | cargo install --locked cargo-pgrx@0.12.6 - cargo pgrx init --pg${{ env.PG_MAJOR }} /usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config + cargo pgrx init --pg${{ env.PG_MAJOR }} $(which pg_config) - name: Install cargo-llvm-cov for coverage report run: cargo install --locked cargo-llvm-cov@0.6.12 @@ -104,9 +104,9 @@ jobs: - name: Run tests run: | # Set up permissions so that the current user below can create extensions - sudo chmod a+rwx $(/usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config --pkglibdir) \ - $(/usr/lib/postgresql/${{ env.PG_MAJOR }}/bin/pg_config --sharedir)/extension \ - /var/run/postgresql/ + sudo chmod a+rwx $(pg_config --pkglibdir) \ + $(pg_config --sharedir)/extension \ + /var/run/postgresql/ # pgrx tests with runas argument ignores environment variables, so # we read env vars from .env file in tests (https://github.com/pgcentralfoundation/pgrx/pull/1674) diff --git a/Cargo.toml b/Cargo.toml index b8c3557..09b056f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,6 @@ pg17 = ["pgrx/pg17", "pgrx-tests/pg17"] pg16 = ["pgrx/pg16", "pgrx-tests/pg16"] pg15 = ["pgrx/pg15", "pgrx-tests/pg15"] pg14 = ["pgrx/pg14", "pgrx-tests/pg14"] -pg13 = ["pgrx/pg13", "pgrx-tests/pg13"] pg_test = [] [dependencies] diff --git a/README.md b/README.md index b1ac7b8..7c679a1 100644 --- a/README.md +++ b/README.md @@ -238,7 +238,6 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`: `pg_parquet` supports the following PostgreSQL versions: | PostgreSQL Major Version | Supported | |--------------------------|-----------| -| 13 | ✅ | | 14 | ✅ | | 15 | ✅ | | 16 | ✅ | diff --git a/src/parquet_copy_hook/copy_from.rs b/src/parquet_copy_hook/copy_from.rs index 15a077f..bf3a878 100644 --- a/src/parquet_copy_hook/copy_from.rs +++ b/src/parquet_copy_hook/copy_from.rs @@ -5,8 +5,8 @@ use pgrx::{ pg_sys::{ addNSItemToQuery, assign_expr_collations, canonicalize_qual, check_enable_rls, coerce_to_boolean, eval_const_expressions, make_ands_implicit, transformExpr, AsPgCStr, - CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node, Oid, - ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, + BeginCopyFrom, CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node, + Oid, ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment, }, void_mut_ptr, PgBox, PgLogLevel, PgRelation, PgSqlErrorCode, }; @@ -19,12 +19,9 @@ use crate::{ }, }; -use super::{ - copy_utils::{ - copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state, - create_filtered_tupledesc_for_relation, - }, - pg_compat::BeginCopyFrom, +use super::copy_utils::{ + copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state, + create_filtered_tupledesc_for_relation, }; // stack to store parquet reader contexts for COPY FROM. @@ -146,6 +143,8 @@ pub(crate) fn execute_copy_from( p_state.as_ptr(), relation.as_ptr(), where_clause, + std::ptr::null(), + false, Some(copy_parquet_data_to_buffer), attribute_list, copy_options.as_ptr(), diff --git a/src/parquet_copy_hook/hook.rs b/src/parquet_copy_hook/hook.rs index e5f6da9..f533a56 100644 --- a/src/parquet_copy_hook/hook.rs +++ b/src/parquet_copy_hook/hook.rs @@ -109,7 +109,6 @@ fn process_copy_from_parquet( .execute() } -#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] #[pg_guard] #[allow(clippy::too_many_arguments)] extern "C" fn parquet_copy_hook( @@ -172,64 +171,3 @@ extern "C" fn parquet_copy_hook( } } } - -#[cfg(feature = "pg13")] -#[pg_guard] -#[allow(clippy::too_many_arguments)] -extern "C" fn parquet_copy_hook( - p_stmt: *mut PlannedStmt, - query_string: *const c_char, - context: u32, - params: *mut ParamListInfoData, - query_env: *mut QueryEnvironment, - dest: *mut DestReceiver, - completion_tag: *mut QueryCompletion, -) { - let p_stmt = unsafe { PgBox::from_pg(p_stmt) }; - let query_string = unsafe { CStr::from_ptr(query_string) }; - let params = unsafe { PgBox::from_pg(params) }; - let query_env = unsafe { PgBox::from_pg(query_env) }; - let mut completion_tag = unsafe { PgBox::from_pg(completion_tag) }; - - if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) { - let nprocessed = process_copy_to_parquet(&p_stmt, query_string, ¶ms, &query_env); - - if !completion_tag.is_null() { - completion_tag.nprocessed = nprocessed; - completion_tag.commandTag = CommandTag::CMDTAG_COPY; - } - return; - } else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) { - let nprocessed = process_copy_from_parquet(&p_stmt, query_string, &query_env); - - if !completion_tag.is_null() { - completion_tag.nprocessed = nprocessed; - completion_tag.commandTag = CommandTag::CMDTAG_COPY; - } - return; - } - - unsafe { - if let Some(prev_hook) = PREV_PROCESS_UTILITY_HOOK { - prev_hook( - p_stmt.into_pg(), - query_string.as_ptr(), - context, - params.into_pg(), - query_env.into_pg(), - dest, - completion_tag.into_pg(), - ) - } else { - standard_ProcessUtility( - p_stmt.into_pg(), - query_string.as_ptr(), - context, - params.into_pg(), - query_env.into_pg(), - dest, - completion_tag.into_pg(), - ) - } - } -} diff --git a/src/parquet_copy_hook/pg_compat.rs b/src/parquet_copy_hook/pg_compat.rs index d9fc8dd..1ff95eb 100644 --- a/src/parquet_copy_hook/pg_compat.rs +++ b/src/parquet_copy_hook/pg_compat.rs @@ -1,23 +1,13 @@ use std::ffi::{c_char, CStr}; -use pgrx::{ - datum::TimeWithTimeZone, - direct_function_call, - pg_sys::{ - copy_data_source_cb, AsPgCStr, List, Node, ParseState, QueryEnvironment, RawStmt, Relation, - }, - IntoDatum, -}; - -#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] -use pgrx::AnyNumeric; +use pgrx::pg_sys::{AsPgCStr, List, Node, QueryEnvironment, RawStmt}; pub(crate) fn pg_analyze_and_rewrite( raw_stmt: *mut RawStmt, query_string: *const c_char, query_env: *mut QueryEnvironment, ) -> *mut List { - #[cfg(any(feature = "pg13", feature = "pg14"))] + #[cfg(feature = "pg14")] unsafe { pgrx::pg_sys::pg_analyze_and_rewrite( raw_stmt, @@ -42,7 +32,7 @@ pub(crate) fn pg_analyze_and_rewrite( #[allow(non_snake_case)] pub(crate) fn strVal(val: *mut Node) -> String { - #[cfg(any(feature = "pg13", feature = "pg14"))] + #[cfg(feature = "pg14")] unsafe { let val = (*(val as *mut pgrx::pg_sys::Value)).val.str_; @@ -63,88 +53,9 @@ pub(crate) fn strVal(val: *mut Node) -> String { } } -#[cfg(feature = "pg13")] -#[allow(non_snake_case)] -pub(crate) fn BeginCopyFrom( - pstate: *mut ParseState, - relation: Relation, - _where_clause: *mut Node, - data_source_cb: copy_data_source_cb, - attribute_list: *mut List, - copy_options: *mut List, -) -> *mut pgrx::pg_sys::CopyStateData { - unsafe { - pgrx::pg_sys::BeginCopyFrom( - pstate, - relation, - std::ptr::null(), - false, - data_source_cb, - attribute_list, - copy_options, - ) - } -} - -#[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] -#[allow(non_snake_case)] -pub(crate) fn BeginCopyFrom( - pstate: *mut ParseState, - relation: Relation, - _where_clause: *mut Node, - data_source_cb: copy_data_source_cb, - attribute_list: *mut List, - copy_options: *mut List, -) -> *mut pgrx::pg_sys::CopyFromStateData { - unsafe { - pgrx::pg_sys::BeginCopyFrom( - pstate, - relation, - _where_clause, - std::ptr::null(), - false, - data_source_cb, - attribute_list, - copy_options, - ) - } -} - -pub(crate) fn extract_timezone_from_timetz(timetz: TimeWithTimeZone) -> f64 { - #[cfg(feature = "pg13")] - { - let timezone_as_secs: f64 = unsafe { - direct_function_call( - pgrx::pg_sys::timetz_part, - &["timezone".into_datum(), timetz.into_datum()], - ) - } - .expect("cannot extract timezone from timetz"); - - timezone_as_secs - } - - #[cfg(any(feature = "pg14", feature = "pg15", feature = "pg16", feature = "pg17"))] - { - let timezone_as_secs: AnyNumeric = unsafe { - direct_function_call( - pgrx::pg_sys::extract_timetz, - &["timezone".into_datum(), timetz.into_datum()], - ) - } - .expect("cannot extract timezone from timetz"); - - let timezone_as_secs: f64 = timezone_as_secs - .try_into() - .unwrap_or_else(|e| panic!("{}", e)); - - timezone_as_secs - } -} - #[allow(non_snake_case)] pub(crate) fn MarkGUCPrefixReserved(guc_prefix: &str) { - #[cfg(any(feature = "pg13", feature = "pg14"))] + #[cfg(feature = "pg14")] unsafe { pgrx::pg_sys::EmitWarningsOnPlaceholders(guc_prefix.as_pg_cstr()) } diff --git a/src/type_compat/pg_arrow_type_conversions.rs b/src/type_compat/pg_arrow_type_conversions.rs index 02752c1..9d9375f 100644 --- a/src/type_compat/pg_arrow_type_conversions.rs +++ b/src/type_compat/pg_arrow_type_conversions.rs @@ -5,8 +5,6 @@ use pgrx::{ direct_function_call, pg_sys, AnyNumeric, IntoDatum, }; -use crate::parquet_copy_hook::pg_compat::extract_timezone_from_timetz; - pub(crate) const MAX_DECIMAL_PRECISION: usize = 38; pub(crate) fn date_to_i32(date: Date) -> i32 { @@ -130,7 +128,17 @@ pub(crate) fn i64_to_time(i64_time: i64) -> Time { } pub(crate) fn timetz_to_i64(timetz: TimeWithTimeZone) -> i64 { - let timezone_as_secs = extract_timezone_from_timetz(timetz); + let timezone_as_secs: AnyNumeric = unsafe { + direct_function_call( + pg_sys::extract_timetz, + &["timezone".into_datum(), timetz.into_datum()], + ) + } + .expect("cannot extract timezone from timetz"); + + let timezone_as_secs: f64 = timezone_as_secs + .try_into() + .unwrap_or_else(|e| panic!("{}", e)); let timezone_as_interval = Interval::from_seconds(timezone_as_secs); let adjusted_timetz: TimeWithTimeZone = unsafe {