Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-817: Replace aircompressor ZStandard compression with zstd-jni #988

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ public enum OrcConf {
"Define the compression strategy to use while writing data.\n" +
"This changes the compression level of higher level compression\n" +
"codec (like ZLIB)."),
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
"hive.exec.orc.compression.zstd.level", 3,
"Define the compression level to use with ZStandard codec "
+ "while writing data."),
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
COMPRESSION_ZSTD_LONGMODE("orc.compression.zstd.longmode",
"hive.exec.orc.compression.zstd.longmode", false,
"If enabled, the Zstandard codec will employ long mode during "
+ "compression."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
Expand Down
24 changes: 24 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ public static class WriterOptions implements Cloneable {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
private int compressionZstdLevel;
private int compressionZstdWindowLog;
private boolean compressionZstdLongMode;
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
Expand Down Expand Up @@ -485,6 +488,15 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
compressionStrategy = CompressionStrategy.valueOf(compString);

compressionZstdLevel =
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf);

compressionZstdWindowLog =
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf);

compressionZstdLongMode =
OrcConf.COMPRESSION_ZSTD_LONGMODE.getBoolean(tableProperties, conf);

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);

Expand Down Expand Up @@ -903,6 +915,18 @@ public CompressionStrategy getCompressionStrategy() {
return compressionStrategy;
}

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}

public int getCompressionZstdWindowLog() {
return compressionZstdWindowLog;
}

public boolean getCompressionZstdLongMode() {
return compressionZstdLongMode;
}

public EncodingStrategy getEncodingStrategy() {
return encodingStrategy;
}
Expand Down
4 changes: 3 additions & 1 deletion java/core/src/java/org/apache/orc/impl/OutStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ private void spill() throws java.io.IOException {
outputBuffer(current);
getNewInputBuffer();
} else {
// Make sure both compressed and overflow are not null before passing to compress
if (compressed == null) {
compressed = getNewOutputBuffer();
} else if (overflow == null) {
}
if (overflow == null) {
overflow = getNewOutputBuffer();
}
int sizePosn = compressed.position();
Expand Down
11 changes: 10 additions & 1 deletion java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,17 @@ public PhysicalFsWriter(FileSystem fs,
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
// HACK: Fetch Zstd-specific options and set them here -- need to find a
// better way before merging.
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
if (codec instanceof ZstdCodec) {
((ZstdCodec.ZstdOptions) tempOptions).setLevel(opts.getCompressionZstdLevel());
((ZstdCodec.ZstdOptions) tempOptions).setWindowLog(opts.getCompressionZstdWindowLog());
((ZstdCodec.ZstdOptions) tempOptions).setLongMode(opts.getCompressionZstdLongMode());
}
compress.withCodec(codec, tempOptions);
}

this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
Expand Down
9 changes: 5 additions & 4 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -126,6 +124,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private final boolean[] directEncodingColumns;
private final List<OrcProto.ColumnEncoding> unencryptedEncodings =
new ArrayList<>();
private final int compressionZstdLevel;
private final int compressionZstdWindowLog;

// the list of maskDescriptions, keys, and variants
private SortedMap<String, MaskDescriptionImpl> maskDescriptions = new TreeMap<>();
Expand Down Expand Up @@ -181,6 +181,8 @@ public WriterImpl(FileSystem fs,

this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
this.compressionZstdLevel = opts.getCompressionZstdLevel();
this.compressionZstdWindowLog = opts.getCompressionZstdWindowLog();

this.rowIndexStride = opts.getRowIndexStride();
buildIndex = rowIndexStride > 0;
Expand Down Expand Up @@ -278,8 +280,7 @@ public static CompressionCodec createCodec(CompressionKind kind) {
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
return new ZstdCodec();
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
Expand Down
Loading