Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Importing large objects from client side #472

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresHasDefault)
export(postgresImportLargeObject)
export(postgresIsTransacting)
export(postgresWaitForNotify)
exportClasses(PqConnection)
Expand Down
33 changes: 33 additions & 0 deletions R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,36 @@ postgresWaitForNotify <- function(conn, timeout = 1) {
postgresIsTransacting <- function(conn) {
connection_is_transacting(conn@ptr)
}


#' Imports a large object from file
#'
#' Returns an object idenfier (Oid) for the imported large object
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param filepath a path to the large object to import
#' @param oid the oid to write to. Defaults to 0 which assigns an unused oid
#' @return the identifier of the large object, an integer
#' @examples
#' \dontrun{
#' con <- postgresDefault()
#' filepath <- 'your_image.png'
#' dbWithTransaction(con, {
#' oid <- postgresImportLargeObject(con, filepath)
#' })
#' }
postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) {

if (!postgresIsTransacting(conn)) {
stopc("Cannot import a large object outside of a transaction")
}

if (is.null(filepath)) stopc("'filepath' cannot be NULL")
if (is.null(oid)) stopc("'oid' cannot be NULL")
if (is.na(oid)) stopc("'oid' cannot be NA")
if (oid < 0) stopc("'oid' cannot be negative")

connection_import_lo_from_file(conn@ptr, filepath, oid)
}
4 changes: 4 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ connection_set_transacting <- function(con, transacting) {
invisible(.Call(`_RPostgres_connection_set_transacting`, con, transacting))
}

connection_import_lo_from_file <- function(con, filename, oid) {
.Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid)
}

connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}
Expand Down
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ reference:
- '`RPostgres-package`'
- postgresHasDefault
- postgresWaitForNotify
- postgresImportLargeObject

development:
mode: auto
Expand Down
31 changes: 31 additions & 0 deletions man/postgresImportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ bool DbConnection::has_query() {
return pCurrentResult_ != NULL;
}

Oid DbConnection::import_lo_from_file(std::string filename, Oid p_oid) {
Oid lo_oid = lo_import_with_oid(pConn_, filename.c_str(), p_oid);
if (lo_oid == InvalidOid) cpp11::stop(PQerrorMessage(pConn_));
return(lo_oid);
}

void DbConnection::copy_data(std::string sql, cpp11::list df) {
LOG_DEBUG << sql;

Expand Down
3 changes: 3 additions & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DbConnection : boost::noncopyable {
bool has_query();

void copy_data(std::string sql, cpp11::list df);

Oid import_lo_from_file(std::string file_path, Oid p_oid);


void check_connection();
cpp11::list info();
Expand Down
4 changes: 4 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ void connection_set_transacting(DbConnection* con, bool transacting) {
}

// Specific functions
[[cpp11::register]]
Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid) {
return con->import_lo_from_file(filename, oid);
}

[[cpp11::register]]
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) {
Expand Down
60 changes: 34 additions & 26 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ extern "C" SEXP _RPostgres_connection_set_transacting(SEXP con, SEXP transacting
END_CPP11
}
// connection.cpp
Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid);
extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filename, SEXP oid) {
BEGIN_CPP11
return cpp11::as_sexp(connection_import_lo_from_file(cpp11::as_cpp<cpp11::decay_t<DbConnection*>>(con), cpp11::as_cpp<cpp11::decay_t<std::string>>(filename), cpp11::as_cpp<cpp11::decay_t<Oid>>(oid)));
END_CPP11
}
// connection.cpp
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df);
extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) {
BEGIN_CPP11
Expand Down Expand Up @@ -197,32 +204,33 @@ extern "C" SEXP _RPostgres_result_column_info(SEXP res) {

extern "C" {
static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{NULL, NULL, 0}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/pch.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <cpp11.hpp>
#include <libpq-fe.h>
#include <libpq/libpq-fs.h>

#include <plogr.h>

Expand Down
1 change: 1 addition & 0 deletions tests/testthat/data/large_object.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
postgres
37 changes: 37 additions & 0 deletions tests/testthat/test-ImportLargeObject.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

test_that("can import and read a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
expect_gt(oid,0)
lo_data <- unlist(dbGetQuery(con, "select lo_get($1) as lo_data", params=list(oid))$lo_data[1])
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres'
expect_equal(lo_data, large_object_txt)
})


test_that("importing to an existing oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
oid <- 1234
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })

expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })
)
dbExecute(con, "select lo_unlink($1) as lo_data", params=list(oid))
})


test_that("import from a non-existing path throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object_that_does_not_exist.txt')
expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
)
})


Loading