Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce file index read/write framework. #3177

Merged
merged 29 commits into from
Apr 15, 2024
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
</tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
<td>MemorySize</td>
<td>The threshold to store file index bytes in manifest.</td>
</tr>
<tr>
<td><h5>file-index.read.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
Expand Down
97 changes: 97 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
Expand Down Expand Up @@ -70,6 +71,10 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final String FILE_INDEX = "file-index";

public static final String COLUMNS = "columns";

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -135,6 +140,18 @@ public class CoreOptions implements Serializable {
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());

public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD =
key("file-index.in-manifest-threshold")
.memoryType()
.defaultValue(MemorySize.parse("500 B"))
.withDescription("The threshold to store file index bytes in manifest.");

public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED =
key("file-index.read.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether enabled read file index.");

public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -1703,6 +1720,86 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

public FileIndexOptions indexColumnsOptions() {
String fileIndexPrefix = FILE_INDEX + ".";
String fileIndexColumnSuffix = "." + COLUMNS;

FileIndexOptions fileIndexOptions = new FileIndexOptions(fileIndexInManifestThreshold());
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String key = entry.getKey();
if (key.startsWith(fileIndexPrefix)) {
// start with file-index, decode this option
if (key.endsWith(fileIndexColumnSuffix)) {
// if end with .column, set up indexes
String indexType =
key.substring(
fileIndexPrefix.length(),
key.length() - fileIndexColumnSuffix.length());
String[] names = entry.getValue().split(",");
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in " + key + ", should not have empty column");
}
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
} else {
// else, it must be an option
String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
if (kv.length != 3) {
continue;
}
String indexType = kv[0];
String cname = kv[1];
String opkey = kv[2];

if (fileIndexOptions.get(cname, indexType) == null) {
// if indexes have not set, find .column in options, then set them
String columns =
options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix);
if (columns == null) {
continue;
}
String[] names = columns.split(",");
boolean foundTarget = false;
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in "
+ key
+ ", should not have empty column");
}
String tname = name.trim();
if (cname.equals(tname)) {
foundTarget = true;
}
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
if (!foundTarget) {
throw new IllegalArgumentException(
"Wrong option in "
+ key
+ ", can't found column "
+ cname
+ " in "
+ columns);
}
}
fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue());
}
}
}
return fileIndexOptions;
}

public long fileIndexInManifestThreshold() {
return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
}

public boolean fileIndexReadEnabled() {
return options.get(FILE_INDEX_READ_ENABLED);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@
* File index file format. Put all column and offset in the header.
*
* <pre>
* _______________________________________ _____________________
* _____________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
* | column size
* | column number
* |-------------------------------------|
* | column 1 | index size
* | column 1 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
* | index name 2 |start pos |length |
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
* | column 2 | index size
* | column 2 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
Expand All @@ -82,14 +82,15 @@
* magic: 8 bytes long
* version: 4 bytes int
* head length: 4 bytes int
* index type: var bytes utf (length + bytes)
* body info size: 4 bytes int (how many column items below)
* column name: var bytes utf
* column number: 4 bytes int
* column x: var bytes utf (length + bytes)
* index number: 4 bytes int (how many column items below)
* index name x: var bytes utf
* start pos: 4 bytes int
* length: 4 bytes int
* redundant length: 4 bytes int (for compatibility with later versions, in this version, content is zero)
* redundant bytes: var bytes (for compatibility with later version, in this version, is empty)
* BODY: column bytes + column bytes + column bytes + .......
* BODY: column index bytes + column index bytes + column index bytes + .......
*
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/** Options of file index column. */
public class FileIndexOptions {

// if the filter size greater than fileIndexInManifestThreshold, we put it in file
private final long fileIndexInManifestThreshold;

private final Map<String, Map<String, Options>> indexTypeOptions;

public FileIndexOptions() {
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
}

public FileIndexOptions(long fileIndexInManifestThreshold) {
this.indexTypeOptions = new HashMap<>();
this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
}

public void computeIfAbsent(String column, String indexType) {
indexTypeOptions
.computeIfAbsent(column, c -> new HashMap<>())
.computeIfAbsent(indexType, i -> new Options());
}

public Options get(String column, String indexType) {
return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
.map(x -> x.get(indexType))
.orElse(null);
}

public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}

public long fileIndexInManifestThreshold() {
return fileIndexInManifestThreshold;
}

public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
return indexTypeOptions.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.BloomFilter64.BitSet;

import org.apache.hadoop.util.bloom.HashFunction;

import java.util.BitSet;

/**
* Bloom filter for file index.
*
Expand All @@ -40,7 +39,7 @@
*/
public class BloomFilterFileIndex implements FileIndexer {

public static final String BLOOM_FILTER = "bloom";
public static final String BLOOM_FILTER = "bloom-filter";

private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
Expand Down Expand Up @@ -84,19 +83,21 @@ public Writer(DataType type, int items, double fpp) {

@Override
public void write(Object key) {
filter.addHash(hashFunction.hash(key));
if (key != null) {
filter.addHash(hashFunction.hash(key));
}
}

@Override
public byte[] serializedBytes() {
int numHashFunctions = filter.getNumHashFunctions();
byte[] bytes = filter.getBitSet().toByteArray();
byte[] serialized = new byte[bytes.length + Integer.BYTES];
byte[] serialized = new byte[filter.getBitSet().bitSize() / Byte.SIZE + Integer.BYTES];
// little endian
serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
serialized[3] = (byte) (numHashFunctions & 0xFF);
System.arraycopy(bytes, 0, serialized, 4, bytes.length);
filter.getBitSet().toByteArray(serialized, 4, serialized.length - 4);
return serialized;
}
}
Expand All @@ -107,21 +108,20 @@ private static class Reader implements FileIndexReader {
private final FastHash hashFunction;

public Reader(DataType type, byte[] serializedBytes) {
// little endian
int numHashFunctions =
((serializedBytes[0] << 24)
+ (serializedBytes[1] << 16)
+ (serializedBytes[2] << 8)
+ serializedBytes[3]);
byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
BitSet bitSet = BitSet.valueOf(bytes);
BitSet bitSet = new BitSet(serializedBytes, 4);
this.filter = new BloomFilter64(numHashFunctions, bitSet);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
return filter.testHash(hashFunction.hash(key));
return key == null || filter.testHash(hashFunction.hash(key));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.utils;

import java.util.BitSet;

/** Bloom filter 64 handle 64 bits hash. */
public final class BloomFilter64 {

Expand All @@ -29,15 +27,15 @@ public final class BloomFilter64 {

public BloomFilter64(long items, double fpp) {
int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE));
this.numHashFunctions =
Math.max(1, (int) Math.round((double) numBits / items * Math.log(2)));
this.bitSet = new BitSet(numBits);
this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0);
}

public BloomFilter64(int numHashFunctions, BitSet bitSet) {
this.numHashFunctions = numHashFunctions;
this.numBits = bitSet.size();
this.numBits = bitSet.bitSize();
this.bitSet = bitSet;
}

Expand Down Expand Up @@ -81,4 +79,38 @@ public int getNumHashFunctions() {
public BitSet getBitSet() {
return bitSet;
}

/** Bit set used for bloom filter 64. */
public static class BitSet {

private static final byte MAST = 0x07;

private final byte[] data;
private final int offset;

public BitSet(byte[] data, int offset) {
assert data.length > 0 : "data length is zero!";
assert offset >= 0 : "offset is negative!";
this.data = data;
this.offset = offset;
}

public void set(int index) {
data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index & MAST));
}

public boolean get(int index) {
return (data[(index >>> 3) + offset] & ((byte) 1 << (index & MAST))) != 0;
}

public int bitSize() {
return (data.length - offset) * Byte.SIZE;
}

public void toByteArray(byte[] bytes, int offset, int length) {
if (length >= 0) {
System.arraycopy(data, this.offset, bytes, offset, length);
}
}
}
}
Loading
Loading