diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 78d8796c624d7..f3a9aa2787a80 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -30,4 +30,5 @@ BWC_VERSION: - "2.12.0" - "2.12.1" - "2.13.0" + - "2.13.1" - "2.14.0" diff --git a/CHANGELOG.md b/CHANGELOG.md index fffbe51a46b5e..170153d9afe82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044)) - Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) - [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704)) +- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) @@ -133,12 +134,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Deprecated ### Removed +- Remove handling of index.mapper.dynamic in AutoCreateIndex([#13067](https://github.com/opensearch-project/OpenSearch/pull/13067)) ### Fixed - Fix bulk API ignores ingest pipeline for upsert ([#12883](https://github.com/opensearch-project/OpenSearch/pull/12883)) - Fix issue with feature flags where default value may not be honored ([#12849](https://github.com/opensearch-project/OpenSearch/pull/12849)) - Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048)) - Fix IndicesRequestCache Stale calculation to be accurate. ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)] +- Client with Java 8 runtime and Apache HttpClient 5 Transport fails with java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer ([#13100](https://github.com/opensearch-project/opensearch-java/pull/13100)) ### Security diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4a1162cf2558b..f5494925dcf50 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,18 +1,18 @@ - [Contributing to OpenSearch](#contributing-to-opensearch) - - [First Things First](#first-things-first) - - [Ways to Contribute](#ways-to-contribute) +- [First Things First](#first-things-first) +- [Ways to Contribute](#ways-to-contribute) - [Bug Reports](#bug-reports) - [Feature Requests](#feature-requests) - [Documentation Changes](#documentation-changes) - [Contributing Code](#contributing-code) - - [Developer Certificate of Origin](#developer-certificate-of-origin) - - [Changelog](#changelog) - - [Review Process](#review-process) - - [Troubleshooting Failing Builds](#troubleshooting-failing-builds) +- [Developer Certificate of Origin](#developer-certificate-of-origin) +- [Changelog](#changelog) +- [Review Process](#review-process) + - [Tips for Success](#tips) # Contributing to OpenSearch -OpenSearch is a community project that is built and maintained by people just like **you**. We're glad you're interested in helping out. There are several different ways you can do it, but before we talk about that, let's talk about how to get started. +OpenSearch is a community project built and maintained by people just like **you**. We're glad you're interested in helping out. There are several different ways you can do it, but before we talk about that, let's talk about how to get started. ## First Things First @@ -30,9 +30,9 @@ Ugh! Bugs! A bug is when software behaves in a way that you didn't expect and the developer didn't intend. To help us understand what's going on, we first want to make sure you're working from the latest version. Please make sure you're testing against the [latest version](https://github.com/opensearch-project/OpenSearch). -Once you've confirmed that the bug still exists in the latest version, you'll want to check to make sure it's not something we already know about on the [open issues GitHub page](https://github.com/opensearch-project/OpenSearch/issues). +Once you've confirmed that the bug still exists in the latest version, you'll want to check the bug is not something we already know about. A good way to figure this out is to search for your bug on the [open issues GitHub page](https://github.com/opensearch-project/OpenSearch/issues). -If you've upgraded to the latest version and you can't find it in our open issues list, then you'll need to tell us how to reproduce it. To make the behavior as clear as possible, please provided your steps as `curl` commands which we can copy and paste into a terminal to run it locally, for example: +If you've upgraded to the latest version and you can't find it in our open issues list, then you'll need to tell us how to reproduce it. To make the behavior as clear as possible, please provide your steps as `curl` commands which we can copy and paste into a terminal to run it locally, for example: ```sh # delete the index @@ -47,11 +47,11 @@ curl -x PUT localhost:9200/test/test/1 -d '{ curl .... ``` -Provide as much information as you can. You may think that the problem lies with your query, when actually it depends on how your data is indexed. The easier it is for us to recreate your problem, the faster it is likely to be fixed. +Provide as much information as you can. You may think that the problem lies with your query, when actually it depends on how your data is indexed. The easier it is for us to recreate your problem, the faster it is likely to be fixed. It is generally always helpful to provide the basic details of your cluster configuration alongside your reproduction steps. ### Feature Requests -If you've thought of a way that OpenSearch could be better, we want to hear about it. We track feature requests using GitHub, so please feel free to open an issue which describes the feature you would like to see, why you need it, and how it should work. +If you've thought of a way that OpenSearch could be better, we want to hear about it. We track feature requests using GitHub, so please feel free to open an issue which describes the feature you would like to see, why you need it, and how it should work. After opening an issue, the fastest way to see your change made is to open a pull request following the requested changes you detailed in your issue. You can learn more about opening a pull request in the [contributing code section](#contributing-code). ### Documentation Changes @@ -164,13 +164,19 @@ If we accept the PR, a [maintainer](MAINTAINERS.md) will merge your change and u If we reject the PR, we will close the pull request with a comment explaining why. This decision isn't always final: if you feel we have misunderstood your intended change or otherwise think that we should reconsider then please continue the conversation with a comment on the PR and we'll do our best to address any further points you raise. -## Troubleshooting Failing Builds +### Tips for Success (#tips) -The OpenSearch testing framework offers many capabilities but exhibits significant complexity (it does lot of randomization internally to cover as many edge cases and variations as possible). Unfortunately, this posses a challenge by making it harder to discover important issues/bugs in straightforward way and may lead to so called flaky tests - the tests which flip randomly from success to failure without any code changes. +We have a lot of mechanisms to help expedite towards an accepted PR. Here are some tips for success: +1. *Minimize BWC guarantees*: The first PR review cycle heavily focuses on the public facing APIs. This is what we have to "guarantee" as non-breaking for [bwc across major versions](./DEVELOPER_GUIDE.md#backwards-compatibility). +2. *Do not copy non-compliant code*: Ensure that code is APLv2 compatible. This means that you have not copied any code from other sources unless that code is also APLv2 compatible. +3. *Utilize feature flags*: Features that are safeguarded behind feature flags are more likely to be merged and backported, as they come with an additional layer of protection. Refer to this [example PR](https://github.com/opensearch-project/OpenSearch/pull/4959) for implementation details. +4. *Use appropriate Java tags*: + - `@opensearch.internal`: Marks internal classes subject to rapid changes. + - `@opensearch.api`: Marks public-facing API classes with backward compatibility guarantees. + - `@opensearch.experimental`: Indicates rapidly changing [experimental code](./DEVELOPER_GUIDE.md#experimental-development). +5. *Employ sandbox for significant core changes*: Any new features or enhancements that make changes to core classes (e.g., search phases, codecs, or specialized lucene APIs) are more likely to. be merged if they are sandboxed. This can only be enabled on the java CLI (`-Dsandbox.enabled=true`). +6. *Micro-benchmark critical path*: This is a lesser known mechanism, but if you have critical path changes you're afraid will impact performance (the changes touch the garbage collector, heap, direct memory, or CPU) then including a [microbenchmark](https://github.com/opensearch-project/OpenSearch/tree/main/benchmarks) with your PR (and jfr or flamegraph results in the description) is a *GREAT IDEA* and will help expedite the review process. +7. *Test rigorously*: Ensure thorough testing ([OpenSearchTestCase](./test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java) for unit tests, [OpenSearchIntegTestCase](./test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java) for integration & cluster tests, [OpenSearchRestTestCase](./test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java) for testing REST endpoint interfaces, and yaml tests with [ClientYamlTestSuiteIT](./rest-api-spec/src/yamlRestTest/java/org/opensearch/test/rest/ClientYamlTestSuiteIT.java) for REST integration tests) + +In general, adding more guardrails to your changes increases the likelihood of swift PR acceptance. We can always relax these guard rails in smaller followup PRs. Reverting a GA feature is much more difficult. Check out the [DEVELOPER_GUIDE](./DEVELOPER_GUIDE.md#submitting-changes) for more useful tips. -If your pull request reports a failing test(s) on one of the checks, please: - - look if there is an existing [issue](https://github.com/opensearch-project/OpenSearch/issues) reported for the test in question - - if not, please make sure this is not caused by your changes, run the failing test(s) locally for some time - - if you are sure the failure is not related, please open a new [bug](https://github.com/opensearch-project/OpenSearch/issues/new?assignees=&labels=bug%2C+untriaged&projects=&template=bug_template.md&title=%5BBUG%5D) with `flaky-test` label - - add a comment referencing the issue(s) or bug report(s) to your pull request explaining the failing build(s) - - as a bonus point, try to contribute by fixing the flaky test(s) diff --git a/client/rest/src/main/java/org/opensearch/client/nio/HttpEntityAsyncEntityProducer.java b/client/rest/src/main/java/org/opensearch/client/nio/HttpEntityAsyncEntityProducer.java index 81fe77ddcfbed..4e6fd6f3d6f9d 100644 --- a/client/rest/src/main/java/org/opensearch/client/nio/HttpEntityAsyncEntityProducer.java +++ b/client/rest/src/main/java/org/opensearch/client/nio/HttpEntityAsyncEntityProducer.java @@ -16,6 +16,7 @@ import org.apache.hc.core5.util.Asserts; import java.io.IOException; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -141,7 +142,7 @@ public void produce(final DataStreamChannel channel) throws IOException { } } if (byteBuffer.position() > 0) { - byteBuffer.flip(); + ((Buffer) byteBuffer).flip(); channel.write(byteBuffer); byteBuffer.compact(); } diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index 56df46ae94d44..f312c484a4842 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -101,6 +101,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_9_2); public static final Version V_2_12_1 = new Version(2120199, org.apache.lucene.util.Version.LUCENE_9_9_2); public static final Version V_2_13_0 = new Version(2130099, org.apache.lucene.util.Version.LUCENE_9_10_0); + public static final Version V_2_13_1 = new Version(2130199, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_14_0 = new Version(2140099, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_11_0); public static final Version CURRENT = V_3_0_0; diff --git a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java index c7638b3c41c63..55dc23f665d2e 100644 --- a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java +++ b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java @@ -60,6 +60,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; +import org.opensearch.script.DerivedFieldScript; import org.opensearch.script.IngestScript; import org.opensearch.script.ScoreScript; import org.opensearch.script.ScriptContext; @@ -108,6 +109,11 @@ public final class PainlessModulePlugin extends Plugin implements ScriptPlugin, ingest.add(AllowlistLoader.loadFromResourceFiles(Allowlist.class, "org.opensearch.ingest.txt")); map.put(IngestScript.CONTEXT, ingest); + // Functions available to derived fields + List derived = new ArrayList<>(Allowlist.BASE_ALLOWLISTS); + derived.add(AllowlistLoader.loadFromResourceFiles(Allowlist.class, "org.opensearch.derived.txt")); + map.put(DerivedFieldScript.CONTEXT, derived); + allowlists = map; } diff --git a/modules/lang-painless/src/main/resources/org/opensearch/painless/spi/org.opensearch.derived.txt b/modules/lang-painless/src/main/resources/org/opensearch/painless/spi/org.opensearch.derived.txt new file mode 100644 index 0000000000000..9a3dd4894b286 --- /dev/null +++ b/modules/lang-painless/src/main/resources/org/opensearch/painless/spi/org.opensearch.derived.txt @@ -0,0 +1,17 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +# This file contains an allowlist for functions to be used in derived field context + +class org.opensearch.script.DerivedFieldScript @no_import { +} + +static_import { + void emit(org.opensearch.script.DerivedFieldScript, Object) bound_to org.opensearch.script.ScriptEmitValues$EmitSingle + void emit(org.opensearch.script.DerivedFieldScript, double, double) bound_to org.opensearch.script.ScriptEmitValues$GeoPoint +} diff --git a/modules/lang-painless/src/test/java/org/opensearch/painless/DerivedFieldScriptTests.java b/modules/lang-painless/src/test/java/org/opensearch/painless/DerivedFieldScriptTests.java new file mode 100644 index 0000000000000..2340e5b238ebb --- /dev/null +++ b/modules/lang-painless/src/test/java/org/opensearch/painless/DerivedFieldScriptTests.java @@ -0,0 +1,227 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.painless; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.memory.MemoryIndex; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.geo.GeoPoint; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.fielddata.IndexGeoPointFieldData; +import org.opensearch.index.fielddata.IndexNumericFieldData; +import org.opensearch.index.fielddata.LeafGeoPointFieldData; +import org.opensearch.index.fielddata.LeafNumericFieldData; +import org.opensearch.index.fielddata.MultiGeoPointValues; +import org.opensearch.index.fielddata.SortedNumericDoubleValues; +import org.opensearch.index.fielddata.plain.AbstractLeafGeoPointFieldData; +import org.opensearch.index.fielddata.plain.LeafDoubleFieldData; +import org.opensearch.index.mapper.GeoPointFieldMapper.GeoPointFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.NumberFieldMapper.NumberFieldType; +import org.opensearch.index.mapper.NumberFieldMapper.NumberType; +import org.opensearch.painless.spi.Allowlist; +import org.opensearch.painless.spi.AllowlistLoader; +import org.opensearch.script.DerivedFieldScript; +import org.opensearch.script.ScriptContext; +import org.opensearch.script.ScriptException; +import org.opensearch.search.lookup.LeafSearchLookup; +import org.opensearch.search.lookup.SearchLookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DerivedFieldScriptTests extends ScriptTestCase { + + private static PainlessScriptEngine SCRIPT_ENGINE; + + @Override + public void setUp() throws Exception { + super.setUp(); + + // Adding derived field script to the contexts for the script engine + Map, List> contexts = newDefaultContexts(); + List allowlists = new ArrayList<>(Allowlist.BASE_ALLOWLISTS); + allowlists.add(AllowlistLoader.loadFromResourceFiles(Allowlist.class, "org.opensearch.derived.txt")); + contexts.put(DerivedFieldScript.CONTEXT, allowlists); + + SCRIPT_ENGINE = new PainlessScriptEngine(Settings.EMPTY, contexts); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + SCRIPT_ENGINE = null; + } + + @Override + protected PainlessScriptEngine getEngine() { + return SCRIPT_ENGINE; + } + + private DerivedFieldScript.LeafFactory compile(String expression, SearchLookup lookup) { + DerivedFieldScript.Factory factory = SCRIPT_ENGINE.compile( + "derived_script_test", + expression, + DerivedFieldScript.CONTEXT, + Collections.emptyMap() + ); + return factory.newFactory(Collections.emptyMap(), lookup); + } + + public void testEmittingDoubleField() throws IOException { + // Mocking field value to be returned + NumberFieldType fieldType = new NumberFieldType("test_double_field", NumberType.DOUBLE); + MapperService mapperService = mock(MapperService.class); + when(mapperService.fieldType("test_double_field")).thenReturn(fieldType); + + SortedNumericDoubleValues doubleValues = mock(SortedNumericDoubleValues.class); + when(doubleValues.docValueCount()).thenReturn(1); + when(doubleValues.advanceExact(anyInt())).thenReturn(true); + when(doubleValues.nextValue()).thenReturn(2.718); + + LeafNumericFieldData atomicFieldData = mock(LeafDoubleFieldData.class); // SortedNumericDoubleFieldData + when(atomicFieldData.getDoubleValues()).thenReturn(doubleValues); + + IndexNumericFieldData fieldData = mock(IndexNumericFieldData.class); // SortedNumericIndexFieldData + when(fieldData.getFieldName()).thenReturn("test_double_field"); + when(fieldData.load(any())).thenReturn(atomicFieldData); + + SearchLookup lookup = new SearchLookup(mapperService, (ignored, searchLookup) -> fieldData); + + // We don't need a real index, just need to construct a LeafReaderContext which cannot be mocked + MemoryIndex index = new MemoryIndex(); + LeafReaderContext leafReaderContext = index.createSearcher().getIndexReader().leaves().get(0); + + // Execute the script + DerivedFieldScript script = compile("emit(doc['test_double_field'].value)", lookup).newInstance(leafReaderContext); + script.setDocument(1); + script.execute(); + + List result = script.getEmittedValues(); + assertEquals(List.of(2.718), result); + } + + public void testEmittingGeoPoint() throws IOException { + // Mocking field value to be returned + GeoPointFieldType fieldType = new GeoPointFieldType("test_geo_field"); + MapperService mapperService = mock(MapperService.class); + when(mapperService.fieldType("test_geo_field")).thenReturn(fieldType); + + MultiGeoPointValues geoPointValues = mock(MultiGeoPointValues.class); + when(geoPointValues.docValueCount()).thenReturn(1); + when(geoPointValues.advanceExact(anyInt())).thenReturn(true); + when(geoPointValues.nextValue()).thenReturn(new GeoPoint(5, 8)); + + LeafGeoPointFieldData atomicFieldData = mock(AbstractLeafGeoPointFieldData.class); // LatLonPointDVLeafFieldData + when(atomicFieldData.getGeoPointValues()).thenReturn(geoPointValues); + + IndexGeoPointFieldData fieldData = mock(IndexGeoPointFieldData.class); + when(fieldData.getFieldName()).thenReturn("test_geo_field"); + when(fieldData.load(any())).thenReturn(atomicFieldData); + + SearchLookup lookup = new SearchLookup(mapperService, (ignored, searchLookup) -> fieldData); + + // We don't need a real index, just need to construct a LeafReaderContext which cannot be mocked + MemoryIndex index = new MemoryIndex(); + LeafReaderContext leafReaderContext = index.createSearcher().getIndexReader().leaves().get(0); + + // Execute the script + DerivedFieldScript script = compile("emit(doc['test_geo_field'].value.getLat(), doc['test_geo_field'].value.getLon())", lookup) + .newInstance(leafReaderContext); + script.setDocument(1); + script.execute(); + + List result = script.getEmittedValues(); + assertEquals(List.of(new Tuple<>(5.0, 8.0)), result); + } + + public void testEmittingMultipleValues() throws IOException { + SearchLookup lookup = mock(SearchLookup.class); + + // We don't need a real index, just need to construct a LeafReaderContext which cannot be mocked + MemoryIndex index = new MemoryIndex(); + LeafReaderContext leafReaderContext = index.createSearcher().getIndexReader().leaves().get(0); + + LeafSearchLookup leafSearchLookup = mock(LeafSearchLookup.class); + when(lookup.getLeafSearchLookup(leafReaderContext)).thenReturn(leafSearchLookup); + + // Execute the script + DerivedFieldScript script = compile( + "def l = new ArrayList(); l.add('test'); l.add('multiple'); l.add('values'); for (String x : l) emit(x)", + lookup + ).newInstance(leafReaderContext); + script.setDocument(1); + script.execute(); + + List result = script.getEmittedValues(); + assertEquals(List.of("test", "multiple", "values"), result); + } + + public void testExceedingByteSizeLimit() throws IOException { + SearchLookup lookup = mock(SearchLookup.class); + + // We don't need a real index, just need to construct a LeafReaderContext which cannot be mocked + MemoryIndex index = new MemoryIndex(); + LeafReaderContext leafReaderContext = index.createSearcher().getIndexReader().leaves().get(0); + + LeafSearchLookup leafSearchLookup = mock(LeafSearchLookup.class); + when(lookup.getLeafSearchLookup(leafReaderContext)).thenReturn(leafSearchLookup); + + // Emitting a large string to exceed the byte size limit + DerivedFieldScript stringScript = compile("for (int i = 0; i < 1024 * 1024; i++) emit('a' + i);", lookup).newInstance( + leafReaderContext + ); + expectThrows(ScriptException.class, () -> { + stringScript.setDocument(1); + stringScript.execute(); + }); + + // Emitting an integer to check byte size limit + DerivedFieldScript intScript = compile("for (int i = 0; i < 1024 * 1024; i++) emit(42)", lookup).newInstance(leafReaderContext); + expectThrows(ScriptException.class, "Expected IllegalStateException for exceeding byte size limit", () -> { + intScript.setDocument(1); + intScript.execute(); + }); + + // Emitting a long to check byte size limit + DerivedFieldScript longScript = compile("for (int i = 0; i < 1024 * 1024; i++) emit(1234567890123456789L)", lookup).newInstance( + leafReaderContext + ); + expectThrows(ScriptException.class, "Expected IllegalStateException for exceeding byte size limit", () -> { + longScript.setDocument(1); + longScript.execute(); + }); + + // Emitting a double to check byte size limit + DerivedFieldScript doubleScript = compile("for (int i = 0; i < 1024 * 1024; i++) emit(3.14159)", lookup).newInstance( + leafReaderContext + ); + expectThrows(ScriptException.class, "Expected IllegalStateException for exceeding byte size limit", () -> { + doubleScript.setDocument(1); + doubleScript.execute(); + }); + + // Emitting a GeoPoint to check byte size limit + DerivedFieldScript geoPointScript = compile("for (int i = 0; i < 1024 * 1024; i++) emit(1.23, 4.56);", lookup).newInstance( + leafReaderContext + ); + expectThrows(ScriptException.class, "Expected IllegalStateException for exceeding byte size limit", () -> { + geoPointScript.setDocument(1); + geoPointScript.execute(); + }); + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 25f361b40636e..1f23a09a047f2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); + listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build()); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..9e830c409a58b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -30,7 +30,7 @@ import org.apache.lucene.store.IndexInput; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.StreamContextSupplier; @@ -49,6 +49,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import java.io.IOException; @@ -466,24 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> { + + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { assertTrue(uploadSuccess); if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -516,24 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th countDownLatch.countDown(); }); List openInputStreams = new ArrayList<>(); - blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }, blobSize, false, WritePriority.NORMAL, uploadSuccess -> { + + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { assertTrue(uploadSuccess); if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -632,20 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t List openInputStreams = new ArrayList<>(); final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); - s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.HIGH) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + s3BlobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { assertNotNull(exceptionRef.get()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..8e25ba4d950ef 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -37,7 +37,6 @@ import org.apache.http.HttpStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.Nullable; import org.opensearch.common.StreamContext; import org.opensearch.common.SuppressForbidden; @@ -332,22 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception { exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener); + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); - assertThat(countDown.isCountedDown(), is(true)); openInputStreams.forEach(inputStream -> { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java new file mode 100644 index 0000000000000..0548ce4a7955f --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java @@ -0,0 +1,208 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.shrink.ResizeType; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class ResizeIndexMigrationTestCase extends MigrationBaseTestCase { + private static final String TEST_INDEX = "test_index"; + private final static String REMOTE_STORE_DIRECTION = "remote_store"; + private final static String DOC_REP_DIRECTION = "docrep"; + private final static String NONE_DIRECTION = "none"; + private final static String STRICT_MODE = "strict"; + private final static String MIXED_MODE = "mixed"; + + /* + * This test will verify the resize request failure, when cluster mode is mixed + * and index is on DocRep node, and migration to remote store is in progress. + * */ + public void testFailResizeIndexWhileDocRepToRemoteStoreMigration() throws Exception { + + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List cmNodes = internalCluster().startNodes(1); + Client client = internalCluster().client(cmNodes.get(0)); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), MIXED_MODE)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // Adding a non remote and a remote node + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + + logger.info("-->Create index on non-remote node and SETTING_REMOTE_STORE_ENABLED is false. Resize should not happen"); + Settings.Builder builder = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + builder.put("index.number_of_shards", 10) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", nonRemoteNodeName) + .put("index.routing.allocation.exclude._name", remoteNodeName) + ) + .setWaitForActiveShards(ActiveShardCount.ALL) + .execute() + .actionGet(); + + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), REMOTE_STORE_DIRECTION)); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + ResizeType resizeType; + int resizeShardsNum; + String cause; + switch (randomIntBetween(0, 2)) { + case 0: + resizeType = ResizeType.SHRINK; + resizeShardsNum = 5; + cause = "shrink_index"; + break; + case 1: + resizeType = ResizeType.SPLIT; + resizeShardsNum = 20; + cause = "split_index"; + break; + default: + resizeType = ResizeType.CLONE; + resizeShardsNum = 10; + cause = "clone_index"; + } + + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put("index.blocks.write", true)) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + Settings.Builder resizeSettingsBuilder = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", resizeShardsNum) + .putNull("index.blocks.write"); + + IllegalStateException ex = expectThrows( + IllegalStateException.class, + () -> client().admin() + .indices() + .prepareResizeIndex(TEST_INDEX, "first_split") + .setResizeType(resizeType) + .setSettings(resizeSettingsBuilder.build()) + .get() + ); + assertEquals( + ex.getMessage(), + "Index " + resizeType + " is not allowed as remote migration mode is mixed" + " and index is remote store disabled" + ); + } + + /* + * This test will verify the resize request failure, when cluster mode is mixed + * and index is on Remote Store node, and migration to DocRep node is in progress. + * */ + public void testFailResizeIndexWhileRemoteStoreToDocRepMigration() throws Exception { + + addRemote = true; + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List cmNodes = internalCluster().startNodes(1); + Client client = internalCluster().client(cmNodes.get(0)); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), MIXED_MODE)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // Adding a non remote and a remote node + String remoteNodeName = internalCluster().startNode(); + + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + + logger.info("-->Create index on remote node and SETTING_REMOTE_STORE_ENABLED is true. Resize should not happen"); + Settings.Builder builder = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + builder.put("index.number_of_shards", 10) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", remoteNodeName) + .put("index.routing.allocation.exclude._name", nonRemoteNodeName) + ) + .setWaitForActiveShards(ActiveShardCount.ALL) + .execute() + .actionGet(); + + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), DOC_REP_DIRECTION)); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + ResizeType resizeType; + int resizeShardsNum; + String cause; + switch (randomIntBetween(0, 2)) { + case 0: + resizeType = ResizeType.SHRINK; + resizeShardsNum = 5; + cause = "shrink_index"; + break; + case 1: + resizeType = ResizeType.SPLIT; + resizeShardsNum = 20; + cause = "split_index"; + break; + default: + resizeType = ResizeType.CLONE; + resizeShardsNum = 10; + cause = "clone_index"; + } + + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings(Settings.builder().put("index.blocks.write", true)) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + Settings.Builder resizeSettingsBuilder = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", resizeShardsNum) + .putNull("index.blocks.write"); + + IllegalStateException ex = expectThrows( + IllegalStateException.class, + () -> client().admin() + .indices() + .prepareResizeIndex(TEST_INDEX, "first_split") + .setResizeType(resizeType) + .setSettings(resizeSettingsBuilder.build()) + .get() + ); + assertEquals( + ex.getMessage(), + "Index " + resizeType + " is not allowed as remote migration mode is mixed" + " and index is remote store enabled" + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 181f242aecd09..ec98d5ff531cb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -20,6 +20,7 @@ import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Nullable; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; @@ -47,6 +48,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -284,7 +286,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() { indexDocuments(client, indexName1, randomIntBetween(5, 10)); ensureGreen(indexName1); - validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A); + validatePathType(indexName1, PathType.FIXED); logger.info("--> snapshot"); SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1))); @@ -301,7 +303,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() { .get(); assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status()); ensureGreen(restoredIndexName1version1); - validatePathType(restoredIndexName1version1, PathType.FIXED, PathHashAlgorithm.FNV_1A); + validatePathType(restoredIndexName1version1, PathType.FIXED); client(clusterManagerNode).admin() .cluster() @@ -327,16 +329,50 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() { validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A); // Validating that custom data has not changed for indexes which were created before the cluster setting got updated - validatePathType(indexName1, PathType.FIXED, PathHashAlgorithm.FNV_1A); + validatePathType(indexName1, PathType.FIXED); + + // Create Snapshot of index 2 + String snapshotName2 = "test-restore-snapshot2"; + snapshotInfo = createSnapshot(snapshotRepoName, snapshotName2, new ArrayList<>(List.of(indexName2))); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertTrue(snapshotInfo.successfulShards() > 0); + assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); + + // Update cluster settings to FIXED + client(clusterManagerNode).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED)) + .get(); + + // Close index 2 + assertAcked(client().admin().indices().prepareClose(indexName2)); + restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName2) + .setWaitForCompletion(false) + .setIndices(indexName2) + .get(); + assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status()); + ensureGreen(indexName2); + + // Validating that custom data has not changed for testindex2 which was created before the cluster setting got updated + validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A); } - private void validatePathType(String index, PathType pathType, PathHashAlgorithm pathHashAlgorithm) { + private void validatePathType(String index, PathType pathType) { + validatePathType(index, pathType, null); + } + + private void validatePathType(String index, PathType pathType, @Nullable PathHashAlgorithm pathHashAlgorithm) { ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); // Validate that the remote_store custom data is present in index metadata for the created index. Map remoteCustomData = state.metadata().index(index).getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); assertNotNull(remoteCustomData); assertEquals(pathType.name(), remoteCustomData.get(PathType.NAME)); - assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME)); + if (Objects.nonNull(pathHashAlgorithm)) { + assertEquals(pathHashAlgorithm.name(), remoteCustomData.get(PathHashAlgorithm.NAME)); + } } public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index ba90cbe96e157..d7ad0daa43524 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -28,7 +28,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; -import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; @@ -57,11 +56,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -191,121 +187,6 @@ protected BulkResponse indexBulk(String indexName, int numDocs) { return client().bulk(bulkRequest).actionGet(); } - public static Settings remoteStoreClusterSettings(String name, Path path) { - return remoteStoreClusterSettings(name, path, name, path); - } - - public static Settings remoteStoreClusterSettings( - String segmentRepoName, - Path segmentRepoPath, - String segmentRepoType, - String translogRepoName, - Path translogRepoPath, - String translogRepoType - ) { - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put( - buildRemoteStoreNodeAttributes( - segmentRepoName, - segmentRepoPath, - segmentRepoType, - translogRepoName, - translogRepoPath, - translogRepoType, - false - ) - ); - return settingsBuilder.build(); - } - - public static Settings remoteStoreClusterSettings( - String segmentRepoName, - Path segmentRepoPath, - String translogRepoName, - Path translogRepoPath - ) { - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(buildRemoteStoreNodeAttributes(segmentRepoName, segmentRepoPath, translogRepoName, translogRepoPath, false)); - return settingsBuilder.build(); - } - - public static Settings buildRemoteStoreNodeAttributes( - String segmentRepoName, - Path segmentRepoPath, - String translogRepoName, - Path translogRepoPath, - boolean withRateLimiterAttributes - ) { - return buildRemoteStoreNodeAttributes( - segmentRepoName, - segmentRepoPath, - ReloadableFsRepository.TYPE, - translogRepoName, - translogRepoPath, - ReloadableFsRepository.TYPE, - withRateLimiterAttributes - ); - } - - public static Settings buildRemoteStoreNodeAttributes( - String segmentRepoName, - Path segmentRepoPath, - String segmentRepoType, - String translogRepoName, - Path translogRepoPath, - String translogRepoType, - boolean withRateLimiterAttributes - ) { - String segmentRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - segmentRepoName - ); - String segmentRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - segmentRepoName - ); - String translogRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - translogRepoName - ); - String translogRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - translogRepoName - ); - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - segmentRepoName - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - segmentRepoName - ); - - Settings.Builder settings = Settings.builder() - .put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName) - .put(segmentRepoTypeAttributeKey, segmentRepoType) - .put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) - .put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName) - .put(translogRepoTypeAttributeKey, translogRepoType) - .put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath) - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName) - .put(stateRepoTypeAttributeKey, segmentRepoType) - .put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath); - - if (withRateLimiterAttributes) { - settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean()) - .put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES); - } - - return settings.build(); - } - Settings defaultIndexSettings() { return Settings.builder() .put(super.indexSettings()) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..d45b4e3deb798 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); + ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java index ca4c16935c2b9..cb41325c18a22 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; @@ -57,6 +58,9 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.DocsStats; import org.opensearch.index.store.StoreStats; +import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; +import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -67,6 +71,7 @@ import java.util.function.IntFunction; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; /** * Main class to initiate resizing (shrink / split) an index into a new index @@ -140,8 +145,8 @@ protected void clusterManagerOperation( // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); - IndexMetadata indexMetadata = state.metadata().index(sourceIndex); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); if (resizeRequest.getResizeType().equals(ResizeType.SHRINK) && state.metadata().isSegmentReplicationEnabled(sourceIndex) && indexMetadata != null @@ -161,7 +166,7 @@ protected void clusterManagerOperation( CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); - }, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex); + }, indicesStatsResponse.getPrimaries().store, clusterSettings, sourceIndex, targetIndex); if (indicesStatsResponse.getIndex(sourceIndex) .getTotal() @@ -200,7 +205,7 @@ protected void clusterManagerOperation( CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); - }, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex); + }, indicesStatsResponse.getPrimaries().store, clusterSettings, sourceIndex, targetIndex); createIndexService.createIndex( updateRequest, ActionListener.map( @@ -223,6 +228,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( final ClusterState state, final IntFunction perShardDocStats, final StoreStats primaryShardsStoreStats, + final ClusterSettings clusterSettings, String sourceIndexName, String targetIndexName ) { @@ -231,6 +237,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( if (metadata == null) { throw new IndexNotFoundException(sourceIndexName); } + validateRemoteMigrationModeSettings(resizeRequest.getResizeType(), metadata, clusterSettings); final Settings.Builder targetIndexSettingsBuilder = Settings.builder() .put(targetIndex.settings()) .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX); @@ -368,4 +375,39 @@ protected static int calculateTargetIndexShardsNum( protected String getClusterManagerActionName(DiscoveryNode node) { return super.getClusterManagerActionName(node); } + + /** + * Reject resize request if cluster mode is [Mixed] and migration direction is [RemoteStore] and index is not on + * REMOTE_STORE_ENABLED node or [DocRep] and index is on REMOTE_STORE_ENABLED node. + * @param type resize type + * @param sourceIndexMetadata source index's metadata + * @param clusterSettings cluster settings + * @throws IllegalStateException if cluster mode is [Mixed] and migration direction is [RemoteStore] or [DocRep] and + * index's SETTING_REMOTE_STORE_ENABLED is not equal to the migration direction's value. + * For example, if migration direction is [RemoteStore] and index's SETTING_REMOTE_STORE_ENABLED + * is false, then throw IllegalStateException. If migration direction is [DocRep] and + * index's SETTING_REMOTE_STORE_ENABLED is true, then throw IllegalStateException. + */ + private static void validateRemoteMigrationModeSettings( + final ResizeType type, + IndexMetadata sourceIndexMetadata, + ClusterSettings clusterSettings + ) { + CompatibilityMode compatibilityMode = clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING); + if (compatibilityMode == CompatibilityMode.MIXED) { + boolean isRemoteStoreEnabled = sourceIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false); + Direction migrationDirection = clusterSettings.get(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING); + boolean invalidConfiguration = (migrationDirection == Direction.REMOTE_STORE && isRemoteStoreEnabled == false) + || (migrationDirection == Direction.DOCREP && isRemoteStoreEnabled); + if (invalidConfiguration) { + throw new IllegalStateException( + "Index " + + type + + " is not allowed as remote migration mode is mixed" + + " and index is remote store " + + (isRemoteStoreEnabled ? "enabled" : "disabled") + ); + } + } + } } diff --git a/server/src/main/java/org/opensearch/action/support/AutoCreateIndex.java b/server/src/main/java/org/opensearch/action/support/AutoCreateIndex.java index 9e8cbd7bf40c2..ce0a8f84d066f 100644 --- a/server/src/main/java/org/opensearch/action/support/AutoCreateIndex.java +++ b/server/src/main/java/org/opensearch/action/support/AutoCreateIndex.java @@ -43,7 +43,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.SystemIndices; import java.util.ArrayList; @@ -64,8 +63,6 @@ public final class AutoCreateIndex { Property.NodeScope, Setting.Property.Dynamic ); - - private final boolean dynamicMappingDisabled; private final IndexNameExpressionResolver resolver; private final SystemIndices systemIndices; private volatile AutoCreate autoCreate; @@ -77,7 +74,6 @@ public AutoCreateIndex( SystemIndices systemIndices ) { this.resolver = resolver; - dynamicMappingDisabled = !MapperService.INDEX_MAPPER_DYNAMIC_SETTING.get(settings); this.systemIndices = systemIndices; this.autoCreate = AUTO_CREATE_INDEX_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(AUTO_CREATE_INDEX_SETTING, this::setAutoCreate); @@ -109,9 +105,6 @@ public boolean shouldAutoCreate(String index, ClusterState state) { if (autoCreate.autoCreateIndex == false) { throw new IndexNotFoundException("[" + AUTO_CREATE_INDEX_SETTING.getKey() + "] is [false]", index); } - if (dynamicMappingDisabled) { - throw new IndexNotFoundException("[" + MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey() + "] is [false]", index); - } // matches not set, default value of "true" if (autoCreate.expressions.isEmpty()) { return true; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 451871b10d5eb..64bea79c9e47b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -206,8 +206,9 @@ public MetadataCreateIndexService( // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); + Supplier minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion(); remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings) - ? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings()) + ? new RemoteStorePathStrategyResolver(clusterService.getClusterSettings(), minNodeVersionSupplier) : null; } @@ -572,28 +573,23 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata( * @param assertNullOldType flag to verify that the old remote store path type is null */ public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) { - if (remoteStorePathStrategyResolver != null) { - // It is possible that remote custom data exists already. In such cases, we need to only update the path type - // in the remote store custom data map. - Map existingRemoteCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - Map remoteCustomData = existingRemoteCustomData == null - ? new HashMap<>() - : new HashMap<>(existingRemoteCustomData); - // Determine the path type for use using the remoteStorePathResolver. - RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get(); - String oldPathType = remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name()); - String oldHashAlgorithm = remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name()); - assert !assertNullOldType || (Objects.isNull(oldPathType) && Objects.isNull(oldHashAlgorithm)); - logger.trace( - () -> new ParameterizedMessage( - "Added newPathStrategy={}, replaced oldPathType={} oldHashAlgorithm={}", - newPathStrategy, - oldPathType, - oldHashAlgorithm - ) - ); - tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData); + if (remoteStorePathStrategyResolver == null) { + return; + } + // It is possible that remote custom data exists already. In such cases, we need to only update the path type + // in the remote store custom data map. + Map existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + assert assertNullOldType == false || Objects.isNull(existingCustomData); + + // Determine the path type for use using the remoteStorePathResolver. + RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get(); + Map remoteCustomData = new HashMap<>(); + remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name()); + if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) { + remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name()); } + logger.trace(() -> new ParameterizedMessage("Added newStrategy={}, replaced oldStrategy={}", remoteCustomData, existingCustomData)); + tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData); } private ClusterState applyCreateIndexRequestWithV1Templates( diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index 468fac08d2946..6f0e4fe90cfff 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -14,6 +14,7 @@ import org.opensearch.core.common.io.stream.Writeable; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,27 +26,26 @@ */ @PublicApi(since = "2.4.0") public class WeightedRouting implements Writeable { - private String attributeName; - private Map weights; + private final String attributeName; + private final Map weights; + private final int hashCode; public WeightedRouting() { - this.attributeName = ""; - this.weights = new HashMap<>(3); + this("", new HashMap<>(3)); } public WeightedRouting(String attributeName, Map weights) { this.attributeName = attributeName; - this.weights = weights; + this.weights = Collections.unmodifiableMap(weights); + this.hashCode = Objects.hash(this.attributeName, this.weights); } public WeightedRouting(WeightedRouting weightedRouting) { - this.attributeName = weightedRouting.attributeName(); - this.weights = weightedRouting.weights; + this(weightedRouting.attributeName(), weightedRouting.weights); } public WeightedRouting(StreamInput in) throws IOException { - attributeName = in.readString(); - weights = (Map) in.readGenericValue(); + this(in.readString(), (Map) in.readGenericValue()); } public boolean isSet() { @@ -70,7 +70,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(attributeName, weights); + return hashCode; } @Override diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 2e25a532b5abf..e16e75de7f27d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; import java.io.IOException; @@ -77,6 +78,20 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link BlobDownloadResponse} for the given blob name. + * + * @param blobName + * The name of the blob to get an {@link InputStream} for. + * @return The {@link BlobDownloadResponse} of the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + @ExperimentalApi + default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet"); + }; + /** * Creates a new {@link InputStream} that can be used to read the given blob starting from * a specific {@code position} in the blob. The {@code length} is an indication of the @@ -128,6 +143,36 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + @ExperimentalApi + default void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet"); + }; + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -149,6 +194,38 @@ default long readBlobPreferredLength() { */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata + * using an atomic write operation if the implementation supports it. + *

+ * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + @ExperimentalApi + default void writeBlobAtomicWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet"); + }; + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java new file mode 100644 index 0000000000000..97f3e4a16a76c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.io.InputStream; +import java.util.Map; + +/** + * Represents the response from a blob download operation, containing both the + * input stream of the blob content and the associated metadata. + * + * @opensearch.experimental + */ +public class BlobDownloadResponse { + + /** + * Downloaded blob InputStream + */ + private final InputStream inputStream; + + /** + * Metadata of the downloaded blob + */ + private final Map metadata; + + public InputStream getInputStream() { + return inputStream; + } + + public Map getMetadata() { + return metadata; + } + + public BlobDownloadResponse(InputStream inputStream, Map metadata) { + this.inputStream = inputStream; + this.metadata = metadata; + } + +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 1264551401b4c..36ee46c0fc2c8 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.InputStreamContainer; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -25,23 +26,30 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; + private final Map metadata; - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { + private ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; this.asyncPartStreams = asyncPartStreams; this.blobChecksum = blobChecksum; + this.metadata = metadata; } public ReadContext(ReadContext readContext) { this.blobSize = readContext.blobSize; this.asyncPartStreams = readContext.asyncPartStreams; this.blobChecksum = readContext.blobChecksum; + this.metadata = readContext.metadata; } public String getBlobChecksum() { return blobChecksum; } + public Map getMetadata() { + return metadata; + } + public int getNumberOfParts() { return asyncPartStreams.size(); } @@ -64,7 +72,7 @@ public List getPartStreams() { @ExperimentalApi public interface StreamPartCreator extends Supplier> { /** - * Kicks off a async process to start streaming. + * Kicks off an async process to start streaming. * * @return When the returned future is completed, streaming has * just begun. Clients must fully consume the resulting stream. @@ -72,4 +80,36 @@ public interface StreamPartCreator extends Supplier get(); } + + /** + * Builder for {@link ReadContext}. + * + * @opensearch.experimental + */ + public static class Builder { + private final long blobSize; + private final List asyncPartStreams; + private String blobChecksum; + private Map metadata; + + public Builder(long blobSize, List asyncPartStreams) { + this.blobSize = blobSize; + this.asyncPartStreams = asyncPartStreams; + } + + public Builder blobChecksum(String blobChecksum) { + this.blobChecksum = blobChecksum; + return this; + } + + public Builder metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public ReadContext build() { + return new ReadContext(blobSize, asyncPartStreams, blobChecksum, metadata); + } + + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index e74462f82400d..d14800e82e495 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -13,6 +13,7 @@ import org.opensearch.common.StreamContext; import java.io.IOException; +import java.util.Map; /** * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams @@ -29,6 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private final Map metadata; /** * Construct a new WriteContext object @@ -41,7 +43,7 @@ public class WriteContext { * @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done * @param expectedChecksum This parameter expected only when the vendor plugin is expected to do server side data integrity verification */ - public WriteContext( + private WriteContext( String fileName, StreamContextSupplier streamContextSupplier, long fileSize, @@ -49,7 +51,8 @@ public WriteContext( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - @Nullable Long expectedChecksum + @Nullable Long expectedChecksum, + Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; @@ -59,6 +62,7 @@ public WriteContext( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.metadata = metadata; } /** @@ -73,6 +77,7 @@ protected WriteContext(WriteContext writeContext) { this.uploadFinalizer = writeContext.uploadFinalizer; this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck; this.expectedChecksum = writeContext.expectedChecksum; + this.metadata = writeContext.metadata; } /** @@ -131,4 +136,87 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + /** + * @return the upload metadata. + */ + public Map getMetadata() { + return metadata; + } + + /** + * Builder for {@link WriteContext}. + * + * @opensearch.internal + */ + public static class Builder { + private String fileName; + private StreamContextSupplier streamContextSupplier; + private long fileSize; + private boolean failIfAlreadyExists; + private WritePriority writePriority; + private CheckedConsumer uploadFinalizer; + private boolean doRemoteDataIntegrityCheck; + private Long expectedChecksum; + private Map metadata; + + public Builder fileName(String fileName) { + this.fileName = fileName; + return this; + } + + public Builder streamContextSupplier(StreamContextSupplier streamContextSupplier) { + this.streamContextSupplier = streamContextSupplier; + return this; + } + + public Builder fileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public Builder writePriority(WritePriority writePriority) { + this.writePriority = writePriority; + return this; + } + + public Builder failIfAlreadyExists(boolean failIfAlreadyExists) { + this.failIfAlreadyExists = failIfAlreadyExists; + return this; + } + + public Builder uploadFinalizer(CheckedConsumer uploadFinalizer) { + this.uploadFinalizer = uploadFinalizer; + return this; + } + + public Builder doRemoteDataIntegrityCheck(boolean doRemoteDataIntegrityCheck) { + this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; + return this; + } + + public Builder expectedChecksum(Long expectedChecksum) { + this.expectedChecksum = expectedChecksum; + return this; + } + + public Builder metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public WriteContext build() { + return new WriteContext( + fileName, + streamContextSupplier, + fileSize, + failIfAlreadyExists, + writePriority, + uploadFinalizer, + doRemoteDataIntegrityCheck, + expectedChecksum, + metadata + ); + } + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 2047c99d9e13b..cd2ef22327ebb 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); + private final Map metadata; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -79,6 +81,43 @@ public RemoteTransferContainer( OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, long expectedChecksum, boolean isRemoteDataIntegritySupported + ) { + this( + fileName, + remoteFileName, + contentLength, + failTransferIfFileExists, + writePriority, + offsetRangeInputStreamSupplier, + expectedChecksum, + isRemoteDataIntegritySupported, + null + ); + } + + /** + * Construct a new RemoteTransferContainer object with metadata. + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + * @param metadata Object metadata to be store with the file. + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported, + Map metadata ) { this.fileName = fileName; this.remoteFileName = remoteFileName; @@ -88,22 +127,23 @@ public RemoteTransferContainer( this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; this.expectedChecksum = expectedChecksum; this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = metadata; } /** * @return The {@link WriteContext} for the current upload */ public WriteContext createWriteContext() { - return new WriteContext( - remoteFileName, - this::supplyStreamContext, - contentLength, - failTransferIfFileExists, - writePriority, - this::finalizeUpload, - isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null - ); + return new WriteContext.Builder().fileName(remoteFileName) + .streamContextSupplier(this::supplyStreamContext) + .fileSize(contentLength) + .failIfAlreadyExists(failTransferIfFileExists) + .writePriority(writePriority) + .uploadFinalizer(this::finalizeUpload) + .doRemoteDataIntegrityCheck(isRemoteDataIntegrityCheckPossible()) + .expectedChecksum(isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null) + .metadata(metadata) + .build(); } // package-private for testing diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 82875564c1c07..388de65ca58a1 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -764,6 +765,7 @@ public static IndexMergePolicy fromString(String text) { private volatile String defaultSearchPipeline; private final boolean widenIndexSortType; private final boolean assignedOnRemoteNode; + private final RemoteStorePathStrategy remoteStorePathStrategy; /** * The maximum age of a retention lease before it is considered expired. @@ -988,6 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); + remoteStorePathStrategy = determineRemoteStorePathStrategy(); setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1908,15 +1911,19 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability; } - public RemoteStorePathStrategy getRemoteStorePathStrategy() { + private RemoteStorePathStrategy determineRemoteStorePathStrategy() { Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - if (remoteCustomData != null - && remoteCustomData.containsKey(PathType.NAME) - && remoteCustomData.containsKey(PathHashAlgorithm.NAME)) { + assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME); + if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) { PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME)); - PathHashAlgorithm pathHashAlgorithm = PathHashAlgorithm.parseString(remoteCustomData.get(PathHashAlgorithm.NAME)); - return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); + String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME); + PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null; + return new RemoteStorePathStrategy(pathType, hashAlgorithm); } return new RemoteStorePathStrategy(PathType.FIXED); } + + public RemoteStorePathStrategy getRemoteStorePathStrategy() { + return remoteStorePathStrategy; + } } diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldValueFetcher.java b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldValueFetcher.java index f3bf0c613415a..40aa2f9890965 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldValueFetcher.java +++ b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldValueFetcher.java @@ -31,8 +31,8 @@ public DerivedFieldValueFetcher(DerivedFieldScript.LeafFactory derivedFieldScrip @Override public List fetchValues(SourceLookup lookup) { derivedFieldScript.setDocument(lookup.docId()); - // TODO: remove List.of() when derivedFieldScript.execute() returns list of objects. - return List.of(derivedFieldScript.execute()); + derivedFieldScript.execute(); + return derivedFieldScript.getEmittedValues(); } public void setNextReader(LeafReaderContext context) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java index 4e557d8c24431..30cfc054e3d0a 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java @@ -8,14 +8,19 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.hash.FNV1a; import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.Set; +import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; @@ -78,9 +83,10 @@ public String getName() { */ @PublicApi(since = "2.14.0") public enum PathType { - FIXED { + FIXED(0) { @Override public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { + assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type"; // Hash algorithm is not used in FIXED path type return pathInput.basePath() .add(pathInput.indexUUID()) @@ -94,7 +100,7 @@ boolean requiresHashAlgorithm() { return false; } }, - HASHED_PREFIX { + HASHED_PREFIX(1) { @Override public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { // TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise. @@ -112,6 +118,40 @@ boolean requiresHashAlgorithm() { } }; + private final int code; + + PathType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + private static final Map CODE_TO_ENUM; + + static { + PathType[] values = values(); + Map codeToStatus = new HashMap<>(values.length); + for (PathType value : values) { + int code = value.code; + if (codeToStatus.containsKey(code)) { + throw new IllegalStateException( + new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage() + ); + } + codeToStatus.put(code, value); + } + CODE_TO_ENUM = unmodifiableMap(codeToStatus); + } + + /** + * Turn a status code into a {@link PathType}. + */ + public static PathType fromCode(int code) { + return CODE_TO_ENUM.get(code); + } + /** * This method generates the path for the given path input which constitutes multiple fields and characteristics * of the data. @@ -131,7 +171,7 @@ public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { return generatePath(pathInput, hashAlgorithm); } - abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm); + protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm); abstract boolean requiresHashAlgorithm(); @@ -158,7 +198,7 @@ public static PathType parseString(String pathType) { @PublicApi(since = "2.14.0") public enum PathHashAlgorithm { - FNV_1A { + FNV_1A(0) { @Override long hash(PathInput pathInput) { String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType() @@ -167,6 +207,39 @@ long hash(PathInput pathInput) { } }; + private final int code; + + PathHashAlgorithm(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + private static final Map CODE_TO_ENUM; + static { + PathHashAlgorithm[] values = values(); + Map codeToStatus = new HashMap<>(values.length); + for (PathHashAlgorithm value : values) { + int code = value.code; + if (codeToStatus.containsKey(code)) { + throw new IllegalStateException( + new ParameterizedMessage("{} has same code as {}", codeToStatus.get(code), value).getFormattedMessage() + ); + } + codeToStatus.put(code, value); + } + CODE_TO_ENUM = unmodifiableMap(codeToStatus); + } + + /** + * Turn a status code into a {@link PathHashAlgorithm}. + */ + public static PathHashAlgorithm fromCode(int code) { + return CODE_TO_ENUM.get(code); + } + abstract long hash(PathInput pathInput); public static PathHashAlgorithm parseString(String pathHashAlgorithm) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java index ce5a6748fd9d4..775f8fe19e4ef 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.blobstore.BlobPath; @@ -36,11 +37,21 @@ public RemoteStorePathStrategy(PathType type) { } public RemoteStorePathStrategy(PathType type, PathHashAlgorithm hashAlgorithm) { - assert type.requiresHashAlgorithm() == false || Objects.nonNull(hashAlgorithm); - this.type = Objects.requireNonNull(type); + Objects.requireNonNull(type, "pathType can not be null"); + if (isCompatible(type, hashAlgorithm) == false) { + throw new IllegalArgumentException( + new ParameterizedMessage("pathType={} pathHashAlgorithm={} are incompatible", type, hashAlgorithm).getFormattedMessage() + ); + } + this.type = type; this.hashAlgorithm = hashAlgorithm; } + public static boolean isCompatible(PathType type, PathHashAlgorithm hashAlgorithm) { + return (type.requiresHashAlgorithm() == false && Objects.isNull(hashAlgorithm)) + || (type.requiresHashAlgorithm() && Objects.nonNull(hashAlgorithm)); + } + public PathType getType() { return type; } @@ -55,7 +66,7 @@ public String toString() { } public BlobPath generatePath(PathInput pathInput) { - return type.generatePath(pathInput, hashAlgorithm); + return type.path(pathInput, hashAlgorithm); } /** diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java index 20fc516132220..5b067115df781 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategyResolver.java @@ -8,11 +8,14 @@ package org.opensearch.index.remote; +import org.opensearch.Version; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.IndicesService; +import java.util.function.Supplier; + /** * Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation. * @@ -22,13 +25,22 @@ public class RemoteStorePathStrategyResolver { private volatile PathType type; - public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings) { + private final Supplier minNodeVersionSupplier; + + public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings, Supplier minNodeVersionSupplier) { + this.minNodeVersionSupplier = minNodeVersionSupplier; type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING); clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType); } public RemoteStorePathStrategy get() { - return new RemoteStorePathStrategy(type, PathHashAlgorithm.FNV_1A); + PathType pathType; + PathHashAlgorithm pathHashAlgorithm; + // Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it. + pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? type : PathType.FIXED; + // If the path type is fixed, hash algorithm is not applicable. + pathHashAlgorithm = pathType == PathType.FIXED ? null : PathHashAlgorithm.FNV_1A; + return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); } private void setType(PathType type) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index c74ab5e24a980..f5e342d28fde1 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -58,8 +58,6 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineException; import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; -import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; @@ -412,8 +410,7 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository, indexUUID, shardId, - new RemoteStorePathStrategy(PathType.FIXED) - // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot + shallowCopyShardMetadata.getRemoteStorePathStrategy() ); sourceRemoteDirectory.initializeToSpecificCommit( primaryTerm, diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java index d54e9686ab951..9c0ea42810e16 100644 --- a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -8,17 +8,22 @@ package org.opensearch.index.snapshots.blobstore; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchParseException; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * Remote Store based Shard snapshot metadata @@ -41,8 +46,10 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment, private final String repositoryBasePath; private final String indexUUID; private final List fileNames; + private final PathType pathType; + private final PathHashAlgorithm pathHashAlgorithm; - static final String DEFAULT_VERSION = "1"; + static final String DEFAULT_VERSION = "2"; static final String NAME = "name"; static final String VERSION = "version"; static final String INDEX_VERSION = "index_version"; @@ -61,6 +68,8 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment, static final String TOTAL_FILE_COUNT = "number_of_files"; static final String TOTAL_SIZE = "total_size"; + static final String PATH_TYPE = "path_type"; + static final String PATH_HASH_ALGORITHM = "path_hash_algorithm"; private static final ParseField PARSE_NAME = new ParseField(NAME); private static final ParseField PARSE_VERSION = new ParseField(VERSION); @@ -75,6 +84,8 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment, private static final ParseField PARSE_REMOTE_STORE_REPOSITORY = new ParseField(REMOTE_STORE_REPOSITORY); private static final ParseField PARSE_REPOSITORY_BASE_PATH = new ParseField(REPOSITORY_BASE_PATH); private static final ParseField PARSE_FILE_NAMES = new ParseField(FILE_NAMES); + private static final ParseField PARSE_PATH_TYPE = new ParseField(PATH_TYPE); + private static final ParseField PARSE_PATH_HASH_ALGORITHM = new ParseField(PATH_HASH_ALGORITHM); /** * Serializes shard snapshot metadata info into JSON @@ -101,6 +112,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.value(fileName); } builder.endArray(); + // We are handling NP check since a cluster can have indexes created earlier which do not have remote store + // path type and path hash algorithm in its custom data in index metadata. + if (Objects.nonNull(pathType)) { + builder.field(PATH_TYPE, pathType.getCode()); + } + if (Objects.nonNull(pathHashAlgorithm)) { + builder.field(PATH_HASH_ALGORITHM, pathHashAlgorithm.getCode()); + } return builder; } @@ -116,34 +135,30 @@ public RemoteStoreShardShallowCopySnapshot( String indexUUID, String remoteStoreRepository, String repositoryBasePath, - List fileNames + List fileNames, + PathType pathType, + PathHashAlgorithm pathHashAlgorithm ) { - this.version = DEFAULT_VERSION; - verifyParameters( - version, + this( + DEFAULT_VERSION, snapshot, indexVersion, primaryTerm, commitGeneration, + startTime, + time, + totalFileCount, + totalSize, indexUUID, remoteStoreRepository, - repositoryBasePath + repositoryBasePath, + fileNames, + pathType, + pathHashAlgorithm ); - this.snapshot = snapshot; - this.indexVersion = indexVersion; - this.primaryTerm = primaryTerm; - this.commitGeneration = commitGeneration; - this.startTime = startTime; - this.time = time; - this.totalFileCount = totalFileCount; - this.totalSize = totalSize; - this.indexUUID = indexUUID; - this.remoteStoreRepository = remoteStoreRepository; - this.repositoryBasePath = repositoryBasePath; - this.fileNames = fileNames; } - private RemoteStoreShardShallowCopySnapshot( + RemoteStoreShardShallowCopySnapshot( String version, String snapshot, long indexVersion, @@ -156,7 +171,9 @@ private RemoteStoreShardShallowCopySnapshot( String indexUUID, String remoteStoreRepository, String repositoryBasePath, - List fileNames + List fileNames, + PathType pathType, + PathHashAlgorithm pathHashAlgorithm ) { verifyParameters( version, @@ -166,7 +183,9 @@ private RemoteStoreShardShallowCopySnapshot( commitGeneration, indexUUID, remoteStoreRepository, - repositoryBasePath + repositoryBasePath, + pathType, + pathHashAlgorithm ); this.version = version; this.snapshot = snapshot; @@ -181,6 +200,8 @@ private RemoteStoreShardShallowCopySnapshot( this.remoteStoreRepository = remoteStoreRepository; this.repositoryBasePath = repositoryBasePath; this.fileNames = fileNames; + this.pathType = pathType; + this.pathHashAlgorithm = pathHashAlgorithm; } /** @@ -203,6 +224,8 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa long primaryTerm = -1; long commitGeneration = -1; List fileNames = new ArrayList<>(); + PathType pathType = null; + PathHashAlgorithm pathHashAlgorithm = null; if (parser.currentToken() == null) { // fresh parser? move to the first token parser.nextToken(); @@ -237,6 +260,10 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa remoteStoreRepository = parser.text(); } else if (PARSE_REPOSITORY_BASE_PATH.match(currentFieldName, parser.getDeprecationHandler())) { repositoryBasePath = parser.text(); + } else if (PARSE_PATH_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + pathType = PathType.fromCode(parser.intValue()); + } else if (PARSE_PATH_HASH_ALGORITHM.match(currentFieldName, parser.getDeprecationHandler())) { + pathHashAlgorithm = PathHashAlgorithm.fromCode(parser.intValue()); } else { throw new OpenSearchParseException("unknown parameter [{}]", currentFieldName); } @@ -266,7 +293,9 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa indexUUID, remoteStoreRepository, repositoryBasePath, - fileNames + fileNames, + pathType, + pathHashAlgorithm ); } @@ -380,38 +409,47 @@ private void verifyParameters( long commitGeneration, String indexUUID, String remoteStoreRepository, - String repositoryBasePath + String repositoryBasePath, + PathType pathType, + PathHashAlgorithm pathHashAlgorithm ) { - String exceptionStr = null; - if (version == null) { - exceptionStr = "Invalid Version Provided"; - } - if (snapshot == null) { - exceptionStr = "Invalid/Missing Snapshot Name"; - } - if (indexVersion < 0) { - exceptionStr = "Invalid Index Version"; - } - if (primaryTerm < 0) { - exceptionStr = "Invalid Primary Term"; - } - if (commitGeneration < 0) { - exceptionStr = "Invalid Commit Generation"; - } - if (indexUUID == null) { - exceptionStr = "Invalid/Missing Index UUID"; - } - if (remoteStoreRepository == null) { - exceptionStr = "Invalid/Missing Remote Store Repository"; - } - if (repositoryBasePath == null) { - exceptionStr = "Invalid/Missing Repository Base Path"; - } - if (exceptionStr != null) { + + throwExceptionIfInvalid(Objects.isNull(version), "Invalid Version Provided"); + throwExceptionIfInvalid(Objects.isNull(snapshot), "Invalid/Missing Snapshot Name"); + throwExceptionIfInvalid(indexVersion < 0, "Invalid Index Version"); + throwExceptionIfInvalid(primaryTerm < 0, "Invalid Primary Term"); + throwExceptionIfInvalid(commitGeneration < 0, "Invalid Commit Generation"); + throwExceptionIfInvalid(Objects.isNull(indexUUID), "Invalid/Missing Index UUID"); + throwExceptionIfInvalid(Objects.isNull(remoteStoreRepository), "Invalid/Missing Remote Store Repository"); + throwExceptionIfInvalid(Objects.isNull(repositoryBasePath), "Invalid/Missing Repository Base Path"); + throwExceptionIfInvalid( + isValidRemotePathConfiguration(version, pathType, pathHashAlgorithm) == false, + new ParameterizedMessage( + "Invalid combination of pathType={} pathHashAlgorithm={} for version={}", + pathType, + pathHashAlgorithm, + version + ).getFormattedMessage() + ); + } + + private void throwExceptionIfInvalid(boolean isInvalid, String exceptionStr) { + if (isInvalid) { throw new IllegalArgumentException(exceptionStr); } } + private boolean isValidRemotePathConfiguration(String version, PathType pathType, PathHashAlgorithm pathHashAlgorithm) { + switch (version) { + case "1": + return Objects.isNull(pathType) && Objects.isNull(pathHashAlgorithm); + case "2": + return Objects.nonNull(pathType) && RemoteStorePathStrategy.isCompatible(pathType, pathHashAlgorithm); + default: + return false; + } + } + /** * Creates a new instance which has a different name and zero incremental file counts but is identical to this instance in terms of the files * it references. @@ -433,7 +471,9 @@ public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, lo indexUUID, remoteStoreRepository, repositoryBasePath, - fileNames + fileNames, + pathType, + pathHashAlgorithm ); } @@ -449,4 +489,63 @@ public IndexShardSnapshotStatus getIndexShardSnapshotStatus() { null ); // Not adding a real generation here as it doesn't matter to callers } + + public PathType getPathType() { + return pathType; + } + + public PathHashAlgorithm getPathHashAlgorithm() { + return pathHashAlgorithm; + } + + public RemoteStorePathStrategy getRemoteStorePathStrategy() { + if (Objects.nonNull(pathType)) { + return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); + } + return new RemoteStorePathStrategy(PathType.FIXED); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + RemoteStoreShardShallowCopySnapshot that = (RemoteStoreShardShallowCopySnapshot) obj; + + return Objects.equals(this.snapshot, that.snapshot) + && Objects.equals(this.version, that.version) + && this.indexVersion == that.indexVersion + && this.startTime == that.startTime + && this.time == that.time + && this.totalFileCount == that.totalFileCount + && this.totalSize == that.totalSize + && this.primaryTerm == that.primaryTerm + && this.commitGeneration == that.commitGeneration + && Objects.equals(this.remoteStoreRepository, that.remoteStoreRepository) + && Objects.equals(this.repositoryBasePath, that.repositoryBasePath) + && Objects.equals(this.indexUUID, that.indexUUID) + && Objects.equals(this.fileNames, that.fileNames) + && Objects.equals(this.pathType, that.pathType) + && Objects.equals(this.pathHashAlgorithm, that.pathHashAlgorithm); + } + + @Override + public int hashCode() { + return Objects.hash( + snapshot, + version, + indexVersion, + startTime, + time, + totalFileCount, + totalSize, + primaryTerm, + commitGeneration, + remoteStoreRepository, + repositoryBasePath, + indexUUID, + fileNames, + pathType, + pathHashAlgorithm + ); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 82dd6301ef79f..a1d5041ff9aff 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -12,8 +12,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -164,6 +166,12 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + @ExperimentalApi + public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); + } + @Override public void deleteBlobs(Iterable path, List fileNames) throws IOException { blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..5fe8b02058383 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,8 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -125,6 +127,16 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + /** + * + * @param path the remote path from where download should be made + * @param fileName the name of the file + * @return {@link BlobDownloadResponse} of the remote file + * @throws IOException the exception while reading the data + */ + @ExperimentalApi + BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); void listAllInSortedOrderAsync( diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 5aab02993db34..ce2ffd8bf3fb4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -108,7 +108,6 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; @@ -672,8 +671,7 @@ public void cloneRemoteStoreIndexShardSnapshot( remoteStoreRepository, indexUUID, String.valueOf(shardId.shardId()), - new RemoteStorePathStrategy(PathType.FIXED) - // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot + remStoreBasedShardMetadata.getRemoteStorePathStrategy() ); remoteStoreMetadataLockManger.cloneLock( FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(), @@ -1154,8 +1152,7 @@ protected void releaseRemoteStoreLockAndCleanup( remoteStoreRepoForIndex, indexUUID, shardId, - new RemoteStorePathStrategy(PathType.FIXED) - // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot + remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy() ); remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build()); logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID); @@ -1178,8 +1175,7 @@ protected void releaseRemoteStoreLockAndCleanup( indexUUID, new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), ThreadPool.Names.REMOTE_PURGE, - new RemoteStorePathStrategy(PathType.FIXED) - // TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot + remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy() ); } } @@ -2694,6 +2690,7 @@ public void snapshotRemoteStoreIndexShard( // now create and write the commit point logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); try { + RemoteStorePathStrategy pathStrategy = store.indexSettings().getRemoteStorePathStrategy(); REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( new RemoteStoreShardShallowCopySnapshot( snapshotId.getName(), @@ -2707,7 +2704,9 @@ public void snapshotRemoteStoreIndexShard( store.indexSettings().getUUID(), store.indexSettings().getRemoteStoreRepository(), this.basePath().toString(), - fileNames + fileNames, + pathStrategy.getType(), + pathStrategy.getHashAlgorithm() ), shardContainer, snapshotId.getUUID(), diff --git a/server/src/main/java/org/opensearch/script/DerivedFieldScript.java b/server/src/main/java/org/opensearch/script/DerivedFieldScript.java index 7f5b991950ec6..e9988ec5aeef2 100644 --- a/server/src/main/java/org/opensearch/script/DerivedFieldScript.java +++ b/server/src/main/java/org/opensearch/script/DerivedFieldScript.java @@ -9,14 +9,17 @@ package org.opensearch.script; import org.apache.lucene.index.LeafReaderContext; -import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.collect.Tuple; import org.opensearch.index.fielddata.ScriptDocValues; import org.opensearch.search.lookup.LeafSearchLookup; import org.opensearch.search.lookup.SearchLookup; import org.opensearch.search.lookup.SourceLookup; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -30,7 +33,7 @@ public abstract class DerivedFieldScript { public static final String[] PARAMETERS = {}; public static final ScriptContext CONTEXT = new ScriptContext<>("derived_field", Factory.class); - private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(DynamicMap.class); + private static final int MAX_BYTE_SIZE = 1024 * 1024; // Maximum allowed byte size (1 MB) private static final Map> PARAMS_FUNCTIONS = Map.of( "doc", @@ -49,16 +52,27 @@ public abstract class DerivedFieldScript { */ private final LeafSearchLookup leafLookup; + /** + * The field values emitted from the script. + */ + private List emittedValues; + + private int totalByteSize; + public DerivedFieldScript(Map params, SearchLookup lookup, LeafReaderContext leafContext) { Map parameters = new HashMap<>(params); this.leafLookup = lookup.getLeafSearchLookup(leafContext); parameters.putAll(leafLookup.asMap()); this.params = new DynamicMap(parameters, PARAMS_FUNCTIONS); + this.emittedValues = new ArrayList<>(); + this.totalByteSize = 0; } - protected DerivedFieldScript() { - params = null; - leafLookup = null; + public DerivedFieldScript() { + this.params = null; + this.leafLookup = null; + this.emittedValues = new ArrayList<>(); + this.totalByteSize = 0; } /** @@ -75,14 +89,54 @@ public Map> getDoc() { return leafLookup.doc(); } + /** + * Return the emitted values from the script execution. + */ + public List getEmittedValues() { + return emittedValues; + } + /** * Set the current document to run the script on next. + * Clears the emittedValues as well since they should be scoped per document. */ public void setDocument(int docid) { + this.emittedValues = new ArrayList<>(); + this.totalByteSize = 0; leafLookup.setDocument(docid); } - public abstract Object execute(); + public void addEmittedValue(Object o) { + int byteSize = getObjectByteSize(o); + int newTotalByteSize = totalByteSize + byteSize; + if (newTotalByteSize <= MAX_BYTE_SIZE) { + emittedValues.add(o); + totalByteSize = newTotalByteSize; + } else { + throw new IllegalStateException("Exceeded maximum allowed byte size for emitted values"); + } + } + + private int getObjectByteSize(Object obj) { + if (obj instanceof String) { + return ((String) obj).getBytes(StandardCharsets.UTF_8).length; + } else if (obj instanceof Integer) { + return Integer.BYTES; + } else if (obj instanceof Long) { + return Long.BYTES; + } else if (obj instanceof Double) { + return Double.BYTES; + } else if (obj instanceof Boolean) { + return Byte.BYTES; // Assuming 1 byte for boolean + } else if (obj instanceof Tuple) { + // Assuming each element in the tuple is a double for GeoPoint case + return Double.BYTES * 2; + } else { + throw new IllegalArgumentException("Unsupported object type passed in emit()"); + } + } + + public void execute() {} /** * A factory to construct {@link DerivedFieldScript} instances. @@ -95,7 +149,6 @@ public interface LeafFactory { /** * A factory to construct stateful {@link DerivedFieldScript} factories for a specific index. - * * @opensearch.internal */ public interface Factory extends ScriptFactory { diff --git a/server/src/main/java/org/opensearch/script/ScriptEmitValues.java b/server/src/main/java/org/opensearch/script/ScriptEmitValues.java new file mode 100644 index 0000000000000..5d12f36442179 --- /dev/null +++ b/server/src/main/java/org/opensearch/script/ScriptEmitValues.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.script; + +import org.opensearch.common.collect.Tuple; + +/** + * Values that can be emitted in a derived field script context. + *

+ * The emit function can be called multiple times within a script definition + * so the function will handle collecting the values over the script execution. + */ +public final class ScriptEmitValues { + + /** + * Takes in a single value and emits it + * Could be a long, double, String, etc. + */ + public static final class EmitSingle { + + private final DerivedFieldScript derivedFieldScript; + + public EmitSingle(DerivedFieldScript derivedFieldScript) { + this.derivedFieldScript = derivedFieldScript; + } + + // TODO: Keeping this generic for the time being due to limitations with + // binding methods with the same name and arity. + // Ideally, we should have an emit signature per derived field type and try to scope + // that to the respective script execution so the other emits aren't allowed. + // One way to do this could be to create implementations of the DerivedFieldScript.LeafFactory + // per field type where they each define their own emit() method and then the engine that executes + // it can have custom compilation logic to perform class bindings on that emit implementation. + public void emit(Object val) { + derivedFieldScript.addEmittedValue(val); + } + + } + + /** + * Emits a GeoPoint value + */ + public static final class GeoPoint { + + private final DerivedFieldScript derivedFieldScript; + + public GeoPoint(DerivedFieldScript derivedFieldScript) { + this.derivedFieldScript = derivedFieldScript; + } + + public void emit(double lat, double lon) { + derivedFieldScript.addEmittedValue(new Tuple<>(lat, lon)); + } + + } + +} diff --git a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java index 848df5f8e4979..5bab2ceca0988 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/shrink/TransportResizeActionTests.java @@ -51,10 +51,13 @@ import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.shard.DocsStats; import org.opensearch.index.store.StoreStats; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.gateway.TestGatewayAllocator; @@ -65,7 +68,12 @@ import java.util.Set; import static java.util.Collections.emptyMap; -import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.hamcrest.CoreMatchers.equalTo; public class TransportResizeActionTests extends OpenSearchTestCase { @@ -95,6 +103,19 @@ private ClusterState createClusterState(String name, int numShards, int numRepli return clusterState; } + private ClusterSettings createClusterSettings( + CompatibilityMode compatibilityMode, + RemoteStoreNodeService.Direction migrationDirection + ) { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.applySettings( + (Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), compatibilityMode) + .put(MIGRATION_DIRECTION_SETTING.getKey(), migrationDirection)).build() + ); + return clusterSettings; + } + public void testErrorCondition() { ClusterState state = createClusterState( "source", @@ -102,6 +123,7 @@ public void testErrorCondition() { randomIntBetween(0, 10), Settings.builder().put("index.blocks.write", true).build() ); + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); assertTrue( expectThrows( IllegalStateException.class, @@ -110,6 +132,7 @@ public void testErrorCondition() { state, (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ) @@ -125,6 +148,7 @@ public void testErrorCondition() { clusterState, (i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -144,6 +168,7 @@ public void testErrorCondition() { clusterState, (i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -173,6 +198,7 @@ public void testErrorCondition() { clusterState, (i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -189,7 +215,7 @@ public void testPassNumRoutingShards() { EmptyClusterInfoService.INSTANCE, EmptySnapshotsInfoService.INSTANCE ); - + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); // now we start the shard @@ -204,6 +230,7 @@ public void testPassNumRoutingShards() { clusterState, null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -217,6 +244,7 @@ public void testPassNumRoutingShards() { clusterState, null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -235,6 +263,7 @@ public void testPassNumRoutingShardsAndFail() { EmptySnapshotsInfoService.INSTANCE ); + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); // now we start the shard @@ -249,6 +278,7 @@ public void testPassNumRoutingShardsAndFail() { clusterState, null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ); @@ -265,6 +295,7 @@ public void testPassNumRoutingShardsAndFail() { finalState, null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, "source", "target" ) @@ -286,6 +317,7 @@ public void testShrinkIndexSettings() { EmptySnapshotsInfoService.INSTANCE ); + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); // now we start the shard @@ -301,6 +333,7 @@ public void testShrinkIndexSettings() { clusterState, (i) -> stats, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, indexName, "target" ); @@ -325,6 +358,8 @@ public void testShrinkWithMaxShardSize() { EmptyClusterInfoService.INSTANCE, EmptySnapshotsInfoService.INSTANCE ); + + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); // now we start the shard @@ -345,6 +380,7 @@ public void testShrinkWithMaxShardSize() { clusterState, (i) -> stats, new StoreStats(100, between(1, 10000)), + clusterSettings, indexName, "target" ); @@ -366,6 +402,7 @@ public void testShrinkWithMaxShardSize() { clusterState, (i) -> stats, new StoreStats(100, between(1, 10000)), + clusterSettings, indexName, "target" ); @@ -387,6 +424,7 @@ public void testShrinkWithMaxShardSize() { clusterState, (i) -> stats, new StoreStats(100, between(1, 10000)), + clusterSettings, indexName, "target" ); @@ -477,6 +515,7 @@ public void testIndexBlocks() { createClusterState(indexName, 10, 0, 40, Settings.builder().put("index.blocks.read_only", true).build()) ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT, RemoteStoreNodeService.Direction.NONE); // Target index will be blocked by [index.blocks.read_only=true] copied from the source index ResizeRequest resizeRequest = new ResizeRequest("target", indexName); ResizeType resizeType; @@ -500,6 +539,7 @@ public void testIndexBlocks() { finalState, null, new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, indexName, "target" ) @@ -551,6 +591,7 @@ public void testIndexBlocks() { clusterState, (i) -> stats, new StoreStats(100, between(1, 10000)), + clusterSettings, indexName, "target" ); @@ -561,6 +602,127 @@ public void testIndexBlocks() { assertEquals(request.waitForActiveShards(), activeShardCount); } + public void testResizeFailuresDuringMigration() { + // We will keep all other settings correct for resize request, + // So we only need to test for the failures due to cluster setting validation while migration + final Settings directionEnabledNodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + boolean isRemoteStoreEnabled = randomBoolean(); + CompatibilityMode compatibilityMode = randomFrom(CompatibilityMode.values()); + RemoteStoreNodeService.Direction migrationDirection = randomFrom(RemoteStoreNodeService.Direction.values()); + // If not mixed mode, then migration direction is NONE. + if (!compatibilityMode.equals(CompatibilityMode.MIXED)) { + migrationDirection = RemoteStoreNodeService.Direction.NONE; + } + ClusterSettings clusterSettings = createClusterSettings(compatibilityMode, migrationDirection); + + ClusterState clusterState = ClusterState.builder( + createClusterState( + "source", + 10, + 0, + 40, + Settings.builder().put("index.blocks.write", true).put(SETTING_REMOTE_STORE_ENABLED, isRemoteStoreEnabled).build() + ) + ).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + AllocationService service = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + + RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + // now we start the shard + routingTable = OpenSearchAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / 10), between(1, 1000), between(1, 10000)); + ResizeRequest resizeRequest = new ResizeRequest("target", "source"); + ResizeType resizeType; + int expectedShardsNum; + String cause; + switch (randomIntBetween(0, 2)) { + case 0: + resizeType = ResizeType.SHRINK; + expectedShardsNum = 5; + cause = "shrink_index"; + break; + case 1: + resizeType = ResizeType.SPLIT; + expectedShardsNum = 20; + cause = "split_index"; + break; + default: + resizeType = ResizeType.CLONE; + expectedShardsNum = 10; + cause = "clone_index"; + } + resizeRequest.setResizeType(resizeType); + resizeRequest.getTargetIndexRequest() + .settings(Settings.builder().put("index.number_of_shards", expectedShardsNum).put("index.blocks.read_only", false).build()); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + resizeRequest.setWaitForActiveShards(activeShardCount); + + if (compatibilityMode == CompatibilityMode.MIXED) { + if ((migrationDirection == RemoteStoreNodeService.Direction.REMOTE_STORE && isRemoteStoreEnabled == false) + || migrationDirection == RemoteStoreNodeService.Direction.DOCREP && isRemoteStoreEnabled == true) { + ClusterState finalState = clusterState; + IllegalStateException ise = expectThrows( + IllegalStateException.class, + () -> TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + finalState, + (i) -> stats, + new StoreStats(between(1, 10000), between(1, 10000)), + clusterSettings, + "source", + "target" + ) + ); + assertEquals( + ise.getMessage(), + "Index " + + resizeType + + " is not allowed as remote migration mode is mixed" + + " and index is remote store " + + (isRemoteStoreEnabled ? "enabled" : "disabled") + ); + } else { + CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + (i) -> stats, + new StoreStats(100, between(1, 10000)), + clusterSettings, + "source", + "target" + ); + assertNotNull(request.recoverFrom()); + assertEquals("source", request.recoverFrom().getName()); + assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards")); + assertEquals(cause, request.cause()); + assertEquals(request.waitForActiveShards(), activeShardCount); + } + } else { + CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest( + resizeRequest, + clusterState, + (i) -> stats, + new StoreStats(100, between(1, 10000)), + clusterSettings, + "source", + "target" + ); + assertNotNull(request.recoverFrom()); + assertEquals("source", request.recoverFrom().getName()); + assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards")); + assertEquals(cause, request.cause()); + assertEquals(request.waitForActiveShards(), activeShardCount); + } + } + private DiscoveryNode newNode(String nodeId) { final Set roles = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)) diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index a2f19b8c694d0..fa71b77648d35 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -1593,12 +1593,9 @@ public void testRemoteCustomData() { // Case 2 - cluster.remote_store.index.path.prefix.optimised=fixed (default value) indexMetadata = testRemoteCustomData(true, PathType.FIXED); - validateRemoteCustomData(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY), PathType.NAME, PathType.FIXED.name()); - validateRemoteCustomData( - indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY), - PathHashAlgorithm.NAME, - PathHashAlgorithm.FNV_1A.name() - ); + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + validateRemoteCustomData(remoteCustomData, PathType.NAME, PathType.FIXED.name()); + assertNull(remoteCustomData.get(PathHashAlgorithm.NAME)); // Case 3 - cluster.remote_store.index.path.prefix.optimised=hashed_prefix indexMetadata = testRemoteCustomData(true, PathType.HASHED_PREFIX); diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 1780819390052..aee4ae40d16a5 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,7 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -103,7 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 0163c2275e7f4..c47874f3ba294 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,7 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext.Builder((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -125,7 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -178,7 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -203,7 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); diff --git a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java index 64f4f1f3e083e..bd6d7b88ade28 100644 --- a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java @@ -197,8 +197,8 @@ public void setDocument(int docId) { } @Override - public Object execute() { - return raw_requests[docId][scriptIndex[0]]; + public void execute() { + addEmittedValue(raw_requests[docId][scriptIndex[0]]); } }; diff --git a/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java b/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java index 18d117fa8c0f5..1bb303a874b9a 100644 --- a/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java +++ b/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java @@ -67,8 +67,8 @@ public void testDerivedField() throws IOException { when(searchLookup.getLeafSearchLookup(ctx)).thenReturn(leafLookup); return new DerivedFieldScript(params, lookup, ctx) { @Override - public Object execute() { - return raw_requests[sourceLookup.docId()][2]; + public void execute() { + addEmittedValue(raw_requests[sourceLookup.docId()][2]); } }; }; diff --git a/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java b/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java index 38c4bb781ce06..e3259a3097278 100644 --- a/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java @@ -14,6 +14,8 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -40,7 +42,10 @@ public void testToXContent() throws IOException { String repositoryBasePath = "test-repo-basepath"; List fileNames = new ArrayList<>(5); fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); + + // Case 1 - Without remote path type fields RemoteStoreShardShallowCopySnapshot shardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + "1", snapshot, indexVersion, primaryTerm, @@ -52,7 +57,9 @@ public void testToXContent() throws IOException { indexUUID, remoteStoreRepository, repositoryBasePath, - fileNames + fileNames, + null, + null ); String actual; try (XContentBuilder builder = MediaTypeRegistry.JSON.contentBuilder()) { @@ -66,6 +73,67 @@ public void testToXContent() throws IOException { + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"]}"; assert Objects.equals(actual, expectedXContent) : "xContent is " + actual; + + // Case 2 - with just fixed type + shardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames, + PathType.FIXED, + null + ); + try (XContentBuilder builder = MediaTypeRegistry.JSON.contentBuilder()) { + builder.startObject(); + shardShallowCopySnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + actual = builder.toString(); + } + + expectedXContent = "{\"version\":\"2\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"],\"path_type\":0}"; + assert Objects.equals(actual, expectedXContent) : "xContent is " + actual; + + // Case 3 - with just hashed prefix type and hash algorithm + shardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames, + PathType.HASHED_PREFIX, + PathHashAlgorithm.FNV_1A + ); + try (XContentBuilder builder = MediaTypeRegistry.JSON.contentBuilder()) { + builder.startObject(); + shardShallowCopySnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + actual = builder.toString(); + } + + expectedXContent = "{\"version\":\"2\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"],\"path_type\":1" + + ",\"path_hash_algorithm\":0}"; + assert Objects.equals(actual, expectedXContent) : "xContent is " + actual; } public void testFromXContent() throws IOException { @@ -83,6 +151,7 @@ public void testFromXContent() throws IOException { List fileNames = new ArrayList<>(5); fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); RemoteStoreShardShallowCopySnapshot expectedShardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + "1", snapshot, indexVersion, primaryTerm, @@ -94,7 +163,9 @@ public void testFromXContent() throws IOException { indexUUID, remoteStoreRepository, repositoryBasePath, - fileNames + fileNames, + null, + null ); String xContent = "{\"version\":\"1\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" @@ -102,22 +173,66 @@ public void testFromXContent() throws IOException { + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"]}"; try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { RemoteStoreShardShallowCopySnapshot actualShardShallowCopySnapshot = RemoteStoreShardShallowCopySnapshot.fromXContent(parser); - assertEquals(actualShardShallowCopySnapshot.snapshot(), expectedShardShallowCopySnapshot.snapshot()); - assertEquals( - actualShardShallowCopySnapshot.getRemoteStoreRepository(), - expectedShardShallowCopySnapshot.getRemoteStoreRepository() - ); - assertEquals(actualShardShallowCopySnapshot.getCommitGeneration(), expectedShardShallowCopySnapshot.getCommitGeneration()); - assertEquals(actualShardShallowCopySnapshot.getPrimaryTerm(), expectedShardShallowCopySnapshot.getPrimaryTerm()); - assertEquals(actualShardShallowCopySnapshot.startTime(), expectedShardShallowCopySnapshot.startTime()); - assertEquals(actualShardShallowCopySnapshot.time(), expectedShardShallowCopySnapshot.time()); - assertEquals(actualShardShallowCopySnapshot.totalSize(), expectedShardShallowCopySnapshot.totalSize()); - assertEquals(actualShardShallowCopySnapshot.totalFileCount(), expectedShardShallowCopySnapshot.totalFileCount()); + assert Objects.equals(expectedShardShallowCopySnapshot, actualShardShallowCopySnapshot); + } + + // with pathType=PathType.FIXED + xContent = "{\"version\":\"2\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"],\"path_type\":0}"; + expectedShardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + "2", + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames, + PathType.FIXED, + null + ); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { + RemoteStoreShardShallowCopySnapshot actualShardShallowCopySnapshot = RemoteStoreShardShallowCopySnapshot.fromXContent(parser); + assert Objects.equals(expectedShardShallowCopySnapshot, actualShardShallowCopySnapshot); + } + + // with pathType=PathType.HASHED_PREFIX and pathHashAlgorithm=PathHashAlgorithm.FNV_1A + xContent = "{\"version\":\"2\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"],\"path_type\":1,\"path_hash_algorithm\":0}"; + expectedShardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + "2", + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames, + PathType.HASHED_PREFIX, + PathHashAlgorithm.FNV_1A + ); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { + RemoteStoreShardShallowCopySnapshot actualShardShallowCopySnapshot = RemoteStoreShardShallowCopySnapshot.fromXContent(parser); + assert Objects.equals(expectedShardShallowCopySnapshot, actualShardShallowCopySnapshot); } } public void testFromXContentInvalid() throws IOException { - final int iters = scaledRandomIntBetween(1, 10); + final int iters = 14; for (int iter = 0; iter < iters; iter++) { String snapshot = "test-snapshot"; long indexVersion = 1; @@ -133,10 +248,11 @@ public void testFromXContentInvalid() throws IOException { List fileNames = new ArrayList<>(5); fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); String failure = null; - String version = RemoteStoreShardShallowCopySnapshot.DEFAULT_VERSION; - long length = Math.max(0, Math.abs(randomLong())); + String version = "1"; + PathType pathType = null; + PathHashAlgorithm pathHashAlgorithm = null; // random corruption - switch (randomIntBetween(0, 8)) { + switch (iter) { case 0: snapshot = null; failure = "Invalid/Missing Snapshot Name"; @@ -170,6 +286,31 @@ public void testFromXContentInvalid() throws IOException { failure = "Invalid Version Provided"; break; case 8: + version = "2"; + failure = "Invalid combination of pathType=null pathHashAlgorithm=null for version=2"; + break; + case 9: + version = "1"; + pathType = PathType.FIXED; + failure = "Invalid combination of pathType=FIXED pathHashAlgorithm=null for version=1"; + break; + case 10: + version = "1"; + pathHashAlgorithm = PathHashAlgorithm.FNV_1A; + failure = "Invalid combination of pathType=null pathHashAlgorithm=FNV_1A for version=1"; + break; + case 11: + version = "2"; + pathType = PathType.FIXED; + pathHashAlgorithm = PathHashAlgorithm.FNV_1A; + failure = "Invalid combination of pathType=FIXED pathHashAlgorithm=FNV_1A for version=2"; + break; + case 12: + version = "2"; + pathType = PathType.HASHED_PREFIX; + pathHashAlgorithm = PathHashAlgorithm.FNV_1A; + break; + case 13: break; default: fail("shouldn't be here"); @@ -194,6 +335,14 @@ public void testFromXContentInvalid() throws IOException { builder.value(fileName); } builder.endArray(); + // We are handling NP check since a cluster can have indexes created earlier which do not have remote store + // path type and path hash algorithm in its custom data in index metadata. + if (Objects.nonNull(pathType)) { + builder.field(RemoteStoreShardShallowCopySnapshot.PATH_TYPE, pathType.getCode()); + } + if (Objects.nonNull(pathHashAlgorithm)) { + builder.field(RemoteStoreShardShallowCopySnapshot.PATH_HASH_ALGORITHM, pathHashAlgorithm.getCode()); + } builder.endObject(); byte[] xContent = BytesReference.toBytes(BytesReference.bytes(builder)); @@ -211,7 +360,8 @@ public void testFromXContentInvalid() throws IOException { assertEquals(remoteStoreShardShallowCopySnapshot.startTime(), startTime); assertEquals(remoteStoreShardShallowCopySnapshot.time(), time); assertEquals(remoteStoreShardShallowCopySnapshot.totalSize(), totalSize); - assertEquals(remoteStoreShardShallowCopySnapshot.totalFileCount(), totalFileCount); + assertEquals(remoteStoreShardShallowCopySnapshot.getPathType(), pathType); + assertEquals(remoteStoreShardShallowCopySnapshot.getPathHashAlgorithm(), pathHashAlgorithm); } else { try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { parser.nextToken(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 11b4eb078226f..44ddd2de9d007 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -704,9 +704,9 @@ public void testCleanupAsync() throws Exception { String repositoryName = "test-repository"; String indexUUID = "test-idx-uuid"; ShardId shardId = new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt("0")); - RemoteStorePathStrategy pathStrategy = new RemoteStorePathStrategy( - randomFrom(PathType.values()), - randomFrom(PathHashAlgorithm.values()) + RemoteStorePathStrategy pathStrategy = randomFrom( + new RemoteStorePathStrategy(PathType.FIXED), + new RemoteStorePathStrategy(PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A) ); RemoteSegmentStoreDirectory.remoteDirectoryCleanup( diff --git a/test/framework/src/main/java/org/opensearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/opensearch/script/MockScriptEngine.java index 8c7e9718eb0cd..456b55883f91e 100644 --- a/test/framework/src/main/java/org/opensearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/opensearch/script/MockScriptEngine.java @@ -288,10 +288,10 @@ public double execute(Map params1, double[] values) { ctx ) { @Override - public Object execute() { + public void execute() { Map vars = new HashMap<>(derivedFieldsParams); vars.put("params", derivedFieldsParams); - return script.apply(vars); + script.apply(vars); } }; return context.factoryClazz.cast(factory); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 664314245530e..f0f5576713042 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2538,7 +2538,7 @@ public static Settings buildRemoteStoreNodeAttributes( ); } - public static Settings buildRemoteStoreNodeAttributes( + private static Settings buildRemoteStoreNodeAttributes( String segmentRepoName, Path segmentRepoPath, String segmentRepoType,