Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add request labeling service/framework #14388

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import org.opensearch.plugins.IdentityPlugin;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.LabelingPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.plugins.NetworkPlugin;
Expand All @@ -212,6 +213,7 @@
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.querygroup.LabelingService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
import org.opensearch.repositories.RepositoriesModule;
Expand Down Expand Up @@ -1114,6 +1116,8 @@ protected Node(
transportService.getTaskManager()
);

final LabelingService labelingService = new LabelingService(pluginsService.filterPlugins(LabelingPlugin.class));

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down
30 changes: 30 additions & 0 deletions server/src/main/java/org/opensearch/plugins/LabelingPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugins;

import org.opensearch.action.IndicesRequest;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.querygroup.LabelingService.LabelingImplementationType;

/**
* This plugin introduces contracts on how should the incoming requests be labeled using implicit/explicit request attributes
*
*/
public interface LabelingPlugin {

LabelingImplementationType getImplementationName();

/**
* This method will compute label value/values and
* put these in the {@link ThreadContext} against {@link org.opensearch.querygroup.LabelingHeader}
* @param request
* @param threadContext
*/
void labelRequest(final IndicesRequest request, final ThreadContext threadContext);
Copy link
Member

@ansjcy ansjcy Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to create a new plugin for the query grouping feature?

public class QueryGroupPlugin extends Plugin implements LabelingPlugin 
{
...
}

Or are we planning to reuse any existing ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't concern the specific feature. Yes, This feature is supposed to be plugin driven to maintain segregation and strong cohesion with the features.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered trying to use the ActionPlugin.getActionFilters extension point? This allows you to run a filter before a transport action is executed on a node.

Here's an example of how Security overrides it and registers a SecurityFilter. The filter needs to implement a method called apply that has access to the Request object. You can also instantiate a filter within createComponents where you have access to the threadPool that core passes to plugins.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 this is the way

Copy link
Contributor Author

@kaushalmahi12 kaushalmahi12 Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!, I missed this comment. This could also work
@cwperks Will the threadContext have the authN/authZ info ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following code to sort the filters based on the order() method implementation

 public ActionFilters(Set<ActionFilter> actionFilters) {
        this.filters = actionFilters.toArray(new ActionFilter[0]);
        Arrays.sort(filters, new Comparator<ActionFilter>() {
            @Override
            public int compare(ActionFilter o1, ActionFilter o2) {
                return Integer.compare(o1.order(), o2.order());
            }
        });
    }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security must be the first action filter to run since that is where authz is performed and that is also where the authenticated user info is serialized to the thread context

Security uses RequestHandlerWrapper if I am not mistaken

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Authentication is done in the RestHandlerWrapper and authorization is done as an action filter.

Since 2.11, authentication has been moved further up the netty pipeline into the header_verifier step.

Copy link
Member

@cwperks cwperks Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaushalmahi12 Security reserves the lowest order (first)

@Override
public int order() {
    return Integer.MIN_VALUE;
}

Any other filter should have a higher value to occur after the SecurityFilter

The SecurityFilter calls on PrivilegesEvaluator.evaluate which is the method in which the authenticated user is serialized into the ThrreadContext. This is the line in PrivilegesEvaluator.evaluate where setUserInfoIntoThreadContext is called.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta Authentication is done in the RestHandlerWrapper and authorization is done as an action filter.

Ah I see, thanks @cwperks , I think the authentication does set the principal, right?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.querygroup;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the wlm as package name. QueryGroup as an independent entity is not enough to warrant a plugin/top level package


/**
* Main enum define various headers introduced by {@link org.opensearch.plugins.LabelingPlugin}s
*/
public enum LabelingHeader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @ansjcy mentioned elsewhere, might be good to add Request prefix to class names

QUERY_GROUP_ID("queryGroupId");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this enum should be part of core as it is used by the RequestLabelingService, but it has plugin implementation specific values. Hence, I am slightly confused whether this should be in core or part of plugin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be part of core since these headers will be consumed in core.


private final String name;

private LabelingHeader(final String name) {
this.name = name;
}

public static LabelingHeader fromName(String name) {
for (final LabelingHeader header : values()) {
if (header.getName().equals(name)) {
return header;
}
}
throw new IllegalArgumentException(name + " is not a valid [LabelingHeader]");
}

private String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.querygroup;

import org.opensearch.action.IndicesRequest;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugins.LabelingPlugin;

import java.util.EnumMap;
import java.util.List;

/**
* TODO: Don't know the right package to put this class, need suggestions for maintainers on this
* Main class to hold multiple implementations of {@link org.opensearch.plugins.LabelingPlugin}
* this class will facilitate access and interactions to different implementations
* Usage: This class should be used as a member to orchestrate the working of your {@link LabelingPlugin}
*/
public class LabelingService {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still recommend adding the request keyword in all the related names, cause essentially what we are doing is adding labels to requests (search for now, but bulk, index etc in the future). Maybe RequestLabelingService ?

/**
* Enum to define what all are currently implementing the plugin
*/
public enum LabelingImplementationType {
Copy link
Member

@ansjcy ansjcy Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved to a separate file? Also are we suggesting a 1-1 mapping here? (a plugin can only be used for one certain purpose and one implementation can only be mapped to 1 plugin) Can multiple plugin have QUERY_GROUP_RESOURCE_MANAGEMENT type? If not, why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want this to be present since invoke triggers for feature might not be uniform, e,g; for QueryGrouping We will need to label the request at two places i,e; at co-ordinator request and at shard request.
Given this will fall on the hot path it will be redundant re-compute the labels for all of the implementations

Regarding whether there should be a 1:1 mapping between feature and Implementation.
Do you think it makes sense to utilise the same implementation across multiple features ? I am assuming each feature could be specific

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to allow each feature to have their specific implementation.

QUERY_GROUP_RESOURCE_MANAGEMENT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again if LabelingImplementationType class is part of core, does the plugin developer need to modify the core for adding their specific value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construct is little debatable, and probably need more thought. Let me think a bit about this

NOOP
}

EnumMap<LabelingImplementationType, LabelingPlugin> implementations;

public LabelingService(List<LabelingPlugin> loadedPlugins) {
implementations = new EnumMap<>(LabelingImplementationType.class);
for (LabelingPlugin plugin : loadedPlugins) {
if (implementations.containsKey(plugin.getImplementationName())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might introduce some confusion and ineffiency when implementing a new LabelingPlugin, in theory, if I'm developing a new LabelingPlugin, I shouldn't be worried about if someone else has already implemented a plugin for the same type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think so, can you elaborate more with help of examples ?

Copy link
Member

@ansjcy ansjcy Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if in your feature/plugin A, you alreayd have a LabelingImplementationType1 implemetation and you have already registered with the LabelingService. When I'm implementing my feature/plugin B, if I also want to have a LabelingImplementationType1, but I don't know there's already a LabelingImplementationType1 exists, there might be possibilities your implementation will override my implementation in prod but I won't be able to capture that during development.

throw new IllegalArgumentException("There should not be two implementations of a LabelingImplementation type");
}
implementations.put(plugin.getImplementationName(), plugin);
}
}

/**
* populates the threadContext with the labels yielded by the {@param type} against the {@link LabelingHeader} keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are delegating the logic to "add labels into thread context" to plugin implementation, which IMO is a little bit dangerous. Plugins can add their arbitrary labels into thread context without complying with the LabelingHeader enum. I think we should have the logic to check/validate and put labels into thread context in this service, plugins should only compute the labels they want to add. Like: https://github.com/opensearch-project/OpenSearch/pull/14282/files#diff-f02ca59c8d99a70fdc277b1b311ba8fc0135920ef5a114578187e9388e922f6bR66

Copy link
Contributor Author

@kaushalmahi12 kaushalmahi12 Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Point! I agree with this, I think with return types we can easily enforce the header types as key.

* @param type
* @param request
* @param threadContext
*/
public void labelRequestFor(final LabelingImplementationType type, final IndicesRequest request, final ThreadContext threadContext) {
final LabelingPlugin plugin = implementations.get(type);
if (plugin == null) {
throw new IllegalArgumentException(type + " implementation is not enabled");
}
plugin.labelRequest(request, threadContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still in favor of having multiple plugins labeling a requests at the same time like https://github.com/opensearch-project/OpenSearch/pull/14282/files#diff-f02ca59c8d99a70fdc277b1b311ba8fc0135920ef5a114578187e9388e922f6bR60,
Can we define something like a "order/priority" that a plugin can apply the labels like we did in the ActionFilter:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are thinking of is I think from a different perspective, I think you are assuming that all of the consuming feature needs are uniform but they are not as I had explained it above for QueryGrouping feature.

So rather than push-down-to-syn mechanism to apply this event. I am more inclined towards pull-to-sync mechanism

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, could you also give some examples on where this function will be called? if we are using this on demand, we might need to pass labelingService to all plugins that need it? possibly by adding it as a default constructor parameter for LabelingPlugin ?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.querygroup;
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.querygroup;

import org.opensearch.action.IndicesRequest;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugins.LabelingPlugin;
import org.opensearch.querygroup.LabelingService.LabelingImplementationType;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;

import static org.mockito.Mockito.mock;

public class LabelingServiceTests extends OpenSearchTestCase {

ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("QSB");
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testInvalidInstantiationOfLabelingService() {
assertThrows(
IllegalArgumentException.class,
() -> new LabelingService(
List.of(
getTestImplementation("", "", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT),
getTestImplementation("", "", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT)
)
)
);
}

public void testExistingImplementationExistingCase() {
LabelingService labelingService = new LabelingService(
List.of(
getTestImplementation("queryGroupId", "akfagagnaga232_2434t", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT)
)
);
IndicesRequest request = mock(IndicesRequest.class);
// threadPool = new TestThreadPool("QSB");
ThreadContext threadContext = threadPool.getThreadContext();

labelingService.labelRequestFor(LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT, request, threadContext);
assertEquals(threadContext.getHeader("queryGroupId"), "akfagagnaga232_2434t");
}

public void testNonExistingImplementationExistingCase() {
LabelingService labelingService = new LabelingService(
List.of(
getTestImplementation("queryGroupId", "akfagagnaga232_2434t", LabelingImplementationType.QUERY_GROUP_RESOURCE_MANAGEMENT)
)
);
IndicesRequest request = mock(IndicesRequest.class);
// threadPool = new TestThreadPool("QSB");
ThreadContext threadContext = threadPool.getThreadContext();

assertThrows(
IllegalArgumentException.class,
() -> labelingService.labelRequestFor(LabelingImplementationType.NOOP, request, threadContext)
);

}

LabelingPlugin getTestImplementation(String header, String value, LabelingImplementationType type) {
return new LabelingPlugin() {
@Override
public LabelingImplementationType getImplementationName() {
return type;
}

@Override
public void labelRequest(IndicesRequest request, ThreadContext threadContext) {
threadContext.putHeader(header, value);
}
};
}
}
Loading