Skip to content

Commit

Permalink
Addressed andrross's comment
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Dec 9, 2024
1 parent e20a4de commit 60b90af
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 56 deletions.
15 changes: 10 additions & 5 deletions server/src/main/java/org/opensearch/action/search/SearchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

/**
* Base class for all individual search phases like collecting distributed frequencies, fetching documents, querying shards.
Expand Down Expand Up @@ -69,11 +70,15 @@ public String getName() {
}

/**
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined
* in {@link SearchPhaseName}
* @return {@link SearchPhaseName}
* Returns an Optional of the SearchPhase name as {@link SearchPhaseName}. If there's not a matching SearchPhase,
* returns an empty Optional.
* @return {@link Optional<SearchPhaseName>}
*/
public SearchPhaseName getSearchPhaseName() {
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
public Optional<SearchPhaseName> getSearchPhaseName() {
try {
return Optional.of(SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT)));
} catch (IllegalArgumentException e) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,22 @@ public long getTookMetric() {

@Override
protected void onPhaseStart(SearchPhaseContext context) {
try {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
} catch (IllegalArgumentException ignored) {
// Do nothing if the phase isn't found in SearchPhaseName.
}
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> phaseStatsMap.get(name).current.inc());
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
try {
StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName());
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> {
StatsHolder phaseStats = phaseStatsMap.get(name);
phaseStats.current.dec();
phaseStats.total.inc();
phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()));
} catch (IllegalArgumentException ignored) {
// Do nothing if the phase isn't found in SearchPhaseName.
}
});
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
try {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
} catch (IllegalArgumentException ignored) {
// Do nothing if the phase isn't found in SearchPhaseName.
}
context.getCurrentPhase().getSearchPhaseName().ifPresent(name -> phaseStatsMap.get(name).current.dec());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,29 +399,29 @@ public void testOnPhaseFailureAndVerifyListeners() {
final List<SearchRequestOperationsListener> requestOperationListeners = List.of(testListener, assertingListener);
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
action.start();
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
requestOperationListeners
);
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

FetchSearchPhase fetchPhase = createFetchSearchPhase();
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
Expand All @@ -430,15 +430,15 @@ public void run() {
action.skipShard(searchShardIterator);
action.start();
action.executeNextPhase(action, fetchPhase);
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
action.onPhaseFailure(new SearchPhase("test") {
@Override
public void run() {

}
}, "message", null);
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName().get()));
}

public void testOnPhaseFailure() {
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.start();

// Verify queryPhase current metric
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);

FetchSearchPhase fetchPhase = createFetchSearchPhase();
Expand All @@ -733,31 +733,31 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
action.executeNextPhase(action, fetchPhase);

// Verify queryPhase total, current and latency metrics
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName().get()));
assertThat(testListener.getPhaseMetric(action.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName().get()));

// Verify fetchPhase current metric
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);

ExpandSearchPhase expandPhase = createExpandSearchPhase();
action.executeNextPhase(fetchPhase, expandPhase);
TimeUnit.MILLISECONDS.sleep(delay);

// Verify fetchPhase total, current and latency metrics
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName().get()));

assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName().get()));

action.executeNextPhase(expandPhase, fetchPhase);
action.onPhaseDone(); /* finish phase since we don't have reponse being sent */

assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName().get()));
}

public void testOnPhaseListenersWithDfsType() throws InterruptedException {
Expand All @@ -772,7 +772,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {

FetchSearchPhase fetchPhase = createFetchSearchPhase();
searchDfsQueryThenFetchAsyncAction.start();
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
TimeUnit.MILLISECONDS.sleep(delay);
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
Expand All @@ -786,9 +786,9 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
null
); /* finalizing the fetch phase since we do adhoc phase lifecycle calls */

assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()), greaterThanOrEqualTo(delay));
assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName().get()));
}

private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -30,18 +31,18 @@ public void testListenersAreExecuted() {

@Override
public void onPhaseStart(SearchPhaseContext context) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.inc();
}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).total.inc();
}

@Override
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName().get()).current.dec();
}
};

Expand All @@ -61,7 +62,7 @@ public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(ctx.getCurrentPhase()).thenReturn(searchPhase);
when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(searchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
compositeListener.onPhaseStart(ctx);
assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void testSearchRequestPhaseFailure() {
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
testRequestStats.onPhaseStart(ctx);
assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName));
testRequestStats.onPhaseFailure(ctx, new Throwable());
Expand All @@ -85,7 +86,7 @@ public void testSearchRequestStats() {
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
long tookTimeInMillis = randomIntBetween(1, 10);
testRequestStats.onPhaseStart(ctx);
long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis);
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
long tookTimeInMillis = randomIntBetween(1, 10);
long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis);
when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime);
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void testShardLevelSearchGroupStats() throws Exception {
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue));
when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
when(mockSearchPhase.getSearchPhaseName()).thenReturn(Optional.of(searchPhaseName));
for (int iterator = 0; iterator < paramValue; iterator++) {
onPhaseStart(testRequestStats, ctx);
onPhaseEnd(testRequestStats, ctx);
Expand Down

0 comments on commit 60b90af

Please sign in to comment.