Skip to content

Commit

Permalink
Implemented Point In Time support (#217)
Browse files Browse the repository at this point in the history
* Bump org.apache.logging.log4j:log4j-to-slf4j from 2.22.0 to 2.22.1 (#208)

Bumps org.apache.logging.log4j:log4j-to-slf4j from 2.22.0 to 2.22.1.

---
updated-dependencies:
- dependency-name: org.apache.logging.log4j:log4j-to-slf4j
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Update dependency ch.qos.logback:logback-classic to v1.4.14 (#212)

Co-authored-by: mend-for-github-com[bot] <50673670+mend-for-github-com[bot]@users.noreply.github.com>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Dependency updates: Spring Boot 3.2.1 (#213)

Signed-off-by: Andriy Redko <[email protected]>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Bump io.specto:hoverfly-java-junit5 from 0.15.0 to 0.16.0 (#215)

Bumps [io.specto:hoverfly-java-junit5](https://github.com/SpectoLabs/hoverfly-java) from 0.15.0 to 0.16.0.
- [Commits](SpectoLabs/hoverfly-java@0.15.0...0.16.0)

---
updated-dependencies:
- dependency-name: io.specto:hoverfly-java-junit5
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Georgios Mamakis <[email protected]>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Implemented point-in-time support

Signed-off-by: Georgios Mamakis <[email protected]>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Implemented point-in-time support

Signed-off-by: Georgios Mamakis <[email protected]>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Resolved review comments

Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Rearranged imports

Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Tuned timeout to 70 seconds

Signed-off-by: gmamakis-thinkanalytics <[email protected]>

* Reverted timeout

Signed-off-by: gmamakis-thinkanalytics <[email protected]>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: gmamakis-thinkanalytics <[email protected]>
Signed-off-by: Andriy Redko <[email protected]>
Signed-off-by: Georgios Mamakis <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: mend-for-github-com[bot] <50673670+mend-for-github-com[bot]@users.noreply.github.com>
Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
4 people authored Jan 11, 2024
1 parent 6de54fb commit fc05e07
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.opensearch.data.client.orhlc;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -32,6 +33,8 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -443,6 +446,28 @@ public void searchScrollClear(List<String> scrollIds) {
}
}

@Override
public String openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) {
CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueMillis(keepAlive.toMillis()),
true,
index.getIndexName());
return execute(client -> client.createPit(createPitRequest, RequestOptions.DEFAULT)).getId();
}

@Override
public Boolean closePointInTime(String pit) {
try {
DeletePitRequest deletePitRequest = new DeletePitRequest(pit);
return execute(client -> client.deletePit(deletePitRequest, RequestOptions.DEFAULT))
.getDeletePitResults()
.get(0)
.isSuccessful();
} catch (Exception e) {
LOGGER.warn(String.format("Could not clear pit: %s", e.getMessage()));
}
return false;
}

public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) {
SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index);
return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.index.reindex.RemoteInfo;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.script.Script;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
Expand Down Expand Up @@ -775,13 +776,21 @@ public SearchRequest searchRequest(

private SearchRequest prepareSearchRequest(
Query query, @Nullable String routing, @Nullable Class<?> clazz, IndexCoordinates indexCoordinates) {

String[] indexNames = indexCoordinates.getIndexNames();
Assert.notNull(indexNames, "No index defined for Query");
Assert.notEmpty(indexNames, "No index defined for Query");

SearchRequest request = new SearchRequest(indexNames);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
SearchRequest request;
// Point in time requests do not allow index definition as PIT id references specific indices
if(query.getPointInTime()== null) {
String[] indexNames = indexCoordinates.getIndexNames();
Assert.notNull(indexNames, "No index defined for Query");
Assert.notEmpty(indexNames, "No index defined for Query");
request = new SearchRequest(indexNames);
} else {
request = new SearchRequest();
PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(query.getPointInTime().id());
pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMillis(query.getPointInTime().keepAlive().toMillis()));
sourceBuilder.pointInTimeBuilder(pointInTimeBuilder);
}

sourceBuilder.version(true);
sourceBuilder.trackScores(query.getTrackScores());
if (hasSeqNoPrimaryTermProperty(clazz)) {
Expand Down Expand Up @@ -878,6 +887,7 @@ private SearchRequest prepareSearchRequest(
return request;
}


private void prepareNativeSearch(NativeSearchQuery query, SearchSourceBuilder sourceBuilder) {

if (!query.getScriptFields().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.lang.Integer;
import java.lang.Long;
import java.lang.Object;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -44,10 +45,12 @@

import org.assertj.core.api.SoftAssertions;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.opensearch.data.client.EnabledIfOpenSearchVersion;
import org.opensearch.data.client.orhlc.NativeSearchQueryBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
Expand All @@ -59,6 +62,7 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
Expand Down Expand Up @@ -2605,6 +2609,159 @@ public void shouldRespectSourceFilterWithScanAndScrollForGivenSearchQuery() {
.containsOnly((String) null);
}

@Test
@EnabledIfOpenSearchVersion(
onOrAfter = "2.3.0",
reason = "https://github.com/opensearch-project/OpenSearch/issues/1147")
public void testPointInTimeCreateAndDestroy(){
// given
// first document
String documentId = nextIdAsString();
SampleEntity sampleEntity1 = SampleEntity.builder().id(documentId).message("abc").rate(10)
.version(System.currentTimeMillis()).build();

// second document
String documentId2 = nextIdAsString();
SampleEntity sampleEntity2 = SampleEntity.builder().id(documentId2).message("xyz").rate(5)
.version(System.currentTimeMillis()).build();

// third document
String documentId3 = nextIdAsString();
SampleEntity sampleEntity3 = SampleEntity.builder().id(documentId3).message("xyzg").rate(10)
.version(System.currentTimeMillis()).build();

List<IndexQuery> indexQueries = getIndexQueries(Arrays.asList(sampleEntity1, sampleEntity2, sampleEntity3));

operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
String pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()),
Duration.ofMinutes(1));
Assertions.assertNotNull(pit);
Query.PointInTime qpit = new Query.PointInTime(pit,Duration.ofMinutes(1));
Query query = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPageable(Pageable.ofSize(2))
.withPointInTime(qpit).build();
SearchHits<SampleEntity> results = operations.search(query,SampleEntity.class);
assertThat(results.getSearchHits().size()).isEqualTo(2);
query = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPointInTime(qpit)
.withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage())))
.build();
SearchHits<SampleEntity> searchAfterResults = operations.search(query,SampleEntity.class);
assertThat(searchAfterResults.getSearchHits().size()).isEqualTo(1);
assertThat(searchAfterResults.getSearchHits().contains(results.getSearchHit(0))).isFalse();
assertThat(searchAfterResults.getSearchHits().contains(results.getSearchHit(1))).isFalse();
Boolean pitResult = operations.closePointInTime(pit);
Assertions.assertTrue(pitResult);
}

@Test
@EnabledIfOpenSearchVersion(
onOrAfter = "2.3.0",
reason = "https://github.com/opensearch-project/OpenSearch/issues/1147")
public void testPointInTimeNewDataUnavailable(){
// given
// first document
String documentId = nextIdAsString();
SampleEntity sampleEntity1 = SampleEntity.builder().id(documentId).message("abc").rate(10)
.version(System.currentTimeMillis()).build();

// second document
String documentId2 = nextIdAsString();
SampleEntity sampleEntity2 = SampleEntity.builder().id(documentId2).message("xyz").rate(5)
.version(System.currentTimeMillis()).build();

// third document
String documentId3 = nextIdAsString();
SampleEntity sampleEntity3 = SampleEntity.builder().id(documentId3).message("xyzg").rate(10)
.version(System.currentTimeMillis()).build();

List<IndexQuery> indexQueries = getIndexQueries(Arrays.asList(sampleEntity1, sampleEntity2, sampleEntity3));

operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
String pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()),
Duration.ofMinutes(1));
Assertions.assertNotNull(pit);
Query.PointInTime qpit = new Query.PointInTime(pit,Duration.ofMinutes(1));
Query query = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPageable(Pageable.ofSize(2))
.withPointInTime(qpit).build();
SearchHits<SampleEntity> results = operations.search(query,SampleEntity.class);
assertThat(results.getSearchHits().size()).isEqualTo(2);

// fourth document
String documentId4 = nextIdAsString();
SampleEntity sampleEntity4 = SampleEntity.builder().id(documentId4).message("abcd").rate(10)
.version(System.currentTimeMillis()).build();
indexQueries = getIndexQueries(Arrays.asList(sampleEntity4));

operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
query = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPointInTime(qpit)
.withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage())))
.build();
SearchHits<SampleEntity> searchAfterResults = operations.search(query,SampleEntity.class);
assertThat(searchAfterResults.getSearchHits().size()).isEqualTo(1);
assertThat(searchAfterResults.getSearchHits().contains(results.getSearchHit(0))).isFalse();
assertThat(searchAfterResults.getSearchHits().contains(results.getSearchHit(1))).isFalse();
Boolean pitResult = operations.closePointInTime(pit);
Assertions.assertTrue(pitResult);
}

@Test
@EnabledIfOpenSearchVersion(
onOrAfter = "2.3.0",
reason = "https://github.com/opensearch-project/OpenSearch/issues/1147")
public void testPointInTimeKeepAliveExpired() throws InterruptedException {
// given
// first document
String documentId = nextIdAsString();
SampleEntity sampleEntity1 = SampleEntity.builder().id(documentId).message("abc").rate(10)
.version(System.currentTimeMillis()).build();

// second document
String documentId2 = nextIdAsString();
SampleEntity sampleEntity2 = SampleEntity.builder().id(documentId2).message("xyz").rate(5)
.version(System.currentTimeMillis()).build();

// third document
String documentId3 = nextIdAsString();
SampleEntity sampleEntity3 = SampleEntity.builder().id(documentId3).message("xyzg").rate(10)
.version(System.currentTimeMillis()).build();

List<IndexQuery> indexQueries = getIndexQueries(Arrays.asList(sampleEntity1, sampleEntity2, sampleEntity3));

operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
String pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()),
Duration.ofMillis(10));
Assertions.assertNotNull(pit);
Query.PointInTime qpit = new Query.PointInTime(pit,Duration.ofMillis(10));
Query query = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPageable(Pageable.ofSize(2))
.withPointInTime(qpit).build();
SearchHits<SampleEntity> results = operations.search(query,SampleEntity.class);
assertThat(results.getSearchHits().size()).isEqualTo(2);

// There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits
Thread.sleep(120000);
final Query searchAfterQuery = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPointInTime(qpit)
.withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage())))
.build();
assertThatExceptionOfType(UncategorizedElasticsearchException.class).isThrownBy(
()-> operations.search(searchAfterQuery,SampleEntity.class)
);
Boolean pitResult = operations.closePointInTime(pit);
Assertions.assertTrue(pitResult);
}



@Test // DATAES-457
public void shouldSortResultsGivenSortCriteriaWithScanAndScroll() {

Expand Down

0 comments on commit fc05e07

Please sign in to comment.