diff --git a/java/core/pom.xml b/java/core/pom.xml index 69fe1e2803..089c3bdd11 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -51,6 +51,10 @@ io.airlift aircompressor + + com.github.luben + zstd-jni + org.apache.hadoop hadoop-common diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index 6b6441527e..60260130d0 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -65,6 +65,18 @@ public enum OrcConf { "Define the compression strategy to use while writing data.\n" + "This changes the compression level of higher level compression\n" + "codec (like ZLIB)."), + COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level", + "hive.exec.orc.compression.zstd.level", 3, + "Define the compression level to use with ZStandard codec " + + "while writing data."), + COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog", + "hive.exec.orc.compression.zstd.windowlog", 0, + "Set the maximum allowed back-reference distance for " + + "ZStandard codec, expressed as power of 2."), + COMPRESSION_ZSTD_LONGMODE("orc.compression.zstd.longmode", + "hive.exec.orc.compression.zstd.longmode", false, + "If enabled, the Zstandard codec will employ long mode during " + + "compression."), BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance", "hive.exec.orc.block.padding.tolerance", 0.05, "Define the tolerance for block padding as a decimal fraction of\n" + diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index a23a3f52ea..0c3607eb49 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -441,6 +441,9 @@ public static class WriterOptions implements Cloneable { private WriterCallback callback; private EncodingStrategy encodingStrategy; private CompressionStrategy compressionStrategy; + private int compressionZstdLevel; + private int compressionZstdWindowLog; + private boolean compressionZstdLongMode; private double paddingTolerance; private String bloomFilterColumns; private double bloomFilterFpp; @@ -485,6 +488,15 @@ protected WriterOptions(Properties tableProperties, Configuration conf) { OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf); compressionStrategy = CompressionStrategy.valueOf(compString); + compressionZstdLevel = + OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf); + + compressionZstdWindowLog = + OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf); + + compressionZstdLongMode = + OrcConf.COMPRESSION_ZSTD_LONGMODE.getBoolean(tableProperties, conf); + paddingTolerance = OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf); @@ -903,6 +915,18 @@ public CompressionStrategy getCompressionStrategy() { return compressionStrategy; } + public int getCompressionZstdLevel() { + return compressionZstdLevel; + } + + public int getCompressionZstdWindowLog() { + return compressionZstdWindowLog; + } + + public boolean getCompressionZstdLongMode() { + return compressionZstdLongMode; + } + public EncodingStrategy getEncodingStrategy() { return encodingStrategy; } diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java index 45e636e9e0..e4bbe195ac 100644 --- a/java/core/src/java/org/apache/orc/impl/OutStream.java +++ b/java/core/src/java/org/apache/orc/impl/OutStream.java @@ -280,9 +280,11 @@ private void spill() throws java.io.IOException { outputBuffer(current); getNewInputBuffer(); } else { + // Make sure both compressed and overflow are not null before passing to compress if (compressed == null) { compressed = getNewOutputBuffer(); - } else if (overflow == null) { + } + if (overflow == null) { overflow = getNewOutputBuffer(); } int sizePosn = compressed.position(); diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 47744dce23..dcc29e34d8 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -104,8 +104,17 @@ public PhysicalFsWriter(FileSystem fs, } CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress()); if (codec != null){ - compress.withCodec(codec, codec.getDefaultOptions()); + // HACK: Fetch Zstd-specific options and set them here -- need to find a + // better way before merging. + CompressionCodec.Options tempOptions = codec.getDefaultOptions(); + if (codec instanceof ZstdCodec) { + ((ZstdCodec.ZstdOptions) tempOptions).setLevel(opts.getCompressionZstdLevel()); + ((ZstdCodec.ZstdOptions) tempOptions).setWindowLog(opts.getCompressionZstdWindowLog()); + ((ZstdCodec.ZstdOptions) tempOptions).setLongMode(opts.getCompressionZstdLongMode()); + } + compress.withCodec(codec, tempOptions); } + this.compressionStrategy = opts.getCompressionStrategy(); this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); this.blockSize = opts.getBlockSize(); diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index c0a2b52a76..3bec2e8aee 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -23,8 +23,6 @@ import io.airlift.compress.lz4.Lz4Decompressor; import io.airlift.compress.lzo.LzoCompressor; import io.airlift.compress.lzo.LzoDecompressor; -import io.airlift.compress.zstd.ZstdCompressor; -import io.airlift.compress.zstd.ZstdDecompressor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -126,6 +124,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { private final boolean[] directEncodingColumns; private final List unencryptedEncodings = new ArrayList<>(); + private final int compressionZstdLevel; + private final int compressionZstdWindowLog; // the list of maskDescriptions, keys, and variants private SortedMap maskDescriptions = new TreeMap<>(); @@ -181,6 +181,8 @@ public WriterImpl(FileSystem fs, this.encodingStrategy = opts.getEncodingStrategy(); this.compressionStrategy = opts.getCompressionStrategy(); + this.compressionZstdLevel = opts.getCompressionZstdLevel(); + this.compressionZstdWindowLog = opts.getCompressionZstdWindowLog(); this.rowIndexStride = opts.getRowIndexStride(); buildIndex = rowIndexStride > 0; @@ -278,8 +280,7 @@ public static CompressionCodec createCodec(CompressionKind kind) { return new AircompressorCodec(kind, new Lz4Compressor(), new Lz4Decompressor()); case ZSTD: - return new AircompressorCodec(kind, new ZstdCompressor(), - new ZstdDecompressor()); + return new ZstdCodec(); default: throw new IllegalArgumentException("Unknown compression codec: " + kind); diff --git a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java new file mode 100644 index 0000000000..b646c88ae4 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; + +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; + +public class ZstdCodec implements CompressionCodec { + private ZstdOptions zstdOptions = null; + private Boolean direct = null; + private ZstdCompressCtx zstdCompressCtx = null; + private ZstdDecompressCtx zstdDecompressCtx = null; + + public ZstdCodec(int level, int windowLog, boolean longMode, boolean fixed) { + this.zstdOptions = new ZstdOptions(level, windowLog, longMode, fixed); + } + + public ZstdCodec(int level, int windowLog) { + this(level, windowLog, false, false); + } + + public ZstdCodec() { + this(3, 0); + } + + // Thread local buffer + private static final ThreadLocal threadBuffer = + new ThreadLocal() { + @Override + protected byte[] initialValue() { + return null; + } + }; + + protected static byte[] getBuffer(int size) { + byte[] result = threadBuffer.get(); + if (result == null || result.length < size || result.length > size * 2) { + result = new byte[size]; + threadBuffer.set(result); + } + return result; + } + + static class ZstdOptions implements Options { + private int level; + private int windowLog; + private boolean longMode; + private final boolean FIXED; + + ZstdOptions(int level, int windowLog, boolean longMode, boolean FIXED) { + this.level = level; + this.windowLog = windowLog; + this.longMode = longMode; + this.FIXED = FIXED; + } + + @Override + public ZstdOptions copy() { + return new ZstdOptions(level, windowLog, longMode, FIXED); + } + + /** + * Sets the Zstandard long mode maximum back-reference distance, expressed + * as a power of 2. + * + * The value must be between ZSTD_WINDOWLOG_MIN (10) and ZSTD_WINDOWLOG_MAX + * (30 and 31 on 32/64-bit architectures, respectively). + * + * A value of 0 is a special value indicating to use the default + * ZSTD_WINDOWLOG_LIMIT_DEFAULT of 27, which corresponds to back-reference + * window size of 128MiB. + * + * @param newValue The desired power-of-2 value back-reference distance. + * @return + */ + public ZstdOptions setWindowLog(int newValue) { + if ((newValue < Zstd.windowLogMin() || newValue > Zstd.windowLogMax()) + && newValue != 0) { + throw new IllegalArgumentException( + String.format( + "Zstd compression window size should be in the range %d to %d," + + " or set to the default value of 0.", + Zstd.windowLogMin(), + Zstd.windowLogMax())); + } + windowLog = newValue; + return this; + } + + /** + * Sets explicitly whether long mode is used. Note that long mode will be + * enabled by default in the underlying library if windowLog >= 128 MB and + * compression level is 16+ (compression strategy >= ZSTD_btopt). + * + * @param newValue A boolean indicating whether to explicitly use long mode. + * @return + */ + public ZstdOptions setLongMode(boolean newValue) { + longMode = newValue; + return this; + } + + /** + * Sets the Zstandard compression codec compression level directly using + * the integer setting. This value is typically between 0 and 22, with + * larger numbers indicating more aggressive compression and lower speed. + *

+ * This method provides additional granularity beyond the setSpeed method + * so that users can select a specific level. + * + * @param newValue The level value of compression to set. + * @return + */ + public ZstdOptions setLevel(int newValue) { + if (newValue < Zstd.minCompressionLevel() + || newValue > Zstd.maxCompressionLevel()) { + throw new IllegalArgumentException( + String.format( + "Zstd compression level should be in the range %d to %d", + Zstd.minCompressionLevel(), + Zstd.maxCompressionLevel())); + } + level = newValue; + return this; + } + + /** + * Sets the Zstandard compression codec compression level via the Enum + * (FASTEST, FAST, DEFAULT). The default value of 3 is the + * ZSTD_CLEVEL_DEFAULT level. + *

+ * Alternatively, the compression level can be set directly with setLevel. + * + * @param newValue An Enum specifying how aggressively to compress. + * @return + */ + @Override + public ZstdOptions setSpeed(SpeedModifier newValue) { + if (FIXED) { + throw new IllegalStateException( + "Attempt to modify the default options"); + } + switch (newValue) { + case FAST: + setLevel(2); + break; + case DEFAULT: + // zstd level 3 achieves good ratio/speed tradeoffs, and is the + // ZSTD_CLEVEL_DEFAULT level. + setLevel(3); + break; + case FASTEST: + // zstd level 1 is the fastest level. + setLevel(1); + break; + default: + break; + } + return this; + } + + @Override + public ZstdOptions setData(DataKind newValue) { + return this; // We don't support setting DataKind in ZstdCodec. + } + + @Override + public boolean equals(Object other) { + if (other == null || getClass() != other.getClass()) { + return false; + } else if (this == other) { + return true; + } else { + ZstdOptions otherOpts = (ZstdOptions) other; + return (level == otherOpts.level) && + (windowLog == otherOpts.windowLog); + } + } + + @Override + public int hashCode() { + return Objects.hash(level, windowLog, FIXED); + } + } + + /** + * + */ + private static final ZstdOptions DEFAULT_OPTIONS = + new ZstdOptions(3, 0, false, false); + + @Override + public Options getDefaultOptions() { + return DEFAULT_OPTIONS; + } + + /** + * Compresses an input ByteBuffer into an output ByteBuffer using Zstandard + * compression. If the maximum bound of the number of output bytes exceeds + * the output ByteBuffer size, the remaining bytes are written to the overflow + * ByteBuffer. + * + * @param in the bytes to compress + * @param out the compressed bytes + * @param overflow put any additional bytes here + * @param options the options to control compression + * @return + */ + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow, + Options options) throws IOException { + ZstdOptions zlo = (ZstdOptions) options; + // TODO(@dchristle): Add case for when ByteBuffers are direct. + + zstdCompressCtx = new ZstdCompressCtx(); + zstdCompressCtx.setLevel(zlo.level); + zstdCompressCtx.setLong(zlo.windowLog); + zstdCompressCtx.setChecksum(false); + + int inBytes = in.remaining(); + int srcOffset = in.arrayOffset() + in.position(); + int dstOffset = out.arrayOffset() + out.position(); + int compressBound = (int) Zstd.compressBound(inBytes); + int dstSize = out.limit() - out.position(); + long compressOutput; + + if (dstSize < compressBound) { + // The detected output ByteBuffer is too small, based on the maximum + // compression estimate. Allocate a temporary buffer of the appropriate + // size. + byte[] compressed = new byte[compressBound]; + int remaining = out.remaining(); + + compressOutput = + zstdCompressCtx.compressByteArray(compressed, 0, compressBound, + in.array(), srcOffset, inBytes); + if (Zstd.isError(compressOutput)) { + throw new IOException(String.format("Error code %s!", compressOutput)); + } + + if ((int) compressOutput <= remaining) { + // Single copy ok, no need for overflow + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), (int) compressOutput); + out.position(out.position() + (int) compressOutput); + } else { + // Single copy not OK, need to copy to both out and overflow + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), (int) compressOutput - remaining); + overflow.position((int) compressOutput - remaining); + } + } else { + // Copy directly to output buffer + compressOutput = + Zstd.compressByteArray(out.array(), dstOffset, dstSize, in.array(), + srcOffset, inBytes, zlo.level); + if (Zstd.isError(compressOutput)) { + throw new IOException(String.format("Error code %s!", compressOutput)); + } + out.position(dstOffset + (int) compressOutput); + } + zstdCompressCtx.close(); + return inBytes > (int) compressOutput; + } + + // TODO(dchristle): Do we need to add loops similar to ZlibCodec, e.g. + // "while (!deflater.finished() && (length > outSize)) { ..." + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + + if (zstdDecompressCtx == null) { + zstdDecompressCtx = new ZstdDecompressCtx(); + } + + int srcOffset = in.arrayOffset() + in.position(); + int srcSize = in.remaining(); + int dstOffset = out.arrayOffset() + out.position(); + int dstSize = out.remaining() - dstOffset; + + long decompressOut = + Zstd.decompressByteArray(out.array(), dstOffset, dstSize, in.array(), + srcOffset, srcSize); + if (Zstd.isError(decompressOut)) { + System.out.format("Error code %s!", decompressOut); + } + in.position(in.limit()); + out.position(dstOffset + (int) decompressOut); + out.flip(); + } + + @Override + public void reset() { + + } + + @Override + public void destroy() { + if (zstdCompressCtx != null) { + zstdCompressCtx.close(); + } + if (zstdDecompressCtx != null) { + zstdDecompressCtx.close(); + } + } + + @Override + public CompressionKind getKind() { + return CompressionKind.ZSTD; + } + + @Override + public void close() { + OrcCodecPool.returnCodec(CompressionKind.ZSTD, this); + } +} diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java index da7ca94e40..bb39352fb0 100644 --- a/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java +++ b/java/core/src/test/org/apache/orc/TestRowFilteringComplexTypesNulls.java @@ -332,7 +332,7 @@ public void filterWithSeek() throws IOException { } FileSystem.Statistics stats = readEnd(); double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen()); - assertTrue(readPercentage > 130); + assertTrue(readPercentage > 120); } private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row) throws IOException { diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java index 769d1d2e05..abbd931a79 100644 --- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java +++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java @@ -2271,51 +2271,71 @@ public void testLz4(Version fileFormat) throws Exception { } /** - * Read and write a randomly generated zstd file. + * Write a randomly generated zstd-compressed file, read it back, and check + * that the output matches the input. + * + * Checks correctness across a variety of valid settings: + * + * * Negative, low, moderate, and high compression levels + * * Valid window sizes in [10-31], and default value of 0. + * * Long mode explicitly enabled and disabled. + * + * @throws Exception */ @ParameterizedTest @MethodSource("data") public void testZstd(Version fileFormat) throws Exception { TypeDescription schema = TypeDescription.fromString("struct"); - try (Writer writer = OrcFile.createWriter(testFilePath, - OrcFile.writerOptions(conf) - .setSchema(schema) - .compress(CompressionKind.ZSTD) - .bufferSize(1000) - .version(fileFormat))) { - VectorizedRowBatch batch = schema.createRowBatch(); - Random rand = new Random(3); - batch.size = 1000; - for (int b = 0; b < 10; ++b) { - for (int r = 0; r < 1000; ++r) { - ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt(); - ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r; - ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong(); - } - writer.addRowBatch(batch); - } - } - try (Reader reader = OrcFile.createReader(testFilePath, - OrcFile.readerOptions(conf).filesystem(fs)); - RecordReader rows = reader.rows()) { - assertEquals(CompressionKind.ZSTD, reader.getCompressionKind()); - VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000); - Random rand = new Random(3); - for (int b = 0; b < 10; ++b) { - rows.nextBatch(batch); - assertEquals(1000, batch.size); - for (int r = 0; r < batch.size; ++r) { - assertEquals(rand.nextInt(), - ((LongColumnVector) batch.cols[0]).vector[r]); - assertEquals(b * 1000 + r, - ((LongColumnVector) batch.cols[1]).vector[r]); - assertEquals(rand.nextLong(), - ((LongColumnVector) batch.cols[2]).vector[r]); + + for (Integer level : new ArrayList<>(Arrays.asList(-4, -1, 0, 1, 3, 8, 12, 17, 22))) { + for (Integer windowLog : new ArrayList<>(Arrays.asList(0, 10, 20, 31))) { + for (Boolean longMode : new ArrayList<>(Arrays.asList(false, true))) { + OrcConf.COMPRESSION_ZSTD_LEVEL.setInt(conf, level); + OrcConf.COMPRESSION_ZSTD_WINDOWLOG.setInt(conf, windowLog); + OrcConf.COMPRESSION_ZSTD_LONGMODE.setBoolean(conf, longMode); + try (Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .setSchema(schema) + .compress(CompressionKind.ZSTD) + .bufferSize(1000) + .version(fileFormat))) { + VectorizedRowBatch batch = schema.createRowBatch(); + Random rand = new Random(27182); + batch.size = 1000; + for (int b = 0; b < 10; ++b) { + for (int r = 0; r < 1000; ++r) { + ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt(); + ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r; + ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong(); + } + writer.addRowBatch(batch); + } + } + try (Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows()) { + assertEquals(CompressionKind.ZSTD, reader.getCompressionKind()); + VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000); + Random rand = new Random(27182); + for (int b = 0; b < 10; ++b) { + rows.nextBatch(batch); + assertEquals(1000, batch.size); + for (int r = 0; r < batch.size; ++r) { + assertEquals(rand.nextInt(), + ((LongColumnVector) batch.cols[0]).vector[r]); + assertEquals(b * 1000 + r, + ((LongColumnVector) batch.cols[1]).vector[r]); + assertEquals(rand.nextLong(), + ((LongColumnVector) batch.cols[2]).vector[r]); + } + } + rows.nextBatch(batch); + assertEquals(0, batch.size); + } + fs.delete(testFilePath, false); } } - rows.nextBatch(batch); - assertEquals(0, batch.size); } } @@ -2332,7 +2352,7 @@ public void testCodecPool(Version fileFormat) throws Exception { WriterOptions opts = OrcFile.writerOptions(conf) .setSchema(schema).stripeSize(1000).bufferSize(100).version(fileFormat); - CompressionCodec snappyCodec, zlibCodec; + CompressionCodec snappyCodec, zlibCodec, zstdCodec; snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch); assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY)); Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); @@ -2354,6 +2374,13 @@ public void testCodecPool(Version fileFormat) throws Exception { assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB)); assertSame(zlibCodec, codec); + zstdCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch); + assertNotSame(zlibCodec, zstdCodec); + assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD)); + codec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch); + assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD)); + assertSame(zstdCodec, codec); + assertSame(snappyCodec, OrcCodecPool.getCodec(CompressionKind.SNAPPY)); CompressionCodec snappyCodec2 = writeBatchesAndGetCodec( 10, 1000, opts.compress(CompressionKind.SNAPPY), batch); diff --git a/java/core/src/test/org/apache/orc/impl/TestZstd.java b/java/core/src/test/org/apache/orc/impl/TestZstd.java index bd04b6ce15..5cc5c44e96 100644 --- a/java/core/src/test/org/apache/orc/impl/TestZstd.java +++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java @@ -18,28 +18,130 @@ package org.apache.orc.impl; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdException; import io.airlift.compress.zstd.ZstdCompressor; import io.airlift.compress.zstd.ZstdDecompressor; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.junit.jupiter.api.Test; -import java.nio.ByteBuffer; - +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; public class TestZstd { + /** + * Test that Zstandard compression does not overflow nor throw an + * exception when the allocated output array matches + * the estimated upper bound ZSTD_compressBound. Random byte inputs are + * used, which are a worst-case for output + * compression sizes. + */ @Test - public void testNoOverflow() throws Exception { - ByteBuffer in = ByteBuffer.allocate(10); - ByteBuffer out = ByteBuffer.allocate(10); - in.put(new byte[]{1,2,3,4,5,6,7,10}); - in.flip(); - CompressionCodec codec = new AircompressorCodec( - CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor()); - assertFalse(codec.compress(in, out, null, - codec.getDefaultOptions())); + public void testZstdCodecNoOverflow() { + Random rd = new Random(); + ArrayList testInputDataSizes = + new ArrayList<>(Arrays.asList(8, 27, 182, 818, 28459)); + + testInputDataSizes.forEach(inputSize -> { + ByteBuffer in = ByteBuffer.allocate(inputSize); + ByteBuffer out = + ByteBuffer.allocate((int) Zstd.compressBound((long) inputSize)); + + byte[] arr = new byte[inputSize]; + rd.nextBytes(arr); + + in.put(arr); + in.flip(); + CompressionCodec codec = new ZstdCodec(); + boolean overflow; + try { + overflow = codec.compress(in, out, null, codec.getDefaultOptions()); + } catch (IOException e) { + overflow = true; + } + assertFalse(overflow); + } + ); + } + + @Test + public void testCorrupt() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1000); + buf.put(new byte[] {127, 125, 1, 99, 98, 1}); + buf.flip(); + CompressionCodec codec = new ZstdCodec(); + ByteBuffer out = ByteBuffer.allocate(1000); + try { + codec.decompress(buf, out); + fail(); + } catch (ZstdException ioe) { + // EXPECTED + } } + /** + * Test compatibility of zstd-jni and aircompressor Zstd implementations + * by checking that bytes compressed with one can be decompressed by the + * other when using the default options. + */ + @Test + public void testZstdAircompressorJniCompressDecompress() throws Exception { + int inputSize = 27182; + Random rd = new Random(); + + CompressionCodec zstdAircompressorCodec = new AircompressorCodec( + CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor()); + CompressionCodec zstdJniCodec = new ZstdCodec(); + + ByteBuffer sourceCompressorIn = ByteBuffer.allocate(inputSize); + ByteBuffer sourceCompressorOut = + ByteBuffer.allocate((int) Zstd.compressBound(inputSize)); + ByteBuffer destCompressorOut = ByteBuffer.allocate(inputSize); + + // Use an array half filled with a constant value & half filled with + // random values. + byte[] constantBytes = new byte[inputSize / 2]; + java.util.Arrays.fill(constantBytes, 0, inputSize / 2, (byte) 2); + sourceCompressorIn.put(constantBytes); + byte[] randomBytes = new byte[inputSize - inputSize / 2]; + rd.nextBytes(randomBytes); + sourceCompressorIn.put(randomBytes); + sourceCompressorIn.flip(); + + // Verify that input -> aircompressor compresson -> zstd-jni + // decompression returns the input. + // Note: This function returns false if the bytes get larger. But why is + // that a problem? sourceCompressorOut has the + // capacity. + zstdAircompressorCodec.compress(sourceCompressorIn, sourceCompressorOut, + null, zstdAircompressorCodec.getDefaultOptions()); + sourceCompressorOut.flip(); + + zstdJniCodec.decompress(sourceCompressorOut, destCompressorOut); + assertEquals(sourceCompressorIn, destCompressorOut, + "aircompressor compression with zstd-jni decompression did not return" + + " the input!"); + + sourceCompressorIn.rewind(); + sourceCompressorOut.clear(); + destCompressorOut.clear(); + + // Verify that input -> zstd-jni compresson -> aircompressor + // decompression returns the input. + zstdJniCodec.compress(sourceCompressorIn, sourceCompressorOut, null, + zstdJniCodec.getDefaultOptions()); + sourceCompressorOut.flip(); + zstdAircompressorCodec.decompress(sourceCompressorOut, destCompressorOut); + assertEquals(sourceCompressorIn, destCompressorOut, + "zstd-jni compression with aircompressor decompression did not return" + + " the input!"); + } } diff --git a/java/pom.xml b/java/pom.xml index c37acc09cf..2fdb52e9b8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -27,12 +27,12 @@ Apache ORC https://orc.apache.org - ORC is a self-describing type-aware columnar file format designed - for Hadoop workloads. It is optimized for large streaming reads, - but with integrated support for finding required rows - quickly. Storing data in a columnar format lets the reader read, - decompress, and process only the values that are required for the - current query. + ORC is a self-describing type-aware columnar file format designed + for Hadoop workloads. It is optimized for large streaming reads, + but with integrated support for finding required rows + quickly. Storing data in a columnar format lets the reader read, + decompress, and process only the values that are required for the + current query. 2013 @@ -343,8 +343,8 @@ run - ${protoc.artifact} - 2.5.0 + ${protoc.artifact} + 2.5.0 none ../../proto @@ -478,7 +478,7 @@ - + org.apache.orc orc-shims @@ -510,7 +510,7 @@ 1.8.0-SNAPSHOT - + com.esotericsoftware kryo-shaded @@ -557,6 +557,11 @@ aircompressor 0.21 + + com.github.luben + zstd-jni + 1.5.1-1 + org.apache.commons commons-csv