Skip to content

Commit

Permalink
[core] Add bloom filter for file index (#3141)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Apr 8, 2024
1 parent 78bc72d commit 942d35d
Show file tree
Hide file tree
Showing 12 changed files with 730 additions and 7 deletions.
11 changes: 11 additions & 0 deletions paimon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ under the License.
<version>1.0.5</version>
</dependency>

<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
</dependency>

<!-- Test -->

<dependency>
Expand Down Expand Up @@ -273,6 +279,7 @@ under the License.
<include>org.codehaus.janino:*</include>
<include>it.unimi.dsi:fastutil</include>
<include>org.roaringbitmap:RoaringBitmap</include>
<include>net.openhft:zero-allocation-hashing</include>
</includes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -312,6 +319,10 @@ under the License.
<pattern>org.roaringbitmap</pattern>
<shadedPattern>org.apache.paimon.shade.org.roaringbitmap</shadedPattern>
</relocation>
<relocation>
<pattern>net.openhft.hashing</pattern>
<shadedPattern>org.apache.paimon.shade.net.openhft.hashing</shadedPattern>
</relocation>
</relocations>
<minimizeJar>true</minimizeJar>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
Expand Down Expand Up @@ -241,9 +242,11 @@ public FileIndexReader readColumnIndex(String columnName) {
return readColumnInputStream(columnName)
.map(
serializedBytes ->
FileIndexer.create(type, fields.get(columnName).type())
.createReader()
.recoverFrom(serializedBytes))
FileIndexer.create(
type,
fields.get(columnName).type(),
new Options())
.createReader(serializedBytes))
.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
*/
public interface FileIndexReader extends FunctionVisitor<Boolean> {

FileIndexReader recoverFrom(byte[] serializedBytes);

@Override
default Boolean visitIsNotNull(FieldRef fieldRef) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

package org.apache.paimon.fileindex;

import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;

import static org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex.BLOOM_FILTER;

/** File index interface. To build a file index. */
public interface FileIndexer {

FileIndexWriter createWriter();

FileIndexReader createReader();
FileIndexReader createReader(byte[] serializedBytes);

static FileIndexer create(String type, DataType dataType) {
static FileIndexer create(String type, DataType dataType, Options options) {
switch (type) {
case BLOOM_FILTER:
return new BloomFilterFileIndex(dataType, options);
default:
throw new RuntimeException("Doesn't support filter type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex.bloomfilter;

import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;

import org.apache.hadoop.util.bloom.HashFunction;

import java.util.BitSet;

/**
* Bloom filter for file index.
*
* <p>Note: This class use {@link BloomFilter64} as a base filter. Store the num hash function (one
* integer) and bit set bytes only. Use {@link HashFunction} to hash the objects, which hash bytes
* type(like varchar, binary, etc.) using xx hash, hash numeric type by specified number hash(see
* http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).
*/
public class BloomFilterFileIndex implements FileIndexer {

public static final String BLOOM_FILTER = "bloom";

private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;

private static final String ITEMS = "items";
private static final String FPP = "fpp";

private final DataType dataType;
private final int items;
private final double fpp;

public BloomFilterFileIndex(DataType dataType, Options options) {
this.dataType = dataType;
this.items = options.getInteger(ITEMS, DEFAULT_ITEMS);
this.fpp = options.getDouble(FPP, DEFAULT_FPP);
}

public String name() {
return BLOOM_FILTER;
}

@Override
public FileIndexWriter createWriter() {
return new Writer(dataType, items, fpp);
}

@Override
public FileIndexReader createReader(byte[] serializedBytes) {
return new Reader(dataType, serializedBytes);
}

private static class Writer implements FileIndexWriter {

private final BloomFilter64 filter;
private final FastHash hashFunction;

public Writer(DataType type, int items, double fpp) {
this.filter = new BloomFilter64(items, fpp);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public void write(Object key) {
filter.addHash(hashFunction.hash(key));
}

@Override
public byte[] serializedBytes() {
int numHashFunctions = filter.getNumHashFunctions();
byte[] bytes = filter.getBitSet().toByteArray();
byte[] serialized = new byte[bytes.length + Integer.BYTES];
serialized[0] = (byte) ((numHashFunctions >>> 24) & 0xFF);
serialized[1] = (byte) ((numHashFunctions >>> 16) & 0xFF);
serialized[2] = (byte) ((numHashFunctions >>> 8) & 0xFF);
serialized[3] = (byte) (numHashFunctions & 0xFF);
System.arraycopy(bytes, 0, serialized, 4, bytes.length);
return serialized;
}
}

private static class Reader implements FileIndexReader {

private final BloomFilter64 filter;
private final FastHash hashFunction;

public Reader(DataType type, byte[] serializedBytes) {
int numHashFunctions =
((serializedBytes[0] << 24)
+ (serializedBytes[1] << 16)
+ (serializedBytes[2] << 8)
+ serializedBytes[3]);
byte[] bytes = new byte[serializedBytes.length - Integer.BYTES];
System.arraycopy(serializedBytes, 4, bytes, 0, bytes.length);
BitSet bitSet = BitSet.valueOf(bytes);
this.filter = new BloomFilter64(numHashFunctions, bitSet);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public Boolean visitEqual(FieldRef fieldRef, Object key) {
return filter.testHash(hashFunction.hash(key));
}
}
}
Loading

0 comments on commit 942d35d

Please sign in to comment.