diff --git a/CHANGELOG.md b/CHANGELOG.md index 52bf4b996204b..220da86716090 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028)) - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465)) +- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index b68bd13cfed80..ddebdc5530e70 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -134,6 +134,12 @@ public abstract class TransportReplicationAction< Setting.Property.NodeScope ); + /** + * Making primary and replica actions suffixes as constant + */ + public static final String PRIMARY_ACTION_SUFFIX = "[p]"; + public static final String REPLICA_ACTION_SUFFIX = "[r]"; + protected final ThreadPool threadPool; protected final TransportService transportService; protected final ClusterService clusterService; @@ -204,8 +210,8 @@ protected TransportReplicationAction( this.shardStateAction = shardStateAction; this.executor = executor; - this.transportPrimaryAction = actionName + "[p]"; - this.transportReplicaAction = actionName + "[r]"; + this.transportPrimaryAction = actionName + PRIMARY_ACTION_SUFFIX; + this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX; this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 0734659d8ee72..821d48fccf48c 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -131,7 +131,7 @@ public final class NetworkModule { private final Map> transportFactories = new HashMap<>(); private final Map> transportHttpFactories = new HashMap<>(); - private final List transportIntercetors = new ArrayList<>(); + private final List transportInterceptors = new ArrayList<>(); /** * Creates a network module that custom networking classes can be plugged into. @@ -149,9 +149,13 @@ public NetworkModule( NetworkService networkService, HttpServerTransport.Dispatcher dispatcher, ClusterSettings clusterSettings, - Tracer tracer + Tracer tracer, + List transportInterceptors ) { this.settings = settings; + if (transportInterceptors != null) { + transportInterceptors.forEach(this::registerTransportInterceptor); + } for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports( settings, @@ -180,11 +184,11 @@ public NetworkModule( for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); } - List transportInterceptors = plugin.getTransportInterceptors( + List pluginTransportInterceptors = plugin.getTransportInterceptors( namedWriteableRegistry, threadPool.getThreadContext() ); - for (TransportInterceptor interceptor : transportInterceptors) { + for (TransportInterceptor interceptor : pluginTransportInterceptors) { registerTransportInterceptor(interceptor); } } @@ -264,7 +268,7 @@ public Supplier getTransportSupplier() { * Registers a new {@link TransportInterceptor} */ private void registerTransportInterceptor(TransportInterceptor interceptor) { - this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null")); + this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null")); } /** @@ -272,7 +276,7 @@ private void registerTransportInterceptor(TransportInterceptor interceptor) { * @see #registerTransportInterceptor(TransportInterceptor) */ public TransportInterceptor getTransportInterceptor() { - return new CompositeTransportInterceptor(this.transportIntercetors); + return new CompositeTransportInterceptor(this.transportInterceptors); } static final class CompositeTransportInterceptor implements TransportInterceptor { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index b321af264f097..40af0cb915349 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -136,6 +136,8 @@ import org.opensearch.persistent.PersistentTasksClusterService; import org.opensearch.persistent.decider.EnableAssignmentDecider; import org.opensearch.plugins.PluginsService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; +import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; import org.opensearch.script.ScriptService; @@ -698,7 +700,11 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, - IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING + IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, + AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, + CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 171ea2c80808c..16faeadfb0880 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -198,6 +198,8 @@ import org.opensearch.plugins.SearchPlugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor; import org.opensearch.repositories.RepositoriesModule; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; @@ -896,6 +898,17 @@ protected Node( final RestController restController = actionModule.getRestController(); + final AdmissionControlService admissionControlService = new AdmissionControlService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + + AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor( + admissionControlService + ); + + List transportInterceptors = List.of(admissionControlTransportInterceptor); final NetworkModule networkModule = new NetworkModule( settings, pluginsService.filterPlugins(NetworkPlugin.class), @@ -908,8 +921,10 @@ protected Node( networkService, restController, clusterService.getClusterSettings(), - tracer + tracer, + transportInterceptors ); + Collection>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins( Plugin.class ).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList()); @@ -1186,6 +1201,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControlService.class).toInstance(admissionControlService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java new file mode 100644 index 0000000000000..2cc409b0e4465 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControlService { + private final ThreadPool threadPool; + public final AdmissionControlSettings admissionControlSettings; + private final ConcurrentMap ADMISSION_CONTROLLERS; + private static final Logger logger = LogManager.getLogger(AdmissionControlService.class); + private final ClusterSettings clusterSettings; + private final Settings settings; + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + */ + public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.threadPool = threadPool; + this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings); + this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); + this.clusterSettings = clusterSettings; + this.settings = settings; + this.initialise(); + } + + /** + * Initialise and Register all the admissionControllers + */ + private void initialise() { + // Initialise different type of admission controllers + registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER); + } + + /** + * Handler to trigger registered admissionController + */ + public void applyTransportAdmissionControl(String action) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); }); + } + + /** + * + * @param admissionControllerName admissionControllerName to register into the service. + */ + public void registerAdmissionController(String admissionControllerName) { + AdmissionController admissionController = this.controllerFactory(admissionControllerName); + this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController); + } + + /** + * @return AdmissionController Instance + */ + private AdmissionController controllerFactory(String admissionControllerName) { + switch (admissionControllerName) { + case CPU_BASED_ADMISSION_CONTROLLER: + return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings); + default: + throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); + } + } + + /** + * + * @return list of the registered admissionControllers + */ + public List getAdmissionControllers() { + return new ArrayList<>(this.ADMISSION_CONTROLLERS.values()); + } + + /** + * + * @param controllerName name of the admissionController + * @return instance of the AdmissionController Instance + */ + public AdmissionController getAdmissionController(String controllerName) { + return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java new file mode 100644 index 0000000000000..b557190ab54ac --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +/** + * Settings related to admission control. + * @opensearch.internal + */ +public final class AdmissionControlSettings { + + /** + * Default parameters for the AdmissionControlSettings + */ + public static class Defaults { + public static final String MODE = "disabled"; + } + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting ADMISSION_CONTROL_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_control.transport.mode", + Defaults.MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile AdmissionControlMode transportLayeradmissionControlMode; + + /** + * @param clusterSettings clusterSettings Instance + * @param settings settings instance + */ + public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings) { + this.transportLayeradmissionControlMode = ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, this::setAdmissionControlTransportLayerMode); + } + + /** + * + * @param admissionControlMode update the mode of admission control feature + */ + private void setAdmissionControlTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayeradmissionControlMode = admissionControlMode; + } + + /** + * + * @return return the default mode of the admissionControl + */ + public AdmissionControlMode getAdmissionControlTransportLayerMode() { + return this.transportLayeradmissionControlMode; + } + + /** + * + * @return true based on the admission control feature is enforced else false + */ + public Boolean isTransportLayerAdmissionControlEnforced() { + return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED; + } + + /** + * + * @return true based on the admission control feature is enabled else false + */ + public Boolean isTransportLayerAdmissionControlEnabled() { + return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java new file mode 100644 index 0000000000000..00564a9967f31 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Abstract class for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public abstract class AdmissionController { + + private final AtomicLong rejectionCount; + private final String admissionControllerName; + + /** + * + * @param rejectionCount initialised rejectionCount value for AdmissionController + * @param admissionControllerName name of the admissionController + */ + public AdmissionController(AtomicLong rejectionCount, String admissionControllerName) { + this.rejectionCount = rejectionCount; + this.admissionControllerName = admissionControllerName; + } + + /** + * Return the current state of the admission controller + * @return true if admissionController is enabled for the transport layer else false + */ + public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlMode) { + return admissionControlMode != AdmissionControlMode.DISABLED; + } + + /** + * Increment the tracking-objects and apply the admission control if threshold is breached. + * Mostly applicable while applying admission controller + */ + public abstract void apply(String action); + + /** + * @return name of the admission-controller + */ + public String getName() { + return this.admissionControllerName; + } + + /** + * Adds the rejection count for the controller. Primarily used when copying controller states. + * @param count To add the value of the tracking resource object as the provided count + */ + public void addRejectionCount(long count) { + this.rejectionCount.addAndGet(count); + } + + /** + * @return current value of the rejection count metric tracked by the admission-controller. + */ + public long getRejectionCount() { + return this.rejectionCount.get(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java new file mode 100644 index 0000000000000..3a8956b2cce87 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class CPUBasedAdmissionController extends AdmissionController { + private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class); + public CPUBasedAdmissionControllerSettings settings; + + /** + * + * @param admissionControllerName State of the admission controller + */ + public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) { + super(new AtomicLong(0), admissionControllerName); + this.settings = new CPUBasedAdmissionControllerSettings(clusterSettings, settings); + } + + /** + * This function will take of applying admission controller based on CPU usage + * @param action is the transport action + */ + @Override + public void apply(String action) { + // TODO Will extend this logic further currently just incrementing rejectionCount + if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { + this.applyForTransportLayer(action); + } + } + + private void applyForTransportLayer(String actionName) { + // currently incrementing counts to evaluate the controller triggering as expected and using in testing so limiting to 10 + // TODO will update rejection logic further in next PR's + if (this.getRejectionCount() < 10) { + this.addRejectionCount(1); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java new file mode 100644 index 0000000000000..23746cc61a203 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes related to the different admission controllers + */ +package org.opensearch.ratelimitting.admissioncontrol.controllers; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java new file mode 100644 index 0000000000000..2ae2436ba84e7 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlMode.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import java.util.Locale; + +/** + * Defines the AdmissionControlMode + */ +public enum AdmissionControlMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControlMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + public String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControlMode based on the mode + */ + public static AdmissionControlMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + default: + throw new IllegalArgumentException("Invalid AdmissionControlMode: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java new file mode 100644 index 0000000000000..f2fdca0cfe49b --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import java.util.Locale; + +/** + * Enums that defines the type of the transport requests + */ +public enum TransportActionType { + INDEXING("indexing"), + SEARCH("search"); + + private final String type; + + TransportActionType(String uriType) { + this.type = uriType; + } + + /** + * + * @return type of the request + */ + public String getType() { + return type; + } + + public static TransportActionType fromName(String name) { + name = name.toLowerCase(Locale.ROOT); + switch (name) { + case "indexing": + return INDEXING; + case "search": + return SEARCH; + default: + throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java new file mode 100644 index 0000000000000..98b08ebd0a7bf --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains enums related to the different admission controller feature + */ +package org.opensearch.ratelimitting.admissioncontrol.enums; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java new file mode 100644 index 0000000000000..b3dc229f86fb6 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains base classes needed for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java new file mode 100644 index 0000000000000..141e9b68db145 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; + +import java.util.Arrays; +import java.util.List; + +/** + * Settings related to cpu based admission controller. + * @opensearch.internal + */ +public class CPUBasedAdmissionControllerSettings { + public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; + + /** + * Default parameters for the CPUBasedAdmissionControllerSettings + */ + public static class Defaults { + public static final long CPU_USAGE = 95; + public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + } + + private AdmissionControlMode transportLayerMode; + private Long searchCPULimit; + private Long indexingCPULimit; + + private final List transportActionsList; + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_control.transport.cpu_usage.mode_override", + AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting used to set the CPU Limits for the search requests by default it will use default IO usage limit + */ + public static final Setting SEARCH_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.search.cpu_usage.limit", + Defaults.CPU_USAGE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting used to set the CPU limits for the indexing requests by default it will use default IO usage limit + */ + public static final Setting INDEXING_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.indexing.cpu_usage.limit", + Defaults.CPU_USAGE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // currently limited to one setting will add further more settings in follow-up PR's + public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { + this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); + this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); + this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); + this.transportActionsList = Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE; + clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); + clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); + } + + private void setTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayerMode = admissionControlMode; + } + + public AdmissionControlMode getTransportLayerAdmissionControllerMode() { + return transportLayerMode; + } + + public Long getSearchCPULimit() { + return searchCPULimit; + } + + public Long getIndexingCPULimit() { + return indexingCPULimit; + } + + public void setIndexingCPULimit(Long indexingCPULimit) { + this.indexingCPULimit = indexingCPULimit; + } + + public void setSearchCPULimit(Long searchCPULimit) { + this.searchCPULimit = searchCPULimit; + } + + public List getTransportActionsList() { + return transportActionsList; + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java new file mode 100644 index 0000000000000..a024ccc756745 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains settings related classes for the different admission controllers + */ +package org.opensearch.ratelimitting.admissioncontrol.settings; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java new file mode 100644 index 0000000000000..79f88d4ad3478 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionControl Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControlTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + AdmissionControlService admissionControlService; + boolean forceExecution; + + public AdmissionControlTransportHandler( + String action, + TransportRequestHandler actualHandler, + AdmissionControlService admissionControlService, + boolean forceExecution + ) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControlService = admissionControlService; + this.forceExecution = forceExecution; + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception when admission control rejected the requests + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + if (!forceExecution) { + try { + this.admissionControlService.applyTransportAdmissionControl(this.action); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + return; + } + } + actualHandler.messageReceived(request, channel, task); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java new file mode 100644 index 0000000000000..01cfcbd780006 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling to intercept requests on both the sender and the receiver side. + */ +public class AdmissionControlTransportInterceptor implements TransportInterceptor { + + AdmissionControlService admissionControlService; + + public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) { + this.admissionControlService = admissionControlService; + } + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java new file mode 100644 index 0000000000000..f97f31bc7b1db --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains transport related classes for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol.transport; diff --git a/server/src/main/java/org/opensearch/ratelimitting/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/package-info.java new file mode 100644 index 0000000000000..c04358e14284f --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base OpenSearch Throttling package + */ +package org.opensearch.ratelimitting; diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 0ca118fe422a5..ab51cafb039c2 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -57,6 +57,7 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -124,7 +125,7 @@ public Map> getTransports( return Collections.singletonMap("custom", custom); } }; - NetworkModule module = newNetworkModule(settings, plugin); + NetworkModule module = newNetworkModule(settings, null, plugin); assertSame(custom, module.getTransportSupplier()); } @@ -135,7 +136,7 @@ public void testRegisterHttpTransport() { .build(); Supplier custom = FakeHttpTransport::new; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getHttpTransports( Settings settings, @@ -155,7 +156,7 @@ public Map> getHttpTransports( assertSame(custom, module.getHttpServerTransportSupplier()); settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); - NetworkModule newModule = newNetworkModule(settings); + NetworkModule newModule = newNetworkModule(settings, null); expectThrows(IllegalStateException.class, () -> newModule.getHttpServerTransportSupplier()); } @@ -169,7 +170,7 @@ public void testOverrideDefault() { Supplier customTransport = () -> null; // content doesn't matter we check reference equality Supplier custom = FakeHttpTransport::new; Supplier def = FakeHttpTransport::new; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getTransports( Settings settings, @@ -214,7 +215,7 @@ public void testDefaultKeys() { Supplier custom = FakeHttpTransport::new; Supplier def = FakeHttpTransport::new; Supplier customTransport = () -> null; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public Map> getTransports( Settings settings, @@ -273,7 +274,7 @@ public TransportRequestHandler interceptHandler( return actualHandler; } }; - NetworkModule module = newNetworkModule(settings, new NetworkPlugin() { + NetworkModule module = newNetworkModule(settings, null, new NetworkPlugin() { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, @@ -295,7 +296,7 @@ public List getTransportInterceptors( assertSame(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.get(0), interceptor); NullPointerException nullPointerException = expectThrows(NullPointerException.class, () -> { - newNetworkModule(settings, new NetworkPlugin() { + newNetworkModule(settings, null, new NetworkPlugin() { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, @@ -309,7 +310,186 @@ public List getTransportInterceptors( assertEquals("interceptor must not be null", nullPointerException.getMessage()); } - private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugins) { + public void testRegisterCoreInterceptor() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(0, called.get()); + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + assertEquals(1, called.get()); + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + assertEquals(2, called.get()); + assertTrue(transportInterceptor instanceof NetworkModule.CompositeTransportInterceptor); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 1); + assertSame(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.get(0), interceptor); + } + + public void testInterceptorOrder() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + AtomicInteger called1 = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + TransportInterceptor interceptor1 = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called1.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor1); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor); + } + }); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 2); + + assertEquals(0, called.get()); + assertEquals(0, called1.get()); + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + assertEquals(1, called.get()); + assertEquals(1, called1.get()); + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + assertEquals(2, called.get()); + assertEquals(2, called1.get()); + } + + public void testInterceptorOrderException() { + Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build(); + AtomicInteger called = new AtomicInteger(0); + AtomicInteger called1 = new AtomicInteger(0); + + TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called.incrementAndGet(); + if ("foo/bar/boom".equals(action)) { + assertTrue(forceExecution); + } else { + assertFalse(forceExecution); + } + return actualHandler; + } + }; + + TransportInterceptor interceptor1 = new TransportInterceptor() { + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + called1.incrementAndGet(); + throw new RuntimeException("Handler Invoke Failed"); + } + }; + + List coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor1); + + NetworkModule module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor); + } + }); + + TransportInterceptor transportInterceptor = module.getTransportInterceptor(); + assertEquals(((NetworkModule.CompositeTransportInterceptor) transportInterceptor).transportInterceptors.size(), 2); + + assertEquals(0, called.get()); + assertEquals(0, called1.get()); + try { + transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); + } catch (Exception e) { + assertEquals(0, called.get()); + assertEquals(1, called1.get()); + } + try { + transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); + } catch (Exception e) { + assertEquals(0, called.get()); + assertEquals(2, called1.get()); + } + } + + private NetworkModule newNetworkModule( + Settings settings, + List coreTransportInterceptors, + NetworkPlugin... plugins + ) { return new NetworkModule( settings, Arrays.asList(plugins), @@ -322,7 +502,8 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi null, new NullDispatcher(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - NoopTracer.INSTANCE + NoopTracer.INSTANCE, + coreTransportInterceptors ); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java new file mode 100644 index 0000000000000..bac4eaf3fd677 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -0,0 +1,140 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; + +public class AdmissionControlServiceTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + private AdmissionControlService admissionControlService; + private String action = ""; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + action = "indexing"; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testWhenAdmissionControllerRegistered() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); + } + + public void testRegisterInvalidAdmissionController() { + String test = "TEST"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> admissionControlService.registerAdmissionController(test) + ); + assertEquals(ex.getMessage(), "Not Supported AdmissionController : " + test); + } + + public void testAdmissionControllerSettings() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings; + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService + .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals( + admissionControlSettings.isTransportLayerAdmissionControlEnabled(), + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals( + admissionControlSettings.isTransportLayerAdmissionControlEnabled(), + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + + Settings newSettings = Settings.builder() + .put(settings) + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue( + cpuBasedAdmissionController.isEnabledForTransportLayer( + cpuBasedAdmissionController.settings.getTransportLayerAdmissionControllerMode() + ) + ); + } + + public void testApplyAdmissionControllerDisabled() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService.applyTransportAdmissionControl(this.action); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); }); + } + + public void testApplyAdmissionControllerEnabled() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService.applyTransportAdmissionControl(this.action); + assertEquals( + admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 0 + ); + + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + admissionControlService.applyTransportAdmissionControl(this.action); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + assertEquals( + admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 1 + ); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..c11ee1cc608f6 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettingsTests.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Set; + +public class AdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the admission controller settings should be supported built in settings", + settings.containsAll(List.of(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE)) + ); + } + + public void testDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + assertEquals(admissionControlSettings.getAdmissionControlTransportLayerMode().getMode(), AdmissionControlSettings.Defaults.MODE); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } + + public void testUpdateAfterGetDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + + Settings newSettings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControlEnforced()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java new file mode 100644 index 0000000000000..af6ec0749e709 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +public class CPUBasedAdmissionControllerTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + CPUBasedAdmissionController admissionController = null; + + String action = "TEST_ACTION"; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testCheckDefaultParameters() { + admissionController = new CPUBasedAdmissionController( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertFalse( + admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); + } + + public void testCheckUpdateSettings() { + admissionController = new CPUBasedAdmissionController( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + } + + public void testApplyControllerWithDefaultSettings() { + admissionController = new CPUBasedAdmissionController( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + action = "indices:data/write/bulk[s][p]"; + admissionController.apply(action); + assertEquals(admissionController.getRejectionCount(), 0); + } + + public void testApplyControllerWhenSettingsEnabled() { + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + admissionController = new CPUBasedAdmissionController( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + settings, + clusterService.getClusterSettings() + ); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + assertEquals(admissionController.getRejectionCount(), 0); + action = "indices:data/write/bulk[s][p]"; + admissionController.apply(action); + assertEquals(admissionController.getRejectionCount(), 1); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java new file mode 100644 index 0000000000000..98c0f3c7cf24c --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlModeTests.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import org.opensearch.test.OpenSearchTestCase; + +public class AdmissionControlModeTests extends OpenSearchTestCase { + + public void testValidActionType() { + assertEquals(AdmissionControlMode.DISABLED.getMode(), "disabled"); + assertEquals(AdmissionControlMode.ENFORCED.getMode(), "enforced"); + assertEquals(AdmissionControlMode.MONITOR.getMode(), "monitor_only"); + assertEquals(AdmissionControlMode.fromName("disabled"), AdmissionControlMode.DISABLED); + assertEquals(AdmissionControlMode.fromName("enforced"), AdmissionControlMode.ENFORCED); + assertEquals(AdmissionControlMode.fromName("monitor_only"), AdmissionControlMode.MONITOR); + } + + public void testInValidActionType() { + String name = "TEST"; + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> AdmissionControlMode.fromName(name)); + assertEquals(ex.getMessage(), "Invalid AdmissionControlMode: " + name); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java new file mode 100644 index 0000000000000..02f582c26f54e --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.enums; + +import org.opensearch.test.OpenSearchTestCase; + +public class TransportActionTypeTests extends OpenSearchTestCase { + + public void testValidActionType() { + assertEquals(TransportActionType.SEARCH.getType(), "search"); + assertEquals(TransportActionType.INDEXING.getType(), "indexing"); + assertEquals(TransportActionType.fromName("search"), TransportActionType.SEARCH); + assertEquals(TransportActionType.fromName("indexing"), TransportActionType.INDEXING); + } + + public void testInValidActionType() { + String name = "test"; + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> TransportActionType.fromName(name)); + assertEquals(ex.getMessage(), "Not Supported TransportAction Type: " + name); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..43103926a69a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.settings; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Set; + +public class CPUBasedAdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the cpu based admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT + ) + ) + ); + } + + public void testDefaultSettings() { + CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + long percent = 95; + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportActionsList(), Arrays.asList("indexing", "search")); + } + + public void testGetConfiguredSettings() { + long percent = 95; + long indexingPercent = 85; + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .build(); + + CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + } + + public void testUpdateAfterGetDefaultSettings() { + long percent = 95; + long searchPercent = 80; + CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + } + + public void testUpdateAfterGetConfiguredSettings() { + long percent = 95; + long indexingPercent = 85; + long searchPercent = 80; + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + + CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); + + Settings updatedSettings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + + searchPercent = 70; + + updatedSettings = Settings.builder() + .put(updatedSettings) + .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + + assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); + assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java new file mode 100644 index 0000000000000..15d60157bda29 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class AdmissionControlTransportHandlerTests extends OpenSearchTestCase { + AdmissionControlTransportHandler admissionControlTransportHandler; + + public void testHandlerInvoked() throws Exception { + String action = "TEST"; + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + mock(AdmissionControlService.class), + false + ); + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRejectedException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + admissionControlService, + false + ); + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRandomException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + admissionControlService, + false + ); + try { + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } catch (Exception exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + private class InterceptingRequestHandler implements TransportRequestHandler { + private final String action; + public int count; + + public InterceptingRequestHandler(String action) { + this.action = action; + this.count = 0; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + this.count = this.count + 1; + } + } +}