Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pg{14,15} to support matrix #57

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
postgres: [ 16, 17 ]
postgres: [ 14, 15, 16, 17 ]
env:
PG_MAJOR: ${{ matrix.postgres }}

Expand Down Expand Up @@ -104,9 +104,9 @@ jobs:
- name: Run tests
run: |
# Set up permissions so that the current user below can create extensions
sudo chmod a+rwx $(pg_config --pkglibdir) \
$(pg_config --sharedir)/extension \
/var/run/postgresql/
sudo chmod a+rwx $(pg_config --pkglibdir) \
$(pg_config --sharedir)/extension \
/var/run/postgresql/

# pgrx tests with runas argument ignores environment variables, so
# we read env vars from .env file in tests (https://github.com/pgcentralfoundation/pgrx/pull/1674)
Expand Down
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ path = "./src/bin/pgrx_embed.rs"

[features]
default = ["pg17"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17" ]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17"]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16"]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14"]
pg_test = []

[dependencies]
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,10 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
> Any type that does not have a corresponding Parquet type will be represented, as a fallback mechanism, as `BYTE_ARRAY` with `STRING` logical type. e.g. `enum`

## Postgres Support Matrix
`pg_parquet` is tested with the following PostgreSQL versions:
`pg_parquet` supports the following PostgreSQL versions:
| PostgreSQL Major Version | Supported |
|--------------------------|-----------|
| 17 | ✅ |
| 14 | ✅ |
| 15 | ✅ |
| 16 | ✅ |
| 17 | ✅ |
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
use pg_sys::{AsPgCStr, MarkGUCPrefixReserved};
use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved;
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};

mod arrow_parquet;
Expand Down Expand Up @@ -29,7 +29,7 @@ pub extern "C" fn _PG_init() {
GucFlags::default(),
);

unsafe { MarkGUCPrefixReserved("pg_parquet".as_pg_cstr()) };
MarkGUCPrefixReserved("pg_parquet");

init_parquet_copy_hook();
}
Expand Down
1 change: 1 addition & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_utils;
pub(crate) mod hook;
pub(crate) mod pg_compat;
20 changes: 10 additions & 10 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,40 +104,40 @@ extern "C" fn copy_parquet_data_to_buffer(
// 3. Calls the executor's CopyFrom function to read data from the parquet file and write
// it to the copy buffer.
pub(crate) fn execute_copy_from(
p_stmt: PgBox<PlannedStmt>,
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
query_env: PgBox<QueryEnvironment>,
query_env: &PgBox<QueryEnvironment>,
uri: Url,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(&p_stmt);
let rel_oid = copy_stmt_relation_oid(p_stmt);

copy_from_stmt_ensure_row_level_security(rel_oid);

let lock_mode = copy_stmt_lock_mode(&p_stmt);
let lock_mode = copy_stmt_lock_mode(p_stmt);

let relation = unsafe { PgRelation::with_lock(rel_oid, lock_mode) };

let p_state = copy_stmt_create_parse_state(query_string, &query_env);
let p_state = copy_stmt_create_parse_state(query_string, query_env);

let ns_item = copy_stmt_create_namespace_item(&p_stmt, &p_state, &relation);
let ns_item = copy_stmt_create_namespace_item(p_stmt, &p_state, &relation);

let mut where_clause = copy_from_stmt_where_clause(&p_stmt);
let mut where_clause = copy_from_stmt_where_clause(p_stmt);

if !where_clause.is_null() {
where_clause = copy_from_stmt_transform_where_clause(&p_state, &ns_item, where_clause);
}

let attribute_list = copy_stmt_attribute_list(&p_stmt);
let attribute_list = copy_stmt_attribute_list(p_stmt);

let tupledesc = create_filtered_tupledesc_for_relation(&p_stmt, &relation);
let tupledesc = create_filtered_tupledesc_for_relation(p_stmt, &relation);

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
let copy_options = copy_from_stmt_create_option_list(&p_stmt);
let copy_options = copy_from_stmt_create_option_list(p_stmt);

let copy_from_state = BeginCopyFrom(
p_state.as_ptr(),
Expand Down
20 changes: 9 additions & 11 deletions src/parquet_copy_hook/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::ffi::{c_char, CStr};
use pgrx::{
ereport, is_a,
pg_sys::{
makeRangeVar, pg_analyze_and_rewrite_fixedparams, pg_plan_query, A_Star, ColumnRef,
CommandTag, CopyStmt, CreateNewPortal, DestReceiver, GetActiveSnapshot,
makeRangeVar, pg_plan_query, A_Star, ColumnRef, CommandTag, CopyStmt, CreateNewPortal,
DestReceiver, GetActiveSnapshot, Node,
NodeTag::{self, T_CopyStmt},
ParamListInfoData, PlannedStmt, PortalDefineQuery, PortalDrop, PortalRun, PortalStart,
QueryCompletion, QueryEnvironment, RawStmt, ResTarget, SelectStmt, CURSOR_OPT_PARALLEL_OK,
Expand All @@ -14,8 +14,9 @@ use pgrx::{
AllocatedByRust, PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode,
};

use crate::parquet_copy_hook::copy_utils::{
copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid,
use crate::parquet_copy_hook::{
copy_utils::{copy_stmt_has_relation, copy_stmt_lock_mode, copy_stmt_relation_oid},
pg_compat::pg_analyze_and_rewrite,
};

// execute_copy_to_with_dest_receiver executes a COPY TO statement with our custom DestReceiver
Expand All @@ -28,8 +29,8 @@ use crate::parquet_copy_hook::copy_utils::{
pub(crate) fn execute_copy_to_with_dest_receiver(
p_stmt: &PgBox<PlannedStmt>,
query_string: &CStr,
params: PgBox<ParamListInfoData>,
query_env: PgBox<QueryEnvironment>,
params: &PgBox<ParamListInfoData>,
query_env: &PgBox<QueryEnvironment>,
parquet_dest: PgBox<DestReceiver>,
) -> u64 {
unsafe {
Expand All @@ -51,11 +52,9 @@ pub(crate) fn execute_copy_to_with_dest_receiver(

let raw_query = prepare_copy_to_raw_stmt(p_stmt, &copy_stmt, &relation);

let rewritten_queries = pg_analyze_and_rewrite_fixedparams(
let rewritten_queries = pg_analyze_and_rewrite(
raw_query.as_ptr(),
query_string.as_ptr(),
std::ptr::null_mut(),
0,
query_env.as_ptr(),
);

Expand Down Expand Up @@ -156,8 +155,7 @@ fn convert_copy_to_relation_to_select_stmt(
target_list.push(target.into_pg());
} else {
// SELECT a,b,... FROM relation
let attribute_name_list =
unsafe { PgList::<pgrx::pg_sys::String>::from_pg(copy_stmt.attlist) };
let attribute_name_list = unsafe { PgList::<Node>::from_pg(copy_stmt.attlist) };
for attribute_name in attribute_name_list.iter_ptr() {
let mut col_ref = unsafe { PgBox::<ColumnRef>::alloc_node(NodeTag::T_ColumnRef) };

Expand Down
18 changes: 7 additions & 11 deletions src/parquet_copy_hook/copy_to_dest_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::ffi::{c_char, CStr};
use std::ffi::{c_char, CStr, CString};

use pg_sys::{
get_typlenbyval, slot_getallattrs, toast_raw_datum_size, AllocSetContextCreateExtended,
AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DatumGetCString,
DestReceiver, HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete,
AsPgCStr, BlessTupleDesc, CommandDest, CurrentMemoryContext, Datum, DestReceiver,
HeapTupleData, List, MemoryContext, MemoryContextAllocZero, MemoryContextDelete,
MemoryContextReset, TupleDesc, TupleTableSlot, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, VARHDRSZ,
};
use pgrx::{prelude::*, PgList, PgMemoryContexts, PgTupleDesc};
use pgrx::{prelude::*, FromDatum, PgList, PgMemoryContexts, PgTupleDesc};

use crate::{
arrow_parquet::{
Expand Down Expand Up @@ -373,15 +373,11 @@ fn tuple_column_sizes(tuple_datums: &[Option<Datum>], tupledesc: &PgTupleDesc) -
(unsafe { toast_raw_datum_size(*column_datum) }) as i32 - VARHDRSZ as i32
} else if typlen == -2 {
// cstring
let cstring = unsafe { DatumGetCString(*column_datum) };

let cstring = unsafe {
CStr::from_ptr(cstring)
.to_str()
.expect("cstring is not a valid CString")
CString::from_datum(*column_datum, false)
.expect("cannot get cstring from datum")
};

cstring.len() as i32 + 1
cstring.as_bytes().len() as i32 + 1
} else {
// fixed size type
typlen as i32
Expand Down
12 changes: 4 additions & 8 deletions src/parquet_copy_hook/copy_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pgrx::{
pg_sys::{
addRangeTableEntryForRelation, defGetInt32, defGetInt64, defGetString, get_namespace_name,
get_rel_namespace, makeDefElem, makeString, make_parsestate, quote_qualified_identifier,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock,
AccessShareLock, AsPgCStr, CopyStmt, CreateTemplateTupleDesc, DefElem, List, NoLock, Node,
NodeTag::T_CopyStmt, Oid, ParseNamespaceItem, ParseState, PlannedStmt, QueryEnvironment,
RangeVar, RangeVarGetRelidExtended, RowExclusiveLock, TupleDescInitEntry,
},
Expand All @@ -19,6 +19,8 @@ use crate::arrow_parquet::{
uri_utils::parse_uri,
};

use super::pg_compat::strVal;

pub(crate) fn validate_copy_to_options(p_stmt: &PgBox<PlannedStmt>, uri: &Url) {
validate_copy_option_names(
p_stmt,
Expand Down Expand Up @@ -444,13 +446,7 @@ fn copy_stmt_attribute_names(p_stmt: &PgBox<PlannedStmt>) -> Vec<String> {
unsafe {
PgList::from_pg(attribute_name_list)
.iter_ptr()
.map(|attribute_name: *mut pgrx::pg_sys::String| {
let attribute_name = PgBox::from_pg(attribute_name);
CStr::from_ptr(attribute_name.sval)
.to_str()
.expect("cannot get attribute name for copy from statement")
.to_string()
})
.map(|attribute_name: *mut Node| strVal(attribute_name))
.collect::<Vec<_>>()
}
}
Expand Down
Loading
Loading