From e4e3aced9f94da6067605c754184d23cd7507b27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Wed, 2 Oct 2024 13:08:44 +0200 Subject: [PATCH] Support writing to stdout --- R/write-parquet.R | 5 ++++- man/write_parquet.Rd | 5 ++++- src/lib/memstream.h | 1 + src/write.cpp | 10 ++++++++-- tests/testthat/helper.R | 8 ++++++++ tests/testthat/test-write-parquet-2.R | 20 ++++++++++++++++++++ 6 files changed, 45 insertions(+), 4 deletions(-) diff --git a/R/write-parquet.R b/R/write-parquet.R index b60a403..6b679da 100644 --- a/R/write-parquet.R +++ b/R/write-parquet.R @@ -8,7 +8,10 @@ #' @param x Data frame to write. #' @param file Path to the output file. If this is the string `":raw:"`, #' then the data frame is written to a memory buffer, and the memory -#' buffer is returned as a raw vector. +#' buffer is returned as a raw vector. If `":stdout:"`, then it is +#' written to the standard output. (When writing to the standard output, +#' special care is needed to make sure no regular R output gets mixed +#' up with the Parquet bytes!) #' @param schema Parquet schema. Specify a schema to tweak the default #' nanoparquet R -> Parquet type mappings. Use [parquet_schema()] to #' create a schema that you can use here, or [read_parquet_schema()] to diff --git a/man/write_parquet.Rd b/man/write_parquet.Rd index fb3ec62..df3b919 100644 --- a/man/write_parquet.Rd +++ b/man/write_parquet.Rd @@ -20,7 +20,10 @@ write_parquet( \item{file}{Path to the output file. If this is the string \code{":raw:"}, then the data frame is written to a memory buffer, and the memory -buffer is returned as a raw vector.} +buffer is returned as a raw vector. If \code{":stdout:"}, then it is +written to the standard output. (When writing to the standard output, +special care is needed to make sure no regular R output gets mixed +up with the Parquet bytes!)} \item{schema}{Parquet schema. Specify a schema to tweak the default nanoparquet R -> Parquet type mappings. Use \code{\link[=parquet_schema]{parquet_schema()}} to diff --git a/src/lib/memstream.h b/src/lib/memstream.h index 6a3dd4d..e87faa5 100644 --- a/src/lib/memstream.h +++ b/src/lib/memstream.h @@ -4,6 +4,7 @@ #include #include #include +#include // A growing buffer that can be used as an output stream. // It stores that data in an array of buffers, each bigger than diff --git a/src/write.cpp b/src/write.cpp index 461229e..5c0f244 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -2926,7 +2926,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, } std::string fname = (char *)CHAR(STRING_ELT(filesxp, 0)); - if (fname == ":raw:") { + if (fname == ":raw:" || fname == ":stdout:") { MemStream ms; std::ostream &os = ms.stream(); RParquetOutFile of(os, codec, comp_level, row_groups); @@ -2935,7 +2935,13 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, R_xlen_t bufsize = ms.size(); SEXP res = Rf_allocVector(RAWSXP, bufsize); ms.copy(RAW(res), bufsize); - return res; + if (fname == ":raw:") { + return res; + } else { + std::cout.write((const char*) RAW(res), bufsize); + std::cout << std::flush; + return R_NilValue; + } } else { RParquetOutFile of(fname, codec, comp_level, row_groups); of.data_page_version = dp_ver; diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index e40a28f..6afc25b 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -39,3 +39,11 @@ test_write <- function(d, schema = NULL, encoding = NULL) { as.data.frame(read_parquet(tmp)) }) } + +rscript <- function() { + if (.Platform$OS.type == "windows") { + file.path(R.home("bin"), "Rscript.exe") + } else { + file.path(R.home("bin"), "Rscript") + } +} diff --git a/tests/testthat/test-write-parquet-2.R b/tests/testthat/test-write-parquet-2.R index 8b1ec85..3efa1b7 100644 --- a/tests/testthat/test-write-parquet-2.R +++ b/tests/testthat/test-write-parquet-2.R @@ -167,6 +167,26 @@ test_that("write_parquet() to memory 2", { expect_equal(r1, r2) }) +test_that("write_parquet() to stdout", { + skip_on_cran() + tmp1 <- tempfile(fileext = ".parquet") + tmp2 <- tempfile(fileext = ".parquet") + script <- tempfile(fileext = ".R") + on.exit(unlink(c(tmp1, tmp2, script)), add = TRUE) + + txt <- c( + if (!is_rcmd_check()) "pkgload::load_all()", + "nanoparquet::write_parquet(mtcars, \":stdout:\")" + ) + writeLines(txt, script) + processx::run(rscript(), script, stdout = tmp1, stderr = NULL) + r1 <- readBin(tmp1, "raw", file.size(tmp1)) + write_parquet(mtcars, tmp2) + r2 <- readBin(tmp2, "raw", file.size(tmp2)) + expect_equal(file.size(tmp1), file.size(tmp2)) + expect_equal(r1, r2) +}) + test_that("gzip compression", { d <- test_df(missing = TRUE) tmp <- tempfile(fileext = ".parquet")