From eb8ca0dd858eb0c218e51d1832a0beaf96e7a51f Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 21 Jun 2024 16:39:04 -0400 Subject: [PATCH] Enhance Point In Time support with APIs to list active point-in-time searches (#218) Signed-off-by: Andriy Redko --- .../orhlc/OpenSearchExceptionTranslator.java | 34 +- .../client/orhlc/OpenSearchRestTemplate.java | 10 +- .../data/client/osc/OpenSearchTemplate.java | 12 +- .../data/core/OpenSearchOperations.java | 26 ++ ...enSearchORHLCSpecificIntegrationTests.java | 40 ++ ...OpenSearchOSCSpecificIntegrationTests.java | 50 +++ .../OpenSearchSpecificIntegrationTests.java | 341 ++++++++++++++++++ .../core/ElasticsearchIntegrationTests.java | 54 +-- 8 files changed, 502 insertions(+), 65 deletions(-) create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/orhlc/OpenSearchORHLCSpecificIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/OpenSearchOSCSpecificIntegrationTests.java create mode 100755 spring-data-opensearch/src/test/java/org/opensearch/data/core/OpenSearchSpecificIntegrationTests.java diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchExceptionTranslator.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchExceptionTranslator.java index c01a002..4945139 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchExceptionTranslator.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchExceptionTranslator.java @@ -23,7 +23,9 @@ import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.elasticsearch.NoSuchIndexException; +import org.springframework.data.elasticsearch.ResourceNotFoundException; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; +import org.springframework.data.elasticsearch.VersionConflictException; /** * Simple {@link PersistenceExceptionTranslator} for OpenSearch. Convert the given runtime exception to an @@ -60,22 +62,36 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) { if (ex instanceof OpenSearchStatusException) { OpenSearchStatusException statusException = (OpenSearchStatusException) ex; - if (statusException.status() == RestStatus.NOT_FOUND - && statusException.getMessage().contains("index_not_found_exception")) { - - Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]"); - String index = ""; - Matcher matcher = pattern.matcher(statusException.getMessage()); - if (matcher.matches()) { - index = matcher.group(1); + if (statusException.status() == RestStatus.NOT_FOUND) { + if (statusException.getMessage().contains("index_not_found_exception")) { + Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]"); + String index = ""; + Matcher matcher = pattern.matcher(statusException.getMessage()); + if (matcher.matches()) { + index = matcher.group(1); + } + + return new NoSuchIndexException(index); + } else { + return new ResourceNotFoundException(statusException.getMessage()); } - return new NoSuchIndexException(index); } if (statusException.getMessage().contains("validation_exception")) { return new DataIntegrityViolationException(statusException.getMessage()); } + if (statusException.status() != null && statusException.getMessage() != null) { + final Integer status = statusException.status().getStatus(); + final String message = statusException.getMessage(); + + if (status == 409 && message.contains("type=version_conflict_engine_exception")) { + if (message.contains("version conflict, current version [")) { + throw new VersionConflictException("Version conflict", statusException); + } + } + } + return new UncategorizedElasticsearchException( ex.getMessage(), statusException.status().getStatus(), null, ex); } diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java index 5217e1c..316f0d2 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java @@ -45,6 +45,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.unit.TimeValue; +import org.opensearch.data.core.OpenSearchOperations; import org.opensearch.index.query.MoreLikeThisQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.reindex.BulkByScrollResponse; @@ -80,7 +81,7 @@ * OpenSearchRestTemplate * @since 0.1 */ -public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate { +public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations { private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class); @@ -468,6 +469,13 @@ public Boolean closePointInTime(String pit) { return false; } + @Override + public List listPointInTime() { + return execute(client -> client.getAllPits(RequestOptions.DEFAULT)) + .getPitInfos().stream().map(pit -> new PitInfo(pit.getPitId(), pit.getCreationTime(), null)) + .toList(); + } + public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) { SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index); return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT)); diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java index 819cb9a..0b0c69f 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java @@ -37,6 +37,7 @@ import org.opensearch.client.opensearch.core.pit.DeletePitRequest; import org.opensearch.client.opensearch.core.search.SearchResult; import org.opensearch.client.transport.Version; +import org.opensearch.data.core.OpenSearchOperations; import org.springframework.data.elasticsearch.BulkFailureException; import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation; import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate; @@ -75,7 +76,7 @@ * @author Haibo Liu * @since 4.4 */ -public class OpenSearchTemplate extends AbstractElasticsearchTemplate { +public class OpenSearchTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations { private static final Log LOGGER = LogFactory.getLog(OpenSearchTemplate.class); @@ -615,6 +616,14 @@ public Boolean closePointInTime(String pit) { return !response.pits().isEmpty(); } + @Override + public List listPointInTime() { + return execute(client -> client.listAllPit()).pits() + .stream() + .map(pit -> new PitInfo(pit.pitId(), pit.creationTime(), pit.keepAlive() == null ? null : Duration.ofMillis(pit.keepAlive()))) + .toList(); + } + // endregion // region script methods @@ -719,5 +728,4 @@ protected List checkForBulkOperationFailure(BulkRespon } // endregion - } diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java new file mode 100644 index 0000000..525eee8 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java @@ -0,0 +1,26 @@ +package org.opensearch.data.core; + +import java.time.Duration; +import java.util.List; +import org.springframework.data.elasticsearch.core.ElasticsearchOperations; + +/** + * The extension over {@link ElasticsearchOperations} with OpenSearch specific operations. + */ +public interface OpenSearchOperations extends ElasticsearchOperations { + /** + * Return all active point in time searches + * @return all active point in time searches + */ + List listPointInTime(); + + /** + * Describes the point in time entry + * + * @param id the point in time id + * @param creationTime the time this point in time was created + * @param keepAlive the new keep alive value for this point in time + */ + record PitInfo(String id, long creationTime, Duration keepAlive) { + } +} diff --git a/spring-data-opensearch/src/test/java/org/opensearch/data/client/orhlc/OpenSearchORHLCSpecificIntegrationTests.java b/spring-data-opensearch/src/test/java/org/opensearch/data/client/orhlc/OpenSearchORHLCSpecificIntegrationTests.java new file mode 100644 index 0000000..edbd4a7 --- /dev/null +++ b/spring-data-opensearch/src/test/java/org/opensearch/data/client/orhlc/OpenSearchORHLCSpecificIntegrationTests.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.orhlc; + +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; + +import org.junit.jupiter.api.DisplayName; +import org.opensearch.data.client.junit.jupiter.OpenSearchRestTemplateConfiguration; +import org.opensearch.data.core.OpenSearchSpecificIntegrationTests; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +@ContextConfiguration(classes = {OpenSearchORHLCSpecificIntegrationTests.Config.class}) +@DisplayName("Using OpenSearch RestHighLevelClient") +public class OpenSearchORHLCSpecificIntegrationTests extends OpenSearchSpecificIntegrationTests { + + @Configuration + @Import({OpenSearchRestTemplateConfiguration.class}) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("integration-specific-os"); + } + } + @Override + protected BaseQueryBuilder getBuilderWithMatchAllQuery() { + return new NativeSearchQueryBuilder().withQuery(matchAllQuery()); + } +} diff --git a/spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/OpenSearchOSCSpecificIntegrationTests.java b/spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/OpenSearchOSCSpecificIntegrationTests.java new file mode 100644 index 0000000..3b05f44 --- /dev/null +++ b/spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/OpenSearchOSCSpecificIntegrationTests.java @@ -0,0 +1,50 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.opensearch.data.client.osc; + +import org.junit.jupiter.api.DisplayName; +import org.opensearch.data.client.junit.jupiter.OpenSearchTemplateConfiguration; +import org.opensearch.data.core.OpenSearchSpecificIntegrationTests; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Farid Faoudi + * @author Sascha Woo + * @since 4.4 + */ +@ContextConfiguration(classes = { OpenSearchOSCSpecificIntegrationTests.Config.class }) +@DisplayName("Using OpenSearch Client") +public class OpenSearchOSCSpecificIntegrationTests extends OpenSearchSpecificIntegrationTests { + + @Configuration + @Import({ OpenSearchTemplateConfiguration.class }) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("integration-specific-os"); + } + } + + @Override + protected BaseQueryBuilder getBuilderWithMatchAllQuery() { + return Queries.getBuilderWithMatchAllQuery(); + } +} diff --git a/spring-data-opensearch/src/test/java/org/opensearch/data/core/OpenSearchSpecificIntegrationTests.java b/spring-data-opensearch/src/test/java/org/opensearch/data/core/OpenSearchSpecificIntegrationTests.java new file mode 100755 index 0000000..605aa2a --- /dev/null +++ b/spring-data-opensearch/src/test/java/org/opensearch/data/core/OpenSearchSpecificIntegrationTests.java @@ -0,0 +1,341 @@ +/* + * Copyright 2014-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.opensearch.data.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.springframework.data.elasticsearch.annotations.FieldType.Text; +import static org.springframework.data.elasticsearch.utils.IdGenerator.nextIdAsString; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.opensearch.data.client.EnabledIfOpenSearchVersion; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.annotation.Id; +import org.springframework.data.annotation.Version; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.annotations.Field; +import org.springframework.data.elasticsearch.annotations.ScriptedField; +import org.springframework.data.elasticsearch.annotations.Setting; +import org.springframework.data.elasticsearch.core.IndexOperations; +import org.springframework.data.elasticsearch.core.SearchHits; +import org.springframework.data.elasticsearch.core.geo.GeoPoint; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.lang.Nullable; + +/** + * All the integration tests that are not in separate files. + */ +@SpringIntegrationTest +public abstract class OpenSearchSpecificIntegrationTests { + private static final String MULTI_INDEX_PREFIX = "os-test-index"; + private static final String MULTI_INDEX_ALL = MULTI_INDEX_PREFIX + "*"; + + @Autowired protected OpenSearchOperations operations; + private IndexOperations indexOperations; + + @Autowired protected IndexNameProvider indexNameProvider; + + @BeforeEach + public void before() { + + indexNameProvider.increment(); + indexOperations = operations.indexOps(SampleEntity.class); + indexOperations.createWithMapping(); + } + + @Test + @Order(java.lang.Integer.MAX_VALUE) + void cleanup() { + operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete(); + operations.indexOps(IndexCoordinates.of(MULTI_INDEX_ALL)).delete(); + } + + protected abstract BaseQueryBuilder getBuilderWithMatchAllQuery(); + + @Test + @EnabledIfOpenSearchVersion( + onOrAfter = "2.3.0", + reason = "https://github.com/opensearch-project/OpenSearch/issues/1147") + public void testPointInTimeKeepAliveExpired() throws InterruptedException { + // given + // first document + String documentId = nextIdAsString(); + SampleEntity sampleEntity1 = SampleEntity.builder().id(documentId).message("abc").rate(10) + .version(System.currentTimeMillis()).build(); + + // second document + String documentId2 = nextIdAsString(); + SampleEntity sampleEntity2 = SampleEntity.builder().id(documentId2).message("xyz").rate(5) + .version(System.currentTimeMillis()).build(); + + // third document + String documentId3 = nextIdAsString(); + SampleEntity sampleEntity3 = SampleEntity.builder().id(documentId3).message("xyzg").rate(10) + .version(System.currentTimeMillis()).build(); + + List indexQueries = getIndexQueries(Arrays.asList(sampleEntity1, sampleEntity2, sampleEntity3)); + + operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName())); + String pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()), + Duration.ofMillis(10)); + Assertions.assertNotNull(pit); + Query.PointInTime qpit = new Query.PointInTime(pit,Duration.ofMillis(10)); + Query query = getBuilderWithMatchAllQuery() // + .withSort(Sort.by(Sort.Order.desc("message"))) // + .withPageable(Pageable.ofSize(2)) + .withPointInTime(qpit).build(); + SearchHits results = operations.search(query,SampleEntity.class); + assertThat(results.getSearchHits().size()).isEqualTo(2); + + final Query searchAfterQuery = getBuilderWithMatchAllQuery() // + .withSort(Sort.by(Sort.Order.desc("message"))) // + .withPointInTime(qpit) + .withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage()))) + .build(); + + final long started = System.nanoTime(); + while ((System.nanoTime() - started) < TimeUnit.SECONDS.toNanos(120)) { + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + if (operations.listPointInTime().isEmpty()) { + break; + } + } + + assertThatExceptionOfType(UncategorizedElasticsearchException.class) + .isThrownBy(()-> operations.search(searchAfterQuery,SampleEntity.class)); + + Boolean pitResult = operations.closePointInTime(pit); + Assertions.assertTrue(pitResult); + } + + private IndexQuery getIndexQuery(SampleEntity sampleEntity) { + return new IndexQueryBuilder().withId(sampleEntity.getId()).withObject(sampleEntity) + .withVersion(sampleEntity.getVersion()).build(); + } + + private List getIndexQueries(List sampleEntities) { + List indexQueries = new ArrayList<>(); + for (SampleEntity sampleEntity : sampleEntities) { + indexQueries.add(getIndexQuery(sampleEntity)); + } + return indexQueries; + } + + // region entities + @Document(indexName = "#{@indexNameProvider.indexName()}") + @Setting(shards = 1, replicas = 0, refreshInterval = "-1") + protected static class SampleEntity { + @Nullable + @Id private String id; + @Nullable + @Field(type = Text, store = true, fielddata = true) private String type; + @Nullable + @Field(type = Text, store = true, fielddata = true) private String message; + private int rate; + @Nullable + @ScriptedField private Double scriptedRate; + private boolean available; + @Nullable private GeoPoint location; + @Nullable + @Version private Long version; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + @Nullable private String id; + @Nullable private String type; + @Nullable private String message; + @Nullable private Long version; + private int rate; + @Nullable private GeoPoint location; + + public Builder id(String id) { + this.id = id; + return this; + } + + public Builder type(String type) { + this.type = type; + return this; + } + + public Builder message(String message) { + this.message = message; + return this; + } + + public Builder version(Long version) { + this.version = version; + return this; + } + + public Builder rate(int rate) { + this.rate = rate; + return this; + } + + public Builder location(GeoPoint location) { + this.location = location; + return this; + } + + public SampleEntity build() { + SampleEntity sampleEntity = new SampleEntity(); + sampleEntity.setId(id); + sampleEntity.setType(type); + sampleEntity.setMessage(message); + sampleEntity.setRate(rate); + sampleEntity.setVersion(version); + sampleEntity.setLocation(location); + return sampleEntity; + } + } + + public SampleEntity() {} + + @Nullable + public String getId() { + return id; + } + + public void setId(@Nullable String id) { + this.id = id; + } + + @Nullable + public String getType() { + return type; + } + + public void setType(@Nullable String type) { + this.type = type; + } + + @Nullable + public String getMessage() { + return message; + } + + public void setMessage(@Nullable String message) { + this.message = message; + } + + public int getRate() { + return rate; + } + + public void setRate(int rate) { + this.rate = rate; + } + + @Nullable + public java.lang.Double getScriptedRate() { + return scriptedRate; + } + + public void setScriptedRate(@Nullable java.lang.Double scriptedRate) { + this.scriptedRate = scriptedRate; + } + + public boolean isAvailable() { + return available; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + @Nullable + public GeoPoint getLocation() { + return location; + } + + public void setLocation(@Nullable GeoPoint location) { + this.location = location; + } + + @Nullable + public java.lang.Long getVersion() { + return version; + } + + public void setVersion(@Nullable java.lang.Long version) { + this.version = version; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + SampleEntity that = (SampleEntity) o; + + if (rate != that.rate) + return false; + if (available != that.available) + return false; + if (!Objects.equals(id, that.id)) + return false; + if (!Objects.equals(type, that.type)) + return false; + if (!Objects.equals(message, that.message)) + return false; + if (!Objects.equals(scriptedRate, that.scriptedRate)) + return false; + if (!Objects.equals(location, that.location)) + return false; + return Objects.equals(version, that.version); + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (type != null ? type.hashCode() : 0); + result = 31 * result + (message != null ? message.hashCode() : 0); + result = 31 * result + rate; + result = 31 * result + (scriptedRate != null ? scriptedRate.hashCode() : 0); + result = 31 * result + (available ? 1 : 0); + result = 31 * result + (location != null ? location.hashCode() : 0); + result = 31 * result + (version != null ? version.hashCode() : 0); + return result; + } + } + // endregion +} diff --git a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java index 90ce42a..cf183fd 100755 --- a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java +++ b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -62,7 +62,6 @@ import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; -import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; @@ -2711,57 +2710,6 @@ public void testPointInTimeNewDataUnavailable(){ Assertions.assertTrue(pitResult); } - @Test - @EnabledIfOpenSearchVersion( - onOrAfter = "2.3.0", - reason = "https://github.com/opensearch-project/OpenSearch/issues/1147") - public void testPointInTimeKeepAliveExpired() throws InterruptedException { - // given - // first document - String documentId = nextIdAsString(); - SampleEntity sampleEntity1 = SampleEntity.builder().id(documentId).message("abc").rate(10) - .version(System.currentTimeMillis()).build(); - - // second document - String documentId2 = nextIdAsString(); - SampleEntity sampleEntity2 = SampleEntity.builder().id(documentId2).message("xyz").rate(5) - .version(System.currentTimeMillis()).build(); - - // third document - String documentId3 = nextIdAsString(); - SampleEntity sampleEntity3 = SampleEntity.builder().id(documentId3).message("xyzg").rate(10) - .version(System.currentTimeMillis()).build(); - - List indexQueries = getIndexQueries(Arrays.asList(sampleEntity1, sampleEntity2, sampleEntity3)); - - operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName())); - String pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()), - Duration.ofMillis(10)); - Assertions.assertNotNull(pit); - Query.PointInTime qpit = new Query.PointInTime(pit,Duration.ofMillis(10)); - Query query = getBuilderWithMatchAllQuery() // - .withSort(Sort.by(Sort.Order.desc("message"))) // - .withPageable(Pageable.ofSize(2)) - .withPointInTime(qpit).build(); - SearchHits results = operations.search(query,SampleEntity.class); - assertThat(results.getSearchHits().size()).isEqualTo(2); - - // There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits - Thread.sleep(120000); - final Query searchAfterQuery = getBuilderWithMatchAllQuery() // - .withSort(Sort.by(Sort.Order.desc("message"))) // - .withPointInTime(qpit) - .withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage()))) - .build(); - assertThatExceptionOfType(UncategorizedElasticsearchException.class).isThrownBy( - ()-> operations.search(searchAfterQuery,SampleEntity.class) - ); - Boolean pitResult = operations.closePointInTime(pit); - Assertions.assertTrue(pitResult); - } - - - @Test // DATAES-457 public void shouldSortResultsGivenSortCriteriaWithScanAndScroll() {