Skip to content

Commit

Permalink
spotlessapply
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 17, 2024
1 parent 867e315 commit 1643aa4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void testSearchCancellationWithBackpressureDisabled() throws InterruptedE
assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException);
}

private static class ExceptionCatchingListener implements ActionListener<TestResponse> {
public static class ExceptionCatchingListener implements ActionListener<TestResponse> {
private final CountDownLatch latch;
private Exception exception = null;

Expand All @@ -336,6 +336,10 @@ public void onFailure(Exception e) {
private Exception getException() {
return exception;
}

public CountDownLatch getLatch() {
return latch;
}
}

enum RequestType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.wlm;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.ActionType;
Expand All @@ -31,6 +32,8 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.backpressure.SearchBackpressureIT.ExceptionCatchingListener;
import org.opensearch.search.backpressure.SearchBackpressureIT.TestResponse;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
Expand All @@ -43,7 +46,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -170,10 +172,10 @@ public void testWlmStatsWithIdAndBreach() throws Exception {
}

public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException {
TestClusterUpdateListener listener = new TestClusterUpdateListener();
ExceptionCatchingListener listener = new ExceptionCatchingListener();
client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener);
assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));
assertEquals(0, listener.latch.getCount());
assertTrue(listener.getLatch().await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));
assertEquals(0, listener.getLatch().getCount());
}

public WlmStatsResponse getWlmStatsResponse(String[] queryGroupIds, Boolean breach) throws ExecutionException, InterruptedException {
Expand All @@ -196,26 +198,6 @@ public void validateResponse(WlmStatsResponse response, String[] validIds, Strin
}
}

private static class TestClusterUpdateListener implements ActionListener<TestClusterUpdateResponse> {
private final CountDownLatch latch;
private Exception exception = null;

public TestClusterUpdateListener() {
this.latch = new CountDownLatch(1);
}

@Override
public void onResponse(TestClusterUpdateResponse r) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
this.exception = e;
latch.countDown();
}
}

public static class TestClusterUpdateRequest extends ActionRequest {
final private String method;
final private QueryGroup queryGroup;
Expand Down Expand Up @@ -252,22 +234,8 @@ public String getMethod() {
}
}

public static class TestClusterUpdateResponse extends ActionResponse {
public TestClusterUpdateResponse() {}

public TestClusterUpdateResponse(StreamInput in) {}

@Override
public void writeTo(StreamOutput out) throws IOException {}
}

public static class TestClusterUpdateTransportAction extends HandledTransportAction<
TestClusterUpdateRequest,
TestClusterUpdateResponse> {
public static final ActionType<TestClusterUpdateResponse> ACTION = new ActionType<>(
"internal::test_cluster_update_action",
TestClusterUpdateResponse::new
);
public static class TestClusterUpdateTransportAction extends HandledTransportAction<TestClusterUpdateRequest, TestResponse> {
public static final ActionType<TestResponse> ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new);
private final ClusterService clusterService;

@Inject
Expand All @@ -281,7 +249,7 @@ public TestClusterUpdateTransportAction(
}

@Override
protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener<TestClusterUpdateResponse> listener) {
protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener<TestResponse> listener) {
clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -305,7 +273,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new TestClusterUpdateResponse());
listener.onResponse(new TestResponse());
}
});
}
Expand Down

0 comments on commit 1643aa4

Please sign in to comment.