Skip to content

Commit

Permalink
[core] Introduce a basic SortLookupStoreFactory (#3770)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 18, 2024
1 parent 6da33cb commit 982e7b1
Show file tree
Hide file tree
Showing 31 changed files with 1,937 additions and 21 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
<tr>
<td><h5>lookup.local-file-type</h5></td>
<td style="word-wrap: break-word;">hash</td>
<td><p>Enum</p></td>
<td>The local file type for lookup.<br /><br />Possible values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash": Construct a hash file for lookup.</li></ul></td>
</tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
Expand Down
36 changes: 36 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Define partition by table options, cannot define partition on DDL and table options at the same time.");

public static final ConfigOption<LookupLocalFileType> LOOKUP_LOCAL_FILE_TYPE =
key("lookup.local-file-type")
.enumType(LookupLocalFileType.class)
.defaultValue(LookupLocalFileType.HASH)
.withDescription("The local file type for lookup.");

public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
key("lookup.hash-load-factor")
.floatType()
Expand Down Expand Up @@ -1624,6 +1630,10 @@ public int cachePageSize() {
return (int) options.get(CACHE_PAGE_SIZE).getBytes();
}

public LookupLocalFileType lookupLocalFileType() {
return options.get(LOOKUP_LOCAL_FILE_TYPE);
}

public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}
Expand Down Expand Up @@ -2616,4 +2626,30 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the local file type for lookup. */
public enum LookupLocalFileType implements DescribedEnum {
SORT("sort", "Construct a sorted file for lookup."),

HASH("hash", "Construct a hash file for lookup.");

private final String value;

private final String description;

LookupLocalFileType(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@
/** Implementation of {@link BlockCompressionFactory} for airlift compressors. */
public class AirCompressorFactory implements BlockCompressionFactory {

private final BlockCompressionType type;
private final Compressor internalCompressor;
private final Decompressor internalDecompressor;

public AirCompressorFactory(Compressor internalCompressor, Decompressor internalDecompressor) {
public AirCompressorFactory(
BlockCompressionType type,
Compressor internalCompressor,
Decompressor internalDecompressor) {
this.type = type;
this.internalCompressor = internalCompressor;
this.internalDecompressor = internalDecompressor;
}

@Override
public BlockCompressionType getCompressionType() {
return type;
}

@Override
public BlockCompressor getCompressor() {
return new AirBlockCompressor(internalCompressor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
public interface BlockCompressionFactory {

BlockCompressionType getCompressionType();

BlockCompressor getCompressor();

BlockDecompressor getDecompressor();
Expand All @@ -39,12 +41,31 @@ static BlockCompressionFactory create(String compression) {
switch (compression.toUpperCase()) {
case "NONE":
return null;
case "ZSTD":
return new ZstdBlockCompressionFactory();
case "LZ4":
return new Lz4BlockCompressionFactory();
case "LZO":
return new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
case "ZSTD":
return new AirCompressorFactory(
BlockCompressionType.LZO, new LzoCompressor(), new LzoDecompressor());
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
}

/** Creates {@link BlockCompressionFactory} according to the {@link BlockCompressionType}. */
@Nullable
static BlockCompressionFactory create(BlockCompressionType compression) {
switch (compression) {
case NONE:
return null;
case ZSTD:
return new ZstdBlockCompressionFactory();
case LZ4:
return new Lz4BlockCompressionFactory();
case LZO:
return new AirCompressorFactory(
BlockCompressionType.LZO, new LzoCompressor(), new LzoDecompressor());
default:
throw new IllegalStateException("Unknown CompressionMethod " + compression);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.compression;

/** Block Compression type. */
public enum BlockCompressionType {
NONE(0),
ZSTD(1),
LZ4(2),
LZO(3);

private final int persistentId;

BlockCompressionType(int persistentId) {
this.persistentId = persistentId;
}

public int persistentId() {
return this.persistentId;
}

public static BlockCompressionType getCompressionTypeByPersistentId(int persistentId) {
BlockCompressionType[] types = values();
for (BlockCompressionType type : types) {
if (type.persistentId == persistentId) {
return type;
}
}

throw new IllegalArgumentException("Unknown persistentId " + persistentId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

/** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
public class Lz4BlockCompressionFactory implements BlockCompressionFactory {

@Override
public BlockCompressionType getCompressionType() {
return BlockCompressionType.LZ4;
}

@Override
public BlockCompressor getCompressor() {
return new Lz4BlockCompressor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
public class ZstdBlockCompressionFactory implements BlockCompressionFactory {

@Override
public BlockCompressionType getCompressionType() {
return BlockCompressionType.ZSTD;
}

@Override
public BlockCompressor getCompressor() {
return new ZstdBlockCompressor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand All @@ -42,6 +43,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;

import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
Expand Down Expand Up @@ -160,6 +162,10 @@ public InternalRow deserialize(byte[] bytes) {
return row;
}

public Comparator<MemorySlice> createSliceComparator() {
return new SliceComparator(rowType);
}

private static FieldWriter createFieldWriter(DataType fieldType) {
final FieldWriter fieldWriter;
switch (fieldType.getTypeRoot()) {
Expand Down Expand Up @@ -515,24 +521,30 @@ private static class RowReader {

private MemorySegment segment;
private MemorySegment[] segments;
private int offset;
private int position;

private RowReader(int headerSizeInBytes) {
this.headerSizeInBytes = headerSizeInBytes;
}

private void pointTo(byte[] bytes) {
this.segment = MemorySegment.wrap(bytes);
pointTo(MemorySegment.wrap(bytes), 0);
}

private void pointTo(MemorySegment segment, int offset) {
this.segment = segment;
this.segments = new MemorySegment[] {segment};
this.position = headerSizeInBytes;
this.offset = offset;
this.position = offset + headerSizeInBytes;
}

private RowKind readRowKind() {
return RowKind.fromByteValue(segment.get(0));
return RowKind.fromByteValue(segment.get(offset));
}

private boolean isNullAt(int pos) {
return bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS);
return bitGet(segment, offset, pos + HEADER_SIZE_IN_BITS);
}

private boolean readBoolean() {
Expand Down Expand Up @@ -635,4 +647,47 @@ private InternalRow readRow(RowCompactedSerializer serializer) {
return serializer.deserialize(bytes);
}
}

private static class SliceComparator implements Comparator<MemorySlice> {

private final RowReader reader1;
private final RowReader reader2;
private final FieldReader[] fieldReaders;

public SliceComparator(RowType rowType) {
this.reader1 = new RowReader(rowType.getFieldCount());
this.reader2 = new RowReader(rowType.getFieldCount());
this.fieldReaders = new FieldReader[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));
}
}

@Override
public int compare(MemorySlice slice1, MemorySlice slice2) {
reader1.pointTo(slice1.segment(), slice1.offset());
reader2.pointTo(slice2.segment(), slice2.offset());
for (int i = 0; i < fieldReaders.length; i++) {
boolean isNull1 = reader1.isNullAt(i);
boolean isNull2 = reader2.isNullAt(i);
if (!isNull1 || !isNull2) {
if (isNull1) {
return -1;
} else if (isNull2) {
return 1;
} else {
FieldReader fieldReader = fieldReaders[i];
Object o1 = fieldReader.readField(reader1, i);
Object o2 = fieldReader.readField(reader2, i);
@SuppressWarnings({"unchecked", "rawtypes"})
int comp = ((Comparable) o1).compareTo(o2);
if (comp != 0) {
return comp;
}
}
}
}
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.apache.paimon.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.BloomFilter;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -59,6 +64,26 @@ static Function<Long, BloomFilter.Builder> bfGenerator(Options options) {
return bfGenerator;
}

static LookupStoreFactory create(
CoreOptions options, CacheManager cacheManager, Comparator<MemorySlice> keyComparator) {
String compression =
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION);
switch (options.lookupLocalFileType()) {
case SORT:
return new SortLookupStoreFactory(
keyComparator, cacheManager, options.cachePageSize(), compression);
case HASH:
return new HashLookupStoreFactory(
cacheManager,
options.cachePageSize(),
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
compression);
default:
throw new IllegalArgumentException(
"Unsupported lookup local file type: " + options.lookupLocalFileType());
}
}

/** Context between writer and reader. */
interface Context {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.paimon.lookup;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;

/** Reader, lookup value by key bytes. */
public interface LookupStoreReader extends Closeable {

/** Lookup value by key. */
@Nullable
byte[] lookup(byte[] key) throws IOException;
}
Loading

0 comments on commit 982e7b1

Please sign in to comment.