threadBuffer =
+ ThreadLocal.withInitial(() -> null);
+
+ protected static byte[] getBuffer(int size) {
+ byte[] result = threadBuffer.get();
+ if (result == null || result.length < size || result.length > size * 2) {
+ result = new byte[size];
+ threadBuffer.set(result);
+ }
+ return result;
+ }
+
+ static class ZstdOptions implements Options {
+ private int level;
+ private int windowLog;
+
+ ZstdOptions(int level, int windowLog) {
+ this.level = level;
+ this.windowLog = windowLog;
+ }
+
+ @Override
+ public ZstdOptions copy() {
+ return new ZstdOptions(level, windowLog);
+ }
+
+ @Override
+ public Options setSpeed(SpeedModifier newValue) {
+ return this;
+ }
+
+ /**
+ * Sets the Zstandard long mode maximum back-reference distance, expressed
+ * as a power of 2.
+ *
+ * The value must be between ZSTD_WINDOWLOG_MIN (10) and ZSTD_WINDOWLOG_MAX
+ * (30 and 31 on 32/64-bit architectures, respectively).
+ *
+ * A value of 0 is a special value indicating to use the default
+ * ZSTD_WINDOWLOG_LIMIT_DEFAULT of 27, which corresponds to back-reference
+ * window size of 128MiB.
+ *
+ * @param newValue The desired power-of-2 value back-reference distance.
+ * @return ZstdOptions
+ */
+ public ZstdOptions setWindowLog(int newValue) {
+ if ((newValue < Zstd.windowLogMin() || newValue > Zstd.windowLogMax()) && newValue != 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Zstd compression window size should be in the range %d to %d,"
+ + " or set to the default value of 0.",
+ Zstd.windowLogMin(),
+ Zstd.windowLogMax()));
+ }
+ windowLog = newValue;
+ return this;
+ }
+
+ /**
+ * Sets the Zstandard compression codec compression level directly using
+ * the integer setting. This value is typically between 0 and 22, with
+ * larger numbers indicating more aggressive compression and lower speed.
+ *
+ * This method provides additional granularity beyond the setSpeed method
+ * so that users can select a specific level.
+ *
+ * @param newValue The level value of compression to set.
+ * @return ZstdOptions
+ */
+ public ZstdOptions setLevel(int newValue) {
+ if (newValue < Zstd.minCompressionLevel() || newValue > Zstd.maxCompressionLevel()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Zstd compression level should be in the range %d to %d",
+ Zstd.minCompressionLevel(),
+ Zstd.maxCompressionLevel()));
+ }
+ level = newValue;
+ return this;
+ }
+
+ @Override
+ public ZstdOptions setData(DataKind newValue) {
+ return this; // We don't support setting DataKind in ZstdCodec.
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ZstdOptions that = (ZstdOptions) o;
+
+ if (level != that.level) return false;
+ return windowLog == that.windowLog;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = level;
+ result = 31 * result + windowLog;
+ return result;
+ }
+ }
+
+ private static final ZstdOptions DEFAULT_OPTIONS =
+ new ZstdOptions(1, 0);
+
+ @Override
+ public Options getDefaultOptions() {
+ return DEFAULT_OPTIONS;
+ }
+
+ /**
+ * Compresses an input ByteBuffer into an output ByteBuffer using Zstandard
+ * compression. If the maximum bound of the number of output bytes exceeds
+ * the output ByteBuffer size, the remaining bytes are written to the overflow
+ * ByteBuffer.
+ *
+ * @param in the bytes to compress
+ * @param out the compressed bytes
+ * @param overflow put any additional bytes here
+ * @param options the options to control compression
+ * @return ZstdOptions
+ */
+ @Override
+ public boolean compress(ByteBuffer in, ByteBuffer out,
+ ByteBuffer overflow,
+ Options options) throws IOException {
+ ZstdOptions zso = (ZstdOptions) options;
+
+ zstdCompressCtx = new ZstdCompressCtx();
+ zstdCompressCtx.setLevel(zso.level);
+ zstdCompressCtx.setLong(zso.windowLog);
+ zstdCompressCtx.setChecksum(false);
+
+ try {
+ int inBytes = in.remaining();
+ byte[] compressed = getBuffer((int) Zstd.compressBound(inBytes));
+
+ int outBytes = zstdCompressCtx.compressByteArray(compressed, 0, compressed.length,
+ in.array(), in.arrayOffset() + in.position(), inBytes);
+ if (Zstd.isError(outBytes)) {
+ throw new IOException(String.format("Error code %s!", outBytes));
+ }
+ if (outBytes < inBytes) {
+ int remaining = out.remaining();
+ if (remaining >= outBytes) {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), outBytes);
+ out.position(out.position() + outBytes);
+ } else {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), remaining);
+ out.position(out.limit());
+ System.arraycopy(compressed, remaining, overflow.array(),
+ overflow.arrayOffset(), outBytes - remaining);
+ overflow.position(outBytes - remaining);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ zstdCompressCtx.close();
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ int srcOffset = in.arrayOffset() + in.position();
+ int srcSize = in.remaining();
+ int dstOffset = out.arrayOffset() + out.position();
+ int dstSize = out.remaining() - dstOffset;
+
+ long decompressOut =
+ Zstd.decompressByteArray(out.array(), dstOffset, dstSize, in.array(),
+ srcOffset, srcSize);
+ if (Zstd.isError(decompressOut)) {
+ throw new IOException(String.format("Error code %s!", decompressOut));
+ }
+ in.position(in.limit());
+ out.position(dstOffset + (int) decompressOut);
+ out.flip();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void destroy() {
+ if (zstdCompressCtx != null) {
+ zstdCompressCtx.close();
+ }
+ }
+
+ @Override
+ public CompressionKind getKind() {
+ return CompressionKind.ZSTD;
+ }
+
+ @Override
+ public void close() {
+ OrcCodecPool.returnCodec(CompressionKind.ZSTD, this);
+ }
+}
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 2dacb8d60d..c24514f697 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -2272,51 +2272,67 @@ public void testLz4(Version fileFormat) throws Exception {
}
/**
- * Read and write a randomly generated zstd file.
+ * Write a randomly generated zstd-compressed file, read it back, and check
+ * that the output matches the input.
+ *
+ * Checks correctness across a variety of valid settings:
+ *
+ * * Negative, low, moderate, and high compression levels
+ * * Valid window sizes in [10-31], and default value of 0.
+ *
+ * @throws Exception
*/
@ParameterizedTest
@MethodSource("data")
public void testZstd(Version fileFormat) throws Exception {
TypeDescription schema =
TypeDescription.fromString("struct");
- try (Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf)
- .setSchema(schema)
- .compress(CompressionKind.ZSTD)
- .bufferSize(1000)
- .version(fileFormat))) {
- VectorizedRowBatch batch = schema.createRowBatch();
- Random rand = new Random(3);
- batch.size = 1000;
- for (int b = 0; b < 10; ++b) {
- for (int r = 0; r < 1000; ++r) {
- ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
- ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
- ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+
+ for (Integer level : new ArrayList<>(Arrays.asList(-4, -1, 0, 1, 3, 8, 12, 17, 22))) {
+ for (Integer windowLog : new ArrayList<>(Arrays.asList(0, 10, 20, 31))) {
+ OrcConf.COMPRESSION_ZSTD_LEVEL.setInt(conf, level);
+ OrcConf.COMPRESSION_ZSTD_WINDOWLOG.setInt(conf, windowLog);
+ try (Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .setSchema(schema)
+ .compress(CompressionKind.ZSTD)
+ .bufferSize(1000)
+ .version(fileFormat))) {
+ VectorizedRowBatch batch = schema.createRowBatch();
+ Random rand = new Random(3);
+ batch.size = 1000;
+ for (int b = 0; b < 10; ++b) {
+ for (int r = 0; r < 1000; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+ ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
+ ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+ }
+ writer.addRowBatch(batch);
+ }
}
- writer.addRowBatch(batch);
- }
- }
- try (Reader reader = OrcFile.createReader(testFilePath,
- OrcFile.readerOptions(conf).filesystem(fs));
- RecordReader rows = reader.rows()) {
- assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
- VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
- Random rand = new Random(3);
- for (int b = 0; b < 10; ++b) {
- rows.nextBatch(batch);
- assertEquals(1000, batch.size);
- for (int r = 0; r < batch.size; ++r) {
- assertEquals(rand.nextInt(),
- ((LongColumnVector) batch.cols[0]).vector[r]);
- assertEquals(b * 1000 + r,
- ((LongColumnVector) batch.cols[1]).vector[r]);
- assertEquals(rand.nextLong(),
- ((LongColumnVector) batch.cols[2]).vector[r]);
+ try (Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows()) {
+ assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
+ Random rand = new Random(3);
+ for (int b = 0; b < 10; ++b) {
+ rows.nextBatch(batch);
+ assertEquals(1000, batch.size);
+ for (int r = 0; r < batch.size; ++r) {
+ assertEquals(rand.nextInt(),
+ ((LongColumnVector) batch.cols[0]).vector[r]);
+ assertEquals(b * 1000 + r,
+ ((LongColumnVector) batch.cols[1]).vector[r]);
+ assertEquals(rand.nextLong(),
+ ((LongColumnVector) batch.cols[2]).vector[r]);
+ }
+ }
+ rows.nextBatch(batch);
+ assertEquals(0, batch.size);
}
+ fs.delete(testFilePath, false);
}
- rows.nextBatch(batch);
- assertEquals(0, batch.size);
}
}
@@ -2333,7 +2349,7 @@ public void testCodecPool(Version fileFormat) throws Exception {
WriterOptions opts = OrcFile.writerOptions(conf)
.setSchema(schema).stripeSize(1000).bufferSize(100).version(fileFormat);
- CompressionCodec snappyCodec, zlibCodec;
+ CompressionCodec snappyCodec, zlibCodec, zstdCodec;
snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
@@ -2355,6 +2371,13 @@ public void testCodecPool(Version fileFormat) throws Exception {
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
assertSame(zlibCodec, codec);
+ zstdCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch);
+ assertNotSame(zlibCodec, zstdCodec);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD));
+ codec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch);
+ assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD));
+ assertSame(zstdCodec, codec);
+
assertSame(snappyCodec, OrcCodecPool.getCodec(CompressionKind.SNAPPY));
CompressionCodec snappyCodec2 = writeBatchesAndGetCodec(
10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
diff --git a/java/core/src/test/org/apache/orc/impl/TestZstd.java b/java/core/src/test/org/apache/orc/impl/TestZstd.java
index bd04b6ce15..b424feb824 100644
--- a/java/core/src/test/org/apache/orc/impl/TestZstd.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java
@@ -18,15 +18,19 @@
package org.apache.orc.impl;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdException;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
+import java.nio.ByteBuffer;
+import java.util.Random;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.junit.jupiter.api.Test;
-import java.nio.ByteBuffer;
-
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.fail;
public class TestZstd {
@@ -34,12 +38,85 @@ public class TestZstd {
public void testNoOverflow() throws Exception {
ByteBuffer in = ByteBuffer.allocate(10);
ByteBuffer out = ByteBuffer.allocate(10);
- in.put(new byte[]{1,2,3,4,5,6,7,10});
+ ByteBuffer jniOut = ByteBuffer.allocate(10);
+ in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10});
in.flip();
CompressionCodec codec = new AircompressorCodec(
- CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
+ CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
assertFalse(codec.compress(in, out, null,
- codec.getDefaultOptions()));
+ codec.getDefaultOptions()));
+ CompressionCodec zstdCodec = new ZstdCodec();
+ assertFalse(zstdCodec.compress(in, jniOut, null,
+ zstdCodec.getDefaultOptions()));
}
+ @Test
+ public void testCorrupt() throws Exception {
+ ByteBuffer buf = ByteBuffer.allocate(1000);
+ buf.put(new byte[] {127, 125, 1, 99, 98, 1});
+ buf.flip();
+ CompressionCodec codec = new ZstdCodec();
+ ByteBuffer out = ByteBuffer.allocate(1000);
+ try {
+ codec.decompress(buf, out);
+ fail();
+ } catch (ZstdException ioe) {
+ // EXPECTED
+ }
+ }
+
+ /**
+ * Test compatibility of zstd-jni and aircompressor Zstd implementations
+ * by checking that bytes compressed with one can be decompressed by the
+ * other when using the default options.
+ */
+ @Test
+ public void testZstdAircompressorJniCompressDecompress() throws Exception {
+ int inputSize = 27182;
+ Random rd = new Random();
+
+ CompressionCodec zstdAircompressorCodec = new AircompressorCodec(
+ CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
+ CompressionCodec zstdJniCodec = new ZstdCodec();
+
+ ByteBuffer sourceCompressorIn = ByteBuffer.allocate(inputSize);
+ ByteBuffer sourceCompressorOut =
+ ByteBuffer.allocate((int) Zstd.compressBound(inputSize));
+ ByteBuffer destCompressorOut = ByteBuffer.allocate(inputSize);
+
+ // Use an array half filled with a constant value & half filled with
+ // random values.
+ byte[] constantBytes = new byte[inputSize / 2];
+ java.util.Arrays.fill(constantBytes, 0, inputSize / 2, (byte) 2);
+ sourceCompressorIn.put(constantBytes);
+ byte[] randomBytes = new byte[inputSize - inputSize / 2];
+ rd.nextBytes(randomBytes);
+ sourceCompressorIn.put(randomBytes);
+ sourceCompressorIn.flip();
+
+ // Verify that input -> aircompressor compresson -> zstd-jni
+ // decompression returns the input.
+ zstdAircompressorCodec.compress(sourceCompressorIn, sourceCompressorOut,
+ null, zstdAircompressorCodec.getDefaultOptions());
+ sourceCompressorOut.flip();
+
+ zstdJniCodec.decompress(sourceCompressorOut, destCompressorOut);
+ assertEquals(sourceCompressorIn, destCompressorOut,
+ "aircompressor compression with zstd-jni decompression did not return"
+ + " the input!");
+
+ sourceCompressorIn.rewind();
+ sourceCompressorOut.clear();
+ destCompressorOut.clear();
+
+ // Verify that input -> zstd-jni compresson -> aircompressor
+ // decompression returns the input.
+ zstdJniCodec.compress(sourceCompressorIn, sourceCompressorOut, null,
+ zstdJniCodec.getDefaultOptions());
+ sourceCompressorOut.flip();
+ zstdAircompressorCodec.decompress(sourceCompressorOut, destCompressorOut);
+ assertEquals(sourceCompressorIn, destCompressorOut,
+ "zstd-jni compression with aircompressor decompression did not return"
+ + " the input!");
+ }
}
diff --git a/java/pom.xml b/java/pom.xml
index 68e4811177..44c0531173 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -83,6 +83,7 @@
2.8.1
3.0.0-M5
${project.build.directory}/testing-tmp
+ 1.5.5-11
@@ -155,6 +156,11 @@
aircompressor
0.25