Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COPY FROM api #31

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/parquet_copy_hook.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod copy_from;
pub(crate) mod copy_to;
pub(crate) mod copy_to_dest_receiver;
pub(crate) mod copy_utils;
276 changes: 276 additions & 0 deletions src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
use pgrx::{
ereport, pg_guard,
pg_sys::{
self, addNSItemToQuery, addRangeTableEntryForRelation, assign_expr_collations,
canonicalize_qual, coerce_to_boolean, eval_const_expressions, make_ands_explicit,
make_parsestate, transformExpr, AccessShareLock, AsPgCStr, BeginCopyFrom, CopyFrom,
CopyStmt, CreateTemplateTupleDesc, EndCopyFrom, List, ParseExprKind, ParseNamespaceItem,
ParseState, RowExclusiveLock, TupleDescInitEntry,
},
void_mut_ptr, PgBox, PgList, PgLogLevel, PgRelation, PgSqlErrorCode, PgTupleDesc,
};
use url::Url;

use crate::{
arrow_parquet::parquet_reader::ParquetReaderContext,
parquet_copy_hook::copy_utils::{
copy_from_stmt_add_or_update_binary_format_option, copy_stmt_lock_mode,
copy_stmt_relation_oid, is_copy_from_stmt,
},
};

// stack to store parquet reader contexts for COPY FROM.
// This needs to be a stack since COPY command can be nested.
static mut PARQUET_READER_CONTEXT_STACK: Vec<ParquetReaderContext> = vec![];

pub(crate) fn peek_parquet_reader_context() -> Option<&'static mut ParquetReaderContext> {
unsafe { PARQUET_READER_CONTEXT_STACK.last_mut() }
}

pub(crate) fn pop_parquet_reader_context(throw_error: bool) {
let mut current_parquet_reader_context = unsafe { PARQUET_READER_CONTEXT_STACK.pop() };

if current_parquet_reader_context.is_none() {
let level = if throw_error {
PgLogLevel::ERROR
} else {
PgLogLevel::DEBUG2
};

ereport!(
level,
PgSqlErrorCode::ERRCODE_INTERNAL_ERROR,
"parquet reader context stack is already empty"
);
} else {
current_parquet_reader_context.take();
}
}

pub(crate) fn push_parquet_reader_context(reader_ctx: ParquetReaderContext) {
unsafe { PARQUET_READER_CONTEXT_STACK.push(reader_ctx) };
}

// This function is called by the COPY FROM command to read data from the parquet file into the copy buffer.
#[pg_guard]
extern "C" fn copy_received_parquet_data_to_buffer(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe received->receive

outbuf: void_mut_ptr,
_minread: i32,
maxread: i32,
) -> i32 {
let current_parquet_reader_context = peek_parquet_reader_context();
if current_parquet_reader_context.is_none() {
panic!("parquet reader context is not found");
}
let current_parquet_reader_context =
current_parquet_reader_context.expect("current parquet reader context is not found");

let mut bytes_in_buffer = current_parquet_reader_context.bytes_in_buffer();

if bytes_in_buffer == 0 {
current_parquet_reader_context.reset_buffer();

if !current_parquet_reader_context.read_parquet() {
return 0;
}

bytes_in_buffer = current_parquet_reader_context.bytes_in_buffer();
}

let bytes_to_copy = std::cmp::min(maxread as usize, bytes_in_buffer);
current_parquet_reader_context.copy_to_outbuf(bytes_to_copy, outbuf);

bytes_to_copy as _
}

pub(crate) fn execute_copy_from(
Copy link
Collaborator

@marcoslot marcoslot Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is especially wild, could use a comment on what's going on here, and e.g. what copy_received_parquet_data_to_buffer is for / why we need it

pstmt: PgBox<pg_sys::PlannedStmt>,
query_string: &core::ffi::CStr,
query_env: PgBox<pg_sys::QueryEnvironment>,
uri: Url,
) -> u64 {
let rel_oid = copy_stmt_relation_oid(&pstmt);
let lock_mode = copy_stmt_lock_mode(&pstmt);
let relation = unsafe { PgRelation::with_lock(rel_oid, lock_mode) };

let pstate = create_parse_state(query_string, &query_env);

let nsitem = copy_ns_item(&pstate, &pstmt, &relation);

let where_clause = copy_from_where_clause(&pstmt);
let mut where_clause = unsafe { PgBox::from_pg(where_clause) };
if !where_clause.is_null() {
where_clause = copy_from_transform_where_clause(&pstate, &nsitem, &where_clause);
}

let attlist = copy_attlist(&pstmt);
let attnamelist = copy_attnames(&pstmt);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this lo longer correspond to postgres code, can probably afford to just write attribute_name_list to make it easier to digest, e.g. I like the verbosity of the function above


let tupledesc = filter_tupledesc_for_relation(&relation, attnamelist);

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
push_parquet_reader_context(parquet_reader_context);

let copy_options = copy_from_stmt_add_or_update_binary_format_option(&pstmt);

let copy_from_state = BeginCopyFrom(
pstate.as_ptr(),
relation.as_ptr(),
where_clause.as_ptr(),
std::ptr::null(),
false,
Some(copy_received_parquet_data_to_buffer),
attlist,
copy_options.as_ptr(),
);

let nprocessed = CopyFrom(copy_from_state);

EndCopyFrom(copy_from_state);

let throw_error = true;
pop_parquet_reader_context(throw_error);

nprocessed
}
}

fn copy_from_where_clause(pstmt: &PgBox<pg_sys::PlannedStmt>) -> *mut pg_sys::Node {
let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(pstmt.utilityStmt as _) };
copy_stmt.whereClause
}

fn copy_from_transform_where_clause(
pstate: &PgBox<pg_sys::ParseState>,
nsitem: &PgBox<ParseNamespaceItem>,
where_clause: &PgBox<pg_sys::Node>,
) -> PgBox<pg_sys::Node> {
unsafe { addNSItemToQuery(pstate.as_ptr(), nsitem.as_ptr(), false, true, true) };

let where_clause = unsafe {
transformExpr(
pstate.as_ptr(),
where_clause.as_ptr(),
ParseExprKind::EXPR_KIND_COPY_WHERE,
)
};

let construct = std::ffi::CString::new("WHERE").expect("CString::new failed");
let where_clause =
unsafe { coerce_to_boolean(pstate.as_ptr(), where_clause, construct.as_ptr()) };

unsafe { assign_expr_collations(pstate.as_ptr(), where_clause) };

let where_clause = unsafe { eval_const_expressions(std::ptr::null_mut(), where_clause) };

let expr = unsafe { canonicalize_qual(where_clause as _, false) };

let mut expr_list = PgList::new();
expr_list.push(expr);

let expr = unsafe { make_ands_explicit(expr_list.as_ptr()) };

unsafe { PgBox::from_pg(expr as _) }
}

fn copy_attlist(pstmt: &PgBox<pg_sys::PlannedStmt>) -> *mut List {
let copy_stmt = unsafe { PgBox::<CopyStmt>::from_pg(pstmt.utilityStmt as _) };
copy_stmt.attlist
}

fn copy_attnames(pstmt: &PgBox<pg_sys::PlannedStmt>) -> Vec<String> {
let attnamelist = copy_attlist(pstmt);
unsafe {
PgList::from_pg(attnamelist)
.iter_ptr()
.map(|ptr| {
{ std::ffi::CStr::from_ptr(*ptr) }
.to_str()
.expect("attnamelist is not a valid C string")
.to_string()
})
.collect::<Vec<_>>()
}
}

fn create_parse_state(
query_string: &core::ffi::CStr,
query_env: &PgBox<pg_sys::QueryEnvironment>,
) -> PgBox<ParseState> {
/* construct a parse state similar to standard_ProcessUtility */
let pstate: *mut ParseState = unsafe { make_parsestate(std::ptr::null_mut()) };
let mut pstate = unsafe { PgBox::from_pg(pstate) };
pstate.p_sourcetext = query_string.as_ptr();
pstate.p_queryEnv = query_env.as_ptr();
pstate
}

fn copy_ns_item(
pstate: &PgBox<ParseState>,
pstmt: &PgBox<pg_sys::PlannedStmt>,
relation: &PgRelation,
) -> PgBox<ParseNamespaceItem> {
let is_copy_from = is_copy_from_stmt(pstmt);

let nsitem = unsafe {
addRangeTableEntryForRelation(
pstate.as_ptr(),
relation.as_ptr(),
if is_copy_from {
RowExclusiveLock as _
} else {
AccessShareLock as _
},
std::ptr::null_mut(),
false,
false,
)
};
unsafe { PgBox::from_pg(nsitem) }
}

fn filter_tupledesc_for_relation(relation: &PgRelation, attnamelist: Vec<String>) -> PgTupleDesc {
let table_tupledesc = relation.tuple_desc();

if attnamelist.is_empty() {
return table_tupledesc;
}

let tupledesc = unsafe { CreateTemplateTupleDesc(attnamelist.len() as i32) };
let tupledesc = unsafe { PgTupleDesc::from_pg(tupledesc) };

let mut attribute_number = 1;

for attname in attnamelist.iter() {
for table_att in table_tupledesc.iter() {
if table_att.is_dropped() {
continue;
}

let att_typoid = table_att.type_oid().value();
let att_typmod = table_att.type_mod();
let att_ndims = table_att.attndims;

if table_att.name() == attname {
unsafe {
TupleDescInitEntry(
tupledesc.as_ptr(),
attribute_number,
attname.as_pg_cstr() as _,
att_typoid,
att_typmod,
att_ndims as _,
)
};

attribute_number += 1;

break;
}
}
}

tupledesc
}
Loading