Skip to content

Commit

Permalink
Adding judgments id param, not using .get() in places.
Browse files Browse the repository at this point in the history
  • Loading branch information
jzonthemtn committed Dec 2, 2024
1 parent c770fd3 commit e484582
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -139,10 +140,25 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC
job.put("invocation", "scheduled");
job.put("max_rank", searchQualityEvaluationJobParameter.getMaxRank());

final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

client.index(indexRequest).get();
final String judgmentsId = UUID.randomUUID().toString();

final IndexRequest indexRequest = new IndexRequest()
.index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(judgmentsId)
.source(job)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.info("Successfully indexed implicit judgments {}", judgmentsId);
}

@Override
public void onFailure(Exception ex) {
LOGGER.error("Unable to index implicit judgments", ex);
}
});

}, exception -> { throw new IllegalStateException("Failed to acquire lock."); }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

public class SearchQualityEvaluationRestHandler extends BaseRestHandler {

Expand Down Expand Up @@ -148,11 +149,16 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
} else if(QUERYSET_RUN_URL.equalsIgnoreCase(request.path())) {

final String querySetId = request.param("id");
final String judgmentsId = request.param("judgments");

if(querySetId == null || judgmentsId == null) {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "{\"error\": \"Missing required parameters.\"}"));
}

try {

final OpenSearchQuerySetRunner openSearchQuerySetRunner = new OpenSearchQuerySetRunner(client);
final QuerySetRunResult querySetRunResult = openSearchQuerySetRunner.run(querySetId);
final QuerySetRunResult querySetRunResult = openSearchQuerySetRunner.run(querySetId, judgmentsId);
openSearchQuerySetRunner.save(querySetRunResult);

} catch (Exception ex) {
Expand Down Expand Up @@ -195,16 +201,35 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
job.put("invocation", "on_demand");
job.put("max_rank", maxRank);

final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
final String judgmentsId = UUID.randomUUID().toString();

try {
client.index(indexRequest).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
final IndexRequest indexRequest = new IndexRequest()
.index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(judgmentsId)
.source(job)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

final AtomicBoolean success = new AtomicBoolean(false);

return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "{\"message\": \"Implicit judgment generation initiated.\"}"));
client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
LOGGER.debug("Judgments indexed: {}", judgmentsId);
success.set(true);
}

@Override
public void onFailure(final Exception ex) {
LOGGER.error("Unable to index judgment with ID {}", judgmentsId, ex);
success.set(false);
}
});

if(success.get()) {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "{\"judgments_id\": \"" + judgmentsId + "\"}"));
} else {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,"Unable to index judgments."));
}

} else {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "{\"error\": \"Invalid click model.\"}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public OpenSearchQuerySetRunner(final Client client) {
}

@Override
public QuerySetRunResult run(final String querySetId) {
public QuerySetRunResult run(final String querySetId, final String judgmentsId) {

// Get the query set.
final SearchSourceBuilder getQuerySetSearchSourceBuilder = new SearchSourceBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public interface QuerySetRunner {
/**
* Runs the query set.
* @param querySetId The ID of the query set to run.
* @param judgmentsId The ID of the judgments set to use for search metric calcuation.
* @return The query set {@link QuerySetRunResult results} and calculated metrics.
*/
QuerySetRunResult run(String querySetId);
QuerySetRunResult run(String querySetId, final String judgmentsId);

/**
* Saves the query set results to a persistent store, which may be the search engine itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.eval.SearchQualityEvaluationPlugin;
import org.opensearch.eval.SearchQualityEvaluationRestHandler;
import org.opensearch.eval.judgments.model.QuerySetQuery;

import javax.management.Query;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -60,7 +59,6 @@ protected String indexQuerySet(final NodeClient client, final String name, final
final Map<String, Long> querySetQuery = new HashMap<>();
querySetQuery.put(query, queries.get(query));

final long frequency = queries.get(query);
querySetQueries.add(querySetQuery);

}
Expand All @@ -80,7 +78,18 @@ protected String indexQuerySet(final NodeClient client, final String name, final
.source(querySet)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

client.index(indexRequest).get();
client.index(indexRequest, new ActionListener<>() {

@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.info("Indexed query set {} having name {}", querySetId, name);
}

@Override
public void onFailure(Exception e) {
LOGGER.error("Unable to index query set {}", querySetId, e);
}
});

return querySetId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public String sample() throws Exception {
searchRequest.scroll(scroll);
searchRequest.source(searchSourceBuilder);

// TODO: Don't use .get()
SearchResponse searchResponse = client.search(searchRequest).get();

String scrollId = searchResponse.getScrollId();
Expand All @@ -93,6 +94,7 @@ public String sample() throws Exception {
final SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);

// TODO: Don't use .get()
searchResponse = client.searchScroll(scrollRequest).get();

scrollId = searchResponse.getScrollId();
Expand Down

0 comments on commit e484582

Please sign in to comment.