From 2218eced1f74fee4095f7df3884f9f7f49f3691a Mon Sep 17 00:00:00 2001 From: yejunhao Date: Thu, 28 Mar 2024 16:19:53 +0800 Subject: [PATCH 1/6] [core] Add bloom filter for file index --- .../paimon/fileindex/FastHashForNumber.java | 164 ++++++++++ .../apache/paimon/fileindex/FileIndexer.java | 5 + .../fileindex/ObjectToBytesVisitor.java | 304 ++++++++++++++++++ .../fileindex/bloomfilter/BloomFilter.java | 129 ++++++++ .../bloomfilter/HadoopBloomFilter.java | 226 +++++++++++++ .../bloomfilter/HadoopDynamicBloomFilter.java | 273 ++++++++++++++++ .../fileindex/bloomfilter/HadoopFilter.java | 220 +++++++++++++ .../fileindex/ObjectToBytesVisitorTest.java | 233 ++++++++++++++ .../bloomfilter/BloomFilterTest.java | 73 +++++ 9 files changed, 1627 insertions(+) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/ObjectToBytesVisitor.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopBloomFilter.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopDynamicBloomFilter.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopFilter.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/ObjectToBytesVisitorTest.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java new file mode 100644 index 000000000000..bbc210f19fa4 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import java.util.Optional; +import java.util.function.Function; + +public class FastHashForNumber implements DataTypeVisitor>> { + + public static final FastHashForNumber INSTANCE = new FastHashForNumber(); + + @Override + public Optional> visit(CharType charType) { + return Optional.empty(); + } + + @Override + public Optional> visit(VarCharType varCharType) { + return Optional.empty(); + } + + @Override + public Optional> visit(BooleanType booleanType) { + return Optional.empty(); + } + + @Override + public Optional> visit(BinaryType binaryType) { + return Optional.empty(); + } + + @Override + public Optional> visit(VarBinaryType varBinaryType) { + return Optional.empty(); + } + + @Override + public Optional> visit(DecimalType decimalType) { + return Optional.of(o -> o == null ? 0L : getLongHash(((Decimal) o).toUnscaledLong())); + } + + @Override + public Optional> visit(TinyIntType tinyIntType) { + return Optional.of(o -> o == null ? 0L : getLongHash((byte) o)); + } + + @Override + public Optional> visit(SmallIntType smallIntType) { + return Optional.of(o -> o == null ? 0L : getLongHash((short) o)); + } + + @Override + public Optional> visit(IntType intType) { + return Optional.of(o -> o == null ? 0L : getLongHash((int) o)); + } + + @Override + public Optional> visit(BigIntType bigIntType) { + return Optional.of(o -> o == null ? 0L : getLongHash((long) o)); + } + + @Override + public Optional> visit(FloatType floatType) { + return Optional.of(o -> o == null ? 0L : getLongHash(Float.floatToIntBits((float) o))); + } + + @Override + public Optional> visit(DoubleType doubleType) { + return Optional.of(o -> o == null ? 0L : getLongHash(Double.doubleToLongBits((double) o))); + } + + @Override + public Optional> visit(DateType dateType) { + return Optional.of(o -> o == null ? 0L : getLongHash((int) o)); + } + + @Override + public Optional> visit(TimeType timeType) { + return Optional.of(o -> o == null ? 0L : getLongHash((int) o)); + } + + @Override + public Optional> visit(TimestampType timestampType) { + return Optional.of(o -> o == null ? 0L : getLongHash(((Timestamp) o).getMillisecond())); + } + + @Override + public Optional> visit(LocalZonedTimestampType localZonedTimestampType) { + return Optional.of(o -> o == null ? 0L : getLongHash(((Timestamp) o).getMillisecond())); + } + + @Override + public Optional> visit(ArrayType arrayType) { + return Optional.empty(); + } + + @Override + public Optional> visit(MultisetType multisetType) { + return Optional.empty(); + } + + @Override + public Optional> visit(MapType mapType) { + return Optional.empty(); + } + + @Override + public Optional> visit(RowType rowType) { + return Optional.empty(); + } + + // Thomas Wang's integer hash function + // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm + static long getLongHash(long key) { + key = (~key) + (key << 21); // key = (key << 21) - key - 1; + key = key ^ (key >> 24); + key = (key + (key << 3)) + (key << 8); // key * 265 + key = key ^ (key >> 14); + key = (key + (key << 2)) + (key << 4); // key * 21 + key = key ^ (key >> 28); + key = key + (key << 31); + return key; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java index e7e3d40bfe6f..53a8efd65942 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java @@ -18,8 +18,11 @@ package org.apache.paimon.fileindex; +import org.apache.paimon.fileindex.bloomfilter.BloomFilter; import org.apache.paimon.types.DataType; +import static org.apache.paimon.fileindex.bloomfilter.BloomFilter.BLOOM_FILTER; + /** File index interface. To build a file index. */ public interface FileIndexer { @@ -29,6 +32,8 @@ public interface FileIndexer { static FileIndexer create(String type, DataType dataType) { switch (type) { + case BLOOM_FILTER: + return new BloomFilter(dataType); default: throw new RuntimeException("Doesn't support filter type: " + type); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/ObjectToBytesVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/ObjectToBytesVisitor.java new file mode 100644 index 000000000000..453e8151502d --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/ObjectToBytesVisitor.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +/** Convert different type object to bytes. */ +public class ObjectToBytesVisitor implements DataTypeVisitor> { + + public static final ObjectToBytesVisitor INSTANCE = new ObjectToBytesVisitor(); + + public static final byte[] NULL_BYTES = new byte[1]; + + static { + Arrays.fill(NULL_BYTES, (byte) 0x00); + } + + @Override + public Function visit(CharType charType) { + return o -> o == null ? NULL_BYTES : ((BinaryString) o).toBytes(); + } + + @Override + public Function visit(VarCharType varCharType) { + return o -> o == null ? NULL_BYTES : ((BinaryString) o).toBytes(); + } + + @Override + public Function visit(BooleanType booleanType) { + return o -> o == null ? NULL_BYTES : ((Boolean) o) ? new byte[] {0x01} : new byte[] {0x00}; + } + + @Override + public Function visit(BinaryType binaryType) { + return o -> o == null ? NULL_BYTES : (byte[]) o; + } + + @Override + public Function visit(VarBinaryType varBinaryType) { + return o -> o == null ? NULL_BYTES : (byte[]) o; + } + + @Override + public Function visit(DecimalType decimalType) { + return o -> o == null ? NULL_BYTES : ((Decimal) o).toUnscaledBytes(); + } + + @Override + public Function visit(TinyIntType tinyIntType) { + return o -> o == null ? NULL_BYTES : new byte[] {(byte) o}; + } + + @Override + public Function visit(SmallIntType smallIntType) { + return o -> + o == null + ? NULL_BYTES + : new byte[] {(byte) ((short) o & 0xff), (byte) ((short) o >> 8 & 0xff)}; + } + + @Override + public Function visit(IntType intType) { + return o -> o == null ? NULL_BYTES : intToBytes((int) o); + } + + @Override + public Function visit(BigIntType bigIntType) { + return o -> o == null ? NULL_BYTES : longToBytes((long) o); + } + + @Override + public Function visit(FloatType floatType) { + return o -> o == null ? NULL_BYTES : intToBytes(Float.floatToIntBits((float) o)); + } + + @Override + public Function visit(DoubleType doubleType) { + return o -> o == null ? NULL_BYTES : longToBytes(Double.doubleToLongBits((double) o)); + } + + @Override + public Function visit(DateType dateType) { + return o -> o == null ? NULL_BYTES : intToBytes((int) o); + } + + @Override + public Function visit(TimeType timeType) { + return o -> o == null ? NULL_BYTES : intToBytes((int) o); + } + + @Override + public Function visit(TimestampType timestampType) { + return o -> o == null ? NULL_BYTES : longToBytes(((Timestamp) o).getMillisecond()); + } + + @Override + public Function visit(LocalZonedTimestampType localZonedTimestampType) { + return o -> o == null ? NULL_BYTES : longToBytes(((Timestamp) o).getMillisecond()); + } + + @Override + public Function visit(ArrayType arrayType) { + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(arrayType.getElementType()); + Function converter = arrayType.getElementType().accept(this); + return o -> { + if (o == null) { + return NULL_BYTES; + } + InternalArray internalArray = (InternalArray) o; + int count = 0; + byte[][] bytes = new byte[internalArray.size()][]; + for (int i = 0; i < internalArray.size(); i++) { + bytes[i] = converter.apply(elementGetter.getElementOrNull(internalArray, i)); + count += bytes[i].length; + } + + byte[] result = new byte[count]; + int position = 0; + for (int i = 0; i < internalArray.size(); i++) { + System.arraycopy(bytes[i], 0, result, position, bytes[i].length); + position += bytes[i].length; + } + return result; + }; + } + + @Override + public Function visit(MultisetType multisetType) { + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(multisetType.getElementType()); + Function converter = multisetType.getElementType().accept(this); + return o -> { + if (o == null) { + return NULL_BYTES; + } + InternalMap map = (InternalMap) o; + int count = 0; + byte[][] bytes = new byte[map.size()][]; + InternalArray keyArray = map.keyArray(); + for (int i = 0; i < map.size(); i++) { + bytes[i] = converter.apply(elementGetter.getElementOrNull(keyArray, i)); + count += bytes[i].length; + } + + byte[] result = new byte[count]; + int position = 0; + for (int i = 0; i < map.size(); i++) { + System.arraycopy(bytes[i], 0, result, position, bytes[i].length); + position += bytes[i].length; + } + return result; + }; + } + + @Override + public Function visit(MapType mapType) { + InternalArray.ElementGetter keyElementGetter = + InternalArray.createElementGetter(mapType.getKeyType()); + Function keyConverter = mapType.getKeyType().accept(this); + + InternalArray.ElementGetter valueElementGetter = + InternalArray.createElementGetter(mapType.getValueType()); + Function valueConverter = mapType.getValueType().accept(this); + + return o -> { + if (o == null) { + return NULL_BYTES; + } + InternalMap map = (InternalMap) o; + + int count = 0; + byte[][] keyBytes = new byte[map.size()][]; + InternalArray keyArray = map.keyArray(); + for (int i = 0; i < map.size(); i++) { + keyBytes[i] = keyConverter.apply(keyElementGetter.getElementOrNull(keyArray, i)); + count += keyBytes[i].length; + } + + byte[][] valueBytes = new byte[map.size()][]; + InternalArray valueArray = map.valueArray(); + for (int i = 0; i < map.size(); i++) { + valueBytes[i] = + valueConverter.apply(valueElementGetter.getElementOrNull(valueArray, i)); + count += valueBytes[i].length; + } + + byte[] result = new byte[count]; + int position = 0; + for (int i = 0; i < map.size(); i++) { + System.arraycopy(keyBytes[i], 0, result, position, keyBytes[i].length); + position += keyBytes[i].length; + System.arraycopy(valueBytes[i], 0, result, position, valueBytes[i].length); + position += valueBytes[i].length; + } + return result; + }; + } + + @Override + public Function visit(RowType rowType) { + List fieldList = rowType.getFields(); + List getters = new ArrayList<>(); + List> converters = new ArrayList<>(); + for (int i = 0; i < fieldList.size(); i++) { + getters.add(InternalRow.createFieldGetter(fieldList.get(i).type(), i)); + converters.add(fieldList.get(i).type().accept(this)); + } + return o -> { + if (o == null) { + return NULL_BYTES; + } + InternalRow secondRow = (InternalRow) o; + + int count = 0; + byte[][] bytes = new byte[rowType.getFieldCount()][]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + bytes[i] = converters.get(i).apply(getters.get(i).getFieldOrNull(secondRow)); + count += bytes[i].length; + } + + byte[] result = new byte[count]; + int position = 0; + for (int i = 0; i < rowType.getFieldCount(); i++) { + System.arraycopy(bytes[i], 0, result, position, bytes[i].length); + position += bytes[i].length; + } + return result; + }; + } + + @VisibleForTesting + static byte[] longToBytes(long x) { + return new byte[] { + (byte) (x & 0xff), + (byte) (x >> 8 & 0xff), + (byte) (x >> 16 & 0xff), + (byte) (x >> 24 & 0xff), + (byte) (x >> 32 & 0xff), + (byte) (x >> 40 & 0xff), + (byte) (x >> 48 & 0xff), + (byte) (x >> 56 & 0xff) + }; + } + + @VisibleForTesting + static byte[] intToBytes(int x) { + return new byte[] { + (byte) (x & 0xff), + (byte) (x >> 8 & 0xff), + (byte) (x >> 16 & 0xff), + (byte) (x >> 24 & 0xff) + }; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java new file mode 100644 index 000000000000..a966f3d600ad --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bloomfilter; + +import org.apache.paimon.fileindex.FastHashForNumber; +import org.apache.paimon.fileindex.FileIndexReader; +import org.apache.paimon.fileindex.FileIndexWriter; +import org.apache.paimon.fileindex.FileIndexer; +import org.apache.paimon.fileindex.ObjectToBytesVisitor; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.types.DataType; + +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +/** Bloom filter for secondary index. */ +public class BloomFilter implements FileIndexer { + + public static final String BLOOM_FILTER = "bloom"; + + private static final int HASH_NUMBER = 4; + + private final HadoopDynamicBloomFilter filter = + new HadoopDynamicBloomFilter(50 * 8, HASH_NUMBER, Hash.MURMUR_HASH, 30); + // reuse + private final Key filterKey = new Key(); + + private final Function converter; + + private final Optional> hashFunction; + + public BloomFilter(DataType type) { + this.converter = type.accept(ObjectToBytesVisitor.INSTANCE); + hashFunction = type.accept(FastHashForNumber.INSTANCE); + } + + public String name() { + return BLOOM_FILTER; + } + + @Override + public FileIndexWriter createWriter() { + return new Writer(); + } + + @Override + public FileIndexReader createReader() { + return new Reader(); + } + + private class Writer implements FileIndexWriter { + + private Writer() {} + + @Override + public void write(Object key) { + if (hashFunction.isPresent()) { + filter.addHash(hashFunction.get().apply(key)); + } else { + filterKey.set(converter.apply(key), 1.0); + filter.add(filterKey); + } + } + + @Override + public byte[] serializedBytes() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + DataOutputStream dos = new DataOutputStream(baos); + + try { + filter.write(dos); + byte[] bytes = baos.toByteArray(); + dos.close(); + return bytes; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private class Reader implements FileIndexReader { + + @Override + public Reader recoverFrom(byte[] serializedBytes) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes)); + + try { + filter.readFields(dis); + } catch (IOException e) { + throw new RuntimeException(e); + } + return this; + } + + @Override + public Boolean visitEqual(FieldRef fieldRef, Object key) { + if (hashFunction.isPresent()) { + return filter.membershipTest(hashFunction.get().apply(key)); + } else { + filterKey.set(converter.apply(key), 1.0); + return filter.membershipTest(filterKey); + } + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopBloomFilter.java new file mode 100644 index 000000000000..8805a2253db3 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopBloomFilter.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bloomfilter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.bloom.Key; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.BitSet; + +/* This file is based on source code from the Hadoop Project (https://hadoop.apache.org//), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Mostly copied from hadoop BloomFilter. */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HadoopBloomFilter extends HadoopFilter { + private static final byte[] bitvalues = + new byte[] { + (byte) 0x01, + (byte) 0x02, + (byte) 0x04, + (byte) 0x08, + (byte) 0x10, + (byte) 0x20, + (byte) 0x40, + (byte) 0x80 + }; + + /** The bit vector. */ + BitSet bits; + + /** Default constructor - use with readFields. */ + public HadoopBloomFilter() { + super(); + } + + /** + * Constructor. + * + * @param vectorSize The vector size of this filter. + * @param nbHash The number of hash function to consider. + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). + */ + public HadoopBloomFilter(int vectorSize, int nbHash, int hashType) { + super(vectorSize, nbHash, hashType); + + bits = new BitSet(this.vectorSize); + } + + @Override + public boolean add(Key key) { + if (key == null) { + throw new NullPointerException("key cannot be null"); + } + + int[] h = hash.hash(key); + hash.clear(); + + boolean newRecord = false; + for (int i = 0; i < nbHash; i++) { + if (!bits.get(h[i])) { + newRecord = true; + bits.set(h[i]); + } + } + return newRecord; + } + + @Override + public boolean addHash(long hash64) { + boolean newRecord = false; + int hash1 = (int) hash64; + int hash2 = (int) (hash64 >>> 32); + for (int i = 0; i < nbHash; i++) { + int hash = hash1 + (i * hash2); + if (!bits.get(hash)) { + newRecord = true; + bits.set(hash); + } + } + return newRecord; + } + + @Override + public void and(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + + this.bits.and(((HadoopBloomFilter) filter).bits); + } + + @Override + public boolean membershipTest(Key key) { + if (key == null) { + throw new NullPointerException("key cannot be null"); + } + + int[] h = hash.hash(key); + hash.clear(); + for (int i = 0; i < nbHash; i++) { + if (!bits.get(h[i])) { + return false; + } + } + return true; + } + + @Override + public boolean membershipTest(long hash64) { + int hash1 = (int) hash64; + int hash2 = (int) (hash64 >>> 32); + for (int i = 0; i < nbHash; i++) { + int hash = hash1 + (i * hash2); + if (!bits.get(hash)) { + return false; + } + } + + return false; + } + + @Override + public void not() { + bits.flip(0, vectorSize); + } + + @Override + public void or(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + bits.or(((HadoopBloomFilter) filter).bits); + } + + @Override + public void xor(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + bits.xor(((HadoopBloomFilter) filter).bits); + } + + @Override + public String toString() { + return bits.toString(); + } + + /** @return size of the the bloomfilter */ + public int getVectorSize() { + return this.vectorSize; + } + + // Writable + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + byte[] bytes = new byte[getNBytes()]; + for (int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) { + if (bitIndex == 8) { + bitIndex = 0; + byteIndex++; + } + if (bitIndex == 0) { + bytes[byteIndex] = 0; + } + if (bits.get(i)) { + bytes[byteIndex] |= bitvalues[bitIndex]; + } + } + out.write(bytes); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + bits = new BitSet(this.vectorSize); + byte[] bytes = new byte[getNBytes()]; + in.readFully(bytes); + for (int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) { + if (bitIndex == 8) { + bitIndex = 0; + byteIndex++; + } + if ((bytes[byteIndex] & bitvalues[bitIndex]) != 0) { + bits.set(i); + } + } + } + + /* @return number of bytes needed to hold bit vector */ + private int getNBytes() { + return (int) (((long) vectorSize + 7) / 8); + } +} // end class diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopDynamicBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopDynamicBloomFilter.java new file mode 100644 index 000000000000..76b27cc2ddc4 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopDynamicBloomFilter.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bloomfilter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.bloom.Key; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/* This file is based on source code from the Hadoop Project (https://hadoop.apache.org//), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Mostly copied from hadoop DynamicBloomFilter. */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HadoopDynamicBloomFilter extends HadoopFilter { + + // Every next floor of bloom filter expand factor + private static final int EXPANSION_FACTOR = 20; + + /** Threshold for the maximum number of key to record in a dynamic Bloom filter row. */ + private int nr; + + /** The number of keys recorded in the current standard active Bloom filter. */ + private int currentNbRecord; + + /** The matrix of Bloom filter. */ + private HadoopBloomFilter[] matrix; + + /** Zero-args constructor for the serialization. */ + public HadoopDynamicBloomFilter() {} + + /** + * Constructor. + * + *

Builds an empty Dynamic Bloom filter. + * + * @param vectorSize The number of bits in the vector. + * @param nbHash The number of hash function to consider. + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). + * @param nr The threshold for the maximum number of keys to record in a dynamic Bloom filter + * row. + */ + public HadoopDynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr) { + super(vectorSize, nbHash, hashType); + + this.nr = nr; + this.currentNbRecord = 0; + + matrix = new HadoopBloomFilter[1]; + matrix[0] = new HadoopBloomFilter(this.vectorSize, this.nbHash, this.hashType); + } + + @Override + public boolean add(Key key) { + if (key == null) { + throw new NullPointerException("Key can not be null"); + } + + HadoopBloomFilter bf = getActiveStandardBF(); + + // get or advance + if (bf == null) { + addRow(); + bf = matrix[matrix.length - 1]; + currentNbRecord = 0; + } + + if (bf.add(key)) { + currentNbRecord++; + return true; + } + + return false; + } + + @Override + public boolean addHash(long hash64) { + HadoopBloomFilter bf = getActiveStandardBF(); + + // get or advance + if (bf == null) { + addRow(); + bf = matrix[matrix.length - 1]; + currentNbRecord = 0; + } + + if (bf.addHash(hash64)) { + currentNbRecord++; + return true; + } + + return false; + } + + @Override + public void and(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + + HadoopDynamicBloomFilter dbf = (HadoopDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + + for (int i = 0; i < matrix.length; i++) { + matrix[i].and(dbf.matrix[i]); + } + } + + @Override + public boolean membershipTest(Key key) { + if (key == null) { + return true; + } + + for (int i = 0; i < matrix.length; i++) { + if (matrix[i].membershipTest(key)) { + return true; + } + } + + return false; + } + + @Override + public boolean membershipTest(long hash64) { + for (int i = 0; i < matrix.length; i++) { + if (matrix[i].membershipTest(hash64)) { + return true; + } + } + + return false; + } + + @Override + public void not() { + for (int i = 0; i < matrix.length; i++) { + matrix[i].not(); + } + } + + @Override + public void or(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + + HadoopDynamicBloomFilter dbf = (HadoopDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + for (int i = 0; i < matrix.length; i++) { + matrix[i].or(dbf.matrix[i]); + } + } + + @Override + public void xor(HadoopFilter filter) { + if (filter == null + || !(filter instanceof HadoopDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + HadoopDynamicBloomFilter dbf = (HadoopDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + + for (int i = 0; i < matrix.length; i++) { + matrix[i].xor(dbf.matrix[i]); + } + } + + @Override + public String toString() { + StringBuilder res = new StringBuilder(); + + for (int i = 0; i < matrix.length; i++) { + res.append(matrix[i]); + res.append(Character.LINE_SEPARATOR); + } + return res.toString(); + } + + // Writable + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(nr); + out.writeInt(currentNbRecord); + out.writeInt(matrix.length); + for (int i = 0; i < matrix.length; i++) { + matrix[i].write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + nr = in.readInt(); + currentNbRecord = in.readInt(); + int len = in.readInt(); + matrix = new HadoopBloomFilter[len]; + for (int i = 0; i < matrix.length; i++) { + matrix[i] = new HadoopBloomFilter(); + matrix[i].readFields(in); + } + } + + /** Adds a new row to this dynamic Bloom filter. */ + private void addRow() { + HadoopBloomFilter[] tmp = new HadoopBloomFilter[matrix.length + 1]; + + for (int i = 0; i < matrix.length; i++) { + tmp[i] = matrix[i]; + } + + // grow up to contain more data + vectorSize *= EXPANSION_FACTOR; + nr *= EXPANSION_FACTOR; + + tmp[tmp.length - 1] = new HadoopBloomFilter(vectorSize, nbHash, hashType); + + matrix = tmp; + } + + /** + * Returns the active standard Bloom filter in this dynamic Bloom filter. + * + * @return BloomFilter The active standard Bloom filter. Null otherwise. + */ + private HadoopBloomFilter getActiveStandardBF() { + if (currentNbRecord >= nr) { + return null; + } + + return matrix[matrix.length - 1]; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopFilter.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopFilter.java new file mode 100644 index 000000000000..f98f9d512530 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/HadoopFilter.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bloomfilter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.bloom.HashFunction; +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/* This file is based on source code from the Hadoop Project (https://hadoop.apache.org//), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Defines the general behavior of a filter. + * + *

A filter is a data structure which aims at offering a lossy summary of a set A. + * The key idea is to map entries of A (also called keys) into several positions + * in a vector through the use of several hash functions. + * + *

Typically, a filter will be implemented as a Bloom filter (or a Bloom filter extension). + * + *

It must be extended in order to define the real behavior. + * + * @see Key The general behavior of a key + * @see HashFunction A hash function + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) +@InterfaceStability.Unstable +public abstract class HadoopFilter implements Writable { + private static final int VERSION = -1; // negative to accommodate for old format + /** The vector size of this filter. */ + protected int vectorSize; + + /** The hash function used to map a key to several positions in the vector. */ + protected HashFunction hash; + + /** The number of hash function to consider. */ + protected int nbHash; + + /** Type of hashing function to use. */ + protected int hashType; + + protected HadoopFilter() {} + + /** + * Constructor. + * + * @param vectorSize The vector size of this filter. + * @param nbHash The number of hash functions to consider. + * @param hashType type of the hashing function (see {@link Hash}). + */ + protected HadoopFilter(int vectorSize, int nbHash, int hashType) { + this.vectorSize = vectorSize; + this.nbHash = nbHash; + this.hashType = hashType; + this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType); + } + + /** + * Adds a key to this filter. + * + * @param key The key to add. + * @return {@code true} if the value did not previously exist in the filter (is a new record). + * Note, that a false positive may occur. + */ + public abstract boolean add(Key key); + + /** + * Adds a key (byte[]) to this filter. + * + * @param hash64 The hash to add. + * @return {@code true} if the value did not previously exist in the filter (is a new record). + * Note, that a false positive may occur. + */ + public abstract boolean addHash(long hash64); + + /** + * Determines wether a specified key belongs to this filter. + * + * @param key The key to test. + * @return boolean True if the specified key belongs to this filter. False otherwise. + */ + public abstract boolean membershipTest(Key key); + + /** + * Determines wether a specified key belongs to this filter. + * + * @param hash64 The hash to test. + * @return boolean True if the specified key belongs to this filter. False otherwise. + */ + public abstract boolean membershipTest(long hash64); + + /** + * Peforms a logical AND between this filter and a specified filter. + * + *

Invariant: The result is assigned to this filter. + * + * @param filter The filter to AND with. + */ + public abstract void and(HadoopFilter filter); + + /** + * Peforms a logical OR between this filter and a specified filter. + * + *

Invariant: The result is assigned to this filter. + * + * @param filter The filter to OR with. + */ + public abstract void or(HadoopFilter filter); + + /** + * Peforms a logical XOR between this filter and a specified filter. + * + *

Invariant: The result is assigned to this filter. + * + * @param filter The filter to XOR with. + */ + public abstract void xor(HadoopFilter filter); + + /** + * Performs a logical NOT on this filter. + * + *

The result is assigned to this filter. + */ + public abstract void not(); + + /** + * Adds a list of keys to this filter. + * + * @param keys The list of keys. + */ + public void add(List keys) { + if (keys == null) { + throw new IllegalArgumentException("ArrayList may not be null"); + } + + for (Key key : keys) { + add(key); + } + } // end add() + + /** + * Adds a collection of keys to this filter. + * + * @param keys The collection of keys. + */ + public void add(Collection keys) { + if (keys == null) { + throw new IllegalArgumentException("Collection may not be null"); + } + for (Key key : keys) { + add(key); + } + } // end add() + + /** + * Adds an array of keys to this filter. + * + * @param keys The array of keys. + */ + public void add(Key[] keys) { + if (keys == null) { + throw new IllegalArgumentException("Key[] may not be null"); + } + for (int i = 0; i < keys.length; i++) { + add(keys[i]); + } + } // end add() + + // Writable interface + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(this.nbHash); + out.writeByte(this.hashType); + out.writeInt(this.vectorSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + int ver = in.readInt(); + if (ver > 0) { // old unversioned format + this.nbHash = ver; + this.hashType = Hash.JENKINS_HASH; + } else if (ver == VERSION) { + this.nbHash = in.readInt(); + this.hashType = in.readByte(); + } else { + throw new IOException("Unsupported version: " + ver); + } + this.vectorSize = in.readInt(); + this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType); + } +} // end class diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/ObjectToBytesVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/ObjectToBytesVisitorTest.java new file mode 100644 index 000000000000..5b3c4117507f --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/ObjectToBytesVisitorTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.function.Function; + +/** Tests for {@link ObjectToBytesVisitor}. */ +public class ObjectToBytesVisitorTest { + + private static final Random RANDOM = new Random(); + + @Test + public void testCharType() { + Function function = + DataTypes.CHAR(10).accept(ObjectToBytesVisitor.INSTANCE); + BinaryString binaryString = BinaryString.fromString(randomString(10)); + Assertions.assertThat(function.apply(binaryString)).containsExactly(binaryString.toBytes()); + } + + @Test + public void testVarCharType() { + Function function = + DataTypes.VARCHAR(10).accept(ObjectToBytesVisitor.INSTANCE); + BinaryString binaryString = BinaryString.fromString(randomString(10)); + Assertions.assertThat(function.apply(binaryString)).containsExactly(binaryString.toBytes()); + } + + @Test + public void testBooleanType() { + Function function = + DataTypes.BOOLEAN().accept(ObjectToBytesVisitor.INSTANCE); + Boolean b = RANDOM.nextBoolean(); + Assertions.assertThat(function.apply(b)) + .containsExactly(b ? new byte[] {0x01} : new byte[] {0x00}); + } + + @Test + public void testBinaryType() { + Function function = + DataTypes.BINARY(10).accept(ObjectToBytesVisitor.INSTANCE); + byte[] b = new byte[10]; + RANDOM.nextBytes(b); + Assertions.assertThat(function.apply(b)).containsExactly(b); + } + + @Test + public void testVarBinaryType() { + Function function = + DataTypes.VARBINARY(10).accept(ObjectToBytesVisitor.INSTANCE); + byte[] b = new byte[10]; + RANDOM.nextBytes(b); + Assertions.assertThat(function.apply(b)).containsExactly(b); + } + + @Test + public void testDecimalType() { + Function function = + DataTypes.DECIMAL(10, 5).accept(ObjectToBytesVisitor.INSTANCE); + Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("0.00123"), 10, 5); + Assertions.assertThat(function.apply(decimal)).containsExactly(decimal.toUnscaledBytes()); + } + + @Test + public void testTinyIntType() { + Function function = + DataTypes.TINYINT().accept(ObjectToBytesVisitor.INSTANCE); + Byte c = (byte) RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)).containsExactly(c); + } + + @Test + public void testSmallIntType() { + Function function = + DataTypes.SMALLINT().accept(ObjectToBytesVisitor.INSTANCE); + short c = (short) RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)) + .containsExactly((byte) (c & 0xff), (byte) (c >> 8 & 0xff)); + } + + @Test + public void testIntType() { + Function function = DataTypes.INT().accept(ObjectToBytesVisitor.INSTANCE); + int c = RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.intToBytes(c))); + } + + @Test + public void testBigIntType() { + Function function = + DataTypes.BIGINT().accept(ObjectToBytesVisitor.INSTANCE); + long c = RANDOM.nextLong(); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.longToBytes(c))); + } + + @Test + public void testFloatType() { + Function function = DataTypes.FLOAT().accept(ObjectToBytesVisitor.INSTANCE); + float c = RANDOM.nextFloat(); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.intToBytes(Float.floatToIntBits(c)))); + } + + @Test + public void testDoubleType() { + Function function = + DataTypes.DOUBLE().accept(ObjectToBytesVisitor.INSTANCE); + double c = RANDOM.nextDouble(); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.longToBytes(Double.doubleToLongBits(c)))); + } + + @Test + public void testDateType() { + Function function = DataTypes.DATE().accept(ObjectToBytesVisitor.INSTANCE); + int c = RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.intToBytes(c))); + } + + @Test + public void testTimestampType() { + Function function = + DataTypes.TIMESTAMP().accept(ObjectToBytesVisitor.INSTANCE); + Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis()); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.longToBytes(c.getMillisecond()))); + } + + @Test + public void testLocalZonedTimestampType() { + Function function = + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().accept(ObjectToBytesVisitor.INSTANCE); + Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis()); + Assertions.assertThat(function.apply(c)) + .containsExactly((ObjectToBytesVisitor.longToBytes(c.getMillisecond()))); + } + + @Test + public void testArrayType() { + Function function = + DataTypes.ARRAY(DataTypes.INT()).accept(ObjectToBytesVisitor.INSTANCE); + int[] ins = new int[] {0, 1, 3, 4}; + GenericArray genericArray = new GenericArray(ins); + byte[] b = new byte[16]; + for (int i = 0; i < 4; i++) { + System.arraycopy(ObjectToBytesVisitor.intToBytes(ins[i]), 0, b, 4 * i, 4); + } + Assertions.assertThat(function.apply(genericArray)).containsExactly(b); + } + + @Test + public void testMapType() { + Function function = + DataTypes.MAP(DataTypes.INT(), DataTypes.INT()) + .accept(ObjectToBytesVisitor.INSTANCE); + Map map = new HashMap<>(); + + map.put(1, 1); + map.put(2, 2); + GenericMap genericMap = new GenericMap(map); + byte[] b = new byte[16]; + System.arraycopy(ObjectToBytesVisitor.intToBytes(1), 0, b, 0, 4); + System.arraycopy(ObjectToBytesVisitor.intToBytes(1), 0, b, 4, 4); + System.arraycopy(ObjectToBytesVisitor.intToBytes(2), 0, b, 8, 4); + System.arraycopy(ObjectToBytesVisitor.intToBytes(2), 0, b, 12, 4); + Assertions.assertThat(function.apply(genericMap)).containsExactly(b); + } + + @Test + public void testMultisetType() { + Function function = + DataTypes.MULTISET(DataTypes.INT()).accept(ObjectToBytesVisitor.INSTANCE); + Map map = new HashMap<>(); + map.put(1, 1); + map.put(2, 2); + GenericMap genericMap = new GenericMap(map); + byte[] b = new byte[8]; + System.arraycopy(ObjectToBytesVisitor.intToBytes(1), 0, b, 0, 4); + System.arraycopy(ObjectToBytesVisitor.intToBytes(2), 0, b, 4, 4); + Assertions.assertThat(function.apply(genericMap)).containsExactly(b); + } + + @Test + public void testRowType() { + Function function = + DataTypes.ROW(DataTypes.INT()).accept(ObjectToBytesVisitor.INSTANCE); + Assertions.assertThat(function.apply(GenericRow.of(1))) + .containsExactly(ObjectToBytesVisitor.intToBytes(1)); + } + + public static String randomString(int length) { + byte[] buffer = new byte[length]; + + for (int i = 0; i < length; i += 1) { + buffer[i] = (byte) ('a' + RANDOM.nextInt(26)); + } + + return new String(buffer); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterTest.java new file mode 100644 index 000000000000..af761e30638b --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bloomfilter; + +import org.apache.paimon.fileindex.FileIndexReader; +import org.apache.paimon.fileindex.FileIndexWriter; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** Tests for {@link BloomFilter}. */ +public class BloomFilterTest { + + private static final Random RANDOM = new Random(); + + @Test + public void testAddFindByRandom() { + + BloomFilter filter = new BloomFilter(DataTypes.BYTES()); + FileIndexWriter writer = filter.createWriter(); + FileIndexReader reader = filter.createReader(); + List testData = new ArrayList<>(); + + for (int i = 0; i < 10000; i++) { + testData.add(random()); + } + + testData.forEach(writer::write); + + for (byte[] bytes : testData) { + Assertions.assertThat(reader.visitEqual(null, bytes)).isTrue(); + } + + int errorCount = 0; + int num = 1000000; + for (int i = 0; i < num; i++) { + byte[] ra = random(); + if (reader.visitEqual(null, ra)) { + errorCount++; + } + } + + // ffp should be less than 0.03 + Assertions.assertThat((double) errorCount / num).isLessThan(0.03); + } + + private byte[] random() { + byte[] b = new byte[Math.abs(RANDOM.nextInt(40) + 1)]; + RANDOM.nextBytes(b); + return b; + } +} From b92c724f043f3b68e7009010ce670bd38ad263fa Mon Sep 17 00:00:00 2001 From: yejunhao Date: Thu, 28 Mar 2024 16:20:39 +0800 Subject: [PATCH 2/6] fix minus --- .../main/java/org/apache/paimon/fileindex/FastHashForNumber.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java index bbc210f19fa4..71b3a37e1051 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FastHashForNumber.java @@ -45,6 +45,7 @@ import java.util.Optional; import java.util.function.Function; +/** Fast hash for number type. */ public class FastHashForNumber implements DataTypeVisitor>> { public static final FastHashForNumber INSTANCE = new FastHashForNumber(); From 10c20b8e86b8d42e1fdcbd7378eaec3c772fee44 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Thu, 28 Mar 2024 16:28:20 +0800 Subject: [PATCH 3/6] fix license --- LICENSE | 3 +++ 1 file changed, 3 insertions(+) diff --git a/LICENSE b/LICENSE index 813296254542..4de907f68931 100644 --- a/LICENSE +++ b/LICENSE @@ -209,6 +209,9 @@ Apache Software Foundation License 2.0 -------------------------------------- paimon-common/src/main/java/org/apache/paimon/fs/Path.java +paimon-common/src/main/java/org/apache/paimon/filter/bloomfilter/HadoopDynamicBloomFilter.java +paimon-common/src/main/java/org/apache/paimon/filter/bloomfilter/HadoopBloomFilter.java +paimon-common/src/main/java/org/apache/paimon/filter/bloomfilter/HadoopFilter.java from http://hadoop.apache.org/ version 2.10.2 paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java From 27200b4e1d4bf6d45352b96c8efdee880f66d669 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Thu, 28 Mar 2024 17:14:29 +0800 Subject: [PATCH 4/6] add test --- .../fileindex/FastHashForNumberTest.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/FastHashForNumberTest.java diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FastHashForNumberTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FastHashForNumberTest.java new file mode 100644 index 000000000000..33d0dbfbce7c --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FastHashForNumberTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.Random; +import java.util.function.Function; + +/** Test for {@link FastHashForNumber}. */ +public class FastHashForNumberTest { + + private static final Random RANDOM = new Random(); + + @Test + public void testDecimalType() { + Function function = + DataTypes.DECIMAL(10, 5).accept(FastHashForNumber.INSTANCE).get(); + Decimal decimal = Decimal.fromBigDecimal(new BigDecimal("0.00123"), 10, 5); + Assertions.assertThat(function.apply(decimal)) + .isEqualTo(FastHashForNumber.getLongHash(decimal.toUnscaledLong())); + } + + @Test + public void testTinyIntType() { + Function function = + DataTypes.TINYINT().accept(FastHashForNumber.INSTANCE).get(); + byte c = (byte) RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)).isEqualTo(FastHashForNumber.getLongHash(c)); + } + + @Test + public void testSmallIntType() { + Function function = + DataTypes.SMALLINT().accept(FastHashForNumber.INSTANCE).get(); + short c = (short) RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)).isEqualTo(FastHashForNumber.getLongHash(c)); + } + + @Test + public void testIntType() { + Function function = DataTypes.INT().accept(FastHashForNumber.INSTANCE).get(); + int c = RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)).isEqualTo((FastHashForNumber.getLongHash(c))); + } + + @Test + public void testBigIntType() { + Function function = + DataTypes.BIGINT().accept(FastHashForNumber.INSTANCE).get(); + long c = RANDOM.nextLong(); + Assertions.assertThat(function.apply(c)).isEqualTo((FastHashForNumber.getLongHash(c))); + } + + @Test + public void testFloatType() { + Function function = + DataTypes.FLOAT().accept(FastHashForNumber.INSTANCE).get(); + float c = RANDOM.nextFloat(); + Assertions.assertThat(function.apply(c)) + .isEqualTo((FastHashForNumber.getLongHash(Float.floatToIntBits(c)))); + } + + @Test + public void testDoubleType() { + Function function = + DataTypes.DOUBLE().accept(FastHashForNumber.INSTANCE).get(); + double c = RANDOM.nextDouble(); + Assertions.assertThat(function.apply(c)) + .isEqualTo((FastHashForNumber.getLongHash(Double.doubleToLongBits(c)))); + } + + @Test + public void testDateType() { + Function function = DataTypes.DATE().accept(FastHashForNumber.INSTANCE).get(); + int c = RANDOM.nextInt(); + Assertions.assertThat(function.apply(c)).isEqualTo((FastHashForNumber.getLongHash(c))); + } + + @Test + public void testTimestampType() { + Function function = + DataTypes.TIMESTAMP().accept(FastHashForNumber.INSTANCE).get(); + Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis()); + Assertions.assertThat(function.apply(c)) + .isEqualTo((FastHashForNumber.getLongHash(c.getMillisecond()))); + } + + @Test + public void testLocalZonedTimestampType() { + Function function = + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().accept(FastHashForNumber.INSTANCE).get(); + Timestamp c = Timestamp.fromEpochMillis(System.currentTimeMillis()); + Assertions.assertThat(function.apply(c)) + .isEqualTo((FastHashForNumber.getLongHash(c.getMillisecond()))); + } +} From c57e7899eaf99282d59da1ce1c5182f7fa4c0797 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Fri, 29 Mar 2024 10:58:33 +0800 Subject: [PATCH 5/6] fix minus --- .../org/apache/paimon/fileindex/bloomfilter/BloomFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java index a966f3d600ad..ad48e938e23b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilter.java @@ -55,7 +55,7 @@ public class BloomFilter implements FileIndexer { public BloomFilter(DataType type) { this.converter = type.accept(ObjectToBytesVisitor.INSTANCE); - hashFunction = type.accept(FastHashForNumber.INSTANCE); + this.hashFunction = type.accept(FastHashForNumber.INSTANCE); } public String name() { From e85936f7323edcd63d8d6d18e4d0bdbfae286100 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Fri, 29 Mar 2024 13:53:02 +0800 Subject: [PATCH 6/6] fix random test --- .../apache/paimon/fs/ByteArraySeekableStreamTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java index e725e2a3dea2..f771c5d4d387 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/ByteArraySeekableStreamTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.fs; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -31,7 +32,7 @@ public class ByteArraySeekableStreamTest { private static final Random RANDOM = new Random(); - @Test + @RepeatedTest(10) public void testBasic() throws IOException { int bl = 100000; byte[] b = randomBytes(bl); @@ -41,7 +42,7 @@ public void testBasic() throws IOException { for (int i = 0; i < RANDOM.nextInt(1000); i++) { int position = RANDOM.nextInt(bl); - int length = RANDOM.nextInt(b.length - position - 1); + int length = RANDOM.nextInt(Math.max(b.length - position - 1, 1)); byte[] expected = new byte[length]; System.arraycopy(b, position, expected, 0, length); @@ -55,7 +56,11 @@ public void testBasic() throws IOException { int position = RANDOM.nextInt(bl); byteArraySeekableStream.seek(position); for (int j = 0; j < 100; j++) { - Assertions.assertThat(b[position + j]) + int testPosition = position + j; + if (testPosition >= b.length) { + break; + } + Assertions.assertThat(b[testPosition]) .isEqualTo((byte) byteArraySeekableStream.read()); } }