Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 23, 2024
1 parent 2e690d7 commit 1a550fe
Showing 1 changed file with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -58,14 +63,15 @@
import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER;
import static org.hamcrest.Matchers.instanceOf;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
import static org.opensearch.threadpool.ThreadPool.Names.SAME;

public class WorkloadManagementIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
final static String PUT = "PUT";
final static String MEMORY = "MEMORY";
final static String CPU = "CPU";
final static String ENABLED = "enabled";
final static String DELETE = "DELETE";
private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS);
private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.SECONDS);

public WorkloadManagementIT(Settings nodeSettings) {
super(nodeSettings);
Expand Down Expand Up @@ -207,7 +213,7 @@ public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup)
assertEquals(0, listener.getLatch().getCount());
}

public static class TestClusterUpdateRequest extends ActionRequest {
public static class TestClusterUpdateRequest extends ClusterManagerNodeRequest<TestClusterUpdateRequest> {
final private String method;
final private QueryGroup queryGroup;

Expand Down Expand Up @@ -243,22 +249,49 @@ public String getMethod() {
}
}

public static class TestClusterUpdateTransportAction extends HandledTransportAction<TestClusterUpdateRequest, TestResponse> {
public static class TestClusterUpdateTransportAction extends TransportClusterManagerNodeAction<TestClusterUpdateRequest, TestResponse> {
public static final ActionType<TestResponse> ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new);
private final ClusterService clusterService;

@Inject
public TestClusterUpdateTransportAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService
) {
super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new);
this.clusterService = clusterService;
super(
ACTION.name(),
transportService,
clusterService,
threadPool,
actionFilters,
TestClusterUpdateRequest::new,
indexNameExpressionResolver
);
}

@Override
protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener<TestResponse> listener) {
protected String executor() {
return SAME;
}

@Override
protected TestResponse read(StreamInput in) throws IOException {
return new TestResponse(in);
}

@Override
protected ClusterBlockException checkBlock(TestClusterUpdateRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void clusterManagerOperation(
TestClusterUpdateRequest request,
ClusterState clusterState,
ActionListener<TestResponse> listener
) {
clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand Down Expand Up @@ -350,7 +383,6 @@ protected void doExecute(Task task, TestQueryGroupTaskRequest request, ActionLis
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
assertEquals(request.getQueryGroupId(), ((QueryGroupTask) task).getQueryGroupId());
long startTime = System.nanoTime();

while (System.nanoTime() - startTime < TIMEOUT.getNanos()) {
doWork(request);
if (cancellableTask.isCancelled()) {
Expand Down Expand Up @@ -379,8 +411,9 @@ private void doWork(TestQueryGroupTaskRequest request) throws InterruptedExcepti
} while (i < iterations);
break;
case "MEMORY":
Byte[] bytes = new Byte[100000];
int[] ints = new int[1000000];
int bytesToAllocate = (int) (Runtime.getRuntime().totalMemory() * 0.01);
Byte[] bytes = new Byte[bytesToAllocate];
int[] ints = new int[bytesToAllocate];
break;
}
}
Expand Down

0 comments on commit 1a550fe

Please sign in to comment.