Skip to content

Commit

Permalink
Add asserting listener class
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Feb 28, 2024
1 parent 7cfd70a commit ff5bb5c
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
*
* @opensearch.internal
*/
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

private final SearchPhaseController searchPhaseController;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
import java.util.function.BiFunction;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -98,7 +96,7 @@ public class AbstractSearchAsyncActionTests extends OpenSearchTestCase {
private final List<Tuple<String, String>> resolvedNodes = new ArrayList<>();
private final Set<ShardSearchContextId> releasedContexts = new CopyOnWriteArraySet<>();
private ExecutorService executor;
private SearchRequestOperationsListener assertingListener;
private SearchRequestOperationsListenerAssertingListener assertingListener;
ThreadPool threadPool;

@Before
Expand All @@ -107,27 +105,7 @@ public void setUp() throws Exception {
super.setUp();
executor = Executors.newFixedThreadPool(1);
threadPool = new TestThreadPool(getClass().getName());
assertingListener = new SearchRequestOperationsListener() {
private volatile SearchPhase phase;

@Override
protected void onPhaseStart(SearchPhaseContext context) {
assertThat(phase, is(nullValue()));
phase = context.getCurrentPhase();
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
assertThat(phase, is(context.getCurrentPhase()));
phase = null;
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
assertThat(phase, is(context.getCurrentPhase()));
phase = null;
}
};
assertingListener = new SearchRequestOperationsListenerAssertingListener();
}

@After
Expand All @@ -137,6 +115,7 @@ public void tearDown() throws Exception {
executor.shutdown();
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS);
assertingListener.assertFinished();
}

private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
Expand Down Expand Up @@ -363,7 +342,7 @@ public void testOnPhaseFailureAndVerifyListeners() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);

final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener);
final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener, assertingListener);
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
action.start();
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
Expand Down Expand Up @@ -395,6 +374,7 @@ public void run() {
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
searchShardIterator.resetAndSkip();
action.skipShard(searchShardIterator);
action.start();
action.executeNextPhase(action, fetchPhase);
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
action.onPhaseFailure(new SearchPhase("test") {
Expand Down Expand Up @@ -626,7 +606,7 @@ public void onFailure(Exception e) {
public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener, assertingListener));

long delay = (randomIntBetween(1, 5));
delay = delay * 10;
Expand Down Expand Up @@ -676,7 +656,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
public void testOnPhaseListenersWithDfsType() throws InterruptedException {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener, assertingListener));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
Expand All @@ -697,6 +677,10 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
searchDfsQueryThenFetchAsyncAction.sendSearchResponse(
mock(InternalSearchResponse.class),
mock(String.valueOf(QuerySearchResult.class))
);
}

private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(
Expand Down Expand Up @@ -750,7 +734,18 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
searchRequest
),
NoopTracer.INSTANCE
);
) {
@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray<SearchPhaseResult> queryResults) {
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger).onPhaseEnd(
this,
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
);
}
};
}

private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
Expand Down Expand Up @@ -812,7 +807,13 @@ ShardSearchFailure[] buildShardFailures() {

@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray<SearchPhaseResult> queryResults) {
start();
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger).onPhaseEnd(
this,
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -76,50 +75,29 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.collection.IsEmptyCollection.empty;

public class CanMatchPreFilterSearchPhaseTests extends OpenSearchTestCase {
private SearchRequestOperationsListener assertingListener;
private Set<SearchPhase> phases;
private SearchRequestOperationsListenerAssertingListener assertingListener;

@Before
public void setUp() throws Exception {
super.setUp();

phases = Collections.newSetFromMap(new IdentityHashMap<>());
assertingListener = new SearchRequestOperationsListener() {
@Override
protected void onPhaseStart(SearchPhaseContext context) {
assertThat(phases.contains(context.getCurrentPhase()), is(false));
phases.add(context.getCurrentPhase());
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
assertThat(phases.contains(context.getCurrentPhase()), is(true));
phases.remove(context.getCurrentPhase());
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
assertThat(phases.contains(context.getCurrentPhase()), is(true));
phases.remove(context.getCurrentPhase());
}
};
assertingListener = new SearchRequestOperationsListenerAssertingListener();
}

@After
public void tearDown() throws Exception {
super.tearDown();
assertBusy(() -> assertThat(phases, empty()), 5, TimeUnit.SECONDS);

assertingListener.assertFinished();
}

public void testFilterShards() throws InterruptedException {
Expand Down Expand Up @@ -183,8 +161,8 @@ public void sendCanMatch(
@Override
public void run() throws IOException {
result.set(iter);
latch.countDown();
assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null);
latch.countDown();
}
},
SearchResponse.Clusters.EMPTY,
Expand Down Expand Up @@ -280,8 +258,8 @@ public void sendCanMatch(
@Override
public void run() throws IOException {
result.set(iter);
latch.countDown();
assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null);
latch.countDown();
}
},
SearchResponse.Clusters.EMPTY,
Expand Down Expand Up @@ -381,15 +359,15 @@ public void sendCanMatch(
randomIntBetween(1, 32),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()),
searchRequest
),
NoopTracer.INSTANCE
) {

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
return new WrappingSearchAsyncActionPhase(this) {
@Override
public void run() {
latch.countDown();
Expand All @@ -413,15 +391,15 @@ protected void executePhaseOnShard(
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()),
searchRequest
),
NoopTracer.INSTANCE
);

canMatchPhase.start();
latch.await();

assertingListener.onPhaseEnd(canMatchPhase, null);
executor.shutdown();
}

Expand Down Expand Up @@ -498,8 +476,8 @@ public void sendCanMatch(
@Override
public void run() {
result.set(iter);
latch.countDown();
assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null);
latch.countDown();
}
},
SearchResponse.Clusters.EMPTY,
Expand Down Expand Up @@ -604,8 +582,8 @@ public void sendCanMatch(
@Override
public void run() {
result.set(iter);
latch.countDown();
assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null);
latch.countDown();
}
},
SearchResponse.Clusters.EMPTY,
Expand Down
Loading

0 comments on commit ff5bb5c

Please sign in to comment.