Skip to content

Commit

Permalink
Enhance Point In Time support with APIs to list active point-in-time …
Browse files Browse the repository at this point in the history
…searches (opensearch-project#218)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Jun 22, 2024
1 parent c90869d commit eb8ca0d
Show file tree
Hide file tree
Showing 8 changed files with 502 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.ResourceNotFoundException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.VersionConflictException;

/**
* Simple {@link PersistenceExceptionTranslator} for OpenSearch. Convert the given runtime exception to an
Expand Down Expand Up @@ -60,22 +62,36 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
if (ex instanceof OpenSearchStatusException) {
OpenSearchStatusException statusException = (OpenSearchStatusException) ex;

if (statusException.status() == RestStatus.NOT_FOUND
&& statusException.getMessage().contains("index_not_found_exception")) {

Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]");
String index = "";
Matcher matcher = pattern.matcher(statusException.getMessage());
if (matcher.matches()) {
index = matcher.group(1);
if (statusException.status() == RestStatus.NOT_FOUND) {
if (statusException.getMessage().contains("index_not_found_exception")) {
Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]");
String index = "";
Matcher matcher = pattern.matcher(statusException.getMessage());
if (matcher.matches()) {
index = matcher.group(1);
}

return new NoSuchIndexException(index);
} else {
return new ResourceNotFoundException(statusException.getMessage());
}
return new NoSuchIndexException(index);
}

if (statusException.getMessage().contains("validation_exception")) {
return new DataIntegrityViolationException(statusException.getMessage());
}

if (statusException.status() != null && statusException.getMessage() != null) {
final Integer status = statusException.status().getStatus();
final String message = statusException.getMessage();

if (status == 409 && message.contains("type=version_conflict_engine_exception")) {
if (message.contains("version conflict, current version [")) {
throw new VersionConflictException("Version conflict", statusException);
}
}
}

return new UncategorizedElasticsearchException(
ex.getMessage(), statusException.status().getStatus(), null, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.data.core.OpenSearchOperations;
import org.opensearch.index.query.MoreLikeThisQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -80,7 +81,7 @@
* OpenSearchRestTemplate
* @since 0.1
*/
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate {
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations {

private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class);

Expand Down Expand Up @@ -468,6 +469,13 @@ public Boolean closePointInTime(String pit) {
return false;
}

@Override
public List<PitInfo> listPointInTime() {
return execute(client -> client.getAllPits(RequestOptions.DEFAULT))
.getPitInfos().stream().map(pit -> new PitInfo(pit.getPitId(), pit.getCreationTime(), null))
.toList();
}

public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) {
SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index);
return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.client.opensearch.core.pit.DeletePitRequest;
import org.opensearch.client.opensearch.core.search.SearchResult;
import org.opensearch.client.transport.Version;
import org.opensearch.data.core.OpenSearchOperations;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation;
import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate;
Expand Down Expand Up @@ -75,7 +76,7 @@
* @author Haibo Liu
* @since 4.4
*/
public class OpenSearchTemplate extends AbstractElasticsearchTemplate {
public class OpenSearchTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations {

private static final Log LOGGER = LogFactory.getLog(OpenSearchTemplate.class);

Expand Down Expand Up @@ -615,6 +616,14 @@ public Boolean closePointInTime(String pit) {
return !response.pits().isEmpty();
}

@Override
public List<PitInfo> listPointInTime() {
return execute(client -> client.listAllPit()).pits()
.stream()
.map(pit -> new PitInfo(pit.pitId(), pit.creationTime(), pit.keepAlive() == null ? null : Duration.ofMillis(pit.keepAlive())))
.toList();
}

// endregion

// region script methods
Expand Down Expand Up @@ -719,5 +728,4 @@ protected List<IndexedObjectInformation> checkForBulkOperationFailure(BulkRespon

}
// endregion

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.data.core;

import java.time.Duration;
import java.util.List;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

/**
* The extension over {@link ElasticsearchOperations} with OpenSearch specific operations.
*/
public interface OpenSearchOperations extends ElasticsearchOperations {
/**
* Return all active point in time searches
* @return all active point in time searches
*/
List<PitInfo> listPointInTime();

/**
* Describes the point in time entry
*
* @param id the point in time id
* @param creationTime the time this point in time was created
* @param keepAlive the new keep alive value for this point in time
*/
record PitInfo(String id, long creationTime, Duration keepAlive) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.data.client.orhlc;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;

import org.junit.jupiter.api.DisplayName;
import org.opensearch.data.client.junit.jupiter.OpenSearchRestTemplateConfiguration;
import org.opensearch.data.core.OpenSearchSpecificIntegrationTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;

@ContextConfiguration(classes = {OpenSearchORHLCSpecificIntegrationTests.Config.class})
@DisplayName("Using OpenSearch RestHighLevelClient")
public class OpenSearchORHLCSpecificIntegrationTests extends OpenSearchSpecificIntegrationTests {

@Configuration
@Import({OpenSearchRestTemplateConfiguration.class})
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("integration-specific-os");
}
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.data.client.osc;

import org.junit.jupiter.api.DisplayName;
import org.opensearch.data.client.junit.jupiter.OpenSearchTemplateConfiguration;
import org.opensearch.data.core.OpenSearchSpecificIntegrationTests;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;

/**
* @author Farid Faoudi
* @author Sascha Woo
* @since 4.4
*/
@ContextConfiguration(classes = { OpenSearchOSCSpecificIntegrationTests.Config.class })
@DisplayName("Using OpenSearch Client")
public class OpenSearchOSCSpecificIntegrationTests extends OpenSearchSpecificIntegrationTests {

@Configuration
@Import({ OpenSearchTemplateConfiguration.class })
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("integration-specific-os");
}
}

@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return Queries.getBuilderWithMatchAllQuery();
}
}
Loading

0 comments on commit eb8ca0d

Please sign in to comment.