Skip to content

Commit

Permalink
Rest high level clients
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed May 17, 2022
1 parent 1dd62b4 commit af9b143
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
Expand Down Expand Up @@ -477,6 +473,17 @@ static Request createPit(CreatePitRequest createPitRequest) throws IOException {
return request;
}

static Request deletePit(DeletePitRequest deletePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteAllPits(DeletePitRequest deletePitRequest) {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
return request;
}

static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/scroll");
request.setEntity(createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,7 @@
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
Expand Down Expand Up @@ -1298,6 +1290,86 @@ public final Cancellable createPitAsync(
);
}

/**
* Delete PIT context using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete PIT context using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deletePitAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete all PIT contexts using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deleteAllPits(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete all PIT contexts using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deleteAllPitsAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import org.junit.Before;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -51,8 +55,28 @@ public void testCreatePit() throws IOException {
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.isSucceeded());
}
/**
* Todo: add deletion logic and test cluster settings
*/

public void testDeleteAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
List<String> pitIds = new ArrayList<>();
pitIds.add("_all");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(
deletePitRequest,
highLevelClient()::deleteAllPits,
highLevelClient()::deleteAllPitsAsync
);
assertTrue(deletePitResponse.isSucceeded());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.*;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
Expand Down Expand Up @@ -1326,6 +1321,28 @@ public void testCreatePit() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeletePit() throws IOException {
List<String> pitIds = new ArrayList<>();
pitIds.add("pitid1");
pitIds.add("pitid2");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
Request request = RequestConverters.deletePit(deletePitRequest);
String endpoint = "/_search/point_in_time";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
assertToXContentBody(deletePitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeleteAllPits() throws IOException {
DeletePitRequest deletePitRequest = new DeletePitRequest();
Request request = RequestConverters.deletePit(deletePitRequest);
String endpoint = "/_search/point_in_time/_all";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@
import org.opensearch.action.fieldcaps.FieldCapabilities;
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.*;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -103,11 +95,7 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -774,16 +762,13 @@ public void testSearchWithPit() throws Exception {
client().performRequest(doc);
}
client().performRequest(new Request(HttpPost.METHOD_NAME, "/test/_refresh"));

CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "test");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35)
.sort("field", SortOrder.ASC)
.pointInTimeBuilder(new PointInTimeBuilder(pitResponse.getId()));
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder);
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);

try {
long counter = 0;
assertSearchHeader(searchResponse);
Expand All @@ -793,7 +778,15 @@ public void testSearchWithPit() throws Exception {
assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++));
}
} finally {
// TODO : Delete PIT
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(
deletePitRequest,
highLevelClient()::deletePit,
highLevelClient()::deletePitAsync
);
assertTrue(deletePitResponse.isSucceeded());
}
}

Expand Down
28 changes: 28 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/delete_pit.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"delete_pit":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Deletes point in time context."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time",
"methods":[
"DELETE"
]
},
{
"path":"/_search/point_in_time/_all",
"methods":[
"DELETE"
]
}
]
},
"body":{
"description":"A comma-separated list of pit IDs to clear"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -99,17 +98,17 @@ void deleteAllPits(ActionListener<DeletePitResponse> listener) {
new ActionListener<>() {
@Override
public void onResponse(final Collection<SearchTransportService.SearchFreeContextResponse> responses) {
//final SetOnce<Boolean> succeeded = new SetOnce<>();
boolean hasFailures = responses.stream().anyMatch(r-> !r.isFreed());
// final SetOnce<Boolean> succeeded = new SetOnce<>();
boolean hasFailures = responses.stream().anyMatch(r -> !r.isFreed());
listener.onResponse(new DeletePitResponse(!hasFailures));
// for (SearchTransportService.SearchFreeContextResponse response : responses) {
// if (!response.isFreed()) {
// succeeded.set(false);
// break;
// }
// }
// succeeded.trySet(true);
// listener.onResponse(new DeletePitResponse(succeeded.get()));
// for (SearchTransportService.SearchFreeContextResponse response : responses) {
// if (!response.isFreed()) {
// succeeded.set(false);
// break;
// }
// }
// succeeded.trySet(true);
// listener.onResponse(new DeletePitResponse(succeeded.get()));
}

@Override
Expand Down

0 comments on commit af9b143

Please sign in to comment.