From 3506973761972df391cabef40b9858e6c05239f0 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 30 May 2024 16:33:24 -0400 Subject: [PATCH 1/3] Create ExecutionContext and show example with ActionPluginProxy Signed-off-by: Craig Perkins --- .../util/concurrent/ExecutionContext.java | 25 ++++++++++ .../common/util/concurrent/ThreadContext.java | 14 ++++++ .../main/java/org/opensearch/node/Node.java | 6 ++- .../opensearch/plugins/ActionPluginProxy.java | 49 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java create mode 100644 server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java new file mode 100644 index 0000000000000..bff824173df65 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +public class ExecutionContext { + private final ThreadLocal context = new ThreadLocal<>(); + + public void set(String value) { + context.set(value); + } + + public String get() { + return context.get(); + } + + public void clear() { + context.remove(); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 6580b0e0085ef..1505be8d8f891 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -115,6 +115,7 @@ public final class ThreadContext implements Writeable { private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; private final ThreadLocal threadLocal; + private final ExecutionContext executionContext; private final int maxWarningHeaderCount; private final long maxWarningHeaderSize; private final List propagators; @@ -126,6 +127,7 @@ public final class ThreadContext implements Writeable { public ThreadContext(Settings settings) { this.defaultHeader = buildDefaultHeaders(settings); this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); + this.executionContext = new ExecutionContext(); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); @@ -139,6 +141,18 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa propagators.remove(Objects.requireNonNull(propagator)); } + public void setExecutionContext(String pluginName) { + this.executionContext.set(pluginName); + } + + public String getExecutionContext() { + return this.executionContext.get(); + } + + public void clearExecutionContext() { + this.executionContext.clear(); + } + /** * Removes the current context and resets a default context. The removed context can be * restored by closing the returned {@link StoredContext}. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 04bd31e6a5809..d10b4d38720fd 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.persistent.PersistentTasksExecutorRegistry; import org.opensearch.persistent.PersistentTasksService; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ActionPluginProxy; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.CircuitBreakerPlugin; @@ -979,7 +980,10 @@ protected Node( settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class), + pluginsService.filterPlugins(ActionPlugin.class) + .stream() + .map(p -> ActionPluginProxy.newInstance(p, threadPool)) + .collect(Collectors.toList()), client, circuitBreakerService, usageService, diff --git a/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java new file mode 100644 index 0000000000000..93027af54d66c --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.threadpool.ThreadPool; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class ActionPluginProxy implements InvocationHandler { + private final ActionPlugin actionPlugin; + private final ThreadPool threadPool; + + public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) { + return (ActionPlugin) Proxy.newProxyInstance( + obj.getClass().getClassLoader(), + new Class[] { ActionPlugin.class }, + new ActionPluginProxy(obj, threadPool) + ); + } + + private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) { + this.actionPlugin = actionPlugin; + this.threadPool = threadPool; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { + Object result; + try { + threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName()); + result = m.invoke(actionPlugin, args); + threadPool.getThreadContext().clearExecutionContext(); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); + } + return result; + } +} From f8cf238079ba58969c01cdfdd6a9c5c977f6ec2c Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Thu, 30 May 2024 16:46:45 -0400 Subject: [PATCH 2/3] Only allow core to set the ExecutionContext Signed-off-by: Craig Perkins --- .../opensearch/common/util/concurrent/ExecutionContext.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java index bff824173df65..251454d8ec868 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ExecutionContext.java @@ -12,6 +12,9 @@ public class ExecutionContext { private final ThreadLocal context = new ThreadLocal<>(); public void set(String value) { + if (context.get() != null) { + throw new IllegalArgumentException("ExecutionContext already present"); + } context.set(value); } From 580f4af5f19e6033aa5e31df4056d5f523fdba0b Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 5 Jun 2024 18:47:47 -0400 Subject: [PATCH 3/3] RestHandlerProxy + ActionPluginProxy Signed-off-by: Craig Perkins --- .../http/ExecutionContextPluginIT.java | 81 +++++++++++++++++ .../http/TestExecutionContextPlugin.java | 91 +++++++++++++++++++ .../http/TestExecutionContextRestAction.java | 72 +++++++++++++++ .../org/opensearch/action/ActionModule.java | 4 +- .../main/java/org/opensearch/node/Node.java | 6 +- .../org/opensearch/rest/RestHandlerProxy.java | 52 +++++++++++ 6 files changed, 300 insertions(+), 6 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java create mode 100644 server/src/main/java/org/opensearch/rest/RestHandlerProxy.java diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java new file mode 100644 index 0000000000000..f6c6e82888bff --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/ExecutionContextPluginIT.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +/** + * Test a rest action that sets special response headers + */ +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1) +public class ExecutionContextPluginIT extends HttpSmokeTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Collection> nodePlugins() { + ArrayList> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestExecutionContextPlugin.class); + return plugins; + } + + public void testThatPluginCannotOverrideExecutionContext() throws IOException { + ensureGreen(); + try { + Response response = getRestClient().performRequest(new Request("GET", "/_execution_context")); + fail("request should have failed"); + } catch(ResponseException e) { + Response response = e.getResponse(); + String responseBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + assertThat(response.getStatusLine().getStatusCode(), equalTo(400)); + assertThat(responseBody, containsString("ExecutionContext already present")); + } + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java new file mode 100644 index 0000000000000..5fa301a271308 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextPlugin.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.Collections.singletonList; + +public class TestExecutionContextPlugin extends Plugin implements ActionPlugin { + + private ThreadPool threadPool; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver expressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.threadPool = threadPool; + return Collections.emptyList(); + } + + @Override + public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster) { + return singletonList(new TestExecutionContextRestAction(threadPool)); + } +} diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java new file mode 100644 index 0000000000000..a2562c8efa027 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/TestExecutionContextRestAction.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.http; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.GET; + +public class TestExecutionContextRestAction extends BaseRestHandler { + + private final ThreadPool threadPool; + + public TestExecutionContextRestAction(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public List routes() { + return singletonList(new Route(GET, "/_execution_context")); + } + + @Override + public String getName() { + return "test_execution_context_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + System.out.println("Plugin execution context: " + threadPool.getThreadContext().getExecutionContext()); + threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context"); + RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen"); + return channel -> channel.sendResponse(response); + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 5e2b62614fc47..893dadd432f8f 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -322,6 +322,7 @@ import org.opensearch.rest.NamedRoute; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestHandlerProxy; import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.rest.action.RestFieldCapabilitiesAction; import org.opensearch.rest.action.RestMainAction; @@ -993,7 +994,8 @@ public void initRestHandlers(Supplier nodesInCluster) { indexNameExpressionResolver, nodesInCluster )) { - registerHandler.accept(handler); + RestHandler handlerProxy = RestHandlerProxy.newInstance(handler, threadPool, plugin); + registerHandler.accept(handlerProxy); } } registerHandler.accept(new RestCatAction(catActions)); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d10b4d38720fd..04bd31e6a5809 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,7 +185,6 @@ import org.opensearch.persistent.PersistentTasksExecutorRegistry; import org.opensearch.persistent.PersistentTasksService; import org.opensearch.plugins.ActionPlugin; -import org.opensearch.plugins.ActionPluginProxy; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.CircuitBreakerPlugin; @@ -980,10 +979,7 @@ protected Node( settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, - pluginsService.filterPlugins(ActionPlugin.class) - .stream() - .map(p -> ActionPluginProxy.newInstance(p, threadPool)) - .collect(Collectors.toList()), + pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, diff --git a/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java new file mode 100644 index 0000000000000..945e927c40732 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/RestHandlerProxy.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.threadpool.ThreadPool; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class RestHandlerProxy implements InvocationHandler { + private final RestHandler restHandler; + private final ThreadPool threadPool; + private final ActionPlugin plugin; + + public static RestHandler newInstance(RestHandler obj, ThreadPool threadPool, ActionPlugin plugin) { + return (RestHandler) Proxy.newProxyInstance( + obj.getClass().getClassLoader(), + new Class[] { RestHandler.class }, + new RestHandlerProxy(obj, threadPool, plugin) + ); + } + + private RestHandlerProxy(RestHandler restHandler, ThreadPool threadPool, ActionPlugin plugin) { + this.restHandler = restHandler; + this.threadPool = threadPool; + this.plugin = plugin; + } + + @Override + public Object invoke(Object proxy, Method m, Object[] args) throws Throwable { + Object result; + try { + threadPool.getThreadContext().setExecutionContext(plugin.getClass().getName()); + result = m.invoke(restHandler, args); + threadPool.getThreadContext().clearExecutionContext(); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } catch (Exception e) { + throw new RuntimeException("unexpected invocation exception: " + e.getMessage()); + } + return result; + } +}