diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index d5f29503cb325..0d72c554caee1 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -213,7 +213,7 @@ public Collection createComponents( final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver expressionResolver, final Supplier repositoriesServiceSupplier, - ContextSwitcher contextSwitcher + final ContextSwitcher contextSwitcher ) { int urgentEventLoopThreads = urgentPoolCount(clusterService.getSettings()); int priorityEventLoopThreads = priorityPoolCount(clusterService.getSettings()); diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java index 316e2104ed7f0..27879c6d2a6c7 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; @@ -26,7 +27,6 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; -import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java index 0fa3861226cdf..ca9a8fc295c67 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestGetExecutionContextRestAction.java @@ -9,13 +9,13 @@ package org.opensearch.http; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.rest.RestStatus; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; -import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.threadpool.ThreadPool; import java.util.List; diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java index 1a5898f58d871..3f57ec9e421c8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/SimpleClusterStateIT.java @@ -471,7 +471,7 @@ public Collection createComponents( final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver expressionResolver, final Supplier repositoriesServiceSupplier, - ContextSwitcher contextSwitcher + final ContextSwitcher contextSwitcher ) { clusterService.addListener(event -> { final ClusterState state = event.state(); diff --git a/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java index cff2992653f1c..2b6931c3ade4a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java @@ -375,7 +375,7 @@ public Collection createComponents( final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver expressionResolver, final Supplier repositoriesServiceSupplier, - ContextSwitcher contextSwitcher + final ContextSwitcher contextSwitcher ) { return Collections.emptyList(); } diff --git a/server/src/main/java/org/opensearch/client/OriginSettingClient.java b/server/src/main/java/org/opensearch/client/OriginSettingClient.java index 1b0e08cc489c4..58efbaaab1e9a 100644 --- a/server/src/main/java/org/opensearch/client/OriginSettingClient.java +++ b/server/src/main/java/org/opensearch/client/OriginSettingClient.java @@ -35,16 +35,20 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionType; import org.opensearch.action.support.ContextPreservingActionListener; +import org.opensearch.common.util.concurrent.ContextSwitcher; +import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import java.util.function.Supplier; +import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME; + /** * A {@linkplain Client} that sends requests with the - * {@link ThreadContext#stashWithOrigin origin} set to a particular - * value and calls its {@linkplain ActionListener} in its original + * origin set to a particular value and calls its {@linkplain ActionListener} + * in its original * {@link ThreadContext}. * * @opensearch.internal @@ -52,10 +56,12 @@ public final class OriginSettingClient extends FilterClient { private final String origin; + private final ContextSwitcher contextSwitcher; public OriginSettingClient(Client in, String origin) { super(in); this.origin = origin; + this.contextSwitcher = new InternalContextSwitcher(in().threadPool()); } @Override @@ -65,7 +71,8 @@ protected void ActionListener listener ) { final Supplier supplier = in().threadPool().getThreadContext().newRestorableContext(false); - try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) { + try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) { + in().threadPool().getThreadContext().putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); } } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 6c6049f04231b..caee1658b16e6 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -415,6 +415,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; @@ -437,11 +438,13 @@ public abstract class AbstractClient implements Client { protected final Settings settings; private final ThreadPool threadPool; private final Admin admin; + private final InternalContextSwitcher contextSwitcher; public AbstractClient(Settings settings, ThreadPool threadPool) { this.settings = settings; this.threadPool = threadPool; this.admin = new Admin(this); + this.contextSwitcher = new InternalContextSwitcher(threadPool); this.logger = LogManager.getLogger(this.getClass()); } @@ -2147,8 +2150,7 @@ protected void Request request, ActionListener listener ) { - ThreadContext threadContext = threadPool().getThreadContext(); - try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) { + try (ThreadContext.StoredContext ctx = contextSwitcher.stashAndMergeHeaders(headers)) { super.doExecute(action, request, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index 4ef014bd08621..0bd84e8381afe 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -58,6 +58,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; @@ -105,7 +106,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final ClusterSettings clusterSettings; protected final ThreadPool threadPool; - protected final InternalContextSwitcher contextSwitcher; + protected final ContextSwitcher contextSwitcher; private volatile TimeValue slowTaskLoggingThreshold; diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index c515c3b13494a..036766f981b36 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.CountDown; import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.common.util.concurrent.InternalContextSwitcher; @@ -134,7 +135,7 @@ public class MasterService extends AbstractLifecycleComponent { private volatile TimeValue slowTaskLoggingThreshold; protected final ThreadPool threadPool; - protected final InternalContextSwitcher contextSwitcher; + protected final ContextSwitcher contextSwitcher; private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ContextSwitcher.java b/server/src/main/java/org/opensearch/common/util/concurrent/ContextSwitcher.java index efa92dceb2124..c1976add3f05c 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ContextSwitcher.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ContextSwitcher.java @@ -9,27 +9,13 @@ package org.opensearch.common.util.concurrent; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.plugins.Plugin; -import org.opensearch.threadpool.ThreadPool; /** - * ContextSwitcher that is passed to plugins in order to switch to a fresh context - * from any existing context + * ContextSwitcher interface * * @opensearch.api */ @PublicApi(since = "2.17.0") -public class ContextSwitcher { - - private final ThreadPool threadPool; - private final Plugin plugin; - - public ContextSwitcher(ThreadPool threadPool, Plugin plugin) { - this.threadPool = threadPool; - this.plugin = plugin; - } - - public ThreadContext.StoredContext switchContext() { - return threadPool.getThreadContext().stashContext(plugin.getClass()); - } +public interface ContextSwitcher { + ThreadContext.StoredContext switchContext(); } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/InternalContextSwitcher.java b/server/src/main/java/org/opensearch/common/util/concurrent/InternalContextSwitcher.java index 47aeda6fc0f3c..162b13de52d56 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/InternalContextSwitcher.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/InternalContextSwitcher.java @@ -11,6 +11,8 @@ import org.opensearch.common.annotation.InternalApi; import org.opensearch.threadpool.ThreadPool; +import java.util.Map; + /** * InternalContextSwitcher is an internal class used to switch into a fresh * internal system context @@ -18,14 +20,19 @@ * @opensearch.internal */ @InternalApi -public class InternalContextSwitcher { +public class InternalContextSwitcher implements ContextSwitcher { private final ThreadPool threadPool; public InternalContextSwitcher(ThreadPool threadPool) { this.threadPool = threadPool; } + @Override public ThreadContext.StoredContext switchContext() { return threadPool.getThreadContext().stashContext(); } + + public ThreadContext.StoredContext stashAndMergeHeaders(Map headers) { + return threadPool.getThreadContext().stashAndMergeHeaders(headers); + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/PluginContextSwitcher.java b/server/src/main/java/org/opensearch/common/util/concurrent/PluginContextSwitcher.java new file mode 100644 index 0000000000000..7a2c245a27cf2 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/PluginContextSwitcher.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.plugins.Plugin; +import org.opensearch.threadpool.ThreadPool; + +/** + * ContextSwitcher that is passed to plugins in order to switch to a fresh context + * from any existing context + * + * @opensearch.api + */ +@PublicApi(since = "2.17.0") +public class PluginContextSwitcher implements ContextSwitcher { + + private final ThreadPool threadPool; + private final Plugin plugin; + + public PluginContextSwitcher(ThreadPool threadPool, Plugin plugin) { + this.threadPool = threadPool; + this.plugin = plugin; + } + + @Override + public ThreadContext.StoredContext switchContext() { + return threadPool.getThreadContext().stashContext(plugin.getClass()); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 446459e7d62e8..3f03c8b57aeed 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -182,7 +182,7 @@ StoredContext stashContext() { * Removes the current context and resets a default context. Retains information about plugin stashing the context. * The removed context can be restored by closing the returned {@link StoredContext}. */ - public StoredContext stashContext(Class pluginClass) { + StoredContext stashContext(Class pluginClass) { final ThreadContextStruct context = threadLocal.get(); /* X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads. @@ -246,7 +246,7 @@ public Writeable captureAsWriteable() { * but the tasks API will perform a get on their behalf using this method * if it can't find the task in memory. */ - public StoredContext stashWithOrigin(String origin) { + StoredContext stashWithOrigin(String origin) { final ThreadContext.StoredContext storedContext = stashContext(); putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); return storedContext; @@ -257,7 +257,7 @@ public StoredContext stashWithOrigin(String origin) { * The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers * that are already existing are preserved unless they are defaults. */ - public StoredContext stashAndMergeHeaders(Map headers) { + StoredContext stashAndMergeHeaders(Map headers) { final ThreadContextStruct context = threadLocal.get(); Map newHeader = new HashMap<>(headers); newHeader.putAll(context.requestHeaders); diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index c9386ad9c491b..712713f83023b 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -45,6 +45,7 @@ import org.opensearch.common.transport.NetworkExceptionHelper; import org.opensearch.common.transport.PortsRange; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -100,7 +101,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo protected final NetworkService networkService; protected final BigArrays bigArrays; protected final ThreadPool threadPool; - protected final InternalContextSwitcher contextSwitcher; + protected final ContextSwitcher contextSwitcher; protected final Dispatcher dispatcher; protected final CorsHandler corsHandler; private final NamedXContentRegistry xContentRegistry; diff --git a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java index eaf6d7c85220d..dcac7fac54def 100644 --- a/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/GlobalCheckpointSyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -70,7 +71,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< ReplicationResponse> { public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync"; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; @Inject public GlobalCheckpointSyncAction( diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 7506740fe84bf..9b3d984285621 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -85,7 +86,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; protected Logger getLogger() { return LOGGER; diff --git a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java index 9d99c749832aa..ef4b9f33b786c 100644 --- a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java @@ -37,6 +37,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.lease.Releasable; import org.opensearch.common.metrics.MeanMetric; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; @@ -66,7 +67,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, private final Runnable forceRefresh; private final Logger logger; private final ThreadPool threadPool; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; private final MeanMetric refreshMetric; /** diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 551fcf2b18b87..03b7c048c66da 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -23,6 +23,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -59,7 +60,7 @@ public class PublishCheckpointAction extends TransportReplicationAction< protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); private final SegmentReplicationTargetService replicationService; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; @Inject public PublishCheckpointAction( diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 9b04d7d16bec7..a8873aceff3b0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -115,6 +115,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.ContextSwitcher; +import org.opensearch.common.util.concurrent.PluginContextSwitcher; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.Assertions; import org.opensearch.core.common.breaker.CircuitBreaker; @@ -932,7 +933,7 @@ protected Node( final ViewService viewService = new ViewService(clusterService, client, null); Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> { - ContextSwitcher contextSwitcher = new ContextSwitcher(threadPool, p); + ContextSwitcher contextSwitcher = new PluginContextSwitcher(threadPool, p); return p.createComponents( client, clusterService, @@ -952,7 +953,7 @@ protected Node( Collection telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class) .stream() .flatMap(p -> { - ContextSwitcher contextSwitcher = new ContextSwitcher(threadPool, (Plugin) p); + ContextSwitcher contextSwitcher = new PluginContextSwitcher(threadPool, (Plugin) p); return p.createComponents( client, clusterService, diff --git a/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java b/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java index fb90e6761df76..835734974ca63 100644 --- a/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java +++ b/server/src/main/java/org/opensearch/transport/NativeMessageHandler.java @@ -39,6 +39,7 @@ import org.opensearch.Version; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.io.stream.ByteBufferStreamInput; @@ -75,7 +76,7 @@ public class NativeMessageHandler implements ProtocolMessageHandler { private static final Logger logger = LogManager.getLogger(NativeMessageHandler.class); private final ThreadPool threadPool; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; private final NativeOutboundHandler outboundHandler; private final NamedWriteableRegistry namedWriteableRegistry; private final TransportHandshaker handshaker; diff --git a/server/src/main/java/org/opensearch/transport/OutboundHandler.java b/server/src/main/java/org/opensearch/transport/OutboundHandler.java index 837f65b12e7b2..8ab283fbbef8f 100644 --- a/server/src/main/java/org/opensearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/OutboundHandler.java @@ -40,6 +40,7 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.network.CloseableChannel; import org.opensearch.common.transport.NetworkExceptionHelper; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -60,7 +61,7 @@ public final class OutboundHandler { private final StatsTracker statsTracker; private final ThreadPool threadPool; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; public OutboundHandler(StatsTracker statsTracker, ThreadPool threadPool) { this.statsTracker = statsTracker; diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java index 559a8bd48c26a..f27ab8132f26c 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; @@ -72,7 +73,7 @@ final class RemoteClusterConnection implements Closeable { private final RemoteConnectionStrategy connectionStrategy; private final String clusterAlias; private final ThreadPool threadPool; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; private volatile boolean skipUnavailable; private final TimeValue initialConnectionTimeout; diff --git a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java index 915235d4825a7..57b44d9dd4ccd 100644 --- a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java @@ -46,6 +46,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; @@ -150,7 +151,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); private final String proxyAddress; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; SniffConnectionStrategy( String clusterAlias, diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkProcessorTests.java index 648c97da5fb65..31585d2b7a134 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkProcessorTests.java @@ -37,6 +37,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -69,7 +70,7 @@ public class BulkProcessorTests extends OpenSearchTestCase { private ThreadPool threadPool; - private InternalContextSwitcher contextSwitcher; + private ContextSwitcher contextSwitcher; private final Logger logger = LogManager.getLogger(BulkProcessorTests.class); @Before diff --git a/server/src/test/java/org/opensearch/action/support/ContextPreservingActionListenerTests.java b/server/src/test/java/org/opensearch/action/support/ContextPreservingActionListenerTests.java index 7e283be59fdf6..f93f66c3aec40 100644 --- a/server/src/test/java/org/opensearch/action/support/ContextPreservingActionListenerTests.java +++ b/server/src/test/java/org/opensearch/action/support/ContextPreservingActionListenerTests.java @@ -31,6 +31,7 @@ package org.opensearch.action.support; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -48,7 +49,7 @@ public class ContextPreservingActionListenerTests extends OpenSearchTestCase { public void testOriginalContextIsPreservedAfterOnResponse() throws IOException { ThreadPool threadPool = new TestThreadPool("ContextPreservingActionListenerTests.testOriginalContextIsPreservedAfterOnResponse"); ThreadContext threadContext = threadPool.getThreadContext(); - InternalContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); + ContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); final boolean nonEmptyContext = randomBoolean(); if (nonEmptyContext) { threadContext.putHeader("not empty", "value"); @@ -87,7 +88,7 @@ public void onFailure(Exception e) { public void testOriginalContextIsPreservedAfterOnFailure() throws Exception { ThreadPool threadPool = new TestThreadPool("ContextPreservingActionListenerTests.testOriginalContextIsPreservedAfterOnFailure"); ThreadContext threadContext = threadPool.getThreadContext(); - InternalContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); + ContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); final boolean nonEmptyContext = randomBoolean(); if (nonEmptyContext) { threadContext.putHeader("not empty", "value"); @@ -128,7 +129,7 @@ public void onFailure(Exception e) { public void testOriginalContextIsWhenListenerThrows() throws Exception { ThreadPool threadPool = new TestThreadPool("ContextPreservingActionListenerTests.testOriginalContextIsWhenListenerThrows"); ThreadContext threadContext = threadPool.getThreadContext(); - InternalContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); + ContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); final boolean nonEmptyContext = randomBoolean(); if (nonEmptyContext) { threadContext.putHeader("not empty", "value"); @@ -178,7 +179,7 @@ public void onFailure(Exception e) { public void testToStringIncludesDelegate() { ThreadPool threadPool = new TestThreadPool("ContextPreservingActionListenerTests.testToStringIncludesDelegate"); ThreadContext threadContext = threadPool.getThreadContext(); - InternalContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); + ContextSwitcher contextSwitcher = new InternalContextSwitcher(threadPool); final ContextPreservingActionListener actionListener; try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) { final ActionListener delegate = new ActionListener() { diff --git a/server/src/test/java/org/opensearch/cluster/metadata/TemplateUpgradeServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/TemplateUpgradeServiceTests.java index 5935dd0544615..bd61596084374 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -46,6 +46,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -91,7 +92,7 @@ public class TemplateUpgradeServiceTests extends OpenSearchTestCase { private ThreadPool threadPool; private ClusterService clusterService; - private InternalContextSwitcher contextSwitcher; + private ContextSwitcher contextSwitcher; @Before public void setUpTest() throws Exception { diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java index 475c2f12fcc7f..8de8099b9ffd1 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java @@ -51,6 +51,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.telemetry.metrics.Histogram; @@ -92,7 +93,7 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private static InternalContextSwitcher contextSwitcher; + private static ContextSwitcher contextSwitcher; private TimedClusterApplierService clusterApplierService; private static MetricsRegistry metricsRegistry; private static Histogram applierslatencyHistogram; diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 846990d32c97b..41b0ef955fef4 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -60,6 +60,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.BaseFuture; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.node.Node; @@ -112,7 +113,7 @@ public class MasterServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private static InternalContextSwitcher contextSwitcher; + private static ContextSwitcher contextSwitcher; private static long timeDiffInMillis; @BeforeClass diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index 226bc88cddb9c..d5add99741080 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -49,6 +49,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; @@ -106,7 +107,7 @@ public class RefreshListenersTests extends OpenSearchTestCase { private Engine engine; private volatile int maxListeners; private ThreadPool threadPool; - private InternalContextSwitcher contextSwitcher; + private ContextSwitcher contextSwitcher; private Store store; private MeanMetric refreshMetric; diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java index bfd43c8804fd4..339c85084fb41 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.ThreadContext.StoredContext; @@ -43,7 +44,7 @@ public class ThreadContextBasedTracerContextStorageTests extends OpenSearchTestCase { private Tracer tracer; private ThreadPool threadPool; - private InternalContextSwitcher contextSwitcher; + private ContextSwitcher contextSwitcher; private ThreadContext threadContext; private TracerContextStorage threadContextStorage; private ExecutorService executorService; diff --git a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java index cf20c34cca660..0575790a508f4 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolClusterManagerService.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ContextSwitcher; import org.opensearch.common.util.concurrent.InternalContextSwitcher; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; @@ -64,7 +65,7 @@ public class FakeThreadPoolClusterManagerService extends ClusterManagerService { private final String name; private final List pendingTasks = new ArrayList<>(); private final Consumer onTaskAvailableToRun; - private final InternalContextSwitcher contextSwitcher; + private final ContextSwitcher contextSwitcher; private boolean scheduledNextTask = false; private boolean taskInProgress = false; private boolean waitForPublish = false; diff --git a/test/framework/src/main/java/org/opensearch/common/util/concurrent/TestContextSwitcher.java b/test/framework/src/main/java/org/opensearch/common/util/concurrent/TestContextSwitcher.java new file mode 100644 index 0000000000000..6e0ffb610d202 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/common/util/concurrent/TestContextSwitcher.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.opensearch.threadpool.ThreadPool; + +/** + * Test Context Switcher + */ +public class TestContextSwitcher implements ContextSwitcher { + private final ThreadPool threadPool; + private final Class pluginClass; + + public TestContextSwitcher(ThreadPool threadPool, Class pluginClass) { + this.threadPool = threadPool; + this.pluginClass = pluginClass; + } + + @Override + public ThreadContext.StoredContext switchContext() { + return threadPool.getThreadContext().stashContext(pluginClass); + } +}