Skip to content

Commit

Permalink
[core] Integrate compression to Hash Lookup
Browse files Browse the repository at this point in the history
This closes apache#2820
  • Loading branch information
JingsongLi committed Feb 2, 2024
1 parent 6a6c060 commit 5af03f5
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 45 deletions.
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</tr>
<tr>
<td><h5>cache-page-size</h5></td>
<td style="word-wrap: break-word;">16 kb</td>
<td style="word-wrap: break-word;">64 kb</td>
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
Expand Down Expand Up @@ -273,6 +273,12 @@
<td>MemorySize</td>
<td>Max memory size for lookup cache.</td>
</tr>
<tr>
<td><h5>lookup.cache-spill-compression</h5></td>
<td style="word-wrap: break-word;">"lz4"</td>
<td>String</td>
<td>Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.</td>
</tr>
<tr>
<td><h5>lookup.cache.bloom.filter.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private HashLookupStoreReader writeData(
Arrays.fill(value, (byte) 1);
HashLookupStoreFactory factory =
new HashLookupStoreFactory(
new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 1024, 0.75);
new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 1024, 0.75, "none");

File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
HashLookupStoreWriter writer = factory.createWriter(file, filter);
Expand Down
5 changes: 5 additions & 0 deletions paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ under the License.
<pattern>it.unimi.dsi.fastutil</pattern>
<shadedPattern>org.apache.paimon.shade.it.unimi.dsi.fastutil</shadedPattern>
</relocation>
<!-- Same to paimon-format. -->
<relocation>
<pattern>io.airlift</pattern>
<shadedPattern>org.apache.paimon.shade.io.airlift</shadedPattern>
</relocation>
</relocations>
<minimizeJar>true</minimizeJar>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<MemorySize> CACHE_PAGE_SIZE =
key("cache-page-size")
.memoryType()
.defaultValue(MemorySize.parse("16 kb"))
.defaultValue(MemorySize.parse("64 kb"))
.withDescription("Memory page size for caching.");

public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
Expand Down Expand Up @@ -729,6 +729,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Max disk size for lookup cache, you can use this option to limit the use of local disks.");

public static final ConfigOption<String> LOOKUP_CACHE_SPILL_COMPRESSION =
key("lookup.cache-spill-compression")
.stringType()
.defaultValue("lz4")
.withDescription(
"Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.");

public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
key("lookup.cache-max-memory-size")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ public interface BlockCompressionFactory {

/** Creates {@link BlockCompressionFactory} according to the configuration. */
@Nullable
static BlockCompressionFactory create(@Nullable String compression) {
if (compression == null) {
return null;
}

static BlockCompressionFactory create(String compression) {
switch (compression.toUpperCase()) {
case "NONE":
return null;
case "LZ4":
return new Lz4BlockCompressionFactory();
case "LZO":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.paimon.io;

import org.apache.paimon.compression.BlockCompressionFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand All @@ -36,8 +40,23 @@ public interface PageFileInput extends Closeable {

byte[] readPosition(long position, int length) throws IOException;

static PageFileInput create(File file, int pageSize) throws IOException {
static PageFileInput create(
File file,
int pageSize,
@Nullable BlockCompressionFactory compressionFactory,
long uncompressBytes,
@Nullable long[] compressPagePositions)
throws IOException {
RandomAccessFile accessFile = new RandomAccessFile(file, "r");
return new UncompressedPageFileInput(accessFile, pageSize);
if (compressionFactory == null) {
return new UncompressedPageFileInput(accessFile, pageSize);
} else {
return new CompressedPageFileInput(
accessFile,
pageSize,
compressionFactory,
uncompressBytes,
compressPagePositions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.paimon.io;

import org.apache.paimon.compression.BlockCompressionFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand All @@ -27,7 +31,12 @@ public interface PageFileOutput extends Closeable {

void write(byte[] bytes, int off, int len) throws IOException;

static PageFileOutput create(File file) throws IOException {
return new UncompressedPageFileOutput(file);
static PageFileOutput create(
File file, int pageSize, @Nullable BlockCompressionFactory compressionFactory)
throws IOException {
if (compressionFactory == null) {
return new UncompressedPageFileOutput(file);
}
return new CompressedPageFileOutput(file, pageSize, compressionFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class HashContext implements Context {
// Offset of the data for different key length
final long[] dataOffsets;

final long uncompressBytes;
final long[] compressPages;

public HashContext(
boolean bloomFilterEnabled,
long bloomFilterExpectedEntries,
Expand All @@ -49,7 +52,9 @@ public HashContext(
int[] slotSizes,
int[] slots,
int[] indexOffsets,
long[] dataOffsets) {
long[] dataOffsets,
long uncompressBytes,
long[] compressPages) {
this.bloomFilterEnabled = bloomFilterEnabled;
this.bloomFilterExpectedEntries = bloomFilterExpectedEntries;
this.bloomFilterBytes = bloomFilterBytes;
Expand All @@ -58,5 +63,21 @@ public HashContext(
this.slots = slots;
this.indexOffsets = indexOffsets;
this.dataOffsets = dataOffsets;
this.uncompressBytes = uncompressBytes;
this.compressPages = compressPages;
}

public HashContext copy(long uncompressBytes, long[] compressPages) {
return new HashContext(
bloomFilterEnabled,
bloomFilterExpectedEntries,
bloomFilterBytes,
keyCounts,
slotSizes,
slots,
indexOffsets,
dataOffsets,
uncompressBytes,
compressPages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.lookup.hash;

import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.utils.BloomFilter;
Expand All @@ -33,21 +34,26 @@ public class HashLookupStoreFactory implements LookupStoreFactory {
private final CacheManager cacheManager;
private final int cachePageSize;
private final double loadFactor;
@Nullable private final BlockCompressionFactory compressionFactory;

public HashLookupStoreFactory(CacheManager cacheManager, int cachePageSize, double loadFactor) {
public HashLookupStoreFactory(
CacheManager cacheManager, int cachePageSize, double loadFactor, String compression) {
this.cacheManager = cacheManager;
this.cachePageSize = cachePageSize;
this.loadFactor = loadFactor;
this.compressionFactory = BlockCompressionFactory.create(compression);
}

@Override
public HashLookupStoreReader createReader(File file, Context context) throws IOException {
return new HashLookupStoreReader(file, (HashContext) context, cacheManager, cachePageSize);
return new HashLookupStoreReader(
file, (HashContext) context, cacheManager, cachePageSize, compressionFactory);
}

@Override
public HashLookupStoreWriter createWriter(File file, @Nullable BloomFilter.Builder bloomFilter)
throws IOException {
return new HashLookupStoreWriter(loadFactor, file, bloomFilter);
return new HashLookupStoreWriter(
loadFactor, file, bloomFilter, compressionFactory, cachePageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.lookup.hash;

import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.io.PageFileInput;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.io.cache.FileBasedRandomInputView;
Expand Down Expand Up @@ -63,7 +64,11 @@ public class HashLookupStoreReader
@Nullable private FileBasedBloomFilter bloomFilter;

HashLookupStoreReader(
File file, HashContext context, CacheManager cacheManager, int cachePageSize)
File file,
HashContext context,
CacheManager cacheManager,
int cachePageSize,
@Nullable BlockCompressionFactory compressionFactory)
throws IOException {
// File path
if (!file.exists()) {
Expand All @@ -83,7 +88,13 @@ public class HashLookupStoreReader

LOG.info("Opening file {}", file.getName());

PageFileInput fileInput = PageFileInput.create(file, cachePageSize);
PageFileInput fileInput =
PageFileInput.create(
file,
cachePageSize,
compressionFactory,
context.uncompressBytes,
context.compressPages);
inputView = new FileBasedRandomInputView(fileInput, cacheManager);

if (context.bloomFilterEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.lookup.hash;

import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.io.CompressedPageFileOutput;
import org.apache.paimon.io.PageFileOutput;
import org.apache.paimon.lookup.LookupStoreFactory.Context;
import org.apache.paimon.lookup.LookupStoreWriter;
Expand Down Expand Up @@ -81,10 +83,20 @@ public class HashLookupStoreWriter implements LookupStoreWriter {

@Nullable private final BloomFilter.Builder bloomFilter;

HashLookupStoreWriter(double loadFactor, File file, @Nullable BloomFilter.Builder bloomFilter)
@Nullable private final BlockCompressionFactory compressionFactory;
private final int compressPageSize;

HashLookupStoreWriter(
double loadFactor,
File file,
@Nullable BloomFilter.Builder bloomFilter,
@Nullable BlockCompressionFactory compressionFactory,
int compressPageSize)
throws IOException {
this.loadFactor = loadFactor;
this.outputFile = file;
this.compressionFactory = compressionFactory;
this.compressPageSize = compressPageSize;
if (loadFactor <= 0.0 || loadFactor >= 1.0) {
throw new IllegalArgumentException(
"Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0.");
Expand Down Expand Up @@ -187,7 +199,9 @@ public Context close() throws IOException {
new int[keyCounts.length],
new int[keyCounts.length],
new int[keyCounts.length],
new long[keyCounts.length]);
new long[keyCounts.length],
0,
null);

long indexesLength = bloomFilterBytes;
long datasLength = 0;
Expand Down Expand Up @@ -223,7 +237,9 @@ public Context close() throws IOException {
context.dataOffsets[i] = indexesLength + context.dataOffsets[i];
}

try (PageFileOutput output = PageFileOutput.create(outputFile)) {
PageFileOutput output =
PageFileOutput.create(outputFile, compressPageSize, compressionFactory);
try {
// Write bloom filter file
if (bloomFilter != null) {
File bloomFilterFile = new File(tempFolder, "bloomfilter.dat");
Expand Down Expand Up @@ -257,11 +273,17 @@ public Context close() throws IOException {
mergeFiles(filesToMerge, output);
} finally {
cleanup(filesToMerge);
output.close();
}

LOG.info(
"Compressed Total store size: {} Mb",
new DecimalFormat("#,##0.0").format(outputFile.length() / (1024 * 1024)));

if (output instanceof CompressedPageFileOutput) {
CompressedPageFileOutput compressedOutput = (CompressedPageFileOutput) output;
context = context.copy(compressedOutput.uncompressBytes(), compressedOutput.pages());
}
return context;
}

Expand Down Expand Up @@ -472,14 +494,4 @@ private DataOutputStream getIndexStream(int keyLength) throws IOException {
}
return dos;
}

private int getNumKeyCount() {
int res = 0;
for (int count : keyCounts) {
if (count != 0) {
res++;
}
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private void innerTest(int len, int maxFileReadCount) throws IOException {
File file = writeFile(bytes);
CacheManager cacheManager = new CacheManager(MemorySize.ofKibiBytes(128));
FileBasedRandomInputView view =
new FileBasedRandomInputView(PageFileInput.create(file, 1024), cacheManager);
new FileBasedRandomInputView(
PageFileInput.create(file, 1024, null, 0, null), cacheManager);

// read first one
// this assertThatCode check the ConcurrentModificationException is not threw.
Expand Down
Loading

0 comments on commit 5af03f5

Please sign in to comment.