Skip to content

Commit

Permalink
Added changes for AdmissionControl Interceptor and AdmissionControlSe…
Browse files Browse the repository at this point in the history
…rvice for RateLimiting (#9286)

* Changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting (#9286)

Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
ajaymovva authored Oct 21, 2023
1 parent 51626d0 commit 7c5a806
Show file tree
Hide file tree
Showing 28 changed files with 1,595 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ public abstract class TransportReplicationAction<
Setting.Property.NodeScope
);

/**
* Making primary and replica actions suffixes as constant
*/
public static final String PRIMARY_ACTION_SUFFIX = "[p]";
public static final String REPLICA_ACTION_SUFFIX = "[r]";

protected final ThreadPool threadPool;
protected final TransportService transportService;
protected final ClusterService clusterService;
Expand Down Expand Up @@ -204,8 +210,8 @@ protected TransportReplicationAction(
this.shardStateAction = shardStateAction;
this.executor = executor;

this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
this.transportPrimaryAction = actionName + PRIMARY_ACTION_SUFFIX;
this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX;

this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public final class NetworkModule {

private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();
private final List<TransportInterceptor> transportInterceptors = new ArrayList<>();

/**
* Creates a network module that custom networking classes can be plugged into.
Expand All @@ -149,9 +149,13 @@ public NetworkModule(
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
Tracer tracer
Tracer tracer,
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -180,11 +184,11 @@ public NetworkModule(
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(
List<TransportInterceptor> pluginTransportInterceptors = plugin.getTransportInterceptors(
namedWriteableRegistry,
threadPool.getThreadContext()
);
for (TransportInterceptor interceptor : transportInterceptors) {
for (TransportInterceptor interceptor : pluginTransportInterceptors) {
registerTransportInterceptor(interceptor);
}
}
Expand Down Expand Up @@ -264,15 +268,15 @@ public Supplier<Transport> getTransportSupplier() {
* Registers a new {@link TransportInterceptor}
*/
private void registerTransportInterceptor(TransportInterceptor interceptor) {
this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
}

/**
* Returns a composite {@link TransportInterceptor} containing all registered interceptors
* @see #registerTransportInterceptor(TransportInterceptor)
*/
public TransportInterceptor getTransportInterceptor() {
return new CompositeTransportInterceptor(this.transportIntercetors);
return new CompositeTransportInterceptor(this.transportInterceptors);
}

static final class CompositeTransportInterceptor implements TransportInterceptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -682,7 +684,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
)
)
);
Expand Down
19 changes: 18 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
import org.opensearch.repositories.RepositoriesModule;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
Expand Down Expand Up @@ -820,6 +822,7 @@ protected Node(
remoteStoreStatsTrackerFactory,
recoverySettings
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -891,6 +894,17 @@ protected Node(

final RestController restController = actionModule.getRestController();

final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService.getClusterSettings(),
threadPool
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
);

List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
Expand All @@ -903,8 +917,10 @@ protected Node(
networkService,
restController,
clusterService.getClusterSettings(),
tracer
tracer,
transportInterceptors
);

Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(
Plugin.class
).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList());
Expand Down Expand Up @@ -1181,6 +1197,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
*/
public class AdmissionControlService {
private final ThreadPool threadPool;
public final AdmissionControlSettings admissionControlSettings;
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;
private static final Logger logger = LogManager.getLogger(AdmissionControlService.class);
private final ClusterSettings clusterSettings;
private final Settings settings;

/**
*
* @param settings Immutable settings instance
* @param clusterSettings ClusterSettings Instance
* @param threadPool ThreadPool Instance
*/
public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
this.clusterSettings = clusterSettings;
this.settings = settings;
this.initialise();
}

/**
* Initialise and Register all the admissionControllers
*/
private void initialise() {
// Initialise different type of admission controllers
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
}

/**
* Handler to trigger registered admissionController
*/
public void applyTransportAdmissionControl(String action) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); });
}

/**
*
* @param admissionControllerName admissionControllerName to register into the service.
*/
public void registerAdmissionController(String admissionControllerName) {
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
}

/**
* @return AdmissionController Instance
*/
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings);
default:
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
}
}

/**
*
* @return list of the registered admissionControllers
*/
public List<AdmissionController> getAdmissionControllers() {
return new ArrayList<>(this.ADMISSION_CONTROLLERS.values());
}

/**
*
* @param controllerName name of the admissionController
* @return instance of the AdmissionController Instance
*/
public AdmissionController getAdmissionController(String controllerName) {
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

/**
* Settings related to admission control.
* @opensearch.internal
*/
public final class AdmissionControlSettings {

/**
* Default parameters for the AdmissionControlSettings
*/
public static class Defaults {
public static final String MODE = "disabled";
}

/**
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set
* rejection will be performed, otherwise only rejection metrics will be populated.
*/
public static final Setting<AdmissionControlMode> ADMISSION_CONTROL_TRANSPORT_LAYER_MODE = new Setting<>(
"admission_control.transport.mode",
Defaults.MODE,
AdmissionControlMode::fromName,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile AdmissionControlMode transportLayeradmissionControlMode;

/**
* @param clusterSettings clusterSettings Instance
* @param settings settings instance
*/
public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings) {
this.transportLayeradmissionControlMode = ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.get(settings);
clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, this::setAdmissionControlTransportLayerMode);
}

/**
*
* @param admissionControlMode update the mode of admission control feature
*/
private void setAdmissionControlTransportLayerMode(AdmissionControlMode admissionControlMode) {
this.transportLayeradmissionControlMode = admissionControlMode;
}

/**
*
* @return return the default mode of the admissionControl
*/
public AdmissionControlMode getAdmissionControlTransportLayerMode() {
return this.transportLayeradmissionControlMode;
}

/**
*
* @return true based on the admission control feature is enforced else false
*/
public Boolean isTransportLayerAdmissionControlEnforced() {
return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED;
}

/**
*
* @return true based on the admission control feature is enabled else false
*/
public Boolean isTransportLayerAdmissionControlEnabled() {
return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED;
}
}
Loading

0 comments on commit 7c5a806

Please sign in to comment.