Skip to content

Commit

Permalink
Make stashContext package-private
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Jul 23, 2024
1 parent 711d542 commit e2ebb36
Show file tree
Hide file tree
Showing 32 changed files with 142 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public Collection<Object> createComponents(
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier,
ContextSwitcher contextSwitcher
final ContextSwitcher contextSwitcher
) {
int urgentEventLoopThreads = urgentPoolCount(clusterService.getSettings());
int priorityEventLoopThreads = priorityPoolCount(clusterService.getSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand All @@ -26,7 +27,6 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
package org.opensearch.http;

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public Collection<Object> createComponents(
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier,
ContextSwitcher contextSwitcher
final ContextSwitcher contextSwitcher
) {
clusterService.addListener(event -> {
final ClusterState state = event.state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public Collection<Object> createComponents(
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier,
ContextSwitcher contextSwitcher
final ContextSwitcher contextSwitcher
) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,33 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ContextPreservingActionListener;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;

import java.util.function.Supplier;

import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME;

/**
* A {@linkplain Client} that sends requests with the
* {@link ThreadContext#stashWithOrigin origin} set to a particular
* value and calls its {@linkplain ActionListener} in its original
* origin set to a particular value and calls its {@linkplain ActionListener}
* in its original
* {@link ThreadContext}.
*
* @opensearch.internal
*/
public final class OriginSettingClient extends FilterClient {

private final String origin;
private final ContextSwitcher contextSwitcher;

public OriginSettingClient(Client in, String origin) {
super(in);
this.origin = origin;
this.contextSwitcher = new InternalContextSwitcher(in().threadPool());
}

@Override
Expand All @@ -65,7 +71,8 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
ActionListener<Response> listener
) {
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
in().threadPool().getThreadContext().putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -437,11 +438,13 @@ public abstract class AbstractClient implements Client {
protected final Settings settings;
private final ThreadPool threadPool;
private final Admin admin;
private final InternalContextSwitcher contextSwitcher;

public AbstractClient(Settings settings, ThreadPool threadPool) {
this.settings = settings;
this.threadPool = threadPool;
this.admin = new Admin(this);
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.logger = LogManager.getLogger(this.getClass());
}

Expand Down Expand Up @@ -2147,8 +2150,7 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
Request request,
ActionListener<Response> listener
) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) {
try (ThreadContext.StoredContext ctx = contextSwitcher.stashAndMergeHeaders(headers)) {
super.doExecute(action, request, listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
Expand Down Expand Up @@ -105,7 +106,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final ClusterSettings clusterSettings;
protected final ThreadPool threadPool;
protected final InternalContextSwitcher contextSwitcher;
protected final ContextSwitcher contextSwitcher;

private volatile TimeValue slowTaskLoggingThreshold;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
Expand Down Expand Up @@ -134,7 +135,7 @@ public class MasterService extends AbstractLifecycleComponent {
private volatile TimeValue slowTaskLoggingThreshold;

protected final ThreadPool threadPool;
protected final InternalContextSwitcher contextSwitcher;
protected final ContextSwitcher contextSwitcher;

private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,13 @@
package org.opensearch.common.util.concurrent;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.plugins.Plugin;
import org.opensearch.threadpool.ThreadPool;

/**
* ContextSwitcher that is passed to plugins in order to switch to a fresh context
* from any existing context
* ContextSwitcher interface
*
* @opensearch.api
*/
@PublicApi(since = "2.17.0")
public class ContextSwitcher {

private final ThreadPool threadPool;
private final Plugin plugin;

public ContextSwitcher(ThreadPool threadPool, Plugin plugin) {
this.threadPool = threadPool;
this.plugin = plugin;
}

public ThreadContext.StoredContext switchContext() {
return threadPool.getThreadContext().stashContext(plugin.getClass());
}
public interface ContextSwitcher {
ThreadContext.StoredContext switchContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,28 @@
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.threadpool.ThreadPool;

import java.util.Map;

/**
* InternalContextSwitcher is an internal class used to switch into a fresh
* internal system context
*
* @opensearch.internal
*/
@InternalApi
public class InternalContextSwitcher {
public class InternalContextSwitcher implements ContextSwitcher {
private final ThreadPool threadPool;

public InternalContextSwitcher(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public ThreadContext.StoredContext switchContext() {
return threadPool.getThreadContext().stashContext();
}

public ThreadContext.StoredContext stashAndMergeHeaders(Map<String, String> headers) {
return threadPool.getThreadContext().stashAndMergeHeaders(headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.plugins.Plugin;
import org.opensearch.threadpool.ThreadPool;

/**
* ContextSwitcher that is passed to plugins in order to switch to a fresh context
* from any existing context
*
* @opensearch.api
*/
@PublicApi(since = "2.17.0")
public class PluginContextSwitcher implements ContextSwitcher {

private final ThreadPool threadPool;
private final Plugin plugin;

public PluginContextSwitcher(ThreadPool threadPool, Plugin plugin) {
this.threadPool = threadPool;
this.plugin = plugin;
}

@Override
public ThreadContext.StoredContext switchContext() {
return threadPool.getThreadContext().stashContext(plugin.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ StoredContext stashContext() {
* Removes the current context and resets a default context. Retains information about plugin stashing the context.
* The removed context can be restored by closing the returned {@link StoredContext}.
*/
public StoredContext stashContext(Class<?> pluginClass) {
StoredContext stashContext(Class<?> pluginClass) {
final ThreadContextStruct context = threadLocal.get();
/*
X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
Expand Down Expand Up @@ -246,7 +246,7 @@ public Writeable captureAsWriteable() {
* but the tasks API will perform a get on their behalf using this method
* if it can't find the task in memory.
*/
public StoredContext stashWithOrigin(String origin) {
StoredContext stashWithOrigin(String origin) {
final ThreadContext.StoredContext storedContext = stashContext();
putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
Expand All @@ -257,7 +257,7 @@ public StoredContext stashWithOrigin(String origin) {
* The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers
* that are already existing are preserved unless they are defaults.
*/
public StoredContext stashAndMergeHeaders(Map<String, String> headers) {
StoredContext stashAndMergeHeaders(Map<String, String> headers) {
final ThreadContextStruct context = threadLocal.get();
Map<String, String> newHeader = new HashMap<>(headers);
newHeader.putAll(context.requestHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.common.transport.PortsRange;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -100,7 +101,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
protected final NetworkService networkService;
protected final BigArrays bigArrays;
protected final ThreadPool threadPool;
protected final InternalContextSwitcher contextSwitcher;
protected final ContextSwitcher contextSwitcher;
protected final Dispatcher dispatcher;
protected final CorsHandler corsHandler;
private final NamedXContentRegistry xContentRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/global_checkpoint_sync";
private final InternalContextSwitcher contextSwitcher;
private final ContextSwitcher contextSwitcher;

@Inject
public GlobalCheckpointSyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

private final InternalContextSwitcher contextSwitcher;
private final ContextSwitcher contextSwitcher;

protected Logger getLogger() {
return LOGGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -66,7 +67,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
private final Runnable forceRefresh;
private final Logger logger;
private final ThreadPool threadPool;
private final InternalContextSwitcher contextSwitcher;
private final ContextSwitcher contextSwitcher;
private final MeanMetric refreshMetric;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class PublishCheckpointAction extends TransportReplicationAction<
protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class);

private final SegmentReplicationTargetService replicationService;
private final InternalContextSwitcher contextSwitcher;
private final ContextSwitcher contextSwitcher;

@Inject
public PublishCheckpointAction(
Expand Down
Loading

0 comments on commit e2ebb36

Please sign in to comment.