diff --git a/CHANGELOG.md b/CHANGELOG.md index b6af9b7041db3..3b15a28c55c43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,10 +45,10 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) - [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) +- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272)) - Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580)) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) - ### Deprecated ### Removed diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index ad8da7244eae0..c0eb344a64dba 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -888,7 +888,8 @@ public void testApiNamingConventions() throws Exception { "nodes.usage", "nodes.reload_secure_settings", "search_shards", - "remote_store.restore", }; + "remote_store.restore", + "cluster.put_weighted_routing", }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json new file mode 100644 index 0000000000000..88498517ba336 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.put_weighted_routing.json @@ -0,0 +1,25 @@ +{ + "cluster.put_weighted_routing": { + "documentation": { + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/put", + "description": "Updates weighted shard routing weights" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_cluster/routing/awareness/{attribute}/weights", + "methods": [ + "PUT" + ], + "parts": { + "attribute": { + "type": "string", + "description": "Awareness attribute name" + } + } + } + ] + } + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index c5fcfdd047a09..e745e505d9fe5 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -79,6 +79,8 @@ import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; @@ -294,6 +296,7 @@ import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction; import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction; +import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction; import org.opensearch.rest.action.admin.cluster.RestClusterSearchShardsAction; import org.opensearch.rest.action.admin.cluster.RestClusterStateAction; @@ -563,6 +566,7 @@ public void reg actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); + actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); @@ -744,6 +748,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestCloseIndexAction()); registerHandler.accept(new RestOpenIndexAction()); registerHandler.accept(new RestAddIndexBlockAction()); + registerHandler.accept(new RestClusterPutWeightedRoutingAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java new file mode 100644 index 0000000000000..65c5ccca71461 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterAddWeightedRoutingAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionType; + +/** + * Action to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public final class ClusterAddWeightedRoutingAction extends ActionType { + + public static final ClusterAddWeightedRoutingAction INSTANCE = new ClusterAddWeightedRoutingAction(); + public static final String NAME = "cluster:admin/routing/awareness/weights/put"; + + private ClusterAddWeightedRoutingAction() { + super(NAME, ClusterPutWeightedRoutingResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java new file mode 100644 index 0000000000000..af229fb12b4f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Request to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest { + private static final Logger logger = LogManager.getLogger(ClusterPutWeightedRoutingRequest.class); + + private WeightedRouting weightedRouting; + private String attributeName; + + public ClusterPutWeightedRoutingRequest() {} + + public WeightedRouting getWeightedRouting() { + return weightedRouting; + } + + public ClusterPutWeightedRoutingRequest setWeightedRouting(WeightedRouting weightedRouting) { + this.weightedRouting = weightedRouting; + return this; + } + + public void attributeName(String attributeName) { + this.attributeName = attributeName; + } + + public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { + super(in); + weightedRouting = new WeightedRouting(in); + } + + public ClusterPutWeightedRoutingRequest(String attributeName) { + this.attributeName = attributeName; + } + + public void setWeightedRouting(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setWeightedRouting(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + } + + public void setWeightedRouting(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String attrValue = null; + Map weights = new HashMap<>(); + Double attrWeight = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + this.weightedRouting = new WeightedRouting(this.attributeName, weights); + } catch (IOException e) { + logger.error("error while parsing put for weighted routing request object", e); + } + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (weightedRouting == null) { + validationException = addValidationError("Weighted routing request object is null", validationException); + } + if (weightedRouting.attributeName() == null || weightedRouting.attributeName().isEmpty()) { + validationException = addValidationError("Attribute name is missing", validationException); + } + if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { + validationException = addValidationError("Weights are missing", validationException); + } + int countValueWithZeroWeights = 0; + double weight; + try { + for (Object value : weightedRouting.weights().values()) { + if (value == null) { + validationException = addValidationError(("Weight is null"), validationException); + } else { + weight = Double.parseDouble(value.toString()); + countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights; + } + } + } catch (NumberFormatException e) { + validationException = addValidationError(("Weight is not a number"), validationException); + } + if (countValueWithZeroWeights > 1) { + validationException = addValidationError( + (String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)), + validationException + ); + } + return validationException; + } + + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterPutWeightedRoutingRequest source(Map source) { + setWeightedRouting(source); + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + weightedRouting.writeTo(out); + } + + @Override + public String toString() { + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java new file mode 100644 index 0000000000000..b437f4c54d8d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; +import org.opensearch.cluster.routing.WeightedRouting; + +/** + * Request builder to update weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingRequestBuilder extends ClusterManagerNodeOperationRequestBuilder< + ClusterPutWeightedRoutingRequest, + ClusterPutWeightedRoutingResponse, + ClusterPutWeightedRoutingRequestBuilder> { + public ClusterPutWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterAddWeightedRoutingAction action) { + super(client, action, new ClusterPutWeightedRoutingRequest()); + } + + public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRouting weightedRouting) { + request.setWeightedRouting(weightedRouting); + return this; + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java new file mode 100644 index 0000000000000..b0154aceef0c2 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingResponse.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Response from updating weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterPutWeightedRoutingResponse extends AcknowledgedResponse { + public ClusterPutWeightedRoutingResponse(boolean acknowledged) { + super(acknowledged); + } + + public ClusterPutWeightedRoutingResponse(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java new file mode 100644 index 0000000000000..8c29ab2199848 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.WeightedRoutingService; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Transport action for updating weights for weighted round-robin search routing policy + * + * @opensearch.internal + */ +public class TransportAddWeightedRoutingAction extends TransportClusterManagerNodeAction< + ClusterPutWeightedRoutingRequest, + ClusterPutWeightedRoutingResponse> { + + private final WeightedRoutingService weightedRoutingService; + private volatile List awarenessAttributes; + + @Inject + public TransportAddWeightedRoutingAction( + Settings settings, + ClusterSettings clusterSettings, + TransportService transportService, + ClusterService clusterService, + WeightedRoutingService weightedRoutingService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + ClusterAddWeightedRoutingAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + ClusterPutWeightedRoutingRequest::new, + indexNameExpressionResolver + ); + this.weightedRoutingService = weightedRoutingService; + this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + this::setAwarenessAttributes + ); + } + + List getAwarenessAttributes() { + return awarenessAttributes; + } + + private void setAwarenessAttributes(List awarenessAttributes) { + this.awarenessAttributes = awarenessAttributes; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected ClusterPutWeightedRoutingResponse read(StreamInput in) throws IOException { + return new ClusterPutWeightedRoutingResponse(in); + } + + @Override + protected void clusterManagerOperation( + ClusterPutWeightedRoutingRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + verifyAwarenessAttribute(request.getWeightedRouting().attributeName()); + weightedRoutingService.registerWeightedRoutingMetadata( + request, + ActionListener.delegateFailure( + listener, + (delegatedListener, response) -> { + delegatedListener.onResponse(new ClusterPutWeightedRoutingResponse(response.isAcknowledged())); + } + ) + ); + } + + private void verifyAwarenessAttribute(String attributeName) { + if (getAwarenessAttributes().contains(attributeName) == false) { + ActionRequestValidationException validationException = null; + + validationException = addValidationError( + String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + validationException + ); + throw validationException; + } + } + + @Override + protected ClusterBlockException checkBlock(ClusterPutWeightedRoutingRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java new file mode 100644 index 0000000000000..4f18b220cd343 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** add/update weighted-round robin shard routing weights. */ +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 7a7b98bf724f6..2f62ab13b131c 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -86,6 +86,9 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -791,4 +794,20 @@ public interface ClusterAdminClient extends OpenSearchClient { * Delete specified dangling indices. */ ActionFuture deleteDanglingIndex(DeleteDanglingIndexRequest request); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + ActionFuture putWeightedRouting(ClusterPutWeightedRoutingRequest request); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + void putWeightedRouting(ClusterPutWeightedRoutingRequest request, ActionListener listener); + + /** + * Updates weights for weighted round-robin search routing policy. + */ + ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting(); + } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index b04de7830a780..7154742de04fb 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -47,6 +47,7 @@ import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; @@ -548,4 +549,13 @@ public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, Str public static SnapshotsStatusRequest snapshotsStatusRequest(String repository) { return new SnapshotsStatusRequest(repository); } + + /** + * Updates weights for weighted round-robin search routing policy + * + * @return update weight request + */ + public static ClusterPutWeightedRoutingRequest putWeightedRoutingRequest(String attributeName) { + return new ClusterPutWeightedRoutingRequest(attributeName); + } } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 21cd01bf65a45..efd18dd4947ad 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -110,6 +110,10 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; @@ -1272,6 +1276,24 @@ public ActionFuture deleteDanglingIndex(DeleteDanglingInde return execute(DeleteDanglingIndexAction.INSTANCE, request); } + @Override + public ActionFuture putWeightedRouting(ClusterPutWeightedRoutingRequest request) { + return execute(ClusterAddWeightedRoutingAction.INSTANCE, request); + } + + @Override + public void putWeightedRouting( + ClusterPutWeightedRoutingRequest request, + ActionListener listener + ) { + execute(ClusterAddWeightedRoutingAction.INSTANCE, request, listener); + } + + @Override + public ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting() { + return new ClusterPutWeightedRoutingRequestBuilder(this, ClusterAddWeightedRoutingAction.INSTANCE); + } + @Override public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener) { execute(DeleteDanglingIndexAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java new file mode 100644 index 0000000000000..da454865ac866 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; + +import org.opensearch.threadpool.ThreadPool; + +/** + * * Service responsible for updating cluster state metadata with weighted routing weights + */ +public class WeightedRoutingService { + private static final Logger logger = LogManager.getLogger(WeightedRoutingService.class); + private final ClusterService clusterService; + private final ThreadPool threadPool; + + @Inject + public WeightedRoutingService(ClusterService clusterService, ThreadPool threadPool) { + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + public void registerWeightedRoutingMetadata( + final ClusterPutWeightedRoutingRequest request, + final ActionListener listener + ) { + final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + if (weightedRoutingMetadata == null) { + logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); + weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + } else { + if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { + logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + } else { + return currentState; + } + } + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to update cluster state for weighted routing weights [{}]", e)); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("cluster weighted routing weights metadata change is processed by all the nodes"); + listener.onResponse(new ClusterStateUpdateResponse(true)); + } + }); + } + + private boolean checkIfSameWeightsInMetadata( + WeightedRoutingMetadata newWeightedRoutingMetadata, + WeightedRoutingMetadata oldWeightedRoutingMetadata + ) { + return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java new file mode 100644 index 0000000000000..1cf44e665cf84 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.client.Requests; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Update Weighted Round Robin based shard routing weights + * + * @opensearch.api + * + */ +public class RestClusterPutWeightedRoutingAction extends BaseRestHandler { + + private static final Logger logger = LogManager.getLogger(RestClusterPutWeightedRoutingAction.class); + + @Override + public List routes() { + return singletonList(new Route(PUT, "/_cluster/routing/awareness/{attribute}/weights")); + } + + @Override + public String getName() { + return "put_weighted_routing_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = createRequest(request); + return channel -> client.admin().cluster().putWeightedRouting(putWeightedRoutingRequest, new RestToXContentListener<>(channel)); + } + + public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + return putWeightedRoutingRequest; + } + +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java new file mode 100644 index 0000000000000..186e7e8638f17 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.put; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Map; + +public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { + + public void testSetWeightedRoutingWeight() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + assertEquals(request.getWeightedRouting(), weightedRouting); + } + + public void testValidate_ValuesAreProper() { + String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNull(actionRequestValidationException); + } + + public void testValidate_TwoZonesWithZeroWeight() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("More than one [2] value has weight set as " + "0")); + } + + public void testValidate_MissingWeights() { + String reqString = "{}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Weights are missing")); + } + + public void testValidate_AttributeMissing() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing")); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java new file mode 100644 index 0000000000000..e5cca998d3f06 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -0,0 +1,234 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class WeightedRoutingServiceTests extends OpenSearchTestCase { + private ThreadPool threadPool; + private ClusterService clusterService; + private TransportService transportService; + private WeightedRoutingService weightedRoutingService; + private ClusterSettings clusterSettings; + NodeClient client; + + final private static Set CLUSTER_MANAGER_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + ); + + final private static Set DATA_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.DATA_ROLE)) + ); + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("test", Settings.EMPTY); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + } + + @Before + public void setUpService() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + clusterState = addClusterManagerNodes(clusterState); + clusterState = addDataNodes(clusterState); + clusterState = setLocalNode(clusterState, "nodeA1"); + + ClusterState.Builder builder = ClusterState.builder(clusterState); + ClusterServiceUtils.setState(clusterService, builder); + + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> clusterService.state().nodes().get("nodes1"), + null, + Collections.emptySet() + + ); + + Settings.Builder settingsBuilder = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone"); + + clusterSettings = new ClusterSettings(settingsBuilder.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + transportService.start(); + transportService.acceptIncomingRequests(); + + this.weightedRoutingService = new WeightedRoutingService(clusterService, threadPool); + client = new NodeClient(Settings.EMPTY, threadPool); + } + + @After + public void shutdown() { + clusterService.stop(); + threadPool.shutdown(); + } + + private ClusterState addDataNodes(ClusterState clusterState) { + clusterState = addDataNodeForAZone(clusterState, "zone_A", "nodeA1", "nodeA2", "nodeA3"); + clusterState = addDataNodeForAZone(clusterState, "zone_B", "nodeB1", "nodeB2", "nodeB3"); + clusterState = addDataNodeForAZone(clusterState, "zone_C", "nodeC1", "nodeC2", "nodeC3"); + return clusterState; + } + + private ClusterState addClusterManagerNodes(ClusterState clusterState) { + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_A", "nodeMA"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_B", "nodeMB"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_C", "nodeMC"); + return clusterState; + } + + private ClusterState addDataNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + DATA_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState addClusterManagerNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + CLUSTER_MANAGER_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + nodeBuilder.localNodeId(nodeId); + nodeBuilder.clusterManagerNodeId(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + return clusterState; + } + + public void testRegisterWeightedRoutingMetadataWithChangedWeights() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 0.0)); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("request should not fail"); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + public void testRegisterWeightedRoutingMetadataWithSameWeights() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(updatedWeightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + assertTrue(clusterStateUpdateResponse.isAcknowledged()); + assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("request should not fail"); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..a4cd6224217b7 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import com.fasterxml.jackson.core.JsonParseException; +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.singletonMap; + +public class RestClusterAddWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testCreateRequest_SupportedRequestBody() throws IOException { + String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + RestRequest restRequest = buildRestRequest(req); + ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); + assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); + assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + } + + public void testCreateRequest_UnsupportedRequestBody() throws IOException { + Map params = new HashMap<>(); + String req = "[\"us-east-1c\" : \"1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"]"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + public void testCreateRequest_MalformedRequestBody() throws IOException { + Map params = new HashMap<>(); + + String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + +}