Skip to content

Commit

Permalink
ORC-817, ORC-1088: Support ZStandard compression using zstd-jni
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Original PR: #988
Original author: dchristle

This PR will support the use of [zstd-jni](https://github.com/luben/zstd-jni) library as the implementation of ORC zstd, with better performance than [aircompressor](https://github.com/airlift/aircompressor).  (#988 (comment))

This PR also exposes the compression level and "long mode" settings to ORC users. These settings allow the user to select different speed/compression trade-offs that were not supported by the original aircompressor.

- Add zstd-jni dependency, and add a new CompressionCodec ZstdCodec that uses it. Add ORC conf to set compression level.
- Add ORC conf to use long mode, and add configuration setters for windowLog.
- Add tests that verify the correctness of writing and reading across compression levels, window sizes, and long mode use.
- Add test for compatibility between Zstd aircompressor and zstd-jni implementations.

### Why are the changes needed?
These change makes sense for a few reasons:

ORC users will gain all the improvements from the main zstd library. It is under active development and receives regular speed and compression improvements. In contrast, aircompressor's zstd implementation is older and stale.

ORC users will be able to use the entire speed/compression tradeoff space. Today, aircompressor's implementation has only one of eight compression strategies ([link](https://github.com/airlift/aircompressor/blob/c5e6972bd37e1d3834514957447028060a268eea/src/main/java/io/airlift/compress/zstd/CompressionParameters.java#L143)). This means only a small range of faster but less compressive strategies can be exposed to ORC users. ORC storage with high compression (e.g. for large-but-infrequently-used data) is a clear use case that this PR would unlock.

It will harmonize the Java ORC implementation with other projects in the Hadoop ecosystem. Parquet, Spark, and even the C++ ORC reader/writers all rely on the official zstd implementation either via zstd-jni or directly. In this way, the Java reader/writer code is an outlier.

Detection and fixing any bugs or regressions will generally happen much faster, given the larger number of users and active developer community of zstd and zstd-jni.

The largest tradeoff is that zstd-jni wraps compiled code. That said, many microprocessor architectures are already targeted & bundled into zstd-jni, so this should be a rare hurdle.

### How was this patch tested?
- Unit tests for reading and writing ORC files using a variety of compression levels, window logs, all pass.
- Unit test to compress and decompress between aircompressor and zstd-jni passes. Note that the current aircompressor implementation uses a small subset of levels, so the test only compares data using the default compression settings.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #1743 from cxzl25/ORC-817.

Lead-authored-by: sychen <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: David Christle <[email protected]>
Co-authored-by: Yiqun Zhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
4 people committed Jan 16, 2024
1 parent b126033 commit 33be571
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 45 deletions.
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public enum OrcConf {
"Define the compression strategy to use while writing data.\n" +
"This changes the compression level of higher level compression\n" +
"codec (like ZLIB)."),
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
"hive.exec.orc.compression.zstd.level", 1,
"Define the compression level to use with ZStandard codec "
+ "while writing data. The valid range is 1~22"),
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
Expand Down
32 changes: 32 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ public static BloomFilterVersion fromString(String s) {
}
}

public static class ZstdCompressOptions {
private int compressionZstdLevel;
private int compressionZstdWindowLog;

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}

public void setCompressionZstdLevel(int compressionZstdLevel) {
this.compressionZstdLevel = compressionZstdLevel;
}

public int getCompressionZstdWindowLog() {
return compressionZstdWindowLog;
}

public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
this.compressionZstdWindowLog = compressionZstdWindowLog;
}
}

/**
* Options for creating ORC file writers.
*/
Expand All @@ -447,6 +468,7 @@ public static class WriterOptions implements Cloneable {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
private ZstdCompressOptions zstdCompressOptions;
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
Expand Down Expand Up @@ -493,6 +515,12 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
compressionStrategy = CompressionStrategy.valueOf(compString);

zstdCompressOptions = new ZstdCompressOptions();
zstdCompressOptions.setCompressionZstdLevel(
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWindowLog(
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);

Expand Down Expand Up @@ -938,6 +966,10 @@ public EncodingStrategy getEncodingStrategy() {
return encodingStrategy;
}

public ZstdCompressOptions getZstdCompressOptions() {
return zstdCompressOptions;
}

public double getPaddingTolerance() {
return paddingTolerance;
}
Expand Down
12 changes: 11 additions & 1 deletion java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,18 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
if (codec instanceof ZstdCodec &&
codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions options) {
OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions();
if (zstdCompressOptions != null) {
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
}
}
compress.withCodec(codec, tempOptions);
}

this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
Expand Down
24 changes: 22 additions & 2 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.orc.impl;

import com.github.luben.zstd.util.Native;
import com.google.protobuf.ByteString;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
Expand Down Expand Up @@ -273,6 +274,17 @@ private static int getClosestBufferSize(int size) {
return Math.min(kb256, Math.max(kb4, pow2));
}

static {
try {
if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
Native.load();
}
} catch (UnsatisfiedLinkError | ExceptionInInitializerError e) {
LOG.warn("Unable to load zstd-jni library for your platform. " +
"Using builtin-java classes where applicable");
}
}

public static CompressionCodec createCodec(CompressionKind kind) {
switch (kind) {
case NONE:
Expand All @@ -288,8 +300,16 @@ public static CompressionCodec createCodec(CompressionKind kind) {
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
if (Native.isLoaded()) {
return new ZstdCodec();
} else {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
case BROTLI:
return new BrotliCodec();
default:
Expand Down
Loading

0 comments on commit 33be571

Please sign in to comment.