Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ExecutionContext and show example with ActionPluginProxy #173

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.http;

import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

/**
* Test a rest action that sets special response headers
*/
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class ExecutionContextPluginIT extends HttpSmokeTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(TestExecutionContextPlugin.class);
return plugins;
}

public void testThatPluginCannotOverrideExecutionContext() throws IOException {
ensureGreen();
try {
Response response = getRestClient().performRequest(new Request("GET", "/_execution_context"));
fail("request should have failed");
} catch(ResponseException e) {
Response response = e.getResponse();
String responseBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
assertThat(response.getStatusLine().getStatusCode(), equalTo(400));
assertThat(responseBody, containsString("ExecutionContext already present"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.http;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;

public class TestExecutionContextPlugin extends Plugin implements ActionPlugin {

private ThreadPool threadPool;

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver expressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool = threadPool;
return Collections.emptyList();
}

@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return singletonList(new TestExecutionContextRestAction(threadPool));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.http;

import org.opensearch.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;

import static java.util.Collections.singletonList;
import static org.opensearch.rest.RestRequest.Method.GET;

public class TestExecutionContextRestAction extends BaseRestHandler {

private final ThreadPool threadPool;

public TestExecutionContextRestAction(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public List<Route> routes() {
return singletonList(new Route(GET, "/_execution_context"));
}

@Override
public String getName() {
return "test_execution_context_action";
}

@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
System.out.println("Plugin execution context: " + threadPool.getThreadContext().getExecutionContext());
threadPool.getThreadContext().setExecutionContext("should-not-allow-plugin-to-set-execution-context");
RestResponse response = new BytesRestResponse(RestStatus.OK, "Should not happen");
return channel -> channel.sendResponse(response);
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestHandlerProxy;
import org.opensearch.rest.RestHeaderDefinition;
import org.opensearch.rest.action.RestFieldCapabilitiesAction;
import org.opensearch.rest.action.RestMainAction;
Expand Down Expand Up @@ -995,7 +996,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
indexNameExpressionResolver,
nodesInCluster
)) {
registerHandler.accept(handler);
RestHandler handlerProxy = RestHandlerProxy.newInstance(handler, threadPool, plugin);
registerHandler.accept(handlerProxy);
}
}
registerHandler.accept(new RestCatAction(catActions));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

public class ExecutionContext {
private final ThreadLocal<String> context = new ThreadLocal<>();

public void set(String value) {
if (context.get() != null) {
throw new IllegalArgumentException("ExecutionContext already present");
}
context.set(value);
}

public String get() {
return context.get();
}

public void clear() {
context.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public final class ThreadContext implements Writeable {
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
private final ThreadLocal<ThreadContextStruct> threadLocal;
private final ExecutionContext executionContext;
private final int maxWarningHeaderCount;
private final long maxWarningHeaderSize;
private final List<ThreadContextStatePropagator> propagators;
Expand All @@ -126,6 +127,7 @@ public final class ThreadContext implements Writeable {
public ThreadContext(Settings settings) {
this.defaultHeader = buildDefaultHeaders(settings);
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.executionContext = new ExecutionContext();
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
Expand All @@ -139,6 +141,18 @@ public void unregisterThreadContextStatePropagator(final ThreadContextStatePropa
propagators.remove(Objects.requireNonNull(propagator));
}

public void setExecutionContext(String pluginName) {
this.executionContext.set(pluginName);
}

public String getExecutionContext() {
return this.executionContext.get();
}

public void clearExecutionContext() {
this.executionContext.clear();
}

/**
* Removes the current context and resets a default context. The removed context can be
* restored by closing the returned {@link StoredContext}.
Expand Down
49 changes: 49 additions & 0 deletions server/src/main/java/org/opensearch/plugins/ActionPluginProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.threadpool.ThreadPool;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class ActionPluginProxy implements InvocationHandler {
private final ActionPlugin actionPlugin;
private final ThreadPool threadPool;

public static ActionPlugin newInstance(ActionPlugin obj, ThreadPool threadPool) {
return (ActionPlugin) Proxy.newProxyInstance(
obj.getClass().getClassLoader(),
new Class<?>[] { ActionPlugin.class },
new ActionPluginProxy(obj, threadPool)
);
}

private ActionPluginProxy(ActionPlugin actionPlugin, ThreadPool threadPool) {
this.actionPlugin = actionPlugin;
this.threadPool = threadPool;
}

@Override
public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
Object result;
try {
threadPool.getThreadContext().setExecutionContext(((Plugin) actionPlugin).getClass().getName());
result = m.invoke(actionPlugin, args);
threadPool.getThreadContext().clearExecutionContext();
} catch (InvocationTargetException e) {
throw e.getTargetException();
} catch (Exception e) {
throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
}
return result;
}
}
Loading
Loading