Skip to content

Commit

Permalink
introduce DynamicBloomFilter file index
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Dec 16, 2024
1 parent 6010e61 commit 8e83455
Show file tree
Hide file tree
Showing 10 changed files with 915 additions and 5 deletions.
11 changes: 11 additions & 0 deletions docs/content/concepts/spec/fileindex.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ Content of bloom filter index is simple:
This class use (64-bits) long hash. Store the num hash function (one integer) and bit set bytes only. Hash bytes type
(like varchar, binary, etc.) using xx hash, hash numeric type by [specified number hash](http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).

## Column Index Bytes: DynamicBloomFilter

Define `'file-index.dynamic-bloom-filter.columns'`.

Content of dynamic bloom filter index is simple:
- version 1 byte
- numBloomFilter 4 bytes int
- bloomFilterVectorSize 4 bytes int
- numHashFunctions 4 bytes int
- bloom filter bytes

## Column Index Bytes: Bitmap

Define `'file-index.bitmap.columns'`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex.dynamicbloomfilter;

import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fileindex.bloomfilter.FastHash;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.BloomFilter64;
import org.apache.paimon.utils.DynamicBloomFilter;

import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;

import static org.apache.paimon.fileindex.FileIndexResult.REMAIN;
import static org.apache.paimon.fileindex.FileIndexResult.SKIP;

public class DynamicBloomFilterFileIndex implements FileIndexer {

public static final int VERSION_1 = 1;

private static final int DEFAULT_ITEMS = 1_000_000;
private static final double DEFAULT_FPP = 0.1;
private static final int DEFAULT_MAX_ITEMS = 10_000_000;

private static final String ITEMS = "items";
private static final String FPP = "fpp";
private static final String MAX_ITEMS = "max_items";

private final DataType dataType;
private final int items;
private final double fpp;

private final int maxItems;

public DynamicBloomFilterFileIndex(DataType dataType, Options options) {
this.dataType = dataType;
this.items = options.getInteger(ITEMS, DEFAULT_ITEMS);
this.fpp = options.getDouble(FPP, DEFAULT_FPP);
this.maxItems = options.getInteger(MAX_ITEMS, DEFAULT_MAX_ITEMS);
}

@Override
public FileIndexWriter createWriter() {
return new DynamicBloomFilterFileIndex.Writer(dataType, items, fpp, maxItems);
}

@Override
public FileIndexReader createReader(SeekableInputStream inputStream, int start, int length) {
try {
inputStream.seek(start);
DataInput input = new DataInputStream(inputStream);
byte version = input.readByte();
if (version > VERSION_1) {
throw new RuntimeException(
String.format(
"read dynamicBloomFilter index file fail, "
+ "your plugin version is lower than %d",
version));
}
int numBloomFilters = input.readInt();
int vectorSize = input.readInt();
int numHashFunctions = input.readInt();

BloomFilter64[] bloomFilter64s = new BloomFilter64[numBloomFilters];
for (int i = 0; i < numBloomFilters; i++) {
byte[] serializedBytes = new byte[vectorSize / Byte.SIZE];
input.readFully(serializedBytes);
BloomFilter64.BitSet bitSet = new BloomFilter64.BitSet(serializedBytes, 0);
bloomFilter64s[i] = new BloomFilter64(numHashFunctions, bitSet);
}
return new DynamicBloomFilterFileIndex.Reader(dataType, bloomFilter64s);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class Reader extends FileIndexReader {

private final DynamicBloomFilter dynamicBloomFilter;
private final FastHash hashFunction;

public Reader(DataType type, BloomFilter64[] bloomFilter64s) {
this.dynamicBloomFilter = new DynamicBloomFilter(bloomFilter64s);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public FileIndexResult visitEqual(FieldRef fieldRef, Object key) {
return key == null || dynamicBloomFilter.testHash(hashFunction.hash(key))
? REMAIN
: SKIP;
}
}

private static class Writer extends FileIndexWriter {

private final DynamicBloomFilter filter;
private final FastHash hashFunction;

public Writer(DataType type, int items, double fpp, int maxItems) {
this.filter = new DynamicBloomFilter(items, fpp, maxItems);
this.hashFunction = FastHash.getHashFunction(type);
}

@Override
public void write(Object key) {
if (key != null) {
filter.addHash(hashFunction.hash(key));
}
}

@Override
public byte[] serializedBytes() {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(bos);
BloomFilter64[] bloomFilterMatrix = filter.matrix();

// 1. write meta
out.writeByte(VERSION_1);
out.writeInt(bloomFilterMatrix.length);
// each bloom filter has same num of hashFunction and bitSet
out.writeInt(bloomFilterMatrix[0].getBitSet().bitSize());
out.writeInt(bloomFilterMatrix[0].getNumHashFunctions());

// 2. write each filter's bitset
for (BloomFilter64 filterMatrix : bloomFilterMatrix) {
byte[] serialized = new byte[filterMatrix.getBitSet().bitSize() / Byte.SIZE];
filterMatrix.getBitSet().toByteArray(serialized, 0, serialized.length);
out.write(serialized);
}
return bos.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex.dynamicbloomfilter;

import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fileindex.FileIndexerFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;

public class DynamicBloomFilterFileIndexFactory implements FileIndexerFactory {

public static final String DYNAMIC_BLOOM_FILTER = "dynamic-bloom-filter";

@Override
public String identifier() {
return DYNAMIC_BLOOM_FILTER;
}

@Override
public FileIndexer create(DataType dataType, Options options) {
return new DynamicBloomFilterFileIndex(dataType, options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

/* This file is based on source code from the hudi Project (http://hudi.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

/**
* Dynamic Bloom Filter. This is largely based of
* org.apache.hudi.common.bloom.InternalDynamicBloomFilter.
*/
public class DynamicBloomFilter {

/** Threshold for the maximum number of key to record in a dynamic Bloom filter row. */
private int items;

/** The number of keys recorded in the current standard active Bloom filter. */
private int currentNbRecord;

private int maxItems;
private boolean reachedMax = false;
private int curMatrixIndex = 0;
private double fpp;

/** The matrix of Bloom filter. */
private BloomFilter64[] matrix;

public DynamicBloomFilter(BloomFilter64[] bloomFilter64s) {
this.matrix = bloomFilter64s;
}

public DynamicBloomFilter(int items, double fpp, int maxItems) {
this.items = items;
this.currentNbRecord = 0;
this.maxItems = maxItems;
this.fpp = fpp;
matrix = new BloomFilter64[1];
matrix[0] = new BloomFilter64(items, fpp);
}

public void addHash(long hash64) {
BloomFilter64 bf = getActiveStandardBF();
if (bf == null) {
addRow();
bf = matrix[matrix.length - 1];
currentNbRecord = 0;
}
bf.addHash(hash64);
currentNbRecord++;
}

/** Adds a new row to <i>this</i> dynamic Bloom filter. */
private void addRow() {
BloomFilter64[] tmp = new BloomFilter64[matrix.length + 1];
System.arraycopy(matrix, 0, tmp, 0, matrix.length);
tmp[tmp.length - 1] = new BloomFilter64(items, fpp);
matrix = tmp;
}

public boolean testHash(long key) {
for (BloomFilter64 bloomFilter : matrix) {
if (bloomFilter.testHash(key)) {
return true;
}
}
return false;
}

/**
* Returns the active standard Bloom filter in <i>this</i> dynamic Bloom filter.
*
* @return BloomFilter64 The active standard Bloom filter. <code>Null</code> otherwise.
*/
private BloomFilter64 getActiveStandardBF() {
if (reachedMax) {
return matrix[curMatrixIndex++ % matrix.length];
}

if (currentNbRecord >= items && (matrix.length * items) < maxItems) {
return null;
} else if (currentNbRecord >= items && (matrix.length * items) >= maxItems) {
reachedMax = true;
return matrix[0];
}
return matrix[matrix.length - 1];
}

public BloomFilter64[] matrix() {
return matrix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@

org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory
org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory
org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory
org.apache.paimon.fileindex.dynamicbloomfilter.DynamicBloomFilterFileIndexFactory
Loading

0 comments on commit 8e83455

Please sign in to comment.