Skip to content

Commit

Permalink
Add pg{13,14,15} to support matrix
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Oct 25, 2024
1 parent c602f6b commit 10aceaa
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 143 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
postgres: [ 16, 17 ]
postgres: [ 14, 15, 16, 17 ]
env:
PG_MAJOR: ${{ matrix.postgres }}

Expand Down
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ path = "./src/bin/pgrx_embed.rs"

[features]
default = ["pg17"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17" ]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14"]
pg13 = ["pgrx/pg13", "pgrx-tests/pg13"]
pg_test = []

[dependencies]
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,11 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
> Any type that does not have a corresponding Parquet type will be represented, as a fallback mechanism, as `BYTE_ARRAY` with `STRING` logical type. e.g. `enum`
## Postgres Support Matrix
`pg_parquet` is tested with the following PostgreSQL versions:
`pg_parquet` supports the following PostgreSQL versions:
| PostgreSQL Major Version | Supported |
|--------------------------|-----------|
| 17 ||
| 13 ||
| 14 ||
| 15 ||
| 16 ||
| 17 ||
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
use pg_sys::{AsPgCStr, MarkGUCPrefixReserved};
use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved;
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};

mod arrow_parquet;
Expand Down Expand Up @@ -29,7 +29,7 @@ pub extern "C" fn _PG_init() {
GucFlags::default(),
);

unsafe { MarkGUCPrefixReserved("pg_parquet".as_pg_cstr()) };
MarkGUCPrefixReserved("pg_parquet");

init_parquet_copy_hook();
}
Expand Down
1 change: 1 addition & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_utils;
pub(crate) mod hook;
pub(crate) mod pg_compat;
35 changes: 18 additions & 17 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use pgrx::{
pg_sys::{
addNSItemToQuery, assign_expr_collations, canonicalize_qual, check_enable_rls,
coerce_to_boolean, eval_const_expressions, make_ands_implicit, transformExpr, AsPgCStr,
BeginCopyFrom, CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node,
Oid, ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
CheckEnableRlsResult, CopyFrom, CopyStmt, EndCopyFrom, InvalidOid, Node, Oid,
ParseExprKind, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
},
void_mut_ptr, PgBox, PgLogLevel, PgRelation, PgSqlErrorCode,
};
Expand All @@ -19,9 +19,12 @@ use crate::{
},
};

use super::copy_utils::{
copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state,
create_filtered_tupledesc_for_relation,
use super::{
copy_utils::{
copy_stmt_attribute_list, copy_stmt_create_namespace_item, copy_stmt_create_parse_state,
create_filtered_tupledesc_for_relation,
},
pg_compat::BeginCopyFrom,
};

// stack to store parquet reader contexts for COPY FROM.
Expand Down Expand Up @@ -104,47 +107,45 @@ extern "C" fn copy_parquet_data_to_buffer(
// 3. Calls the executor's CopyFrom function to read data from the parquet file and write
// it to the copy buffer.
pub(crate) fn execute_copy_from(
p_stmt: PgBox<PlannedStmt>,
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: PgBox<QueryEnvironment>,
query_env: &PgBox<QueryEnvironment>,
uri: Url,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(&p_stmt);
let rel_oid = copy_stmt_relation_oid(p_stmt);

copy_from_stmt_ensure_row_level_security(rel_oid);

let lock_mode = copy_stmt_lock_mode(&p_stmt);
let lock_mode = copy_stmt_lock_mode(p_stmt);

let relation = unsafe { PgRelation::with_lock(rel_oid, lock_mode) };

let p_state = copy_stmt_create_parse_state(query_string, &query_env);
let p_state = copy_stmt_create_parse_state(query_string, query_env);

let ns_item = copy_stmt_create_namespace_item(&p_stmt, &p_state, &relation);
let ns_item = copy_stmt_create_namespace_item(p_stmt, &p_state, &relation);

let mut where_clause = copy_from_stmt_where_clause(&p_stmt);
let mut where_clause = copy_from_stmt_where_clause(p_stmt);

if !where_clause.is_null() {
where_clause = copy_from_stmt_transform_where_clause(&p_state, &ns_item, where_clause);
}

let attribute_list = copy_stmt_attribute_list(&p_stmt);
let attribute_list = copy_stmt_attribute_list(p_stmt);

let tupledesc = create_filtered_tupledesc_for_relation(&p_stmt, &relation);
let tupledesc = create_filtered_tupledesc_for_relation(p_stmt, &relation);

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
let copy_options = copy_from_stmt_create_option_list(&p_stmt);
let copy_options = copy_from_stmt_create_option_list(p_stmt);

let copy_from_state = BeginCopyFrom(
p_state.as_ptr(),
relation.as_ptr(),
where_clause,
std::ptr::null(),
false,
Some(copy_parquet_data_to_buffer),
attribute_list,
copy_options.as_ptr(),
Expand Down
20 changes: 9 additions & 11 deletions src/parquet_copy_hook/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::ffi::{c_char, CStr};
use pgrx::{
ereport, is_a,
pg_sys::{
makeRangeVar, pg_analyze_and_rewrite_fixedparams, pg_plan_query, A_Star, ColumnRef,
CommandTag, CopyStmt, CreateNewPortal, DestReceiver, GetActiveSnapshot,
makeRangeVar, pg_plan_query, A_Star, ColumnRef, CommandTag, CopyStmt, CreateNewPortal,
DestReceiver, GetActiveSnapshot, Node,
NodeTag::{self, T_CopyStmt},
ParamListInfoData, PlannedStmt, PortalDefineQuery, PortalDrop, PortalRun, PortalStart,
QueryCompletion, QueryEnvironment, RawStmt, ResTarget, SelectStmt, CURSOR_OPT_PARALLEL_OK,
Expand All @@ -14,8 +14,9 @@ use pgrx::{
AllocatedByRust, PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode,
};

use crate::parquet_copy_hook::copy_utils::{
copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid,
use crate::parquet_copy_hook::{
copy_utils::{copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid},
pg_compat::pg_analyze_and_rewrite,
};

// execute_copy_to_with_dest_receiver executes a COPY TO statement with our custom DestReceiver
Expand All @@ -28,8 +29,8 @@ use crate::parquet_copy_hook::copy_utils::{
pub(crate) fn execute_copy_to_with_dest_receiver(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
params: PgBox<ParamListInfoData>,
query_env: PgBox<QueryEnvironment>,
params: &PgBox<ParamListInfoData>,
query_env: &PgBox<QueryEnvironment>,
parquet_dest: PgBox<DestReceiver>,
) -> u64 {
unsafe {
Expand All @@ -51,11 +52,9 @@ pub(crate) fn execute_copy_to_with_dest_receiver(

let raw_query = prepare_copy_to_raw_stmt(p_stmt, &copy_stmt, &relation);

let rewritten_queries = pg_analyze_and_rewrite_fixedparams(
let rewritten_queries = pg_analyze_and_rewrite(
raw_query.as_ptr(),
query_string.as_ptr(),
std::ptr::null_mut(),
0,
query_env.as_ptr(),
);

Expand Down Expand Up @@ -156,8 +155,7 @@ fn convert_copy_to_relation_to_select_stmt(
target_list.push(target.into_pg());
} else {
// SELECT a,b,... FROM relation
let attribute_name_list =
unsafe { PgList::<pgrx::pg_sys::String>::from_pg(copy_stmt.attlist) };
let attribute_name_list = unsafe { PgList::<Node>::from_pg(copy_stmt.attlist) };
for attribute_name in attribute_name_list.iter_ptr() {
let mut col_ref = unsafe { PgBox::<ColumnRef>::alloc_node(NodeTag::T_ColumnRef) };

Expand Down
18 changes: 7 additions & 11 deletions src/parquet_copy_hook/copy_to_dest_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::ffi::{c_char, CStr};
use std::ffi::{c_char, CStr, CString};

use pg_sys::{
get_typlenbyval, slot_getallattrs, toast_raw_datum_size, AllocSetContextCreateExtended,
AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DatumGetCString,
DestReceiver, HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete,
AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DestReceiver,
HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete,
MemoryContextReset, TupleDesc, TupleTableSlot, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, VARHDRSZ,
};
use pgrx::{prelude::*, PgList, PgMemoryContexts, PgTupleDesc};
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};

use crate::{
arrow_parquet::{
Expand Down Expand Up @@ -373,15 +373,11 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
(unsafe { toast_raw_datum_size(*column_datum) }) as i32 - VARHDRSZ as i32
} else if typlen == -2 {
// cstring
let cstring = unsafe { DatumGetCString(*column_datum) };

let cstring = unsafe {
CStr::from_ptr(cstring)
.to_str()
.expect("cstring is not a valid CString")
CString::from_datum(*column_datum, false)
.expect("cannot get cstring from datum")
};

cstring.len() as i32 + 1
cstring.as_bytes().len() as i32 + 1
} else {
// fixed size type
typlen as i32
Expand Down
12 changes: 4 additions & 8 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pgrx::{
pg_sys::{
addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name,
get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node,
NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry,
},
Expand All @@ -19,6 +19,8 @@ use crate::arrow_parquet::{
uri_utils::parse_uri,
};

use super::pg_compat::strVal;

pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri: &Url) {
validate_copy_option_names(
p_stmt,
Expand Down Expand Up @@ -444,13 +446,7 @@ fn copy_stmt_attribute_names(p_stmt: &PgBox<PlannedStmt>) -> Vec<String> {
unsafe {
PgList::from_pg(attribute_name_list)
.iter_ptr()
.map(|attribute_name: *mut pgrx::pg_sys::String| {
let attribute_name = PgBox::from_pg(attribute_name);
CStr::from_ptr(attribute_name.sval)
.to_str()
.expect("cannot get attribute name for copy from statement")
.to_string()
})
.map(|attribute_name: *mut Node| strVal(attribute_name))
.collect::<Vec<_>>()
}
}
Expand Down
Loading

0 comments on commit 10aceaa

Please sign in to comment.