Skip to content

Commit

Permalink
HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (a…
Browse files Browse the repository at this point in the history
…pache#3787)

ZStandard supports initialization of compressors and decompressors with a
precomputed dictionary, which can dramatically improve and speed up compression
of tables with small values. For more details, please see

  The Case For Small Data Compression
  https://github.com/facebook/zstd#the-case-for-small-data-compression

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
apurtell authored Oct 27, 2021
1 parent a5a349f commit 45f76a4
Show file tree
Hide file tree
Showing 15 changed files with 600 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class CompressionUtil {
public final class CompressionUtil {

private CompressionUtil() { }

/**
* Round up to the next power of two, unless the value would become negative (ints
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hbase.io.compress;

import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;

/**
* A utility class for managing compressor/decompressor dictionary loading and caching of load
* results. Useful for any codec that can support changing dictionaries at runtime,
* such as ZStandard.
*/
@InterfaceAudience.Private
public final class DictionaryCache {

public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
public static final String RESOURCE_SCHEME = "resource://";

private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
private static LoadingCache<String, byte[]> CACHE;

private DictionaryCache() { }

/**
* Load a dictionary or return a previously cached load.
* @param conf configuration
* @param path the hadoop Path where the dictionary is located, as a String
* @return the dictionary bytes if successful, null otherwise
*/
public static byte[] getDictionary(final Configuration conf, final String path)
throws IOException {
if (path == null || path.isEmpty()) {
return null;
}
// Create the dictionary loading cache if we haven't already
if (CACHE == null) {
synchronized (DictionaryCache.class) {
if (CACHE == null) {
final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
CACHE = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
new CacheLoader<String, byte[]>() {
@Override
public byte[] load(String s) throws Exception {
byte[] bytes;
if (path.startsWith(RESOURCE_SCHEME)) {
bytes = loadFromResource(conf, path, maxSize);
} else {
bytes = loadFromHadoopFs(conf, path, maxSize);
}
LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
return bytes;
}
});
}
}
}

// Get or load the dictionary for the given path
try {
return CACHE.get(path);
} catch (ExecutionException e) {
throw new IOException(e);
}
}

// Visible for testing
public static byte[] loadFromResource(final Configuration conf, final String s,
final int maxSize) throws IOException {
if (!s.startsWith(RESOURCE_SCHEME)) {
throw new IOException("Path does not start with " + RESOURCE_SCHEME);
}
final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
LOG.info("Loading resource {}", path);
final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
if (in == null) {
throw new FileNotFoundException("Resource " + path + " not found");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
final byte[] buffer = new byte[8192];
int n, len = 0;
do {
n = in.read(buffer);
if (n > 0) {
len += n;
if (len > maxSize) {
throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
}
baos.write(buffer, 0, n);
}
} while (n > 0);
} finally {
in.close();
}
return baos.toByteArray();
}

private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
final int maxSize) throws IOException {
final Path path = new Path(s);
final FileSystem fs = FileSystem.get(path.toUri(), conf);
LOG.info("Loading file {}", path);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final FSDataInputStream in = fs.open(path);
try {
final byte[] buffer = new byte[8192];
int n, len = 0;
do {
n = in.read(buffer);
if (n > 0) {
len += n;
if (len > maxSize) {
throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
}
baos.write(buffer, 0, n);
}
} while (n > 0);
} finally {
in.close();
}
return baos.toByteArray();
}

// Visible for testing
public static boolean contains(String dictionaryPath) {
if (CACHE != null) {
return CACHE.asMap().containsKey(dictionaryPath);
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package org.apache.hadoop.hbase.io.compress;

import static org.junit.Assert.assertTrue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Random;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand All @@ -31,6 +29,8 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,11 +39,11 @@ public class CompressionTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);

static final int LARGE_SIZE = 10 * 1024 * 1024;
static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
static final int BLOCK_SIZE = 4096;
protected static final int LARGE_SIZE = 10 * 1024 * 1024;
protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
protected static final int BLOCK_SIZE = 4096;

static final byte[] SMALL_INPUT;
protected static final byte[] SMALL_INPUT;
static {
// 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
Expand All @@ -67,15 +67,20 @@ public class CompressionTestBase {
Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
}

protected void codecTest(final CompressionCodec codec, final byte[][] input)
throws Exception {
protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception {
codecTest(codec, input, null);
}

protected void codecTest(final CompressionCodec codec, final byte[][] input,
final Integer expectedCompressedSize) throws Exception {
// We do this in Compression.java
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
// Compress
long start = EnvironmentEdgeManager.currentTime();
Compressor compressor = codec.createCompressor();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CompressionOutputStream out = codec.createOutputStream(baos);
CompressionOutputStream out = codec.createOutputStream(baos, compressor);
int inLen = 0;
long start = EnvironmentEdgeManager.currentTime();
for (int i = 0; i < input.length; i++) {
out.write(input[i]);
inLen += input[i].length;
Expand All @@ -85,9 +90,15 @@ protected void codecTest(final CompressionCodec codec, final byte[][] input)
final byte[] compressed = baos.toByteArray();
LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
inLen, compressed.length, end - start);
if (expectedCompressedSize != null) {
assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize +
", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length);
}
// Decompress
final byte[] plain = new byte[inLen];
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
decompressor);
start = EnvironmentEdgeManager.currentTime();
IOUtils.readFully(in, plain, 0, plain.length);
in.close();
Expand All @@ -113,29 +124,37 @@ protected void codecSmallTest(final CompressionCodec codec) throws Exception {
/**
* Test with a large input (1MB) divided into blocks of 4KB.
*/
protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
RandomDistribution.DiscreteRNG zipf =
protected void codecLargeTest(final CompressionCodec codec, final double sigma)
throws Exception {
RandomDistribution.DiscreteRNG rng =
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
for (int i = 0; i < input.length; i++) {
for (int j = 0; j < input[i].length; j++) {
input[i][j] = (byte)zipf.nextInt();
}
}
fill(rng, input);
codecTest(codec, input);
}

/**
* Test with a very large input (100MB) as a single input buffer.
*/
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
RandomDistribution.DiscreteRNG zipf =
protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma)
throws Exception {
RandomDistribution.DiscreteRNG rng =
new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
final byte[][] input = new byte[1][VERY_LARGE_SIZE];
for (int i = 0; i < VERY_LARGE_SIZE; i++) {
input[0][i] = (byte)zipf.nextInt();
}
fill(rng, input);
codecTest(codec, input);
}

protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
for (int i = 0; i < input.length; i++) {
fill(rng, input[i]);
}
}

protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
for (int i = 0; i < input.length; i++) {
input[i] = (byte) rng.nextInt();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.DictionaryCache;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand All @@ -41,6 +44,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {

public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";

private Configuration conf;

Expand All @@ -60,12 +64,12 @@ public void setConf(Configuration conf) {

@Override
public Compressor createCompressor() {
return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
}

@Override
public Decompressor createDecompressor() {
return new ZstdDecompressor(getBufferSize(conf));
return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
}

@Override
Expand Down Expand Up @@ -123,4 +127,31 @@ static int getBufferSize(Configuration conf) {
return size > 0 ? size : 256 * 1024; // Don't change this default
}

static byte[] getDictionary(final Configuration conf) {
String path = conf.get(ZSTD_DICTIONARY_KEY);
try {
return DictionaryCache.getDictionary(conf, path);
} catch (IOException e) {
throw new RuntimeException("Unable to load dictionary at " + path, e);
}
}

// Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
// format, followed by a 32-bit identifier also in little-endian format.
// Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md

static boolean isDictionary(byte[] dictionary) {
return (dictionary[0] == (byte)0x37 &&
dictionary[1] == (byte)0xA4 &&
dictionary[2] == (byte)0x30 &&
dictionary[3] == (byte)0xEC);
}

static int getDictionaryId(byte[] dictionary) {
if (!isDictionary(dictionary)) {
throw new IllegalArgumentException("Not a ZStandard dictionary");
}
return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
}

}
Loading

0 comments on commit 45f76a4

Please sign in to comment.