Skip to content

Commit

Permalink
Make permission checking conditional for DestReceiver
Browse files Browse the repository at this point in the history
In order to do this, we pass down a check_perms flag when creating/using the
object_store_with_location() routine.  We also gate the CopyHook to always check
the permissions, but this allows someone to plug the returned DestReceiver and
implement their own permissions checks.

We also ensure that the UDFs properly call permissions checks based on the URL
passed in.
  • Loading branch information
pgguru committed Oct 24, 2024
1 parent c2ef071 commit 19d7dc5
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/arrow_parquet/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ pub(crate) struct ParquetReaderContext {
}

impl ParquetReaderContext {
pub(crate) fn new(uri: Url, tupledesc: &PgTupleDesc) -> Self {
pub(crate) fn new(uri: Url, tupledesc: &PgTupleDesc, check_perms: bool) -> Self {
// Postgis and Map contexts are used throughout reading the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
reset_map_context();

let parquet_reader = parquet_reader_from_uri(&uri);
let parquet_reader = parquet_reader_from_uri(&uri, check_perms);

let schema = parquet_reader.schema();
ensure_arrow_schema_match_tupledesc(schema.clone(), tupledesc);
Expand Down
4 changes: 3 additions & 1 deletion src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl ParquetWriterContext {
compression: PgParquetCompression,
compression_level: i32,
tupledesc: &PgTupleDesc,
check_perms: bool,
) -> ParquetWriterContext {
debug_assert!(tupledesc.oid() == RECORDOID);

Expand All @@ -60,7 +61,8 @@ impl ParquetWriterContext {
let schema = parse_arrow_schema_from_tupledesc(tupledesc);
let schema = Arc::new(schema);

let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);
let parquet_writer =
parquet_writer_from_uri(&uri, schema.clone(), writer_props, check_perms);

let attribute_contexts = collect_pg_to_arrow_attribute_contexts(tupledesc, &schema.fields);

Expand Down
80 changes: 34 additions & 46 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ fn parse_bucket_and_key(uri: &Url) -> (String, String) {
(bucket.to_string(), key.to_string())
}

fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
if uri.scheme() == "s3" {
ensure_object_store_access_privilege(copy_from);
fn object_store_with_location(
uri: &Url,
copy_from: bool,
check_perms: bool,
) -> (Arc<dyn ObjectStore>, Path) {
if check_perms {
ensure_access_privilege_to_uri(&uri, copy_from);
}

if uri.scheme() == "s3" {
let (bucket_name, key) = parse_bucket_and_key(uri);

let storage_container = PG_BACKEND_TOKIO_RUNTIME
Expand All @@ -71,8 +77,6 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStor
} else {
debug_assert!(uri.scheme() == "file");

ensure_local_file_access_privilege(copy_from);

let uri = uri_as_string(uri);

if !copy_from {
Expand Down Expand Up @@ -178,17 +182,17 @@ pub(crate) fn uri_as_string(uri: &Url) -> String {
uri.to_string()
}

pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri);
pub(crate) fn parquet_schema_from_uri(uri: &Url, check_perms: bool) -> SchemaDescriptor {
let parquet_reader = parquet_reader_from_uri(uri, check_perms);

let arrow_schema = parquet_reader.schema();

arrow_to_parquet_schema(arrow_schema).unwrap_or_else(|e| panic!("{}", e))
}

pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
pub(crate) fn parquet_metadata_from_uri(uri: &Url, check_perms: bool) -> Arc<ParquetMetaData> {
let copy_from = true;
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
let (parquet_object_store, location) = object_store_with_location(uri, copy_from, check_perms);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
Expand All @@ -209,9 +213,12 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
})
}

pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
pub(crate) fn parquet_reader_from_uri(
uri: &Url,
check_perms: bool,
) -> ParquetRecordBatchStream<ParquetObjectReader> {
let copy_from = true;
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
let (parquet_object_store, location) = object_store_with_location(uri, copy_from, check_perms);

PG_BACKEND_TOKIO_RUNTIME.block_on(async {
let object_store_meta = parquet_object_store
Expand Down Expand Up @@ -241,24 +248,32 @@ pub(crate) fn parquet_writer_from_uri(
uri: &Url,
arrow_schema: SchemaRef,
writer_props: WriterProperties,
check_perms: bool,
) -> AsyncArrowWriter<ParquetObjectWriter> {
let copy_from = false;
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
let (parquet_object_store, location) = object_store_with_location(uri, copy_from, check_perms);

let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);

AsyncArrowWriter::try_new(parquet_object_writer, arrow_schema, Some(writer_props))
.unwrap_or_else(|e| panic!("failed to create parquet writer for uri {}: {}", uri, e))
}

fn ensure_object_store_access_privilege(copy_from: bool) {
pub(crate) fn ensure_access_privilege_to_uri(uri: &Url, copy_from: bool) {
if unsafe { superuser() } {
return;
}

let user_id = unsafe { GetUserId() };
let is_file = uri.scheme() == "file";

let required_role_name = if copy_from {
let required_role_name = if is_file {
if copy_from {
"pg_read_server_files"
} else {
"pg_write_server_files"
}
} else if copy_from {
PARQUET_OBJECT_STORE_READ_ROLE
} else {
PARQUET_OBJECT_STORE_WRITE_ROLE
Expand All @@ -268,46 +283,19 @@ fn ensure_object_store_access_privilege(copy_from: bool) {
unsafe { get_role_oid(required_role_name.to_string().as_pg_cstr(), false) };

let operation_str = if copy_from { "from" } else { "to" };
let object_type = if is_file { "file" } else { "remote uri" };

if !unsafe { has_privs_of_role(user_id, required_role_id) } {
ereport!(
pgrx::PgLogLevel::ERROR,
pgrx::PgSqlErrorCode::ERRCODE_INSUFFICIENT_PRIVILEGE,
format!("permission denied to COPY {} a remote uri", operation_str),
format!(
"Only roles with privileges of the \"{}\" role may COPY {} a remote uri.",
required_role_name, operation_str
"permission denied to COPY {} a {}",
operation_str, object_type
),
);
}
}

fn ensure_local_file_access_privilege(copy_from: bool) {
if unsafe { superuser() } {
return;
}

let user_id = unsafe { GetUserId() };

let required_role_name = if copy_from {
"pg_read_server_files"
} else {
"pg_write_server_files"
};

let required_role_id =
unsafe { get_role_oid(required_role_name.to_string().as_pg_cstr(), false) };

let operation_str = if copy_from { "from" } else { "to" };

if !unsafe { has_privs_of_role(user_id, required_role_id) } {
ereport!(
pgrx::PgLogLevel::ERROR,
pgrx::PgSqlErrorCode::ERRCODE_INSUFFICIENT_PRIVILEGE,
format!("permission denied to COPY {} a file", operation_str),
format!(
"Only roles with privileges of the \"{}\" role may COPY {} a file.",
required_role_name, operation_str
"Only roles with privileges of the \"{}\" role may COPY {} a {}.",
required_role_name, operation_str, object_type
),
);
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,9 @@ mod tests {
let create_role = "create role test_role;";
Spi::run(create_role).unwrap();

let grant_role = "grant pg_write_server_files TO test_role;";
Spi::run(grant_role).unwrap();

let set_role = "set role test_role;";
Spi::run(set_role).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/parquet_copy_hook/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub(crate) fn execute_copy_from(

unsafe {
// parquet reader context is used throughout the COPY FROM operation.
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc);
let parquet_reader_context = ParquetReaderContext::new(uri, &tupledesc, true);
push_parquet_reader_context(parquet_reader_context);

// makes sure to set binary format
Expand Down
2 changes: 1 addition & 1 deletion src/parquet_copy_hook/copy_to_dest_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ extern "C" fn copy_startup(dest: *mut DestReceiver, _operation: i32, tupledesc:
// parquet writer context is used throughout the COPY TO operation.
// This might be put into ParquetCopyDestReceiver, but it's hard to preserve repr(C).
let parquet_writer_context =
ParquetWriterContext::new(uri, compression, compression_level, &tupledesc);
ParquetWriterContext::new(uri, compression, compression_level, &tupledesc, false);
push_parquet_writer_context(parquet_writer_context);
}

Expand Down
11 changes: 10 additions & 1 deletion src/parquet_copy_hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use pg_sys::{
use pgrx::{prelude::*, GucSetting};

use crate::{
arrow_parquet::{compression::INVALID_COMPRESSION_LEVEL, uri_utils::uri_as_string},
arrow_parquet::{
compression::INVALID_COMPRESSION_LEVEL,
uri_utils::{ensure_access_privilege_to_uri, uri_as_string},
},
parquet_copy_hook::{
copy_to_dest_receiver::create_copy_to_parquet_dest_receiver,
copy_utils::{
Expand Down Expand Up @@ -59,6 +62,9 @@ extern "C" fn parquet_copy_hook(

if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_to_parquet_stmt(&p_stmt) {
let uri = copy_stmt_uri(&p_stmt).expect("uri is None");
let copy_from = false;

ensure_access_privilege_to_uri(&uri, copy_from);

validate_copy_to_options(&p_stmt, &uri);

Expand Down Expand Up @@ -106,6 +112,9 @@ extern "C" fn parquet_copy_hook(
return;
} else if ENABLE_PARQUET_COPY_HOOK.get() && is_copy_from_parquet_stmt(&p_stmt) {
let uri = copy_stmt_uri(&p_stmt).expect("uri is None");
let copy_from = true;

ensure_access_privilege_to_uri(&uri, copy_from);

validate_copy_from_options(&p_stmt);

Expand Down
6 changes: 3 additions & 3 deletions src/parquet_udfs/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod parquet {
> {
let uri = parse_uri(&uri);

let parquet_metadata = parquet_metadata_from_uri(&uri);
let parquet_metadata = parquet_metadata_from_uri(&uri, true);

let mut rows = vec![];

Expand Down Expand Up @@ -137,7 +137,7 @@ mod parquet {
> {
let uri = parse_uri(&uri);

let parquet_metadata = parquet_metadata_from_uri(&uri);
let parquet_metadata = parquet_metadata_from_uri(&uri, true);

let created_by = parquet_metadata
.file_metadata()
Expand Down Expand Up @@ -174,7 +174,7 @@ mod parquet {
> {
let uri = parse_uri(&uri);

let parquet_metadata = parquet_metadata_from_uri(&uri);
let parquet_metadata = parquet_metadata_from_uri(&uri, true);
let kv_metadata = parquet_metadata.file_metadata().key_value_metadata();

if kv_metadata.is_none() {
Expand Down
2 changes: 1 addition & 1 deletion src/parquet_udfs/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod parquet {
> {
let uri = parse_uri(&uri);

let parquet_schema = parquet_schema_from_uri(&uri);
let parquet_schema = parquet_schema_from_uri(&uri, true);

let root_type = parquet_schema.root_schema();
let thrift_schema_elements = to_thrift(root_type).unwrap_or_else(|e| {
Expand Down

0 comments on commit 19d7dc5

Please sign in to comment.