void transformSearchPhaseResults(
final String currentPhase,
final String nextPhase
) {
- pipeline.runSearchPhaseResultsTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase);
+ pipeline.runSearchPhaseResultsTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase, requestContext);
}
// Visible for testing
diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java
index 0120d68ceb5aa..a06383fbe9cef 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java
@@ -21,13 +21,6 @@
* @opensearch.internal
*/
public interface Processor {
- /**
- * Processor configuration key to let the factory know the context for pipeline creation.
- *
- * See {@link PipelineSource}.
- */
- String PIPELINE_SOURCE = "pipeline_source";
-
/**
* Gets the type of processor
*/
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
index 772dc8758bace..a64266cfb2a2b 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
@@ -32,6 +32,22 @@ void process(
final SearchPhaseContext searchPhaseContext
);
+ /**
+ * Processes the {@link SearchPhaseResults} obtained from a SearchPhase which will be returned to next
+ * SearchPhase. Receives the {@link PipelineProcessingContext} passed to other processors.
+ * @param searchPhaseResult {@link SearchPhaseResults}
+ * @param searchPhaseContext {@link SearchContext}
+ * @param requestContext {@link PipelineProcessingContext}
+ * @param {@link SearchPhaseResult}
+ */
+ default void process(
+ final SearchPhaseResults searchPhaseResult,
+ final SearchPhaseContext searchPhaseContext,
+ final PipelineProcessingContext requestContext
+ ) {
+ process(searchPhaseResult, searchPhaseContext);
+ }
+
/**
* The phase which should have run before, this processor can start executing.
* @return {@link SearchPhaseName}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
index 580fe1b7c4216..2175b5d135394 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
@@ -408,7 +408,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
- return new PipelinedRequest(pipeline, searchRequest);
+ PipelineProcessingContext requestContext = new PipelineProcessingContext();
+ return new PipelinedRequest(pipeline, searchRequest, requestContext);
}
Map> getRequestProcessorFactories() {
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
index 427c9e4ab694c..30adc9b0afbe8 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
@@ -15,18 +15,27 @@
* Interface for a search pipeline processor that modifies a search request.
*/
public interface SearchRequestProcessor extends Processor {
-
/**
- * Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
- * executes.
- *
+ * Process a SearchRequest without receiving request-scoped state.
* Implement this method if the processor makes no asynchronous calls.
- * @param request the executed {@link SearchRequest}
- * @return a new {@link SearchRequest} (or the input {@link SearchRequest} if no changes)
- * @throws Exception if an error occurs during processing
+ * @param request the search request (which may have been modified by an earlier processor)
+ * @return the modified search request
+ * @throws Exception implementation-specific processing exception
*/
SearchRequest processRequest(SearchRequest request) throws Exception;
+ /**
+ * Process a SearchRequest, with request-scoped state shared across processors in the pipeline
+ * Implement this method if the processor makes no asynchronous calls.
+ * @param request the search request (which may have been modified by an earlier processor)
+ * @param requestContext request-scoped state shared across processors in the pipeline
+ * @return the modified search request
+ * @throws Exception implementation-specific processing exception
+ */
+ default SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
+ return processRequest(request);
+ }
+
/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
@@ -35,9 +44,13 @@ public interface SearchRequestProcessor extends Processor {
* @param request the executed {@link SearchRequest}
* @param requestListener callback to be invoked on successful processing or on failure
*/
- default void processRequestAsync(SearchRequest request, ActionListener requestListener) {
+ default void processRequestAsync(
+ SearchRequest request,
+ PipelineProcessingContext requestContext,
+ ActionListener requestListener
+ ) {
try {
- requestListener.onResponse(processRequest(request));
+ requestListener.onResponse(processRequest(request, requestContext));
} catch (Exception e) {
requestListener.onFailure(e);
}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
index 21136ce208fee..98591ab9d0def 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
@@ -21,24 +21,47 @@ public interface SearchResponseProcessor extends Processor {
* Transform a {@link SearchResponse}, possibly based on the executed {@link SearchRequest}.
*
* Implement this method if the processor makes no asynchronous calls.
- * @param request the executed {@link SearchRequest}
+ *
+ * @param request the executed {@link SearchRequest}
* @param response the current {@link SearchResponse}, possibly modified by earlier processors
* @return a modified {@link SearchResponse} (or the input {@link SearchResponse} if no changes)
* @throws Exception if an error occurs during processing
*/
SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception;
+ /**
+ * Process a SearchResponse, with request-scoped state shared across processors in the pipeline
+ *
+ * Implement this method if the processor makes no asynchronous calls.
+ *
+ * @param request the (maybe transformed) search request
+ * @param response the search response (which may have been modified by an earlier processor)
+ * @param requestContext request-scoped state shared across processors in the pipeline
+ * @return the modified search response
+ * @throws Exception implementation-specific processing exception
+ */
+ default SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
+ throws Exception {
+ return processResponse(request, response);
+ }
+
/**
* Transform a {@link SearchResponse}, possibly based on the executed {@link SearchRequest}.
*
* Expert method: Implement this if the processor needs to make asynchronous calls. Otherwise, implement processResponse.
- * @param request the executed {@link SearchRequest}
- * @param response the current {@link SearchResponse}, possibly modified by earlier processors
+ *
+ * @param request the executed {@link SearchRequest}
+ * @param response the current {@link SearchResponse}, possibly modified by earlier processors
* @param responseListener callback to be invoked on successful processing or on failure
*/
- default void processResponseAsync(SearchRequest request, SearchResponse response, ActionListener responseListener) {
+ default void processResponseAsync(
+ SearchRequest request,
+ SearchResponse response,
+ PipelineProcessingContext requestContext,
+ ActionListener responseListener
+ ) {
try {
- responseListener.onResponse(processResponse(request, response));
+ responseListener.onResponse(processResponse(request, response, requestContext));
} catch (Exception e) {
responseListener.onFailure(e);
}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java
new file mode 100644
index 0000000000000..67e1c1147cb87
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java
@@ -0,0 +1,25 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.pipeline;
+
+import org.opensearch.action.search.SearchRequest;
+
+/**
+ * A specialization of {@link SearchRequestProcessor} that makes use of the request-scoped processor state.
+ * Implementors must implement the processRequest method that accepts request-scoped processor state.
+ */
+public interface StatefulSearchRequestProcessor extends SearchRequestProcessor {
+ @Override
+ default SearchRequest processRequest(SearchRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception;
+}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java
new file mode 100644
index 0000000000000..f0842d24e1b56
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java
@@ -0,0 +1,27 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.pipeline;
+
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+
+/**
+ * A specialization of {@link SearchResponseProcessor} that makes use of the request-scoped processor state.
+ * Implementors must implement the processResponse method that accepts request-scoped processor state.
+ */
+public interface StatefulSearchResponseProcessor extends SearchResponseProcessor {
+ @Override
+ default SearchResponse processResponse(SearchRequest request, SearchResponse response) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
+ throws Exception;
+}
diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
index c825ecc8abe9f..12052598d3671 100644
--- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
+++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
@@ -231,6 +231,7 @@ public ThreadPool(
final Map builders = new HashMap<>();
final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
+ final int halfProc = halfAllocatedProcessors(allocatedProcessors);
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
@@ -264,13 +265,13 @@ public ThreadPool(
builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
builders.put(
Names.TRANSLOG_TRANSFER,
- new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
+ new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProc, TimeValue.timeValueMinutes(5))
);
builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000));
- builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
+ builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProc, TimeValue.timeValueMinutes(5)));
builders.put(
Names.REMOTE_REFRESH_RETRY,
- new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
+ new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProc, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.REMOTE_RECOVERY,
@@ -555,6 +556,10 @@ static int boundedBy(int value, int min, int max) {
return Math.min(max, Math.max(min, value));
}
+ static int halfAllocatedProcessors(int allocatedProcessors) {
+ return (allocatedProcessors + 1) / 2;
+ }
+
static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
}
diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
index e40826915c848..c4a782209421b 100644
--- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
+++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
@@ -117,6 +117,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
+import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
@@ -1217,6 +1218,27 @@ public void testvalidateIndexSettings() {
threadPool.shutdown();
}
+ public void testIndexTemplateReplicationType() {
+ Settings templateSettings = Settings.builder().put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build();
+
+ request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
+ final Settings.Builder requestSettings = Settings.builder();
+ request.settings(requestSettings.build());
+ Settings indexSettings = aggregateIndexSettings(
+ ClusterState.EMPTY_STATE,
+ request,
+ templateSettings,
+ null,
+ Settings.EMPTY,
+ IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
+ randomShardLimitService(),
+ Collections.emptySet(),
+ clusterSettings
+ );
+ assertNotEquals(ReplicationType.SEGMENT, clusterSettings.get(CLUSTER_REPLICATION_TYPE_SETTING));
+ assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(INDEX_REPLICATION_TYPE_SETTING.getKey()));
+ }
+
public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettings() {
Settings settings = Settings.builder()
.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT)
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
index 8f2db5db969d2..052c7877404a8 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
@@ -200,4 +200,88 @@ public void testTargetPoolDedicatedSearchNodeAllocationDecisions() {
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}
+
+ public void testDebugMessage() {
+ ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2);
+ AllocationService service = this.createRemoteCapableAllocationService();
+ clusterState = allocateShardsAndBalance(clusterState, service);
+
+ // Add an unassigned primary shard for force allocation checks
+ Metadata metadata = Metadata.builder(clusterState.metadata())
+ .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
+ .build();
+ RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable())
+ .addAsNew(metadata.index("test_local_unassigned"))
+ .build();
+ clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build();
+
+ // Add remote index unassigned primary
+ clusterState = createRemoteIndex(clusterState, "test_remote_unassigned");
+
+ RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes();
+ RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes);
+ globalAllocation.setDebugMode(RoutingAllocation.DebugMode.ON);
+
+ ShardRouting localShard = clusterState.routingTable()
+ .allShards(getIndexName(0, false))
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting remoteShard = clusterState.routingTable()
+ .allShards(getIndexName(0, true))
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting unassignedLocalShard = clusterState.routingTable()
+ .allShards("test_local_unassigned")
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting unassignedRemoteShard = clusterState.routingTable()
+ .allShards("test_remote_unassigned")
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index());
+ IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index());
+ String localNodeId = LOCAL_NODE_PREFIX;
+ for (RoutingNode routingNode : globalAllocation.routingNodes()) {
+ if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) {
+ localNodeId = routingNode.nodeId();
+ break;
+ }
+ }
+ String remoteNodeId = remoteShard.currentNodeId();
+ RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId);
+ RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId);
+
+ TargetPoolAllocationDecider targetPoolAllocationDecider = new TargetPoolAllocationDecider();
+ Decision decision = targetPoolAllocationDecider.canAllocate(localShard, remoteCapableNode, globalAllocation);
+ assertEquals(
+ "Routing pools are incompatible. Shard pool: [LOCAL_ONLY], node pool: [REMOTE_CAPABLE] without [data] role",
+ decision.getExplanation()
+ );
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteShard, localOnlyNode, globalAllocation);
+ assertEquals("Routing pools are incompatible. Shard pool: [REMOTE_CAPABLE], node pool: [LOCAL_ONLY]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteShard, remoteCapableNode, globalAllocation);
+ assertEquals("Routing pools are compatible. Shard pool: [REMOTE_CAPABLE], node pool: [REMOTE_CAPABLE]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(localIdx, remoteCapableNode, globalAllocation);
+ assertEquals(
+ "Routing pools are incompatible. Index pool: [LOCAL_ONLY], node pool: [REMOTE_CAPABLE] without [data] role",
+ decision.getExplanation()
+ );
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteIdx, localOnlyNode, globalAllocation);
+ assertEquals("Routing pools are incompatible. Index pool: [REMOTE_CAPABLE], node pool: [LOCAL_ONLY]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteIdx, remoteCapableNode, globalAllocation);
+ assertEquals("Routing pools are compatible. Index pool: [REMOTE_CAPABLE], node pool: [REMOTE_CAPABLE]", decision.getExplanation());
+ }
}
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
index a33e5f453d1e1..074f659850c7b 100644
--- a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
@@ -8,21 +8,36 @@
package org.opensearch.common.blobstore.transfer;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
+import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
+import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
public class RemoteTransferContainerTests extends OpenSearchTestCase {
@@ -92,25 +107,37 @@ private void testSupplyStreamContext(
int partCount = streamContext.getNumberOfParts();
assertEquals(expectedPartCount, partCount);
Thread[] threads = new Thread[partCount];
+ InputStream[] streams = new InputStream[partCount];
long totalContentLength = remoteTransferContainer.getContentLength();
assert partSize * (partCount - 1) + lastPartSize == totalContentLength
: "part sizes and last part size don't add up to total content length";
logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts());
- for (int partIdx = 0; partIdx < partCount; partIdx++) {
- int finalPartIdx = partIdx;
- long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
- threads[partIdx] = new Thread(() -> {
+ try {
+ for (int partIdx = 0; partIdx < partCount; partIdx++) {
+ int finalPartIdx = partIdx;
+ long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
+ threads[partIdx] = new Thread(() -> {
+ try {
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
+ streams[finalPartIdx] = inputStreamContainer.getInputStream();
+ assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
+ } catch (IOException e) {
+ fail("IOException during stream creation");
+ }
+ });
+ threads[partIdx].start();
+ }
+ for (int i = 0; i < partCount; i++) {
+ threads[i].join();
+ }
+ } finally {
+ Arrays.stream(streams).forEach(stream -> {
try {
- InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
- assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
+ stream.close();
} catch (IOException e) {
- fail("IOException during stream creation");
+ throw new RuntimeException(e);
}
});
- threads[partIdx].start();
- }
- for (int i = 0; i < partCount; i++) {
- threads[i].join();
}
}
@@ -182,6 +209,7 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
}
private void testTypeOfProvidedStreams(boolean isRemoteDataIntegritySupported) throws IOException {
+ InputStream inputStream = null;
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
testFile.getFileName().toString(),
@@ -201,12 +229,132 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
) {
StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ inputStream = inputStreamContainer.getInputStream();
if (shouldOffsetInputStreamsBeChecked(isRemoteDataIntegritySupported)) {
assertTrue(inputStreamContainer.getInputStream() instanceof ResettableCheckedInputStream);
} else {
assertTrue(inputStreamContainer.getInputStream() instanceof OffsetRangeInputStream);
}
assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16));
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ }
+
+ public void testCloseDuringOngoingReadOnStream() throws IOException, InterruptedException {
+ Supplier rateLimiterSupplier = Mockito.mock(Supplier.class);
+ Mockito.when(rateLimiterSupplier.get()).thenReturn(null);
+ CountDownLatch readInvokedLatch = new CountDownLatch(1);
+ AtomicBoolean readAfterClose = new AtomicBoolean();
+ CountDownLatch streamClosed = new CountDownLatch(1);
+ AtomicBoolean indexInputClosed = new AtomicBoolean();
+ AtomicInteger closedCount = new AtomicInteger();
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.NORMAL,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ IndexInput indexInput = Mockito.mock(IndexInput.class);
+ Mockito.doAnswer(invocation -> {
+ indexInputClosed.set(true);
+ closedCount.incrementAndGet();
+ return null;
+ }).when(indexInput).close();
+ Mockito.when(indexInput.getFilePointer()).thenAnswer((Answer) invocation -> {
+ if (readAfterClose.get() == false) {
+ return 0L;
+ }
+ readInvokedLatch.countDown();
+ boolean closedSuccess = streamClosed.await(30, TimeUnit.SECONDS);
+ assertTrue(closedSuccess);
+ assertFalse(indexInputClosed.get());
+ return 0L;
+ });
+
+ OffsetRangeIndexInputStream offsetRangeIndexInputStream = new OffsetRangeIndexInputStream(
+ indexInput,
+ size,
+ position
+ );
+ return new RateLimitingOffsetRangeInputStream(offsetRangeIndexInputStream, rateLimiterSupplier, null);
+ }
+ },
+ 0,
+ true
+ )
+ ) {
+ StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ assertTrue(inputStreamContainer.getInputStream() instanceof RateLimitingOffsetRangeInputStream);
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ readAfterClose.set(true);
+ inputStreamContainer.getInputStream().readAllBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ boolean successReadWait = readInvokedLatch.await(30, TimeUnit.SECONDS);
+ assertTrue(successReadWait);
+ // Closing stream here. Test Multiple invocations of close. Shouldn't throw any exception
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ streamClosed.countDown();
+ boolean processed = latch.await(30, TimeUnit.SECONDS);
+ assertTrue(processed);
+ assertTrue(readAfterClose.get());
+ assertTrue(indexInputClosed.get());
+
+ // Test Multiple invocations of close. Close count should always be 1.
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ assertEquals(1, closedCount.get());
+
+ }
+ }
+
+ public void testReadAccessWhenStreamClosed() throws IOException {
+ Supplier rateLimiterSupplier = Mockito.mock(Supplier.class);
+ Mockito.when(rateLimiterSupplier.get()).thenReturn(null);
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.NORMAL,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ IndexInput indexInput = Mockito.mock(IndexInput.class);
+ OffsetRangeIndexInputStream offsetRangeIndexInputStream = new OffsetRangeIndexInputStream(
+ indexInput,
+ size,
+ position
+ );
+ return new RateLimitingOffsetRangeInputStream(offsetRangeIndexInputStream, rateLimiterSupplier, null);
+ }
+ },
+ 0,
+ true
+ )
+ ) {
+ StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ inputStreamContainer.getInputStream().close();
+ assertThrows(AlreadyClosedException.class, () -> inputStreamContainer.getInputStream().readAllBytes());
}
}
diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
index dc2111fdcfc56..46be10ce62840 100644
--- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
@@ -1416,7 +1416,7 @@ public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException,
indexShard,
indexShard.getPendingPrimaryTerm() + 1,
globalCheckpoint,
- randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo),
+ randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNoOfUpdatesOrDeletesBeforeRollback),
new ActionListener() {
@Override
public void onResponse(Releasable releasable) {
diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
index 36cfd84ff960a..2c6c4afed69fd 100644
--- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
+++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java
@@ -51,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -122,6 +123,14 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
1,
"node-1"
);
+ private final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
+ 10,
+ 36,
+ 34,
+ 1,
+ 1,
+ "node-1"
+ );
@Before
public void setup() throws IOException {
@@ -979,6 +988,51 @@ public void testDeleteStaleCommitsActualDelete() throws Exception {
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
}
+ public void testDeleteStaleCommitsDeleteDedup() throws Exception {
+ Map> metadataFilenameContentMapping = new HashMap<>(populateMetadata());
+ metadataFilenameContentMapping.put(metadataFilename4, metadataFilenameContentMapping.get(metadataFilename3));
+
+ when(
+ remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
+ RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX,
+ Integer.MAX_VALUE
+ )
+ ).thenReturn(new ArrayList<>(List.of(metadataFilename, metadataFilename2, metadataFilename3, metadataFilename4)));
+
+ when(remoteMetadataDirectory.getBlobStream(metadataFilename4)).thenAnswer(
+ I -> createMetadataFileBytes(
+ metadataFilenameContentMapping.get(metadataFilename4),
+ indexShard.getLatestReplicationCheckpoint(),
+ segmentInfos
+ )
+ );
+
+ remoteSegmentStoreDirectory.init();
+
+ // popluateMetadata() adds stub to return 4 metadata files
+ // We are passing lastNMetadataFilesToKeep=2 here so that oldest 2 metadata files will be deleted
+ remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);
+
+ Set staleSegmentFiles = new HashSet<>();
+ for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
+ staleSegmentFiles.add(metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]);
+ }
+ for (String metadata : metadataFilenameContentMapping.get(metadataFilename4).values()) {
+ staleSegmentFiles.add(metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]);
+ }
+ staleSegmentFiles.forEach(file -> {
+ try {
+ // Even with the same files in 2 stale metadata files, delete should be called only once.
+ verify(remoteDataDirectory, times(1)).deleteFile(file);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
+ verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
+ verify(remoteMetadataDirectory).deleteFile(metadataFilename4);
+ }
+
public void testDeleteStaleCommitsActualDeleteIOException() throws Exception {
Map> metadataFilenameContentMapping = populateMetadata();
remoteSegmentStoreDirectory.init();
diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java
index 98d2a7e84d672..f5851e669a2da 100644
--- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java
+++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java
@@ -41,6 +41,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
+import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
@@ -68,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.mockito.ArgumentMatchers.anyString;
@@ -1378,4 +1380,92 @@ public void testExtraParameterInProcessorConfig() {
fail("Wrong exception type: " + e.getClass());
}
}
+
+ private static class FakeStatefulRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor {
+ private final String type;
+ private final Consumer stateConsumer;
+
+ public FakeStatefulRequestProcessor(String type, Consumer stateConsumer) {
+ super(null, null, false);
+ this.type = type;
+ this.stateConsumer = stateConsumer;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
+ stateConsumer.accept(requestContext);
+ return request;
+ }
+ }
+
+ private static class FakeStatefulResponseProcessor extends AbstractProcessor implements StatefulSearchResponseProcessor {
+ private final String type;
+ private final Consumer stateConsumer;
+
+ public FakeStatefulResponseProcessor(String type, Consumer stateConsumer) {
+ super(null, null, false);
+ this.type = type;
+ this.stateConsumer = stateConsumer;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
+ throws Exception {
+ stateConsumer.accept(requestContext);
+ return response;
+ }
+ }
+
+ public void testStatefulProcessors() throws Exception {
+ AtomicReference contextHolder = new AtomicReference<>();
+ SearchPipelineService searchPipelineService = createWithProcessors(
+ Map.of(
+ "write_context",
+ (pf, t, d, igf, cfg, ctx) -> new FakeStatefulRequestProcessor("write_context", (c) -> c.setAttribute("a", "b"))
+ ),
+ Map.of(
+ "read_context",
+ (pf, t, d, igf, cfg, ctx) -> new FakeStatefulResponseProcessor(
+ "read_context",
+ (c) -> contextHolder.set((String) c.getAttribute("a"))
+ )
+ ),
+ Collections.emptyMap()
+ );
+
+ SearchPipelineMetadata metadata = new SearchPipelineMetadata(
+ Map.of(
+ "p1",
+ new PipelineConfiguration(
+ "p1",
+ new BytesArray(
+ "{\"request_processors\" : [ { \"write_context\": {} } ], \"response_processors\": [ { \"read_context\": {} }] }"
+ ),
+ XContentType.JSON
+ )
+ )
+ );
+ ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
+ ClusterState previousState = clusterState;
+ clusterState = ClusterState.builder(clusterState)
+ .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata))
+ .build();
+ searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState));
+
+ PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"));
+ assertNull(contextHolder.get());
+ syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null));
+ assertNotNull(contextHolder.get());
+ assertEquals("b", contextHolder.get());
+ }
}
diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java
index 3a98a67b53920..ee816aa5f596d 100644
--- a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java
+++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java
@@ -145,6 +145,87 @@ public void run() {
assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue()));
}
+ public void testNoThreadContextToPreserve() throws InterruptedException, ExecutionException, TimeoutException {
+ final Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue()));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue()));
+
+ final Span local1 = tracer.startSpan(SpanCreationContext.internal().name("test-local-1"));
+ try (SpanScope localScope = tracer.withSpanInScope(local1)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local1.getParentSpan(), is(nullValue()));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local1));
+ }
+ }
+
+ final Span local2 = tracer.startSpan(SpanCreationContext.internal().name("test-local-2"));
+ try (SpanScope localScope = tracer.withSpanInScope(local2)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local2.getParentSpan(), is(nullValue()));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local2));
+ }
+ }
+
+ final Span local3 = tracer.startSpan(SpanCreationContext.internal().name("test-local-3"));
+ try (SpanScope localScope = tracer.withSpanInScope(local3)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local3.getParentSpan(), is(nullValue()));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local3));
+ }
+ }
+ }
+ };
+
+ executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS);
+
+ assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue()));
+ }
+
+ public void testPreservingContextThreadContextMultipleSpans() throws InterruptedException, ExecutionException, TimeoutException {
+ final Span span = tracer.startSpan(SpanCreationContext.internal().name("test"));
+
+ try (SpanScope scope = tracer.withSpanInScope(span)) {
+ final Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue())));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span));
+
+ final Span local1 = tracer.startSpan(SpanCreationContext.internal().name("test-local-1"));
+ try (SpanScope localScope = tracer.withSpanInScope(local1)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local1.getParentSpan(), is(span));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local1));
+ }
+ }
+
+ final Span local2 = tracer.startSpan(SpanCreationContext.internal().name("test-local-2"));
+ try (SpanScope localScope = tracer.withSpanInScope(local2)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local2.getParentSpan(), is(span));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local2));
+ }
+ }
+
+ final Span local3 = tracer.startSpan(SpanCreationContext.internal().name("test-local-3"));
+ try (SpanScope localScope = tracer.withSpanInScope(local3)) {
+ try (StoredContext ignored = threadContext.stashContext()) {
+ assertThat(local3.getParentSpan(), is(span));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local3));
+ }
+ }
+ }
+ };
+
+ executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS);
+ }
+
+ assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue())));
+ assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue()));
+ }
+
public void testPreservingContextAndStashingThreadContext() throws InterruptedException, ExecutionException, TimeoutException {
final Span span = tracer.startSpan(SpanCreationContext.internal().name("test"));
diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java
index 19271bbf30e80..97326377ce245 100644
--- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java
+++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java
@@ -150,10 +150,10 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
- sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen);
+ sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n);
- sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive);
- sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen);
+ sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors);
+ sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
return sizes.get(threadPoolName).apply(numberOfProcessors);
}
diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle
index ea677de632254..bb9d70ac39398 100644
--- a/test/fixtures/hdfs-fixture/build.gradle
+++ b/test/fixtures/hdfs-fixture/build.gradle
@@ -33,7 +33,7 @@ apply plugin: 'opensearch.java'
group = 'hdfs'
versions << [
- 'jetty': '9.4.52.v20230823'
+ 'jetty': '9.4.53.v20231009'
]
dependencies {
@@ -48,6 +48,9 @@ dependencies {
exclude group: "com.squareup.okhttp3"
exclude group: "org.xerial.snappy"
exclude module: "json-io"
+ exclude module: "logback-core"
+ exclude module: "logback-classic"
+ exclude module: "avro"
}
api "org.codehaus.jettison:jettison:${versions.jettison}"
api "org.apache.commons:commons-compress:${versions.commonscompress}"
@@ -66,7 +69,9 @@ dependencies {
api "org.eclipse.jetty.websocket:javax-websocket-server-impl:${versions.jetty}"
api 'org.apache.zookeeper:zookeeper:3.9.1'
api "org.apache.commons:commons-text:1.11.0"
- api "commons-net:commons-net:3.9.0"
+ api "commons-net:commons-net:3.10.0"
+ api "ch.qos.logback:logback-core:1.2.13"
+ api "ch.qos.logback:logback-classic:1.2.13"
runtimeOnly "com.google.guava:guava:${versions.guava}"
runtimeOnly("com.squareup.okhttp3:okhttp:4.12.0") {
exclude group: "com.squareup.okio"
diff --git a/test/framework/src/main/java/org/opensearch/bootstrap/BootstrapForTesting.java b/test/framework/src/main/java/org/opensearch/bootstrap/BootstrapForTesting.java
index 8f065de35aa8b..43881d0660e04 100644
--- a/test/framework/src/main/java/org/opensearch/bootstrap/BootstrapForTesting.java
+++ b/test/framework/src/main/java/org/opensearch/bootstrap/BootstrapForTesting.java
@@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -161,12 +162,17 @@ public class BootstrapForTesting {
addClassCodebase(codebases, "opensearch-rest-client", "org.opensearch.client.RestClient");
}
final Policy testFramework = Security.readPolicy(Bootstrap.class.getResource("test-framework.policy"), codebases);
+ // Allow modules to define own test policy in ad-hoc fashion (if needed) that is not really applicable to other modules
+ final Optional testPolicy = Optional.ofNullable(Bootstrap.class.getResource("test.policy"))
+ .map(policy -> Security.readPolicy(policy, codebases));
final Policy opensearchPolicy = new OpenSearchPolicy(codebases, perms, getPluginPermissions(), true, new Permissions());
Policy.setPolicy(new Policy() {
@Override
public boolean implies(ProtectionDomain domain, Permission permission) {
// implements union
- return opensearchPolicy.implies(domain, permission) || testFramework.implies(domain, permission);
+ return opensearchPolicy.implies(domain, permission)
+ || testFramework.implies(domain, permission)
+ || testPolicy.map(policy -> policy.implies(domain, permission)).orElse(false /* no policy */);
}
});
// Create access control context for mocking
diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java
index 7614cd0e8f920..6215e84f42676 100644
--- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java
@@ -71,6 +71,7 @@
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
+import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
@@ -1646,6 +1647,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
for (List segmented : partition) {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (IndexRequestBuilder indexRequestBuilder : segmented) {
+ indexRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
bulkBuilder.add(indexRequestBuilder);
}
BulkResponse actionGet = bulkBuilder.execute().actionGet();