Skip to content

Commit

Permalink
GH-15727: add write_checksum parameter on data export feature (#15770)
Browse files Browse the repository at this point in the history
* add write_checksum parameter to allow disabling the behaviour of hadoop parquet writer that always write a checksum file for each data file by default

* add Py param validation

* add Py test

* add R tests

* remove unnecessary change in parquet exporter

* fixed R documentation
  • Loading branch information
sebhrusen authored Sep 19, 2023
1 parent 367ddc3 commit 90d548e
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 91 deletions.
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/FramesHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) {
if (s.parallel) {
Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup.");
}
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression));
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum));
} else {
Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams()
.setSeparator(s.separator)
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/FramesV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class FramesV3 extends RequestSchemaV3<Frames, FramesV3> {
@API(help="Compression method (default none; gzip, bzip2 and snappy available depending on runtime environment)")
public String compression;

@API(help="Specifies if checksum should be written next to data files on export (if supported by export format).")
public boolean write_checksum = true;

@API(help="Field separator (default ',')")
public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR;

Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/fvec/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr
return job.start(t, fr.anyVec().nChunks());
}

public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression) {
public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) {
// Validate input
if (! H2O.getPM().isEmptyDirectoryAllNodes(path)) {
throw new H2OIllegalArgumentException(path, "exportFrame", "Cannot use path " + path +
Expand All @@ -1626,7 +1626,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String
}
Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset");

H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression);
H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum);
return job.start(t, fr.anyVec().nChunks());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface BinaryFormatExporter {

H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression);
H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum);

boolean supports(ExportFileFormat format);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package water.parser.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
Expand Down Expand Up @@ -28,9 +30,19 @@

public class FrameParquetExporter {

public void export(H2O.H2OCountedCompleter<?> completer, String path, Frame frame, boolean force, String compression) {
public void export(H2O.H2OCountedCompleter<?> completer, String path, Frame frame, boolean force, String compression, boolean writeChecksum) {
File f = new File(path);
new FrameParquetExporter.PartExportParquetTask(completer, f.getPath(), generateMessageTypeString(frame), frame.names(), frame.types(), frame.domains(), force, compression).dfork(frame);
new FrameParquetExporter.PartExportParquetTask(
completer,
f.getPath(),
generateMessageTypeString(frame),
frame.names(),
frame.types(),
frame.domains(),
force,
compression,
writeChecksum
).dfork(frame);
}

private static class PartExportParquetTask extends MRTask<PartExportParquetTask> {
Expand All @@ -41,9 +53,11 @@ private static class PartExportParquetTask extends MRTask<PartExportParquetTask>
final byte[] _colTypes;
final String[][] _domains;
final boolean _force;
final boolean _writeChecksum;

PartExportParquetTask(H2O.H2OCountedCompleter<?> completer, String path, String messageTypeString,
String[] colNames, byte[] colTypes, String[][] domains, boolean force, String compression) {
String[] colNames, byte[] colTypes, String[][] domains,
boolean force, String compression, boolean writeChecksum) {
super(completer);
_path = path;
_compressionCodecName = getCompressionCodecName(compression);
Expand All @@ -52,6 +66,7 @@ private static class PartExportParquetTask extends MRTask<PartExportParquetTask>
_colTypes = colTypes;
_domains = domains;
_force = force;
_writeChecksum = writeChecksum;
}

CompressionCodecName getCompressionCodecName(String compression) {
Expand Down Expand Up @@ -82,7 +97,7 @@ public void map(Chunk[] cs) {
String partPath = _path + "/part-m-" + String.valueOf(100000 + partIdx).substring(1);

SimpleGroupFactory fact = new SimpleGroupFactory(parseMessageType(_messageTypeString));
try (ParquetWriter<Group> writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force))) {
try (ParquetWriter<Group> writer = buildWriter(new Path(partPath), _compressionCodecName, PersistHdfs.CONF, parseMessageType(_messageTypeString), getMode(_force), _writeChecksum)) {
String currColName;
byte currColType;

Expand Down Expand Up @@ -122,34 +137,40 @@ public void map(Chunk[] cs) {
}

private static String generateMessageTypeString(Frame frame) {
String message_txt = "message test { ";
StringBuilder mb = new StringBuilder("message export_type { ");
String currName;
for (int i = 0; i < frame.numCols(); i++) {
currName = frame._names[i];
switch (frame.types()[i]) {
case (T_TIME):
message_txt = message_txt.concat("optional int64 ").concat(currName).concat(" (TIMESTAMP_MILLIS);");
mb.append("optional int64 ").append(currName).append(" (TIMESTAMP_MILLIS);");
break;
case (T_NUM):
case (T_BAD):
message_txt = message_txt.concat("optional double ").concat(currName).concat("; ");
mb.append("optional double ").append(currName).append("; ");
break;
case (T_STR):
case (T_CAT):
message_txt = message_txt.concat("optional BINARY ").concat(currName).concat(" (UTF8); ");
mb.append("optional BINARY ").append(currName).append(" (UTF8); ");
break;
case (T_UUID):
message_txt = message_txt.concat("optional fixed_len_byte_array(16) ").concat(currName).concat(" (UUID); ");
mb.append("optional fixed_len_byte_array(16) ").append(currName).append(" (UUID); ");
break;
}
}
message_txt = message_txt.concat("} ");
return message_txt;
mb.append("} ");
return mb.toString();
}

private static ParquetWriter<Group> buildWriter(Path file, CompressionCodecName compressionCodecName, Configuration configuration, MessageType _schema, ParquetFileWriter.Mode mode) throws IOException {
GroupWriteSupport.setSchema(_schema, configuration);
return new ParquetWriter.Builder(file) {
private static ParquetWriter<Group> buildWriter(Path path, CompressionCodecName compressionCodecName, Configuration configuration, MessageType schema, ParquetFileWriter.Mode mode, boolean writeChecksum) throws IOException {
GroupWriteSupport.setSchema(schema, configuration);

// The filesystem is cached for a given path and configuration,
// therefore the following modification on the fs is a bit hacky as another process could use the same instance.
// However, given the current use case and the fact that the changes impacts only the way files are written, it should be on the safe side.
FileSystem fs = path.getFileSystem(configuration);
fs.setWriteChecksum(writeChecksum);
return new ParquetWriter.Builder(path) {
@Override
protected ParquetWriter.Builder self() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
public class ParquetExporter implements BinaryFormatExporter {

@Override
public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression) {
return new ExportParquetDriver(frame, path, force, compression);
public H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum) {
return new ExportParquetDriver(frame, path, force, compression, writeChecksum);
}

@Override
public boolean supports(ExportFileFormat format) {
if (ExportFileFormat.parquet.equals(format)) {
return true;
}
return false;
return ExportFileFormat.parquet.equals(format);
}

private class ExportParquetDriver extends H2O.H2OCountedCompleter<ExportParquetDriver> {
Expand All @@ -26,19 +23,21 @@ private class ExportParquetDriver extends H2O.H2OCountedCompleter<ExportParquetD
String _path;
boolean _force;
String _compression;
boolean _writeChecksum;

public ExportParquetDriver(Frame frame, String path, boolean force, String compression) {
public ExportParquetDriver(Frame frame, String path, boolean force, String compression, boolean writeChecksum) {
_frame = frame;
_path = path;
_force = force;
_compression = compression;
_writeChecksum = writeChecksum;
}

@Override
public void compute2() {
// multipart export
FrameParquetExporter parquetExporter = new FrameParquetExporter();
parquetExporter.export(this, _path, _frame, _force, _compression);
parquetExporter.export(this, _path, _frame, _force, _compression, _writeChecksum);
}
}
}
10 changes: 8 additions & 2 deletions h2o-py/h2o/h2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,8 @@ def load_model(path):
return get_model(res["models"][0]["model_id"]["name"])


def export_file(frame, path, force=False, sep=",", compression=None, parts=1, header=True, quote_header=True, parallel=False, format="csv"):
def export_file(frame, path, force=False, sep=",", compression=None, parts=1, header=True, quote_header=True,
parallel=False, format="csv", write_checksum=True):
"""
Export a given H2OFrame to a path on the machine this python session is currently connected to.
Expand All @@ -1602,6 +1603,8 @@ def export_file(frame, path, force=False, sep=",", compression=None, parts=1, he
:param format: one of 'csv' or 'parquet'. Defaults to 'csv'. Export
to parquet is multipart and H2O itself determines the optimal number
of files (1 file per chunk).
:param write_checksum: if supported by the format (e.g. 'parquet'),
export will include a checksum file for each exported data file.
:examples:
Expand All @@ -1628,10 +1631,13 @@ def export_file(frame, path, force=False, sep=",", compression=None, parts=1, he
assert_is_type(quote_header, bool)
assert_is_type(parallel, bool)
assert_is_type(format, str)
assert_is_type(write_checksum, bool)
H2OJob(api("POST /3/Frames/%s/export" % (frame.frame_id),
data={"path": path, "num_parts": parts, "force": force,
"compression": compression, "separator": ord(sep),
"header": header, "quote_header": quote_header, "parallel": parallel, "format": format}), "Export File").poll()
"header": header, "quote_header": quote_header, "parallel": parallel,
"format": format, "write_checksum": write_checksum}
), "Export File").poll()


def load_frame(frame_id, path, force=True):
Expand Down
48 changes: 38 additions & 10 deletions h2o-py/tests/testdir_apis/H2O_Module/pyunit_h2oexport_file.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import sys, os
import os
import shutil
import sys
sys.path.insert(1,"../../../")
from tests import pyunit_utils
from tests import pyunit_utils as pu
import h2o

def h2oexport_file():
def export_file_csv():
"""
Python API test: h2o.export_file(frame, path, force=False, parts=1). Note taht force=True is only honored if
Python API test: h2o.export_file(frame, path, force=False, parts=1). Note that force=True is only honored if
parts=1. Otherwise, an error will be thrown.
"""
training_data = h2o.import_file(pyunit_utils.locate("smalldata/logreg/benign.csv"))
training_data = h2o.import_file(pu.locate("smalldata/logreg/benign.csv"))
try:
results_dir = pyunit_utils.locate("results") # find directory path to results folder
results_dir = pu.locate("results") # find directory path to results folder
final_path = os.path.join(results_dir, 'frameData')
h2o.export_file(training_data, final_path, force=True, parts=1) # save data
assert os.path.isfile(final_path), "h2o.export_file() command is not working."
Expand All @@ -28,7 +30,33 @@ def h2oexport_file():
"is not tested with multi-part export.".format(final_dir_path))


if __name__ == "__main__":
pyunit_utils.standalone_test(h2oexport_file)
else:
h2oexport_file()
def export_file_parquet():
data = h2o.import_file(pu.locate("smalldata/titanic/titanic_expanded.csv"), header=1)
path = pu.locate("results")
export_dir = os.path.join(path, data.frame_id + "_export_parquet")
if os.path.isdir(export_dir):
shutil.rmtree(export_dir, ignore_errors=True)
h2o.export_file(data, path=export_dir, format='parquet')

assert os.path.isdir(export_dir)
assert any(os.path.splitext(f)[1] == '.crc' for f in os.listdir(export_dir))


def export_file_parquet_no_checksum():
data = h2o.import_file(pu.locate("smalldata/titanic/titanic_expanded.csv"), header=1)
path = pu.locate("results")
export_dir = os.path.join(path, data.frame_id + "_export_parquet_no_checksum")
if os.path.isdir(export_dir):
shutil.rmtree(export_dir, ignore_errors=True)
h2o.export_file(data, path=export_dir, format='parquet', write_checksum=False)

assert os.path.isdir(export_dir)
assert os.listdir(export_dir)
assert not any(os.path.splitext(f)[1] == '.crc' for f in os.listdir(export_dir))


pu.run_tests([
export_file_csv,
export_file_parquet,
export_file_parquet_no_checksum
])
12 changes: 9 additions & 3 deletions h2o-r/h2o-package/R/export.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#' @param format string, one of "csv" or "parquet". Default is "csv". Export
#' to parquet is multipart and H2O itself determines the optimal number
#' of files (1 file per chunk).
#' @param write_checksum logical, if supported by the format (e.g. 'parquet'),
#' export will include a checksum file for each exported data file.
#'
#' @examples
#'\dontrun{
Expand All @@ -52,7 +54,7 @@
#' }
#' @export
h2o.exportFile <- function(data, path, force = FALSE, sep = ",", compression = NULL, parts = 1,
header = TRUE, quote_header = TRUE, format = "csv") {
header = TRUE, quote_header = TRUE, format = "csv", write_checksum = TRUE) {
if (!is.H2OFrame(data))
stop("`data` must be an H2OFrame object")

Expand All @@ -73,8 +75,12 @@ h2o.exportFile <- function(data, path, force = FALSE, sep = ",", compression = N

if(!is.character(format) && !(format == "csv" || format == "parquet"))
stop("`format` must be 'csv' or 'parquet'")

params <- list(path=path, num_parts=parts, force=force, separator=.asc(sep), header=header, quote_header=quote_header, format=format)

if(!is.logical(write_checksum) || length(write_checksum) != 1L || is.na(write_checksum))
stop("`write_checksum` must be TRUE or FALSE")

params <- list(path=path, num_parts=parts, force=force, separator=.asc(sep), header=header, quote_header=quote_header,
format=format, write_checksum=write_checksum)
if (! is.null(compression)) {
params$compression <- compression
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test.export.file <- function(parts) {
dname <- file.path(sandbox(), fname)

Log.info("Exporting File...")
h2o.exportFile(mypred, dname, parts = parts)
h2o.exportFile(mypred, dname, parts = parts, force = TRUE)

Log.info("Comparing file with R...")
rfiles <- ifelse(parts > 1, list.files(dname, full.names = TRUE), dname)
Expand All @@ -42,10 +42,14 @@ test.export.file <- function(parts) {
print(head(H.pred))

expect_equal(R.pred, H.pred)
return(dname)
}

test.export.file.single <- function() test.export.file(1)
test.export.file.multipart <- function() test.export.file(2)
test.export.file.csv.single <- function() test.export.file(1)
test.export.file.csv.multipart <- function() test.export.file(2)

doSuite("Testing Exporting Files CSV", makeSuite(
test.export.file.csv.single,
test.export.file.csv.multipart,
))

doTest("Testing Exporting Files (single file)", test.export.file.single)
doTest("Testing Exporting Files (part files)", test.export.file.multipart)
Loading

0 comments on commit 90d548e

Please sign in to comment.