Skip to content

Commit

Permalink
Make markAsSystemContext package-private
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Jul 23, 2024
1 parent e2ebb36 commit ed71247
Show file tree
Hide file tree
Showing 26 changed files with 62 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ContextPreservingActionListener;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -61,7 +61,7 @@ public final class OriginSettingClient extends FilterClient {
public OriginSettingClient(Client in, String origin) {
super(in);
this.origin = origin;
this.contextSwitcher = new InternalContextSwitcher(in().threadPool());
this.contextSwitcher = new SystemContextSwitcher(in().threadPool());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -438,13 +438,13 @@ public abstract class AbstractClient implements Client {
protected final Settings settings;
private final ThreadPool threadPool;
private final Admin admin;
private final InternalContextSwitcher contextSwitcher;
private final SystemContextSwitcher contextSwitcher;

public AbstractClient(Settings settings, ThreadPool threadPool) {
this.settings = settings;
this.threadPool = threadPool;
this.admin = new Admin(this);
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.logger = LogManager.getLogger(this.getClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
Expand Down Expand Up @@ -142,7 +142,7 @@ public ClusterApplierService(
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.state = new AtomicReference<>();
this.nodeName = nodeName;

Expand Down Expand Up @@ -400,7 +400,6 @@ private void submitStateUpdateTask(
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
threadContext.markAsSystemContext();
final UpdateTask updateTask = new UpdateTask(
config.priority(),
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.Assertions;
import org.opensearch.core.common.text.Text;
Expand Down Expand Up @@ -171,7 +171,7 @@ public MasterService(
);
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.clusterManagerMetrics = clusterManagerMetrics;
}

Expand Down Expand Up @@ -1013,7 +1013,6 @@ public <T> void submitStateUpdateTasks(
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
threadContext.markAsSystemContext();

List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
* @opensearch.internal
*/
@InternalApi
public class InternalContextSwitcher implements ContextSwitcher {
public class SystemContextSwitcher implements ContextSwitcher {
private final ThreadPool threadPool;

public InternalContextSwitcher(ThreadPool threadPool) {
public SystemContextSwitcher(ThreadPool threadPool) {
this.threadPool = threadPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ StoredContext stashContext() {
*/

ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putPersistent(context.persistentHeaders);
threadContextStruct.setSystemContext(propagators);

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
threadContextStruct = threadContextStruct.putHeaders(
Expand Down Expand Up @@ -595,7 +596,7 @@ boolean isDefaultContext() {
* Marks this thread context as an internal system context. This signals that actions in this context are issued
* by the system itself rather than by a user action.
*/
public void markAsSystemContext() {
void markAsSystemContext() {
threadLocal.set(threadLocal.get().setSystemContext(propagators));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.opensearch.common.transport.PortsRange;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -133,7 +133,7 @@ protected AbstractHttpServerTransport(
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.xContentRegistry = xContentRegistry;
this.dispatcher = dispatcher;
this.handlingSettings = HttpHandlingSettings.fromSettings(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -96,13 +96,11 @@ public GlobalCheckpointSyncAction(
Request::new,
ThreadPool.Names.MANAGEMENT
);
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
}

public void updateGlobalCheckpointForShard(final ShardId shardId) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
threadContext.markAsSystemContext();
execute(new Request(shardId), ActionListener.wrap(r -> {}, e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -115,7 +115,7 @@ public RetentionLeaseBackgroundSyncAction(
Request::new,
ThreadPool.Names.MANAGEMENT
);
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
}

@Override
Expand All @@ -124,10 +124,7 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
}

final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
transportService.sendChildRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class RetentionLeaseSyncAction extends TransportWriteAction<
public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

private final InternalContextSwitcher contextSwitcher;
private final SystemContextSwitcher contextSwitcher;

protected Logger getLogger() {
return LOGGER;
Expand Down Expand Up @@ -123,7 +123,7 @@ public RetentionLeaseSyncAction(
systemIndices,
tracer
);
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
}

@Override
Expand All @@ -138,10 +138,7 @@ final void sync(
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener
) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
transportService.sendChildRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.translog.Translog;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -110,7 +110,7 @@ public RefreshListeners(
this.forceRefresh = forceRefresh;
this.logger = logger;
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.refreshMetric = refreshMetric;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -87,7 +87,7 @@ public PublishCheckpointAction(
ThreadPool.Names.REFRESH
);
this.replicationService = targetService;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
}

@Override
Expand All @@ -114,10 +114,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.ByteBufferStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
Expand Down Expand Up @@ -102,7 +102,7 @@ public class NativeMessageHandler implements ProtocolMessageHandler {
TransportKeepAlive keepAlive
) {
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
this.outboundHandler = new NativeOutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays, outboundHandler);
this.namedWriteableRegistry = namedWriteableRegistry;
this.handshaker = handshaker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.opensearch.common.network.CloseableChannel;
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
Expand All @@ -66,7 +66,7 @@ public final class OutboundHandler {
public OutboundHandler(StatsTracker statsTracker, ThreadPool threadPool) {
this.statsTracker = statsTracker;
this.threadPool = threadPool;
this.contextSwitcher = new InternalContextSwitcher(threadPool);
this.contextSwitcher = new SystemContextSwitcher(threadPool);
}

void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -94,7 +94,7 @@ final class RemoteClusterConnection implements Closeable {
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
this.threadPool = transportService.threadPool;
this.contextSwitcher = new InternalContextSwitcher(this.threadPool);
this.contextSwitcher = new SystemContextSwitcher(this.threadPool);
initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
}

Expand Down Expand Up @@ -139,8 +139,6 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
final ContextPreservingActionListener<Function<String, DiscoveryNode>> contextPreservingActionListener =
new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener);
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
// we stash any context here since this is an internal execution and should not leak any existing context information
threadContext.markAsSystemContext();

final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ContextSwitcher;
import org.opensearch.common.util.concurrent.InternalContextSwitcher;
import org.opensearch.common.util.concurrent.SystemContextSwitcher;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -213,7 +213,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
this.nodePredicate = nodePredicate;
this.configuredSeedNodes = configuredSeedNodes;
this.seedNodes = seedNodes;
this.contextSwitcher = new InternalContextSwitcher(transportService.getThreadPool());
this.contextSwitcher = new SystemContextSwitcher(transportService.getThreadPool());
}

static Stream<Setting.AffixSetting<?>> enablementSettings() {
Expand Down Expand Up @@ -351,9 +351,6 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, Act
new SniffClusterStateResponseHandler(connection, listener, seedNodes)
);
try (ThreadContext.StoredContext ignore = contextSwitcher.switchContext()) {
// we stash any context here since this is an internal execution and should not leak any
// existing context information.
threadContext.markAsSystemContext();
transportService.sendRequest(
connection,
ClusterStateAction.NAME,
Expand Down
Loading

0 comments on commit ed71247

Please sign in to comment.