Skip to content

Commit

Permalink
Enable Fuzzy codec for doc id fields using a bloom filter
Browse files Browse the repository at this point in the history
Signed-off-by: mgodwan <[email protected]>
  • Loading branch information
mgodwan committed Oct 31, 2023
1 parent 8807d7a commit b1b32c0
Show file tree
Hide file tree
Showing 17 changed files with 1,239 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,

IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING,
IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

import static org.opensearch.Version.V_2_7_0;
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
Expand Down Expand Up @@ -610,6 +611,20 @@ public final class IndexSettings {
Property.Dynamic
);

public static final Setting<Boolean> INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting(
"index.doc_id_fuzzy_set.enabled",
false,
Property.IndexScope,
Property.Dynamic
);

public static final Setting<Double> INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING = Setting.doubleSetting(
"index.doc_id_fuzzy_set.false_positive_probability",
DEFAULT_FALSE_POSITIVE_PROBABILITY,
Property.IndexScope,
Property.Dynamic
);

public static final TimeValue DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL = new TimeValue(650, TimeUnit.MILLISECONDS);
public static final TimeValue MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL = TimeValue.ZERO;
public static final Setting<TimeValue> INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
Expand Down Expand Up @@ -729,6 +744,17 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

/**
* Is fuzzy set enabled for doc id
*/
private volatile boolean enableFuzzySetForDocId;

/**
* False positive probability to use while creating fuzzy set.
*/
private volatile double docIdFuzzySetFalsePositiveProbability;


/**
* Returns the default search fields for this index.
*/
Expand Down Expand Up @@ -866,6 +892,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
* Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them.
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
enableFuzzySetForDocId = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING);
docIdFuzzySetFalsePositiveProbability = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -945,6 +973,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
this::setRemoteTranslogUploadBufferInterval
);
scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, this::setEnableFuzzySetForDocId);
scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1669,4 +1699,20 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
public boolean shouldWidenIndexSortType() {
return this.widenIndexSortType;
}

public boolean isEnableFuzzySetForDocId() {
return enableFuzzySetForDocId;
}

public void setEnableFuzzySetForDocId(boolean enableFuzzySetForDocId) {
this.enableFuzzySetForDocId = enableFuzzySetForDocId;
}

public double getDocIdFuzzySetFalsePositiveProbability() {
return docIdFuzzySetFalsePositiveProbability;
}

public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) {
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat;
import org.opensearch.index.codec.fuzzy.FuzzySetFactory;
import org.opensearch.index.codec.fuzzy.FuzzySetParameters;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;

/**
* {@link PerFieldMappingPostingFormatCodec This postings format} is the default
* {@link PostingsFormat} for OpenSearch. It utilizes the
Expand All @@ -57,6 +63,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
private final Logger logger;
private final MapperService mapperService;
private final DocValuesFormat dvFormat = new Lucene90DocValuesFormat();
private FuzzySetFactory fuzzySetFactory;
private FuzzyFilterPostingsFormat docIdPostingsFormat;

static {
assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class)
Expand All @@ -67,6 +75,8 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map
super(compressionMode);
this.mapperService = mapperService;
this.logger = logger;
fuzzySetFactory = new FuzzySetFactory(Map.of(IdFieldMapper.NAME,
new FuzzySetParameters(() -> mapperService.getIndexSettings().getDocIdFuzzySetFalsePositiveProbability())));
}

@Override
Expand All @@ -76,6 +86,11 @@ public PostingsFormat getPostingsFormatForField(String field) {
logger.warn("no index mapper found for field: [{}] returning default postings format", field);
} else if (fieldType instanceof CompletionFieldMapper.CompletionFieldType) {
return CompletionFieldMapper.CompletionFieldType.postingsFormat();
} else if (IdFieldMapper.NAME.equals(field) && mapperService.getIndexSettings().isEnableFuzzySetForDocId()) {
if (docIdPostingsFormat == null) {
docIdPostingsFormat = new FuzzyFilterPostingsFormat(super.getPostingsFormatForField(field), fuzzySetFactory);
}
return docIdPostingsFormat;
}
return super.getPostingsFormatForField(field);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.fuzzy;

import org.apache.lucene.util.BytesRef;
import org.opensearch.common.CheckedSupplier;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public abstract class AbstractFuzzySet implements FuzzySet {

/**
* Add an item to this fuzzy set.
* @param value The value to be added
*/
protected abstract void add(BytesRef value);

/**
* Add all items to the underlying set.
* Implementations can choose to perform this using an optimized strategy based on the type of set.
* @param valuesIteratorProvider Supplier for an iterator over All values which should be added to the set.
*/
protected void addAll(CheckedSupplier<Iterator<BytesRef>, IOException> valuesIteratorProvider) throws IOException {
Iterator<BytesRef> values = valuesIteratorProvider.get();
while (values.hasNext()) {
add(values.next());
}
}

protected long generateKey(BytesRef value) {
return MurmurHash64.INSTANCE.hash(value);
}

protected void assertAllElementsExist(CheckedSupplier<Iterator<BytesRef>, IOException> iteratorProvider) throws IOException {
Iterator<BytesRef> iter = iteratorProvider.get();
int cnt = 0;
while (iter.hasNext()) {
BytesRef item = iter.next();
assert contains(item) == Result.MAYBE : "Expected Filter to return positive response for elements added to it. Elements matched: " + cnt;
cnt ++;
}
}
}
147 changes: 147 additions & 0 deletions server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.fuzzy;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.core.Assertions;

import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;

/**
* The code is based on Lucene's implementation of Bloom Filter.
* It represents a subset of the Lucene implementation needed for OpenSearch use cases.
* Since the Lucene implementation is marked experimental,
* this aims to ensure we can provide a bwc implementation during upgrades.
*/
public class BloomFilter extends AbstractFuzzySet {

private static final Logger logger = LogManager.getLogger(BloomFilter.class);

// The sizes of BitSet used are all numbers that, when expressed in binary form,
// are all ones. This is to enable fast downsizing from one bitset to another
// by simply ANDing each set index in one bitset with the size of the target bitset
// - this provides a fast modulo of the number. Values previously accumulated in
// a large bitset and then mapped to a smaller set can be looked up using a single
// AND operation of the query term's hash rather than needing to perform a 2-step
// translation of the query term that mirrors the stored content's reprojections.
static final int[] usableBitSetSizes;

static {
usableBitSetSizes = new int[26];
for (int i = 0; i < usableBitSetSizes.length; i++) {
usableBitSetSizes[i] = (1 << (i + 6)) - 1;
}
}

private final LongArrayBackedBitSet bitset;
private final int setSize;
private final int hashCount;

BloomFilter(long maxDocs, double maxFpp, CheckedSupplier<Iterator<BytesRef>, IOException> fieldIteratorProvider) throws IOException {
int setSize =
(int)
Math.ceil(
(maxDocs * Math.log(maxFpp))
/ Math.log(1 / Math.pow(2, Math.log(2))));
setSize = getNearestSetSize(2 * setSize);
int optimalK = (int) Math.round(((double) setSize / maxDocs) * Math.log(2));
this.bitset = new LongArrayBackedBitSet(setSize + 1);
this.setSize = setSize;
this.hashCount = optimalK;
addAll(fieldIteratorProvider);
if (Assertions.ENABLED) {
assertAllElementsExist(fieldIteratorProvider);
}
logger.trace("Bloom filter created with fpp: {}, setSize: {}, hashCount: {}", maxFpp, setSize, hashCount);
}

BloomFilter(IndexInput in) throws IOException {
hashCount = in.readInt();
setSize = in.readInt();
this.bitset = new LongArrayBackedBitSet(in);
}

@Override
public void writeTo(DataOutput out) throws IOException {
out.writeInt(hashCount);
out.writeInt(setSize);
bitset.writeTo(out);
}

static int getNearestSetSize(int maxNumberOfBits) {
int result = usableBitSetSizes[0];
for (int i = 0; i < usableBitSetSizes.length; i++) {
if (usableBitSetSizes[i] <= maxNumberOfBits) {
result = usableBitSetSizes[i];
}
}
return result;
}

@Override
public SetType setType() {
return SetType.BLOOM_FILTER_V1;
}

@Override
public Result contains(BytesRef value) {
long hash = generateKey(value);
int msb = (int) (hash >>> Integer.SIZE);
int lsb = (int) hash;
for (int i = 0; i < hashCount; i++) {
int bloomPos = (lsb + i * msb);
if (!mayContainValue(bloomPos)) {
return Result.NO;
}
}
return Result.MAYBE;
}

protected void add(BytesRef value) {
long hash = generateKey(value);
int msb = (int) (hash >>> Integer.SIZE);
int lsb = (int) hash;
for (int i = 0; i < hashCount; i++) {
// Bitmasking using bloomSize is effectively a modulo operation since set sizes are always power of 2
int bloomPos = (lsb + i * msb) & setSize;
bitset.set(bloomPos);
}
}

@Override
public boolean isSaturated() {
long numBitsSet = bitset.cardinality();
return (float) numBitsSet / (float) setSize > 0.9f;
}

@Override
public long ramBytesUsed() {
return RamUsageEstimator.sizeOf(bitset.ramBytesUsed());
}

private boolean mayContainValue(int aHash) {
// Bloom sizes are always base 2 and so can be ANDed for a fast modulo
int pos = aHash & setSize;
return bitset.isSet(pos);
}

@Override
public void close() throws IOException {
IOUtils.close(bitset);
}
}
Loading

0 comments on commit b1b32c0

Please sign in to comment.