diff --git a/NAMESPACE b/NAMESPACE index 6a09679a..19ef3ccf 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -6,6 +6,7 @@ export(Postgres) export(Redshift) export(postgresDefault) export(postgresHasDefault) +export(postgresImportLargeObject) export(postgresIsTransacting) export(postgresWaitForNotify) exportClasses(PqConnection) diff --git a/R/PqConnection.R b/R/PqConnection.R index e01b0378..71f49086 100644 --- a/R/PqConnection.R +++ b/R/PqConnection.R @@ -135,3 +135,36 @@ postgresWaitForNotify <- function(conn, timeout = 1) { postgresIsTransacting <- function(conn) { connection_is_transacting(conn@ptr) } + + +#' Imports a large object from file +#' +#' Returns an object idenfier (Oid) for the imported large object +#' +#' @export +#' @param conn a [PqConnection-class] object, produced by +#' [DBI::dbConnect()] +#' @param filepath a path to the large object to import +#' @param oid the oid to write to. Defaults to 0 which assigns an unused oid +#' @return the identifier of the large object, an integer +#' @examples +#' \dontrun{ +#' con <- postgresDefault() +#' filepath <- 'your_image.png' +#' dbWithTransaction(con, { +#' oid <- postgresImportLargeObject(con, filepath) +#' }) +#' } +postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) { + + if (!postgresIsTransacting(conn)) { + stopc("Cannot import a large object outside of a transaction") + } + + if (is.null(filepath)) stopc("'filepath' cannot be NULL") + if (is.null(oid)) stopc("'oid' cannot be NULL") + if (is.na(oid)) stopc("'oid' cannot be NA") + if (oid < 0) stopc("'oid' cannot be negative") + + connection_import_lo_from_file(conn@ptr, filepath, oid) +} diff --git a/R/cpp11.R b/R/cpp11.R index 5c1d0cd2..15734e20 100644 --- a/R/cpp11.R +++ b/R/cpp11.R @@ -36,6 +36,10 @@ connection_set_transacting <- function(con, transacting) { invisible(.Call(`_RPostgres_connection_set_transacting`, con, transacting)) } +connection_import_lo_from_file <- function(con, filename, oid) { + .Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid) +} + connection_copy_data <- function(con, sql, df) { invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df)) } diff --git a/_pkgdown.yml b/_pkgdown.yml index 028f3a7c..af8c57e7 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -41,6 +41,7 @@ reference: - '`RPostgres-package`' - postgresHasDefault - postgresWaitForNotify + - postgresImportLargeObject development: mode: auto diff --git a/man/postgresImportLargeObject.Rd b/man/postgresImportLargeObject.Rd new file mode 100644 index 00000000..1b7f6851 --- /dev/null +++ b/man/postgresImportLargeObject.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PqConnection.R +\name{postgresImportLargeObject} +\alias{postgresImportLargeObject} +\title{Imports a large object from file} +\usage{ +postgresImportLargeObject(conn, filepath = NULL, oid = 0) +} +\arguments{ +\item{conn}{a \linkS4class{PqConnection} object, produced by +\code{\link[DBI:dbConnect]{DBI::dbConnect()}}} + +\item{filepath}{a path to the large object to import} + +\item{oid}{the oid to write to. Defaults to 0 which assigns an unused oid} +} +\value{ +the identifier of the large object, an integer +} +\description{ +Returns an object idenfier (Oid) for the imported large object +} +\examples{ +\dontrun{ +con <- postgresDefault() +filepath <- 'your_image.png' +dbWithTransaction(con, { + oid <- postgresImportLargeObject(con, filepath) +}) +} +} diff --git a/src/DbConnection.cpp b/src/DbConnection.cpp index cf66200d..6d43cb43 100644 --- a/src/DbConnection.cpp +++ b/src/DbConnection.cpp @@ -128,6 +128,12 @@ bool DbConnection::has_query() { return pCurrentResult_ != NULL; } +Oid DbConnection::import_lo_from_file(std::string filename, Oid p_oid) { + Oid lo_oid = lo_import_with_oid(pConn_, filename.c_str(), p_oid); + if (lo_oid == InvalidOid) cpp11::stop(PQerrorMessage(pConn_)); + return(lo_oid); +} + void DbConnection::copy_data(std::string sql, cpp11::list df) { LOG_DEBUG << sql; diff --git a/src/DbConnection.h b/src/DbConnection.h index a0a70481..8e28834f 100644 --- a/src/DbConnection.h +++ b/src/DbConnection.h @@ -35,6 +35,9 @@ class DbConnection : boost::noncopyable { bool has_query(); void copy_data(std::string sql, cpp11::list df); + + Oid import_lo_from_file(std::string file_path, Oid p_oid); + void check_connection(); cpp11::list info(); diff --git a/src/connection.cpp b/src/connection.cpp index 2e0cb172..b2e38ece 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -91,6 +91,10 @@ void connection_set_transacting(DbConnection* con, bool transacting) { } // Specific functions +[[cpp11::register]] +Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid) { + return con->import_lo_from_file(filename, oid); +} [[cpp11::register]] void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) { diff --git a/src/cpp11.cpp b/src/cpp11.cpp index bf613ab9..080b6075 100644 --- a/src/cpp11.cpp +++ b/src/cpp11.cpp @@ -71,6 +71,13 @@ extern "C" SEXP _RPostgres_connection_set_transacting(SEXP con, SEXP transacting END_CPP11 } // connection.cpp +Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid); +extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filename, SEXP oid) { + BEGIN_CPP11 + return cpp11::as_sexp(connection_import_lo_from_file(cpp11::as_cpp>(con), cpp11::as_cpp>(filename), cpp11::as_cpp>(oid))); + END_CPP11 +} +// connection.cpp void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df); extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) { BEGIN_CPP11 @@ -197,32 +204,33 @@ extern "C" SEXP _RPostgres_result_column_info(SEXP res) { extern "C" { static const R_CallMethodDef CallEntries[] = { - {"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0}, - {"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3}, - {"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3}, - {"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1}, - {"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1}, - {"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1}, - {"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2}, - {"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2}, - {"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1}, - {"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2}, - {"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2}, - {"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1}, - {"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2}, - {"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1}, - {"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1}, - {"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2}, - {"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1}, - {"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2}, - {"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1}, - {"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3}, - {"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2}, - {"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1}, - {"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1}, - {"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1}, - {"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1}, - {"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1}, + {"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0}, + {"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3}, + {"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3}, + {"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1}, + {"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3}, + {"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1}, + {"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1}, + {"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2}, + {"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2}, + {"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1}, + {"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2}, + {"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2}, + {"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1}, + {"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2}, + {"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1}, + {"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1}, + {"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2}, + {"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1}, + {"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2}, + {"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1}, + {"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3}, + {"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2}, + {"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1}, + {"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1}, + {"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1}, + {"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1}, + {"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1}, {NULL, NULL, 0} }; } diff --git a/src/pch.h b/src/pch.h index 23ca8422..f15e4cf3 100644 --- a/src/pch.h +++ b/src/pch.h @@ -1,5 +1,6 @@ #include #include +#include #include diff --git a/tests/testthat/data/large_object.txt b/tests/testthat/data/large_object.txt new file mode 100644 index 00000000..7d72bd78 --- /dev/null +++ b/tests/testthat/data/large_object.txt @@ -0,0 +1 @@ +postgres \ No newline at end of file diff --git a/tests/testthat/test-ImportLargeObject.R b/tests/testthat/test-ImportLargeObject.R new file mode 100644 index 00000000..bf2b9f62 --- /dev/null +++ b/tests/testthat/test-ImportLargeObject.R @@ -0,0 +1,37 @@ + +test_that("can import and read a large object", { + con <- postgresDefault() + on.exit(dbDisconnect(con)) + test_file_path <- paste0(test_path(),'/data/large_object.txt') + dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) }) + expect_gt(oid,0) + lo_data <- unlist(dbGetQuery(con, "select lo_get($1) as lo_data", params=list(oid))$lo_data[1]) + large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres' + expect_equal(lo_data, large_object_txt) +}) + + +test_that("importing to an existing oid throws error", { + con <- postgresDefault() + on.exit(dbDisconnect(con)) + test_file_path <- paste0(test_path(),'/data/large_object.txt') + oid <- 1234 + dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) }) + + expect_error( + dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) }) + ) + dbExecute(con, "select lo_unlink($1) as lo_data", params=list(oid)) +}) + + +test_that("import from a non-existing path throws error", { + con <- postgresDefault() + on.exit(dbDisconnect(con)) + test_file_path <- paste0(test_path(),'/data/large_object_that_does_not_exist.txt') + expect_error( + dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) }) + ) +}) + +