diff --git a/CHANGELOG.md b/CHANGELOG.md index 7429eb9f4cd4a..c978dbaf0ebfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -138,6 +138,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465)) - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) +- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java new file mode 100644 index 0000000000000..4e995f5a5067c --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterConstructionBenchmark { + + private List items; + + @Param({ "1000000", "10000000", "50000000" }) + private int numIds; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySetFactory fuzzySetFactory; + private String fieldName; + + @Setup + public void setupIds() { + this.fieldName = IdFieldMapper.NAME; + this.items = IntStream.range(0, numIds).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + this.fuzzySetFactory = new FuzzySetFactory(Map.of(fieldName, parameters)); + } + + @Benchmark + public FuzzySet buildFilter() throws IOException { + return fuzzySetFactory.createFuzzySet(items.size(), fieldName, () -> items.iterator()); + } +} diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java new file mode 100644 index 0000000000000..383539219830e --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterLookupBenchmark { + + @Param({ "50000000", "1000000" }) + private int numItems; + + @Param({ "1000000" }) + private int searchKeyCount; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySet fuzzySet; + private List items; + private Random random = new Random(); + + @Setup + public void setupFilter() throws IOException { + String fieldName = IdFieldMapper.NAME; + items = IntStream.range(0, numItems).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + fuzzySet = new FuzzySetFactory(Map.of(fieldName, parameters)).createFuzzySet(numItems, fieldName, () -> items.iterator()); + } + + @Benchmark + public void contains_withExistingKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(items.get(random.nextInt(items.size()))) == FuzzySet.Result.MAYBE); + } + } + + @Benchmark + public void contains_withRandomKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(new BytesRef(UUIDs.base64UUID()))); + } + } +} diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index 3dff452be855f..777377f04e8b9 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -62,6 +62,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { setting 'repositories.url.allowed_urls', 'http://snapshot.test*' setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" setting 'http.content_type.required', 'true' + systemProperty 'opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled', 'true' } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 1577260e145d4..08a5e92fc2d02 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -40,10 +40,10 @@ import org.opensearch.common.Booleans; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.EngineConfig; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; @@ -344,6 +344,92 @@ public void testIndexingWithSegRep() throws Exception { } } + public void testIndexingWithFuzzyFilterPostings() throws Exception { + if (UPGRADE_FROM_VERSION.onOrBefore(Version.V_2_11_1)) { + logger.info("--> Skip test for version {} where fuzzy filter postings format feature is not available", UPGRADE_FROM_VERSION); + return; + } + final String indexName = "test-index-fuzzy-set"; + final int shardCount = 3; + final int replicaCount = 1; + logger.info("--> Case {}", CLUSTER_TYPE); + printClusterNodes(); + logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put( + EngineConfig.INDEX_CODEC_SETTING.getKey(), + randomFrom(new ArrayList<>(CODECS) { + { + add(CodecService.LUCENE_DEFAULT_CODEC); + } + }) + ) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); + createIndex(indexName, settings.build()); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + bulk(indexName, "_OLD", 5); + break; + case MIXED: + waitForClusterHealthWithNoShardMigration(indexName, "yellow"); + break; + case UPGRADED: + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING.getKey(), true); + updateIndexSettings(indexName, settingsBuilder); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + int expectedCount; + switch (CLUSTER_TYPE) { + case OLD: + expectedCount = 5; + break; + case MIXED: + if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + expectedCount = 5; + } else { + expectedCount = 10; + } + break; + case UPGRADED: + expectedCount = 15; + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount); + + if (CLUSTER_TYPE != ClusterType.OLD) { + bulk(indexName, "_" + CLUSTER_TYPE, 5); + logger.info("--> Index one doc (to be deleted next) and verify doc count"); + Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); + toBeDeleted.addParameter("refresh", "true"); + toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); + client().performRequest(toBeDeleted); + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount + 6); + + logger.info("--> Delete previously added doc and verify doc count"); + Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); + delete.addParameter("refresh", "true"); + client().performRequest(delete); + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount + 5); + + //forceMergeAndVerify(indexName, shardCount * (1 + replicaCount)); + } + } + public void testAutoIdWithOpTypeCreate() throws IOException { final String indexName = "auto_id_and_op_type_create_index"; StringBuilder b = new StringBuilder(); diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 44dc4161f093a..e6d7ba0c60772 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -34,6 +34,7 @@ protected FeatureFlagSettings( FeatureFlags.IDENTITY_SETTING, FeatureFlags.TELEMETRY_SETTING, FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, - FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING + FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, + FeatureFlags.DOC_ID_FUZZY_SET_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 97d6a0ddf02c8..cf502fe685fcf 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -229,6 +229,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, + // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index c88a795501ca6..075dc9934e130 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -54,6 +54,11 @@ public class FeatureFlags { */ public static final String WRITEABLE_REMOTE_INDEX = "opensearch.experimental.feature.writeable_remote_index.enabled"; + /** + * Gates the optimization to enable bloom filters for doc id lookup. + */ + public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled"; + /** * Should store the settings from opensearch.yml. */ @@ -110,4 +115,6 @@ public static boolean isEnabled(Setting featureFlag) { false, Property.NodeScope ); + + public static final Setting DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 00e765d73f77f..43a3a49418fbf 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -65,7 +65,9 @@ import java.util.function.UnaryOperator; import static org.opensearch.Version.V_2_7_0; +import static org.opensearch.common.util.FeatureFlags.DOC_ID_FUZZY_SET_SETTING; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; +import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; @@ -658,6 +660,22 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic ); + public static final Setting INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting( + "index.optimize_doc_id_lookup.fuzzy_set.enabled", + false, + Property.IndexScope, + Property.Dynamic + ); + + public static final Setting INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING = Setting.doubleSetting( + "index.optimize_doc_id_lookup.fuzzy_set.false_positive_probability", + DEFAULT_FALSE_POSITIVE_PROBABILITY, + 0.01, + 0.50, + Property.IndexScope, + Property.Dynamic + ); + public static final TimeValue DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL = new TimeValue(650, TimeUnit.MILLISECONDS); public static final TimeValue MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL = TimeValue.ZERO; public static final Setting INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( @@ -787,6 +805,16 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; + /** + * Is fuzzy set enabled for doc id + */ + private volatile boolean enableFuzzySetForDocId; + + /** + * False positive probability to use while creating fuzzy set. + */ + private volatile double docIdFuzzySetFalsePositiveProbability; + /** * Returns the default search fields for this index. */ @@ -926,6 +954,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them. */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); + + boolean isOptimizeDocIdLookupUsingFuzzySetFeatureEnabled = FeatureFlags.isEnabled(DOC_ID_FUZZY_SET_SETTING); + if (isOptimizeDocIdLookupUsingFuzzySetFeatureEnabled) { + enableFuzzySetForDocId = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING); + docIdFuzzySetFalsePositiveProbability = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING); + } + scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, tieredMergePolicyProvider::setNoCFSRatio @@ -1032,6 +1067,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this::setRemoteTranslogUploadBufferInterval ); scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); + scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, this::setEnableFuzzySetForDocId); + scopedSettings.addSettingsUpdateConsumer( + INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, + this::setDocIdFuzzySetFalsePositiveProbability + ); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1801,4 +1841,36 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) { public boolean shouldWidenIndexSortType() { return this.widenIndexSortType; } + + public boolean isEnableFuzzySetForDocId() { + return enableFuzzySetForDocId; + } + + public void setEnableFuzzySetForDocId(boolean enableFuzzySetForDocId) { + verifyFeatureToSetDocIdFuzzySetSetting(enabled -> this.enableFuzzySetForDocId = enabled, enableFuzzySetForDocId); + } + + public double getDocIdFuzzySetFalsePositiveProbability() { + return docIdFuzzySetFalsePositiveProbability; + } + + public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) { + verifyFeatureToSetDocIdFuzzySetSetting( + fpp -> this.docIdFuzzySetFalsePositiveProbability = fpp, + docIdFuzzySetFalsePositiveProbability + ); + } + + private static void verifyFeatureToSetDocIdFuzzySetSetting(Consumer settingUpdater, T val) { + if (FeatureFlags.isEnabled(DOC_ID_FUZZY_SET_SETTING)) { + settingUpdater.accept(val); + } else { + throw new IllegalArgumentException( + "Fuzzy set for optimizing doc id lookup " + + "cannot be enabled with feature flag [" + + FeatureFlags.DOC_ID_FUZZY_SET + + "] set to false" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index dc28ad2d6dc07..1ad17f121560c 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -39,10 +39,16 @@ import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; import org.opensearch.index.mapper.CompletionFieldMapper; +import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; +import java.util.Map; + /** * {@link PerFieldMappingPostingFormatCodec This postings format} is the default * {@link PostingsFormat} for OpenSearch. It utilizes the @@ -57,6 +63,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene99Codec { private final Logger logger; private final MapperService mapperService; private final DocValuesFormat dvFormat = new Lucene90DocValuesFormat(); + private final FuzzySetFactory fuzzySetFactory; + private PostingsFormat docIdPostingsFormat; static { assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class) @@ -67,6 +75,12 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map super(compressionMode); this.mapperService = mapperService; this.logger = logger; + fuzzySetFactory = new FuzzySetFactory( + Map.of( + IdFieldMapper.NAME, + new FuzzySetParameters(() -> mapperService.getIndexSettings().getDocIdFuzzySetFalsePositiveProbability()) + ) + ); } @Override @@ -76,6 +90,11 @@ public PostingsFormat getPostingsFormatForField(String field) { logger.warn("no index mapper found for field: [{}] returning default postings format", field); } else if (fieldType instanceof CompletionFieldMapper.CompletionFieldType) { return CompletionFieldMapper.CompletionFieldType.postingsFormat(); + } else if (IdFieldMapper.NAME.equals(field) && mapperService.getIndexSettings().isEnableFuzzySetForDocId()) { + if (docIdPostingsFormat == null) { + docIdPostingsFormat = new FuzzyFilterPostingsFormat(super.getPostingsFormatForField(field), fuzzySetFactory); + } + return docIdPostingsFormat; } return super.getPostingsFormatForField(field); } diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java new file mode 100644 index 0000000000000..09976297361fa --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.hash.T1ha1; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Encapsulates common behaviour implementation for a fuzzy set. + */ +public abstract class AbstractFuzzySet implements FuzzySet { + + /** + * Add an item to this fuzzy set. + * @param value The value to be added + */ + protected abstract void add(BytesRef value); + + /** + * Add all items to the underlying set. + * Implementations can choose to perform this using an optimized strategy based on the type of set. + * @param valuesIteratorProvider Supplier for an iterator over All values which should be added to the set. + */ + protected void addAll(CheckedSupplier, IOException> valuesIteratorProvider) throws IOException { + Iterator values = valuesIteratorProvider.get(); + while (values.hasNext()) { + add(values.next()); + } + } + + public Result contains(BytesRef val) { + return containsHash(generateKey(val)); + } + + protected abstract Result containsHash(long hash); + + protected long generateKey(BytesRef value) { + return T1ha1.hash(value.bytes, value.offset, value.length, 0L); + } + + protected void assertAllElementsExist(CheckedSupplier, IOException> iteratorProvider) throws IOException { + Iterator iter = iteratorProvider.get(); + int cnt = 0; + while (iter.hasNext()) { + BytesRef item = iter.next(); + assert contains(item) == Result.MAYBE + : "Expected Filter to return positive response for elements added to it. Elements matched: " + cnt; + cnt++; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java new file mode 100644 index 0000000000000..b8a8352183ca8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Based on code from the Apache Lucene project (https://github.com/apache/lucene) under the Apache License, version 2.0. + * Copyright 2001-2022 The Apache Software Foundation + * Modifications (C) OpenSearch Contributors. All Rights Reserved. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.Assertions; + +import java.io.IOException; +import java.util.Iterator; + +/** + * The code is based on Lucene's implementation of Bloom Filter. + * It represents a subset of the Lucene implementation needed for OpenSearch use cases. + * Since the Lucene implementation is marked experimental, + * this aims to ensure we can provide a bwc implementation during upgrades. + */ +public class BloomFilter extends AbstractFuzzySet { + + private static final Logger logger = LogManager.getLogger(BloomFilter.class); + + // The sizes of BitSet used are all numbers that, when expressed in binary form, + // are all ones. This is to enable fast downsizing from one bitset to another + // by simply ANDing each set index in one bitset with the size of the target bitset + // - this provides a fast modulo of the number. Values previously accumulated in + // a large bitset and then mapped to a smaller set can be looked up using a single + // AND operation of the query term's hash rather than needing to perform a 2-step + // translation of the query term that mirrors the stored content's reprojections. + static final int[] usableBitSetSizes; + + static { + usableBitSetSizes = new int[26]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + usableBitSetSizes[i] = (1 << (i + 6)) - 1; + } + } + + private final LongArrayBackedBitSet bitset; + private final int setSize; + private final int hashCount; + + BloomFilter(long maxDocs, double maxFpp, CheckedSupplier, IOException> fieldIteratorProvider) throws IOException { + int setSize = (int) Math.ceil((maxDocs * Math.log(maxFpp)) / Math.log(1 / Math.pow(2, Math.log(2)))); + setSize = getNearestSetSize(setSize < Integer.MAX_VALUE / 2 ? 2 * setSize : Integer.MAX_VALUE); + int optimalK = (int) Math.round(((double) setSize / maxDocs) * Math.log(2)); + this.bitset = new LongArrayBackedBitSet(setSize); + this.setSize = setSize; + this.hashCount = optimalK; + addAll(fieldIteratorProvider); + if (Assertions.ENABLED) { + assertAllElementsExist(fieldIteratorProvider); + } + logger.debug("Bloom filter created with fpp: {}, setSize: {}, hashCount: {}", maxFpp, setSize, hashCount); + } + + BloomFilter(IndexInput in) throws IOException { + hashCount = in.readInt(); + setSize = in.readInt(); + this.bitset = new LongArrayBackedBitSet(in); + } + + @Override + public void writeTo(DataOutput out) throws IOException { + out.writeInt(hashCount); + out.writeInt(setSize); + bitset.writeTo(out); + } + + private static int getNearestSetSize(int maxNumberOfBits) { + assert maxNumberOfBits > 0 : "Provided size estimate for bloom filter is illegal (<=0) : " + maxNumberOfBits; + int result = usableBitSetSizes[0]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + if (usableBitSetSizes[i] <= maxNumberOfBits) { + result = usableBitSetSizes[i]; + } + } + return result; + } + + @Override + public SetType setType() { + return SetType.BLOOM_FILTER_V1; + } + + @Override + public Result containsHash(long hash) { + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + int bloomPos = (lsb + i * msb); + if (!mayContainValue(bloomPos)) { + return Result.NO; + } + } + return Result.MAYBE; + } + + protected void add(BytesRef value) { + long hash = generateKey(value); + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + // Bitmasking using bloomSize is effectively a modulo operation since set sizes are always power of 2 + int bloomPos = (lsb + i * msb) & setSize; + bitset.set(bloomPos); + } + } + + @Override + public boolean isSaturated() { + long numBitsSet = bitset.cardinality(); + // Don't bother saving bitsets if >90% of bits are set - we don't want to + // throw any more memory at this problem. + return (float) numBitsSet / (float) setSize > 0.9f; + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.sizeOf(bitset.ramBytesUsed()); + } + + private boolean mayContainValue(int aHash) { + // Bloom sizes are always base 2 and so can be ANDed for a fast modulo + int pos = aHash & setSize; + return bitset.get(pos); + } + + @Override + public void close() throws IOException { + IOUtils.close(bitset); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java new file mode 100644 index 0000000000000..01f8054fc91be --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java @@ -0,0 +1,492 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Based on code from the Apache Lucene project (https://github.com/apache/lucene) under the Apache License, version 2.0. + * Copyright 2001-2022 The Apache Software Foundation + * Modifications (C) OpenSearch Contributors. All Rights Reserved. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Based on Lucene's BloomFilterPostingsFormat. + * Discussion with Lucene community based on which the decision to have this in OpenSearch code was taken + * is captured here: https://github.com/apache/lucene/issues/12986 + * + * The class deals with persisting the bloom filter through the postings format, + * and reading the field via a bloom filter fronted terms enum (to reduce disk seeks in case of absence of requested values) + * The class should be handled during lucene upgrades. There are bwc tests present to verify the format continues to work after upgrade. + */ + +public final class FuzzyFilterPostingsFormat extends PostingsFormat { + + private static final Logger logger = LogManager.getLogger(FuzzyFilterPostingsFormat.class); + + /** + * This name is stored in headers. If changing the implementation for the format, this name/version should be updated + * so that reads can work as expected. + */ + public static final String FUZZY_FILTER_CODEC_NAME = "FuzzyFilterCodec99"; + + public static final int VERSION_START = 0; + public static final int VERSION_CURRENT = VERSION_START; + + /** Extension of Fuzzy Filters file */ + public static final String FUZZY_FILTER_FILE_EXTENSION = "fzd"; + + private final PostingsFormat delegatePostingsFormat; + private final FuzzySetFactory fuzzySetFactory; + + public FuzzyFilterPostingsFormat(PostingsFormat delegatePostingsFormat, FuzzySetFactory fuzzySetFactory) { + super(FUZZY_FILTER_CODEC_NAME); + this.delegatePostingsFormat = delegatePostingsFormat; + this.fuzzySetFactory = fuzzySetFactory; + } + + // Needed for SPI + public FuzzyFilterPostingsFormat() { + this(null, null); + } + + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + if (delegatePostingsFormat == null) { + throw new UnsupportedOperationException( + "Error - " + getClass().getName() + " has been constructed without a choice of PostingsFormat" + ); + } + FieldsConsumer fieldsConsumer = delegatePostingsFormat.fieldsConsumer(state); + return new FuzzyFilteredFieldsConsumer(fieldsConsumer, state); + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + return new FuzzyFilteredFieldsProducer(state); + } + + static class FuzzyFilteredFieldsProducer extends FieldsProducer { + private FieldsProducer delegateFieldsProducer; + HashMap fuzzySetsByFieldName = new HashMap<>(); + private List closeables = new ArrayList<>(); + + public FuzzyFilteredFieldsProducer(SegmentReadState state) throws IOException { + String fuzzyFilterFileName = IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + FUZZY_FILTER_FILE_EXTENSION + ); + IndexInput filterIn = null; + boolean success = false; + try { + // Using IndexInput directly instead of ChecksumIndexInput since we want to support RandomAccessInput + filterIn = state.directory.openInput(fuzzyFilterFileName, state.context); + + CodecUtil.checkIndexHeader( + filterIn, + FUZZY_FILTER_CODEC_NAME, + VERSION_START, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + // Load the delegate postings format + PostingsFormat delegatePostingsFormat = PostingsFormat.forName(filterIn.readString()); + this.delegateFieldsProducer = delegatePostingsFormat.fieldsProducer(state); + int numFilters = filterIn.readInt(); + for (int i = 0; i < numFilters; i++) { + int fieldNum = filterIn.readInt(); + FuzzySet set = FuzzySetFactory.deserializeFuzzySet(filterIn); + closeables.add(set); + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldNum); + fuzzySetsByFieldName.put(fieldInfo.name, set); + } + CodecUtil.retrieveChecksum(filterIn); + + // Can we disable it if we foresee performance issues? + CodecUtil.checksumEntireFile(filterIn); + success = true; + closeables.add(filterIn); + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(filterIn, delegateFieldsProducer); + } + } + } + + @Override + public Iterator iterator() { + return delegateFieldsProducer.iterator(); + } + + @Override + public void close() throws IOException { + // Why closing here? + IOUtils.closeWhileHandlingException(closeables); + delegateFieldsProducer.close(); + } + + @Override + public Terms terms(String field) throws IOException { + FuzzySet filter = fuzzySetsByFieldName.get(field); + if (filter == null) { + return delegateFieldsProducer.terms(field); + } else { + Terms result = delegateFieldsProducer.terms(field); + if (result == null) { + return null; + } + return new FuzzyFilteredTerms(result, filter); + } + } + + @Override + public int size() { + return delegateFieldsProducer.size(); + } + + static class FuzzyFilteredTerms extends Terms { + private Terms delegateTerms; + private FuzzySet filter; + + public FuzzyFilteredTerms(Terms terms, FuzzySet filter) { + this.delegateTerms = terms; + this.filter = filter; + } + + @Override + public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) throws IOException { + return delegateTerms.intersect(compiled, startTerm); + } + + @Override + public TermsEnum iterator() throws IOException { + return new FilterAppliedTermsEnum(delegateTerms, filter); + } + + @Override + public long size() throws IOException { + return delegateTerms.size(); + } + + @Override + public long getSumTotalTermFreq() throws IOException { + return delegateTerms.getSumTotalTermFreq(); + } + + @Override + public long getSumDocFreq() throws IOException { + return delegateTerms.getSumDocFreq(); + } + + @Override + public int getDocCount() throws IOException { + return delegateTerms.getDocCount(); + } + + @Override + public boolean hasFreqs() { + return delegateTerms.hasFreqs(); + } + + @Override + public boolean hasOffsets() { + return delegateTerms.hasOffsets(); + } + + @Override + public boolean hasPositions() { + return delegateTerms.hasPositions(); + } + + @Override + public boolean hasPayloads() { + return delegateTerms.hasPayloads(); + } + + @Override + public BytesRef getMin() throws IOException { + return delegateTerms.getMin(); + } + + @Override + public BytesRef getMax() throws IOException { + return delegateTerms.getMax(); + } + } + + static final class FilterAppliedTermsEnum extends BaseTermsEnum { + + private Terms delegateTerms; + private TermsEnum delegateTermsEnum; + private final FuzzySet filter; + + public FilterAppliedTermsEnum(Terms delegateTerms, FuzzySet filter) throws IOException { + this.delegateTerms = delegateTerms; + this.filter = filter; + } + + void reset(Terms delegateTerms) throws IOException { + this.delegateTerms = delegateTerms; + this.delegateTermsEnum = null; + } + + private TermsEnum delegate() throws IOException { + if (delegateTermsEnum == null) { + /* pull the iterator only if we really need it - + * this can be a relativly heavy operation depending on the + * delegate postings format and the underlying directory + * (clone IndexInput) */ + delegateTermsEnum = delegateTerms.iterator(); + } + return delegateTermsEnum; + } + + @Override + public BytesRef next() throws IOException { + return delegate().next(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + // The magical fail-fast speed up that is the entire point of all of + // this code - save a disk seek if there is a match on an in-memory + // structure + // that may occasionally give a false positive but guaranteed no false + // negatives + if (filter.contains(text) == FuzzySet.Result.NO) { + return false; + } + return delegate().seekExact(text); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return delegate().seekCeil(text); + } + + @Override + public void seekExact(long ord) throws IOException { + delegate().seekExact(ord); + } + + @Override + public BytesRef term() throws IOException { + return delegate().term(); + } + + @Override + public long ord() throws IOException { + return delegate().ord(); + } + + @Override + public int docFreq() throws IOException { + return delegate().docFreq(); + } + + @Override + public long totalTermFreq() throws IOException { + return delegate().totalTermFreq(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return delegate().postings(reuse, flags); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + return delegate().impacts(flags); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(filter=" + filter.toString() + ")"; + } + } + + @Override + public void checkIntegrity() throws IOException { + delegateFieldsProducer.checkIntegrity(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(fields=" + fuzzySetsByFieldName.size() + ",delegate=" + delegateFieldsProducer + ")"; + } + } + + class FuzzyFilteredFieldsConsumer extends FieldsConsumer { + private FieldsConsumer delegateFieldsConsumer; + private Map fuzzySets = new HashMap<>(); + private SegmentWriteState state; + private List closeables = new ArrayList<>(); + + public FuzzyFilteredFieldsConsumer(FieldsConsumer fieldsConsumer, SegmentWriteState state) { + this.delegateFieldsConsumer = fieldsConsumer; + this.state = state; + } + + @Override + public void write(Fields fields, NormsProducer norms) throws IOException { + + // Delegate must write first: it may have opened files + // on creating the class + // (e.g. Lucene41PostingsConsumer), and write() will + // close them; alternatively, if we delayed pulling + // the fields consumer until here, we could do it + // afterwards: + delegateFieldsConsumer.write(fields, norms); + + for (String field : fields) { + Terms terms = fields.terms(field); + if (terms == null) { + continue; + } + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(field); + FuzzySet fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name, () -> iterator(terms)); + if (fuzzySet == null) { + break; + } + assert fuzzySets.containsKey(fieldInfo) == false; + closeables.add(fuzzySet); + fuzzySets.put(fieldInfo, fuzzySet); + } + } + + private Iterator iterator(Terms terms) throws IOException { + TermsEnum termIterator = terms.iterator(); + return new Iterator<>() { + + private BytesRef currentTerm; + private PostingsEnum postingsEnum; + + @Override + public boolean hasNext() { + try { + do { + currentTerm = termIterator.next(); + if (currentTerm == null) { + return false; + } + postingsEnum = termIterator.postings(postingsEnum, 0); + if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) { + return true; + } + } while (true); + } catch (IOException ex) { + throw new IllegalStateException("Cannot read terms: " + termIterator.attributes()); + } + } + + @Override + public BytesRef next() { + return currentTerm; + } + }; + } + + private boolean closed; + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + delegateFieldsConsumer.close(); + + // Now we are done accumulating values for these fields + List> nonSaturatedSets = new ArrayList<>(); + + for (Map.Entry entry : fuzzySets.entrySet()) { + FuzzySet fuzzySet = entry.getValue(); + if (!fuzzySet.isSaturated()) { + nonSaturatedSets.add(entry); + } + } + String fuzzyFilterFileName = IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + FUZZY_FILTER_FILE_EXTENSION + ); + try (IndexOutput fuzzyFilterFileOutput = state.directory.createOutput(fuzzyFilterFileName, state.context)) { + logger.trace( + "Writing fuzzy filter postings with version: {} for segment: {}", + VERSION_CURRENT, + state.segmentInfo.toString() + ); + CodecUtil.writeIndexHeader( + fuzzyFilterFileOutput, + FUZZY_FILTER_CODEC_NAME, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + + // remember the name of the postings format we will delegate to + fuzzyFilterFileOutput.writeString(delegatePostingsFormat.getName()); + + // First field in the output file is the number of fields+sets saved + fuzzyFilterFileOutput.writeInt(nonSaturatedSets.size()); + for (Map.Entry entry : nonSaturatedSets) { + FieldInfo fieldInfo = entry.getKey(); + FuzzySet fuzzySet = entry.getValue(); + saveAppropriatelySizedFuzzySet(fuzzyFilterFileOutput, fuzzySet, fieldInfo); + } + CodecUtil.writeFooter(fuzzyFilterFileOutput); + } + // We are done with large bitsets so no need to keep them hanging around + fuzzySets.clear(); + IOUtils.closeWhileHandlingException(closeables); + } + + private void saveAppropriatelySizedFuzzySet(IndexOutput fileOutput, FuzzySet fuzzySet, FieldInfo fieldInfo) throws IOException { + fileOutput.writeInt(fieldInfo.number); + fileOutput.writeString(fuzzySet.setType().getSetName()); + fuzzySet.writeTo(fileOutput); + } + } + + @Override + public String toString() { + return "FuzzyFilterPostingsFormat(" + delegatePostingsFormat + ")"; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java new file mode 100644 index 0000000000000..df443ffbca33d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedFunction; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Fuzzy Filter interface + */ +public interface FuzzySet extends Accountable, Closeable { + + /** + * Name used for a codec to be aware of what fuzzy set has been used. + */ + SetType setType(); + + /** + * @param value the item whose membership needs to be checked. + */ + Result contains(BytesRef value); + + boolean isSaturated(); + + void writeTo(DataOutput out) throws IOException; + + /** + * Enum to represent result of membership check on a fuzzy set. + */ + enum Result { + /** + * A definite no for the set membership of an item. + */ + NO, + + /** + * Fuzzy sets cannot guarantee that a given item is present in the set or not due the data being stored in + * a lossy format (e.g. fingerprint, hash). + * Hence, we return a response denoting that the item maybe present. + */ + MAYBE + } + + /** + * Enum to declare supported properties and mappings for a fuzzy set implementation. + */ + enum SetType { + BLOOM_FILTER_V1("bloom_filter_v1", BloomFilter::new, List.of("bloom_filter")); + + /** + * Name persisted in postings file. This will be used when reading to determine the bloom filter implementation. + */ + private final String setName; + + /** + * Interface for reading the actual fuzzy set implementation into java object. + */ + private final CheckedFunction deserializer; + + SetType(String setName, CheckedFunction deserializer, List aliases) { + if (aliases.size() < 1) { + throw new IllegalArgumentException("Alias list is empty. Could not create Set Type: " + setName); + } + this.setName = setName; + this.deserializer = deserializer; + } + + public String getSetName() { + return setName; + } + + public CheckedFunction getDeserializer() { + return deserializer; + } + + public static SetType from(String name) { + for (SetType type : SetType.values()) { + if (type.setName.equals(name)) { + return type; + } + } + throw new IllegalArgumentException("There is no implementation for fuzzy set: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java new file mode 100644 index 0000000000000..5d1fd03f099d4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Factory class to create fuzzy set. + * Supports bloom filters for now. More sets can be added as required. + */ +public class FuzzySetFactory { + + private final Map setTypeForField; + + public FuzzySetFactory(Map setTypeForField) { + this.setTypeForField = setTypeForField; + } + + public FuzzySet createFuzzySet(int maxDocs, String fieldName, CheckedSupplier, IOException> iteratorProvider) + throws IOException { + FuzzySetParameters params = setTypeForField.get(fieldName); + if (params == null) { + throw new IllegalArgumentException("No fuzzy set defined for field: " + fieldName); + } + switch (params.getSetType()) { + case BLOOM_FILTER_V1: + return new BloomFilter(maxDocs, params.getFalsePositiveProbability(), iteratorProvider); + default: + throw new IllegalArgumentException("No Implementation for set type: " + params.getSetType()); + } + } + + public static FuzzySet deserializeFuzzySet(IndexInput in) throws IOException { + FuzzySet.SetType setType = FuzzySet.SetType.from(in.readString()); + return setType.getDeserializer().apply(in); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java new file mode 100644 index 0000000000000..7bb96e7c34f0b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import java.util.function.Supplier; + +/** + * Wrapper for params to create a fuzzy set. + */ +public class FuzzySetParameters { + private final Supplier falsePositiveProbabilityProvider; + private final FuzzySet.SetType setType; + + public static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.2047d; + + public FuzzySetParameters(Supplier falsePositiveProbabilityProvider) { + this.falsePositiveProbabilityProvider = falsePositiveProbabilityProvider; + this.setType = FuzzySet.SetType.BLOOM_FILTER_V1; + } + + public double getFalsePositiveProbability() { + return falsePositiveProbabilityProvider.get(); + } + + public FuzzySet.SetType getSetType() { + return setType; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java new file mode 100644 index 0000000000000..08d6059c1e82e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.OpenSearchException; +import org.opensearch.common.util.LongArray; + +import java.io.IOException; + +/** + * A Long array backed by RandomAccessInput. + * This implementation supports read operations only. + */ +class IndexInputImmutableLongArray implements LongArray { + + private final RandomAccessInput input; + private final long size; + + IndexInputImmutableLongArray(long size, RandomAccessInput input) { + this.size = size; + this.input = input; + } + + @Override + public void close() {} + + @Override + public long size() { + return size; + } + + @Override + public synchronized long get(long index) { + try { + // Multiplying by 8 since each long is 8 bytes, and we need to get the long value at (index * 8) in the + // RandomAccessInput being accessed. + return input.readLong(index << 3); + } catch (IOException ex) { + throw new OpenSearchException(ex); + } + } + + @Override + public long set(long index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long increment(long index, long inc) { + throw new UnsupportedOperationException(); + } + + @Override + public void fill(long fromIndex, long toIndex, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.shallowSizeOfInstance(IndexInputImmutableLongArray.class); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java new file mode 100644 index 0000000000000..bd4936aeec366 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.LongArray; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A bitset backed by a long-indexed array. + */ +class LongArrayBackedBitSet implements Accountable, Closeable { + + private long underlyingArrayLength = 0L; + private LongArray longArray; + + /** + * Constructor which uses an on heap array. This should be using during construction of the bitset. + * @param capacity The maximum capacity to provision for the bitset. + */ + LongArrayBackedBitSet(long capacity) { + // Since the bitset is backed by a long array, we only need 1 element for every 64 bits in the underlying array. + underlyingArrayLength = (capacity >> 6) + 1L; + this.longArray = BigArrays.NON_RECYCLING_INSTANCE.withCircuitBreaking().newLongArray(underlyingArrayLength); + } + + /** + * Constructor which uses Lucene's IndexInput to read the bitset into a read-only buffer. + * @param in IndexInput containing the serialized bitset. + * @throws IOException + */ + LongArrayBackedBitSet(IndexInput in) throws IOException { + underlyingArrayLength = in.readLong(); + // Multiplying by 8 since the length above is of the long array, so we will have + // 8 times the number of bytes in our stream. + long streamLength = underlyingArrayLength << 3; + this.longArray = new IndexInputImmutableLongArray(underlyingArrayLength, in.randomAccessSlice(in.getFilePointer(), streamLength)); + in.skipBytes(streamLength); + } + + public void writeTo(DataOutput out) throws IOException { + out.writeLong(underlyingArrayLength); + for (int idx = 0; idx < underlyingArrayLength; idx++) { + out.writeLong(longArray.get(idx)); + } + } + + /** + * This is an O(n) operation, and will iterate over all the elements in the underlying long array + * to determine cardinality of the set. + * @return number of set bits in the bitset. + */ + public long cardinality() { + long tot = 0; + for (int i = 0; i < underlyingArrayLength; ++i) { + tot += Long.bitCount(longArray.get(i)); + } + return tot; + } + + /** + * Retrieves whether the bit is set or not at the given index. + * @param index the index to look up for the bit + * @return true if bit is set, false otherwise + */ + public boolean get(long index) { + long i = index >> 6; // div 64 + long val = longArray.get(i); + long bitmask = 1L << index; + return (val & bitmask) != 0; + } + + /** + * Sets the bit at the given index. + * @param index the index to set the bit at. + */ + public void set(long index) { + long wordNum = index >> 6; // div 64 + long bitmask = 1L << index; + long val = longArray.get(wordNum); + longArray.set(wordNum, val | bitmask); + } + + @Override + public long ramBytesUsed() { + return 128L + longArray.ramBytesUsed(); + } + + @Override + public void close() throws IOException { + IOUtils.close(longArray); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java new file mode 100644 index 0000000000000..7aeac68cd192a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** classes responsible for handling all fuzzy codecs and operations */ +package org.opensearch.index.codec.fuzzy; diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index d4a97f0267222..34aecfc62b8b2 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.ReplicationStats; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; @@ -95,7 +96,8 @@ public class SegmentsStats implements Writeable, ToXContentFragment { Map.entry("tvx", "Term Vector Index"), Map.entry("tvd", "Term Vector Documents"), Map.entry("tvf", "Term Vector Fields"), - Map.entry("liv", "Live Documents") + Map.entry("liv", "Live Documents"), + Map.entry(FuzzyFilterPostingsFormat.FUZZY_FILTER_FILE_EXTENSION, "Fuzzy Filter") ); public SegmentsStats() { diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat index 2c92f0ecd3f51..80b1d25064885 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat @@ -1 +1,2 @@ org.apache.lucene.search.suggest.document.Completion50PostingsFormat +org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java new file mode 100644 index 0000000000000..92669d5bc1d92 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class BloomFilterTests extends OpenSearchTestCase { + + public void testBloomFilterSerializationDeserialization() throws IOException { + int elementCount = randomIntBetween(1, 100); + long maxDocs = elementCount * 10L; // Keeping this high so that it ensures some bits are not set. + BloomFilter filter = new BloomFilter(maxDocs, getFpp(), () -> idIterator(elementCount)); + byte[] buffer = new byte[(int) maxDocs * 5]; + ByteArrayDataOutput out = new ByteArrayDataOutput(buffer); + + // Write in the format readable through factory + out.writeString(filter.setType().getSetName()); + filter.writeTo(out); + + FuzzySet reconstructedFilter = FuzzySetFactory.deserializeFuzzySet(new ByteArrayIndexInput("filter", buffer)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, reconstructedFilter.setType()); + + Iterator idIterator = idIterator(elementCount); + while (idIterator.hasNext()) { + BytesRef element = idIterator.next(); + assertEquals(FuzzySet.Result.MAYBE, reconstructedFilter.contains(element)); + assertEquals(FuzzySet.Result.MAYBE, filter.contains(element)); + } + } + + public void testBloomFilterIsSaturated_returnsTrue() throws IOException { + BloomFilter bloomFilter = new BloomFilter(1L, getFpp(), () -> idIterator(1000)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(true, bloomFilter.isSaturated()); + } + + public void testBloomFilterIsSaturated_returnsFalse() throws IOException { + int elementCount = randomIntBetween(1, 100); + BloomFilter bloomFilter = new BloomFilter(20000, getFpp(), () -> idIterator(elementCount)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(false, bloomFilter.isSaturated()); + } + + public void testBloomFilterWithLargeCapacity() throws IOException { + long maxDocs = randomLongBetween(Integer.MAX_VALUE, 5L * Integer.MAX_VALUE); + BloomFilter bloomFilter = new BloomFilter(maxDocs, getFpp(), () -> List.of(new BytesRef("bar")).iterator()); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + } + + private double getFpp() { + return randomDoubleBetween(0.01, 0.50, true); + } + + private Iterator idIterator(int count) { + return new Iterator() { + int cnt = count; + + @Override + public boolean hasNext() { + return cnt-- > 0; + } + + @Override + public BytesRef next() { + return new BytesRef(Integer.toString(cnt)); + } + }; + } +} diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java new file mode 100644 index 0000000000000..868c2175d0689 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.tests.index.BasePostingsFormatTestCase; +import org.apache.lucene.tests.util.TestUtil; + +import java.util.TreeMap; + +public class FuzzyFilterPostingsFormatTests extends BasePostingsFormatTestCase { + + private TreeMap params = new TreeMap<>() { + @Override + public FuzzySetParameters get(Object k) { + return new FuzzySetParameters(() -> FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY); + } + }; + + private Codec fuzzyFilterCodec = TestUtil.alwaysPostingsFormat( + new FuzzyFilterPostingsFormat(TestUtil.getDefaultPostingsFormat(), new FuzzySetFactory(params)) + ); + + @Override + protected Codec getCodec() { + return fuzzyFilterCodec; + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 72d68ffb9449d..0f9ab3aa3d64f 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -197,6 +197,8 @@ import static org.opensearch.core.common.util.CollectionUtils.eagerPartition; import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.opensearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING; import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.test.XContentTestUtils.convertToMap; @@ -630,6 +632,11 @@ public Settings indexSettings() { ); } + if (randomBoolean()) { + builder.put(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING.getKey(), true); + builder.put(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING.getKey(), randomDoubleBetween(0.01, 0.50, true)); + } + return builder.build(); } @@ -646,6 +653,9 @@ protected Settings featureFlagSettings() { } // Enabling Telemetry setting by default featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), true); + + // Enabling fuzzy set for tests by default + featureSettings.put(FeatureFlags.DOC_ID_FUZZY_SET_SETTING.getKey(), true); return featureSettings.build(); }