Skip to content

Commit

Permalink
[core] Introduce file index read/write framework. (#3177)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Apr 15, 2024
1 parent ffb9032 commit b3eeea9
Show file tree
Hide file tree
Showing 42 changed files with 994 additions and 111 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
</tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
<td style="word-wrap: break-word;">500 bytes</td>
<td>MemorySize</td>
<td>The threshold to store file index bytes in manifest.</td>
</tr>
<tr>
<td><h5>file-index.read.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enabled read file index.</td>
</tr>
<tr>
<td><h5>file-reader-async-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
Expand Down
97 changes: 97 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
Expand Down Expand Up @@ -70,6 +71,10 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final String FILE_INDEX = "file-index";

public static final String COLUMNS = "columns";

public static final ConfigOption<Integer> BUCKET =
key("bucket")
.intType()
Expand Down Expand Up @@ -135,6 +140,18 @@ public class CoreOptions implements Serializable {
"Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by "
+ FILE_COMPRESSION_PER_LEVEL.key());

public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD =
key("file-index.in-manifest-threshold")
.memoryType()
.defaultValue(MemorySize.parse("500 B"))
.withDescription("The threshold to store file index bytes in manifest.");

public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED =
key("file-index.read.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether enabled read file index.");

public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
Expand Down Expand Up @@ -1703,6 +1720,86 @@ public boolean deletionVectorsEnabled() {
return options.get(DELETION_VECTORS_ENABLED);
}

public FileIndexOptions indexColumnsOptions() {
String fileIndexPrefix = FILE_INDEX + ".";
String fileIndexColumnSuffix = "." + COLUMNS;

FileIndexOptions fileIndexOptions = new FileIndexOptions(fileIndexInManifestThreshold());
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String key = entry.getKey();
if (key.startsWith(fileIndexPrefix)) {
// start with file-index, decode this option
if (key.endsWith(fileIndexColumnSuffix)) {
// if end with .column, set up indexes
String indexType =
key.substring(
fileIndexPrefix.length(),
key.length() - fileIndexColumnSuffix.length());
String[] names = entry.getValue().split(",");
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in " + key + ", should not have empty column");
}
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
} else {
// else, it must be an option
String[] kv = key.substring(fileIndexPrefix.length()).split("\\.");
if (kv.length != 3) {
continue;
}
String indexType = kv[0];
String cname = kv[1];
String opkey = kv[2];

if (fileIndexOptions.get(cname, indexType) == null) {
// if indexes have not set, find .column in options, then set them
String columns =
options.get(fileIndexPrefix + indexType + fileIndexColumnSuffix);
if (columns == null) {
continue;
}
String[] names = columns.split(",");
boolean foundTarget = false;
for (String name : names) {
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException(
"Wrong option in "
+ key
+ ", should not have empty column");
}
String tname = name.trim();
if (cname.equals(tname)) {
foundTarget = true;
}
fileIndexOptions.computeIfAbsent(name.trim(), indexType);
}
if (!foundTarget) {
throw new IllegalArgumentException(
"Wrong option in "
+ key
+ ", can't found column "
+ cname
+ " in "
+ columns);
}
}
fileIndexOptions.get(cname, indexType).set(opkey, entry.getValue());
}
}
}
return fileIndexOptions;
}

public long fileIndexInManifestThreshold() {
return options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
}

public boolean fileIndexReadEnabled() {
return options.get(FILE_INDEX_READ_ENABLED);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.", true, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@
* File index file format. Put all column and offset in the header.
*
* <pre>
* _______________________________________ _____________________
* _____________________________________ _____________________
* | magic |version|head length |
* |-------------------------------------|
* | column size
* | column number
* |-------------------------------------|
* | column 1 | index size
* | column 1 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
* | index name 2 |start pos |length |
* |-------------------------------------|
* | index name 3 |start pos |length |
* |-------------------------------------| HEAD
* | column 2 | index size
* | column 2 | index number
* |-------------------------------------|
* | index name 1 |start pos |length |
* |-------------------------------------|
Expand All @@ -82,14 +82,15 @@
* magic: 8 bytes long
* version: 4 bytes int
* head length: 4 bytes int
* index type: var bytes utf (length + bytes)
* body info size: 4 bytes int (how many column items below)
* column name: var bytes utf
* column number: 4 bytes int
* column x: var bytes utf (length + bytes)
* index number: 4 bytes int (how many column items below)
* index name x: var bytes utf
* start pos: 4 bytes int
* length: 4 bytes int
* redundant length: 4 bytes int (for compatibility with later versions, in this version, content is zero)
* redundant bytes: var bytes (for compatibility with later version, in this version, is empty)
* BODY: column bytes + column bytes + column bytes + .......
* BODY: column index bytes + column index bytes + column index bytes + .......
*
* </pre>
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/** Options of file index column. */
public class FileIndexOptions {

// if the filter size greater than fileIndexInManifestThreshold, we put it in file
private final long fileIndexInManifestThreshold;

private final Map<String, Map<String, Options>> indexTypeOptions;

public FileIndexOptions() {
this(CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD.defaultValue().getBytes());
}

public FileIndexOptions(long fileIndexInManifestThreshold) {
this.indexTypeOptions = new HashMap<>();
this.fileIndexInManifestThreshold = fileIndexInManifestThreshold;
}

public void computeIfAbsent(String column, String indexType) {
indexTypeOptions
.computeIfAbsent(column, c -> new HashMap<>())
.computeIfAbsent(indexType, i -> new Options());
}

public Options get(String column, String indexType) {
return Optional.ofNullable(indexTypeOptions.getOrDefault(column, null))
.map(x -> x.get(indexType))
.orElse(null);
}

public boolean isEmpty() {
return indexTypeOptions.isEmpty();
}

public long fileIndexInManifestThreshold() {
return fileIndexInManifestThreshold;
}

public Set<Map.Entry<String, Map<String, Options>>> entrySet() {
return indexTypeOptions.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.BloomFilter64.BitSet;

import org.apache.hadoop.util.bloom.HashFunction;

import java.util.BitSet;

/**
* Bloom filter for file index.
*
Expand All @@ -40,7 +39,7 @@
*/
public class BloomFilterFileIndex implements FileIndexer {

public static final String BLOOM_FILTER = "bloom";
public static final String BLOOM_FILTER = "bloom-filter";

private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
Expand Down Expand Up @@ -84,19 +83,21 @@ public Writer(DataType type, int items, double fpp) {

@Override
public void write(Object key) {
filter.addHash(hashFunction.hash(key));
if (key != null) {
filter.addHash(hashFunction.hash(key));
}
}

@Override
public byte[] serializedBytes() {
int numHashFunctions = filter.getNumHashFunctions();
byte[] bytes = filter.getBitSet().toByteArray();
byte[] serialized = new byte[bytes.length + Integer.BYTES];
byte[] serialized = new byte[filter.getBitSet().bitSize() / Byte.SIZE + Integer.BYTES];
// little endian
serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
serialized[3] = (byte) (numHashFunctions & 0xFF);
System.arraycopy(bytes, 0, serialized, 4, bytes.length);
filter.getBitSet().toByteArray(serialized, 4, serialized.length - 4);
return serialized;
}
}
Expand All @@ -107,21 +108,20 @@ private static class Reader implements FileIndexReader {
private final FastHash hashFunction;

public Reader(DataType type, byte[] serializedBytes) {
// little endian
int numHashFunctions =
((serializedBytes[0] << 24)
+ (serializedBytes[1] << 16)
+ (serializedBytes[2] << 8)
+ serializedBytes[3]);
byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
BitSet bitSet = BitSet.valueOf(bytes);
BitSet bitSet = new BitSet(serializedBytes, 4);
this.filter = new BloomFilter64(numHashFunctions, bitSet);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
return filter.testHash(hashFunction.hash(key));
return key == null || filter.testHash(hashFunction.hash(key));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.utils;

import java.util.BitSet;

/** Bloom filter 64 handle 64 bits hash. */
public final class BloomFilter64 {

Expand All @@ -29,15 +27,15 @@ public final class BloomFilter64 {

public BloomFilter64(long items, double fpp) {
int nb = (int) (-items * Math.log(fpp) / (Math.log(2) * Math.log(2)));
this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
this.numBits = nb + (Byte.SIZE - (nb % Byte.SIZE));
this.numHashFunctions =
Math.max(1, (int) Math.round((double) numBits / items * Math.log(2)));
this.bitSet = new BitSet(numBits);
this.bitSet = new BitSet(new byte[numBits / Byte.SIZE], 0);
}

public BloomFilter64(int numHashFunctions, BitSet bitSet) {
this.numHashFunctions = numHashFunctions;
this.numBits = bitSet.size();
this.numBits = bitSet.bitSize();
this.bitSet = bitSet;
}

Expand Down Expand Up @@ -81,4 +79,38 @@ public int getNumHashFunctions() {
public BitSet getBitSet() {
return bitSet;
}

/** Bit set used for bloom filter 64. */
public static class BitSet {

private static final byte MAST = 0x07;

private final byte[] data;
private final int offset;

public BitSet(byte[] data, int offset) {
assert data.length > 0 : "data length is zero!";
assert offset >= 0 : "offset is negative!";
this.data = data;
this.offset = offset;
}

public void set(int index) {
data[(index >>> 3) + offset] |= (byte) ((byte) 1 << (index & MAST));
}

public boolean get(int index) {
return (data[(index >>> 3) + offset] & ((byte) 1 << (index & MAST))) != 0;
}

public int bitSize() {
return (data.length - offset) * Byte.SIZE;
}

public void toByteArray(byte[] bytes, int offset, int length) {
if (length >= 0) {
System.arraycopy(data, this.offset, bytes, offset, length);
}
}
}
}
Loading

0 comments on commit b3eeea9

Please sign in to comment.