From fa1f2cd22a8c7557b51ff879184288580b2686fb Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 4 May 2022 09:45:24 +0530 Subject: [PATCH 1/4] Adding java docs and addressing comments Signed-off-by: Bharathwaj G --- .../action/search/DeletePITAction.java | 3 + .../action/search/DeletePITResponse.java | 3 + .../action/search/RestDeletePITAction.java | 30 +++++---- .../search/TransportDeletePITActionTests.java | 3 + .../search/DeletePitMultiNodeTests.java | 3 + .../search/pit/RestDeletePitActionTests.java | 66 ++++++++++++++++--- 6 files changed, 87 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/DeletePITAction.java b/server/src/main/java/org/opensearch/action/search/DeletePITAction.java index 7f043a365c403..6048996037bbe 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePITAction.java @@ -10,6 +10,9 @@ import org.opensearch.action.ActionType; +/** + * Action type for deleting PIT reader contexts + */ public class DeletePITAction extends ActionType { public static final DeletePITAction INSTANCE = new DeletePITAction(); diff --git a/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java b/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java index 220f5377bc1ce..5c3f66b0ad293 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePITResponse.java @@ -26,6 +26,9 @@ import static org.opensearch.rest.RestStatus.NOT_FOUND; import static org.opensearch.rest.RestStatus.OK; +/** + * Response class for delete pit flow which returns if the contexts are freed + */ public class DeletePITResponse extends ActionResponse implements StatusToXContentObject { /** diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java index 26739d3749f92..5fdf8a9d7a6da 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePITAction.java @@ -11,7 +11,6 @@ import org.opensearch.action.search.DeletePITRequest; import org.opensearch.action.search.DeletePITResponse; import org.opensearch.client.node.NodeClient; -import org.opensearch.common.Strings; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.RestStatusToXContentListener; @@ -23,6 +22,9 @@ import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.DELETE; +/** + * Rest action for deleting PIT contexts + */ public class RestDeletePITAction extends BaseRestHandler { @Override @@ -32,24 +34,26 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - String pitIds = request.param("pit_id"); + String allPitIdsQualifier = "_all"; DeletePITRequest deletePITRequest = new DeletePITRequest(); - deletePITRequest.setPitIds(asList(Strings.splitStringByCommaToArray(pitIds))); - request.withContentOrSourceParamParserOrNull((xContentParser -> { - if (xContentParser != null) { - // NOTE: if rest request with xcontent body has request parameters, values parsed from request body have the precedence - try { - deletePITRequest.fromXContent(xContentParser); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to parse request body", e); + if (request.path().contains(allPitIdsQualifier)) { + deletePITRequest.setPitIds(asList(allPitIdsQualifier)); + } else { + request.withContentOrSourceParamParserOrNull((xContentParser -> { + if (xContentParser != null) { + try { + deletePITRequest.fromXContent(xContentParser); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse request body", e); + } } - } - })); + })); + } return channel -> client.deletePit(deletePITRequest, new RestStatusToXContentListener(channel)); } @Override public List routes() { - return unmodifiableList(asList(new Route(DELETE, "/_search/_point_in_time"), new Route(DELETE, "/_search/_point_in_time/{id}"))); + return unmodifiableList(asList(new Route(DELETE, "/_search/_point_in_time"), new Route(DELETE, "/_search/_point_in_time/_all"))); } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java index 347c5a11630de..abe2f55917969 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java @@ -51,6 +51,9 @@ import static org.mockito.Mockito.when; import static org.opensearch.action.support.PlainActionFuture.newFuture; +/** + * Functional tests for transport delete pit action + */ public class TransportDeletePITActionTests extends OpenSearchTestCase { DiscoveryNode node1 = null; diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 9bfb3a76220bc..2836594fa4d37 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -31,6 +31,9 @@ import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +/** + * Multi node integration tests for delete PIT use cases + */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) public class DeletePitMultiNodeTests extends OpenSearchIntegTestCase { diff --git a/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java index 1798dcdf100df..3c59fe259074a 100644 --- a/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java @@ -27,6 +27,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +/** + * Tests to verify the behavior of rest delete pit action for list delete and delete all PIT endpoints + */ public class RestDeletePitActionTests extends OpenSearchTestCase { public void testParseDeletePitRequestWithInvalidJsonThrowsException() throws Exception { RestDeletePITAction action = new RestDeletePITAction(); @@ -38,7 +41,7 @@ public void testParseDeletePitRequestWithInvalidJsonThrowsException() throws Exc assertThat(e.getMessage(), equalTo("Failed to parse request body")); } - public void testBodyParamsOverrideQueryStringParams() throws Exception { + public void testDeletePitWithBody() throws Exception { SetOnce pitCalled = new SetOnce<>(); try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { @Override @@ -49,9 +52,29 @@ public void deletePit(DeletePITRequest request, ActionListener pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePit(DeletePITRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(1)); + assertThat(request.getPitIds().get(0), equalTo("_all")); + } + }) { + RestDeletePITAction action = new RestDeletePITAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_all").build(); FakeRestChannel channel = new FakeRestChannel(request, false, 0); action.handleRequest(request, channel, nodeClient); @@ -59,7 +82,32 @@ public void deletePit(DeletePITRequest request, ActionListener pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePit(DeletePITRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(1)); + assertThat(request.getPitIds().get(0), equalTo("_all")); + } + }) { + RestDeletePITAction action = new RestDeletePITAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray("{\"pit_id\": [\"BODY\"]}"), + XContentType.JSON + ).withPath("/_all").build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> action.handleRequest(request, channel, nodeClient) + ); + assertTrue(ex.getMessage().contains("request [GET /_all] does not support having a body")); + } + } + + public void testDeletePitQueryStringParamsShouldThrowException() { SetOnce pitCalled = new SetOnce<>(); try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { @Override @@ -75,9 +123,11 @@ public void deletePit(DeletePITRequest request, ActionListener action.handleRequest(request, channel, nodeClient) + ); + assertTrue(ex.getMessage().contains("unrecognized param")); } } } From da3d6eebc96b2c2dfd0d82fa0116f0691bbe0866 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 5 May 2022 13:24:22 +0530 Subject: [PATCH 2/4] List all PITs API Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 7 ++ .../action/search/GetAllPITNodeRequest.java | 39 +++++++++ .../action/search/GetAllPITNodeResponse.java | 67 ++++++++++++++ .../action/search/GetAllPITNodesRequest.java | 36 ++++++++ .../action/search/GetAllPITNodesResponse.java | 78 +++++++++++++++++ .../action/search/GetAllPITsAction.java | 23 +++++ .../org/opensearch/action/search/PitInfo.java | 58 +++++++++++++ .../search/TransportGetAllPITsAction.java | 87 +++++++++++++++++++ .../action/search/RestGetAllPITsAction.java | 81 +++++++++++++++++ .../org/opensearch/search/SearchService.java | 17 ++++ .../opensearch/search/PitMultiNodeTests.java | 40 +++++++++ 11 files changed, 533 insertions(+) create mode 100644 server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java create mode 100644 server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java create mode 100644 server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java create mode 100644 server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java create mode 100644 server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java create mode 100644 server/src/main/java/org/opensearch/action/search/PitInfo.java create mode 100644 server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 350d91a560182..ef2b2789a0c3e 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -238,12 +238,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePITAction; import org.opensearch.action.search.DeletePITAction; +import org.opensearch.action.search.GetAllPITsAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePITAction; import org.opensearch.action.search.TransportDeletePITAction; +import org.opensearch.action.search.TransportGetAllPITsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -407,6 +409,7 @@ import org.opensearch.rest.action.search.RestCreatePITAction; import org.opensearch.rest.action.search.RestDeletePITAction; import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestGetAllPITsAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; @@ -666,8 +669,11 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + + // Point in time actions actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); actions.register(DeletePITAction.INSTANCE, TransportDeletePITAction.class); + actions.register(GetAllPITsAction.INSTANCE, TransportGetAllPITsAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -844,6 +850,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // Point in time API registerHandler.accept(new RestCreatePITAction()); registerHandler.accept(new RestDeletePITAction()); + registerHandler.accept(new RestGetAllPITsAction()); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java new file mode 100644 index 0000000000000..eb6b9bf797180 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeRequest; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to get all active PITs in a node + */ +public class GetAllPITNodeRequest extends BaseNodeRequest { + GetAllPITNodesRequest request; + + @Inject + public GetAllPITNodeRequest(GetAllPITNodesRequest request) { + this.request = request; + } + + public GetAllPITNodeRequest(StreamInput in) throws IOException { + super(in); + request = new GetAllPITNodesRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java new file mode 100644 index 0000000000000..82599c78e14c8 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Response which holds information about all PIT contexts in a node + */ +public class GetAllPITNodeResponse extends BaseNodeResponse implements ToXContentFragment { + private List pitsInfo; + + @Inject + public GetAllPITNodeResponse(StreamInput in, List pitsInfo) throws IOException { + super(in); + this.pitsInfo = pitsInfo; + } + + public GetAllPITNodeResponse(DiscoveryNode node, List pitsInfo) { + super(node); + this.pitsInfo = pitsInfo; + } + + public GetAllPITNodeResponse(StreamInput in) throws IOException { + super(in); + this.pitsInfo = Collections.unmodifiableList(in.readList(PitInfo::new)); + } + + public List getPitsInfo() { + return pitsInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(pitsInfo); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("pitsInfo"); + for (PitInfo pit : pitsInfo) { + pit.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java new file mode 100644 index 0000000000000..873dcc32b6557 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to get all active PIT IDs in set of nodes + */ +public class GetAllPITNodesRequest extends BaseNodesRequest { + @Inject + public GetAllPITNodesRequest(DiscoveryNode... concreteNodes) { + super(concreteNodes); + } + + public GetAllPITNodesRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java new file mode 100644 index 0000000000000..9fabf9d2b6fdf --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Response structure to hold all active PIT contexts information from all nodes + */ +public class GetAllPITNodesResponse extends BaseNodesResponse implements ToXContentObject { + + List pitsInfo = new ArrayList<>(); + + @Inject + public GetAllPITNodesResponse(StreamInput in) throws IOException { + super(in); + } + + public GetAllPITNodesResponse( + ClusterName clusterName, + List getAllPITNodeResponses, + List failures + ) { + super(clusterName, getAllPITNodeResponses, failures); + Set uniquePitIds = new HashSet<>(); + pitsInfo.addAll( + getAllPITNodeResponses.stream() + .flatMap(p -> p.getPitsInfo().stream().filter(t -> uniquePitIds.add(t.getPitId()))) + .collect(Collectors.toList()) + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("pitsInfo"); + for (PitInfo pit : pitsInfo) { + pit.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public List readNodesFrom(StreamInput in) throws IOException { + return in.readList(GetAllPITNodeResponse::new); + } + + @Override + public void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + public List getPITIDs() { + return new ArrayList<>(pitsInfo); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java new file mode 100644 index 0000000000000..75f0db71af445 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type for listing all PIT reader contexts + */ +public class GetAllPITsAction extends ActionType { + public static final GetAllPITsAction INSTANCE = new GetAllPITsAction(); + public static final String NAME = "indices:data/readall/pit"; + + private GetAllPITsAction() { + super(NAME, GetAllPITNodesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PitInfo.java b/server/src/main/java/org/opensearch/action/search/PitInfo.java new file mode 100644 index 0000000000000..45c2cec44233f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PitInfo.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * This holds information about pit reader context such as pit id and creation time + */ +public class PitInfo implements ToXContentFragment, Writeable { + private final String pitId; + private final long creationTime; + + public PitInfo(String pitId, long creationTime) { + this.pitId = pitId; + this.creationTime = creationTime; + } + + public PitInfo(StreamInput in) throws IOException { + this.pitId = in.readString(); + this.creationTime = in.readLong(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("pitId", pitId); + builder.field("creationTime", creationTime); + builder.endObject(); + return builder; + } + + public String getPitId() { + return pitId; + } + + public long getCreationTime() { + return creationTime; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + out.writeLong(creationTime); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java new file mode 100644 index 0000000000000..a011ae6fe63fb --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action to get all PIT contexts + */ +public class TransportGetAllPITsAction extends TransportNodesAction< + GetAllPITNodesRequest, + GetAllPITNodesResponse, + GetAllPITNodeRequest, + GetAllPITNodeResponse> { + + private final SearchService searchService; + + @Inject + public TransportGetAllPITsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + GetAllPITsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPITNodesRequest::new, + GetAllPITNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPITNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPITNodesResponse newResponse( + GetAllPITNodesRequest request, + List getAllPITNodeResponses, + List failures + ) { + return new GetAllPITNodesResponse(clusterService.getClusterName(), getAllPITNodeResponses, failures); + } + + @Override + protected GetAllPITNodeRequest newNodeRequest(GetAllPITNodesRequest request) { + return new GetAllPITNodeRequest(request); + } + + @Override + protected GetAllPITNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPITNodeResponse(in); + } + + /** + * This node specific operation retrieves all node specific information + */ + @Override + protected GetAllPITNodeResponse nodeOperation(GetAllPITNodeRequest request) { + GetAllPITNodeResponse nodeResponse = new GetAllPITNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java new file mode 100644 index 0000000000000..a55a659ded71e --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.search.GetAllPITNodesRequest; +import org.opensearch.action.search.GetAllPITNodesResponse; +import org.opensearch.action.search.GetAllPITsAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for retrieving all active PIT IDs across all nodes + */ +public class RestGetAllPITsAction extends BaseRestHandler { + @Override + public String getName() { + return "get_all_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + @Override + public void processResponse(final ClusterStateResponse clusterStateResponse) throws IOException { + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); + client.execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(final GetAllPITNodesResponse getAllPITNodesResponse) throws Exception { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("pitIds", getAllPITNodesResponse.getPITIDs()); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + } + }); + } + }); + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList(new Route(GET, "/_search/point_in_time/all"))); + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index c196f68e28baa..eaa741c5bb54e 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -42,6 +42,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; +import org.opensearch.action.search.PitInfo; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; @@ -139,6 +140,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1438,6 +1440,21 @@ public PitReaderContext getPitReaderContext(ShardSearchContextId id) { return null; } + /** + * This method returns all active PIT reader contexts + */ + public List getAllPITReaderContexts() { + final List pitContextsInfo = new ArrayList<>(); + for (ReaderContext ctx : activeReaders.values()) { + if (ctx instanceof PitReaderContext) { + final PitReaderContext context = (PitReaderContext) ctx; + PitInfo pitInfo = new PitInfo(context.getPitId(), context.getCreationTime()); + pitContextsInfo.add(pitInfo); + } + } + return pitContextsInfo; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 3efdcadb8e09f..fe5f8343d60e2 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -8,20 +8,31 @@ package org.opensearch.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.search.CreatePITAction; import org.opensearch.action.search.CreatePITRequest; import org.opensearch.action.search.CreatePITResponse; +import org.opensearch.action.search.GetAllPITNodesRequest; +import org.opensearch.action.search.GetAllPITNodesResponse; +import org.opensearch.action.search.GetAllPITsAction; import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; @@ -55,6 +66,35 @@ public void testPit() throws Exception { assertEquals(2, searchResponse.getTotalShards()); } + public void testGetAllPits() throws Exception { + client().admin().indices().prepareCreate("index1").get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index", "index1" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + CreatePITResponse pitResponse = execute.get(); + CreatePITResponse pitResponse1 = client().execute(CreatePITAction.INSTANCE, request).get(); + CreatePITResponse pitResponse2 = client().execute(CreatePITAction.INSTANCE, request).get(); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); + ActionFuture execute1 = client().execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest); + GetAllPITNodesResponse getPitResponse = execute1.get(); + assertEquals(3, getPitResponse.getPITIDs().size()); + List resultPitIds = getPitResponse.getPITIDs().stream().map(p -> p.getPitId()).collect(Collectors.toList()); + Assert.assertTrue(resultPitIds.contains(pitResponse.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse1.getId())); + Assert.assertTrue(resultPitIds.contains(pitResponse2.getId())); + } + public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), false); request.setIndices(new String[] { "index" }); From faebba7a71463ecbb765398806e5face0497d256 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 11 May 2022 12:10:59 +0530 Subject: [PATCH 3/4] Making naming uniform and adding tests Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 10 ++-- ...Request.java => GetAllPitNodeRequest.java} | 10 ++-- ...sponse.java => GetAllPitNodeResponse.java} | 8 +-- ...equest.java => GetAllPitNodesRequest.java} | 6 +- ...ponse.java => GetAllPitNodesResponse.java} | 18 +++--- ...lPITsAction.java => GetAllPitsAction.java} | 8 +-- ...on.java => TransportGetAllPitsAction.java} | 40 ++++++------- ...sAction.java => RestGetAllPitsAction.java} | 14 ++--- .../search/TransportDeletePitActionTests.java | 4 +- .../search/DeletePitMultiNodeTests.java | 26 ++++---- .../opensearch/search/PitMultiNodeTests.java | 60 +++++++++++++++---- .../opensearch/search/PitSingleNodeTests.java | 4 ++ 12 files changed, 124 insertions(+), 84 deletions(-) rename server/src/main/java/org/opensearch/action/search/{GetAllPITNodeRequest.java => GetAllPitNodeRequest.java} (73%) rename server/src/main/java/org/opensearch/action/search/{GetAllPITNodeResponse.java => GetAllPitNodeResponse.java} (87%) rename server/src/main/java/org/opensearch/action/search/{GetAllPITNodesRequest.java => GetAllPitNodesRequest.java} (78%) rename server/src/main/java/org/opensearch/action/search/{GetAllPITNodesResponse.java => GetAllPitNodesResponse.java} (77%) rename server/src/main/java/org/opensearch/action/search/{GetAllPITsAction.java => GetAllPitsAction.java} (65%) rename server/src/main/java/org/opensearch/action/search/{TransportGetAllPITsAction.java => TransportGetAllPitsAction.java} (63%) rename server/src/main/java/org/opensearch/rest/action/search/{RestGetAllPITsAction.java => RestGetAllPitsAction.java} (85%) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 90cbf6e960e60..57623a9b4faff 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -238,14 +238,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; -import org.opensearch.action.search.GetAllPITsAction; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; -import org.opensearch.action.search.TransportGetAllPITsAction; +import org.opensearch.action.search.TransportGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -409,7 +409,7 @@ import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestDeletePitAction; import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestGetAllPITsAction; +import org.opensearch.rest.action.search.RestGetAllPitsAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; @@ -673,7 +673,7 @@ public void reg // Point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); - actions.register(GetAllPITsAction.INSTANCE, TransportGetAllPITsAction.class); + actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -850,7 +850,7 @@ public void initRestHandlers(Supplier nodesInCluster) { // Point in time API registerHandler.accept(new RestCreatePitAction()); registerHandler.accept(new RestDeletePitAction()); - registerHandler.accept(new RestGetAllPITsAction()); + registerHandler.accept(new RestGetAllPitsAction()); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java similarity index 73% rename from server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java rename to server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java index eb6b9bf797180..940380d68ba11 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java @@ -18,17 +18,17 @@ /** * Request to get all active PITs in a node */ -public class GetAllPITNodeRequest extends BaseNodeRequest { - GetAllPITNodesRequest request; +public class GetAllPitNodeRequest extends BaseNodeRequest { + GetAllPitNodesRequest request; @Inject - public GetAllPITNodeRequest(GetAllPITNodesRequest request) { + public GetAllPitNodeRequest(GetAllPitNodesRequest request) { this.request = request; } - public GetAllPITNodeRequest(StreamInput in) throws IOException { + public GetAllPitNodeRequest(StreamInput in) throws IOException { super(in); - request = new GetAllPITNodesRequest(in); + request = new GetAllPitNodesRequest(in); } @Override diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java similarity index 87% rename from server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java rename to server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java index 82599c78e14c8..e1716e3135713 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java @@ -23,21 +23,21 @@ /** * Response which holds information about all PIT contexts in a node */ -public class GetAllPITNodeResponse extends BaseNodeResponse implements ToXContentFragment { +public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment { private List pitsInfo; @Inject - public GetAllPITNodeResponse(StreamInput in, List pitsInfo) throws IOException { + public GetAllPitNodeResponse(StreamInput in, List pitsInfo) throws IOException { super(in); this.pitsInfo = pitsInfo; } - public GetAllPITNodeResponse(DiscoveryNode node, List pitsInfo) { + public GetAllPitNodeResponse(DiscoveryNode node, List pitsInfo) { super(node); this.pitsInfo = pitsInfo; } - public GetAllPITNodeResponse(StreamInput in) throws IOException { + public GetAllPitNodeResponse(StreamInput in) throws IOException { super(in); this.pitsInfo = Collections.unmodifiableList(in.readList(PitInfo::new)); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java similarity index 78% rename from server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java rename to server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index 873dcc32b6557..0073d4308c174 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -19,13 +19,13 @@ /** * Request to get all active PIT IDs in set of nodes */ -public class GetAllPITNodesRequest extends BaseNodesRequest { +public class GetAllPitNodesRequest extends BaseNodesRequest { @Inject - public GetAllPITNodesRequest(DiscoveryNode... concreteNodes) { + public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) { super(concreteNodes); } - public GetAllPITNodesRequest(StreamInput in) throws IOException { + public GetAllPitNodesRequest(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java similarity index 77% rename from server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java rename to server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java index 9fabf9d2b6fdf..d732a3eac4b95 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -27,24 +27,24 @@ /** * Response structure to hold all active PIT contexts information from all nodes */ -public class GetAllPITNodesResponse extends BaseNodesResponse implements ToXContentObject { +public class GetAllPitNodesResponse extends BaseNodesResponse implements ToXContentObject { List pitsInfo = new ArrayList<>(); @Inject - public GetAllPITNodesResponse(StreamInput in) throws IOException { + public GetAllPitNodesResponse(StreamInput in) throws IOException { super(in); } - public GetAllPITNodesResponse( + public GetAllPitNodesResponse( ClusterName clusterName, - List getAllPITNodeResponses, + List getAllPitNodeRespons, List failures ) { - super(clusterName, getAllPITNodeResponses, failures); + super(clusterName, getAllPitNodeRespons, failures); Set uniquePitIds = new HashSet<>(); pitsInfo.addAll( - getAllPITNodeResponses.stream() + getAllPitNodeRespons.stream() .flatMap(p -> p.getPitsInfo().stream().filter(t -> uniquePitIds.add(t.getPitId()))) .collect(Collectors.toList()) ); @@ -63,12 +63,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public List readNodesFrom(StreamInput in) throws IOException { - return in.readList(GetAllPITNodeResponse::new); + public List readNodesFrom(StreamInput in) throws IOException { + return in.readList(GetAllPitNodeResponse::new); } @Override - public void writeNodesTo(StreamOutput out, List nodes) throws IOException { + public void writeNodesTo(StreamOutput out, List nodes) throws IOException { out.writeList(nodes); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java similarity index 65% rename from server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java rename to server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java index 75f0db71af445..4589172ef492d 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -13,11 +13,11 @@ /** * Action type for listing all PIT reader contexts */ -public class GetAllPITsAction extends ActionType { - public static final GetAllPITsAction INSTANCE = new GetAllPITsAction(); +public class GetAllPitsAction extends ActionType { + public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); public static final String NAME = "indices:data/readall/pit"; - private GetAllPITsAction() { - super(NAME, GetAllPITNodesResponse::new); + private GetAllPitsAction() { + super(NAME, GetAllPitNodesResponse::new); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java similarity index 63% rename from server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java rename to server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java index a011ae6fe63fb..7ac156fcbce08 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -24,16 +24,16 @@ /** * Transport action to get all PIT contexts */ -public class TransportGetAllPITsAction extends TransportNodesAction< - GetAllPITNodesRequest, - GetAllPITNodesResponse, - GetAllPITNodeRequest, - GetAllPITNodeResponse> { +public class TransportGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { private final SearchService searchService; @Inject - public TransportGetAllPITsAction( + public TransportGetAllPitsAction( ThreadPool threadPool, ClusterService clusterService, TransportService transportService, @@ -41,44 +41,44 @@ public TransportGetAllPITsAction( SearchService searchService ) { super( - GetAllPITsAction.NAME, + GetAllPitsAction.NAME, threadPool, clusterService, transportService, actionFilters, - GetAllPITNodesRequest::new, - GetAllPITNodeRequest::new, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, ThreadPool.Names.SAME, - GetAllPITNodeResponse.class + GetAllPitNodeResponse.class ); this.searchService = searchService; } @Override - protected GetAllPITNodesResponse newResponse( - GetAllPITNodesRequest request, - List getAllPITNodeResponses, + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeRespons, List failures ) { - return new GetAllPITNodesResponse(clusterService.getClusterName(), getAllPITNodeResponses, failures); + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); } @Override - protected GetAllPITNodeRequest newNodeRequest(GetAllPITNodesRequest request) { - return new GetAllPITNodeRequest(request); + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(request); } @Override - protected GetAllPITNodeResponse newNodeResponse(StreamInput in) throws IOException { - return new GetAllPITNodeResponse(in); + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); } /** * This node specific operation retrieves all node specific information */ @Override - protected GetAllPITNodeResponse nodeOperation(GetAllPITNodeRequest request) { - GetAllPITNodeResponse nodeResponse = new GetAllPITNodeResponse( + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( transportService.getLocalNode(), searchService.getAllPITReaderContexts() ); diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java similarity index 85% rename from server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java rename to server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java index a55a659ded71e..82a96dbeaa03a 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPITsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java @@ -11,9 +11,9 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; -import org.opensearch.action.search.GetAllPITNodesRequest; -import org.opensearch.action.search.GetAllPITNodesResponse; -import org.opensearch.action.search.GetAllPITsAction; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.xcontent.XContentBuilder; @@ -36,7 +36,7 @@ /** * Rest action for retrieving all active PIT IDs across all nodes */ -public class RestGetAllPITsAction extends BaseRestHandler { +public class RestGetAllPitsAction extends BaseRestHandler { @Override public String getName() { return "get_all_pit_action"; @@ -58,10 +58,10 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) thr } DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; nodes.toArray(disNodesArr); - GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); - client.execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { @Override - public RestResponse buildResponse(final GetAllPITNodesResponse getAllPITNodesResponse) throws Exception { + public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse) throws Exception { try (XContentBuilder builder = channel.newBuilder()) { builder.startObject(); builder.field("pitIds", getAllPITNodesResponse.getPITIDs()); diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index 32c1094a9cce5..767a86a9e2827 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -102,9 +102,7 @@ public void setupData() { clusterServiceMock = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); - final Settings keepAliveSettings = Settings.builder() - .put(CreatePitController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) - .build(); + final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE.getKey(), 30000).build(); when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index b0fc991fe4ecb..f6bd765ba4dc7 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -11,9 +11,9 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; -import org.opensearch.action.search.CreatePITAction; -import org.opensearch.action.search.CreatePITRequest; -import org.opensearch.action.search.CreatePITResponse; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; @@ -49,21 +49,21 @@ public void clearIndex() { client().admin().indices().prepareDelete("index").get(); } - private CreatePITResponse createPitOnIndex(String index) throws ExecutionException, InterruptedException { - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + private CreatePitResponse createPitOnIndex(String index) throws ExecutionException, InterruptedException { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { index }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); return execute.get(); } public void testDeletePit() throws Exception { - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - CreatePITResponse pitResponse = execute.get(); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); - execute = client().execute(CreatePITAction.INSTANCE, request); + execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); pitIds.add(pitResponse.getId()); DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); @@ -98,13 +98,13 @@ public void testDeleteAllPits() throws Exception { } public void testDeletePitWhileNodeDrop() throws Exception { - CreatePITResponse pitResponse = createPitOnIndex("index"); + CreatePitResponse pitResponse = createPitOnIndex("index"); createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); ensureGreen(); List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); - CreatePITResponse pitResponse1 = createPitOnIndex("index1"); + CreatePitResponse pitResponse1 = createPitOnIndex("index1"); pitIds.add(pitResponse1.getId()); DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { @@ -156,7 +156,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { } public void testDeleteWhileSearch() throws Exception { - CreatePITResponse pitResponse = createPitOnIndex("index"); + CreatePitResponse pitResponse = createPitOnIndex("index"); ensureGreen(); List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 7b4a9712d836a..022e75560e0ad 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -18,9 +18,9 @@ import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; -import org.opensearch.action.search.GetAllPITNodesRequest; -import org.opensearch.action.search.GetAllPITNodesResponse; -import org.opensearch.action.search.GetAllPITsAction; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; @@ -71,12 +71,12 @@ public void testPit() throws Exception { public void testGetAllPits() throws Exception { client().admin().indices().prepareCreate("index1").get(); - CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index", "index1" }); - ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); - CreatePITResponse pitResponse = execute.get(); - CreatePITResponse pitResponse1 = client().execute(CreatePITAction.INSTANCE, request).get(); - CreatePITResponse pitResponse2 = client().execute(CreatePITAction.INSTANCE, request).get(); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + CreatePitResponse pitResponse1 = client().execute(CreatePitAction.INSTANCE, request).get(); + CreatePitResponse pitResponse2 = client().execute(CreatePitAction.INSTANCE, request).get(); final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.local(false); clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); @@ -88,14 +88,52 @@ public void testGetAllPits() throws Exception { } DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; nodes.toArray(disNodesArr); - GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); - ActionFuture execute1 = client().execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest); - GetAllPITNodesResponse getPitResponse = execute1.get(); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); assertEquals(3, getPitResponse.getPITIDs().size()); List resultPitIds = getPitResponse.getPITIDs().stream().map(p -> p.getPitId()).collect(Collectors.toList()); + // asserting that we get all unique PIT IDs Assert.assertTrue(resultPitIds.contains(pitResponse.getId())); Assert.assertTrue(resultPitIds.contains(pitResponse1.getId())); Assert.assertTrue(resultPitIds.contains(pitResponse2.getId())); + client().admin().indices().prepareDelete("index1").get(); + } + + public void testGetAllPitsDuringNodeDrop() throws Exception { + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(getDiscoveryNodes()); + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ActionFuture execute1 = client().execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + // we still get a pit id from the data node which is up + assertEquals(1, getPitResponse.getPITIDs().size()); + // failure for node drop + assertEquals(1, getPitResponse.failures().size()); + assertTrue(getPitResponse.failures().get(0).getMessage().contains("Failed node")); + return super.onNodeStopped(nodeName); + } + }); + } + + private DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + return disNodesArr; } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index 53ab3e0de401b..f28f18c9de755 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -237,6 +237,10 @@ public void testSearchWithFirstPhaseKeepAliveExpiry() throws ExecutionException, service.doClose(); } + private void assertWithGetAllPits(int size) { + + } + public void testSearchWithPitSecondPhaseKeepAliveExpiry() throws ExecutionException, InterruptedException { createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); From a10074a716e0402abc13cfb5db204e1a51ffde00 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 29 Jul 2022 13:46:57 +0530 Subject: [PATCH 4/4] changes to security Signed-off-by: Bharathwaj G --- .../java/org/opensearch/action/search/CreatePitAction.java | 2 +- .../java/org/opensearch/action/search/DeletePitAction.java | 2 +- .../org/opensearch/action/search/GetAllPitsAction.java | 2 +- .../main/java/org/opensearch/action/search/PitInfo.java | 7 ++++++- .../src/main/java/org/opensearch/search/SearchService.java | 2 +- .../org/opensearch/search/internal/PitReaderContext.java | 3 +++ .../java/org/opensearch/search/internal/ReaderContext.java | 2 +- 7 files changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java index 1af56a044205b..7ffa30a182458 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java @@ -15,7 +15,7 @@ */ public class CreatePitAction extends ActionType { public static final CreatePitAction INSTANCE = new CreatePitAction(); - public static final String NAME = "indices:data/read/point_in_time"; + public static final String NAME = "indices:data/read/point_in_time/create"; private CreatePitAction() { super(NAME, CreatePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java index 2774fb5ef7648..4bacc8619d914 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java @@ -16,7 +16,7 @@ public class DeletePitAction extends ActionType { public static final DeletePitAction INSTANCE = new DeletePitAction(); - public static final String NAME = "indices:admin/read/pit/delete"; + public static final String NAME = "indices:data/read/point_in_time/delete"; private DeletePitAction() { super(NAME, DeletePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java index 4589172ef492d..16e65cb785a7d 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -15,7 +15,7 @@ */ public class GetAllPitsAction extends ActionType { public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); - public static final String NAME = "indices:data/readall/pit"; + public static final String NAME = "indices:data/read/point_in_time/readall"; private GetAllPitsAction() { super(NAME, GetAllPitNodesResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/PitInfo.java b/server/src/main/java/org/opensearch/action/search/PitInfo.java index 45c2cec44233f..feb54dea7a9dd 100644 --- a/server/src/main/java/org/opensearch/action/search/PitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/PitInfo.java @@ -22,15 +22,18 @@ public class PitInfo implements ToXContentFragment, Writeable { private final String pitId; private final long creationTime; + private final long keepAlive; - public PitInfo(String pitId, long creationTime) { + public PitInfo(String pitId, long creationTime,long keepAlive) { this.pitId = pitId; this.creationTime = creationTime; + this.keepAlive = keepAlive; } public PitInfo(StreamInput in) throws IOException { this.pitId = in.readString(); this.creationTime = in.readLong(); + this.keepAlive = in.readLong(); } @Override @@ -38,6 +41,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("pitId", pitId); builder.field("creationTime", creationTime); + builder.field("keepALive", keepAlive); builder.endObject(); return builder; } @@ -54,5 +58,6 @@ public long getCreationTime() { public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); out.writeLong(creationTime); + out.writeLong(keepAlive); } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 0552f0110db4b..a5a27ae7aa37a 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1447,7 +1447,7 @@ public List getAllPITReaderContexts() { for (ReaderContext ctx : activeReaders.values()) { if (ctx instanceof PitReaderContext) { final PitReaderContext context = (PitReaderContext) ctx; - PitInfo pitInfo = new PitInfo(context.getPitId(), context.getCreationTime()); + PitInfo pitInfo = new PitInfo(context.getPitId(), context.getCreationTime(), context.getKeepAlive()); pitContextsInfo.add(pitInfo); } } diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 43ca7e0ebd823..782b651510899 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -40,6 +40,9 @@ public String getPitId() { return this.pitId.get(); } + public long getKeepAlive() { return this.keepAlive.get(); } + + public void setPitId(final String pitId) { this.pitId.set(pitId); } diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 04791e05f603c..772a4e8fc04d9 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -67,7 +67,7 @@ public class ReaderContext implements Releasable { private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean singleSession; - private final AtomicLong keepAlive; + final AtomicLong keepAlive; private final AtomicLong lastAccessTime; // For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095. private final AbstractRefCounted refCounted;