Skip to content

Commit

Permalink
81 supplier plugin health and store usage after revert (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#89)

Signed-off-by: Johannes Peter <[email protected]>
  • Loading branch information
JohannesDaniel authored Dec 16, 2024
1 parent c6c8595 commit aced4bd
Show file tree
Hide file tree
Showing 9 changed files with 586 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/main/java/com/o19s/es/ltr/LtrQueryParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.opensearch.ltr.stats.LTRStats;
import org.opensearch.ltr.stats.StatName;
import org.opensearch.ltr.stats.suppliers.CacheStatsOnNodeSupplier;
import org.opensearch.ltr.stats.suppliers.PluginHealthStatusSupplier;
import org.opensearch.ltr.stats.suppliers.StoreStatsSupplier;
import org.opensearch.ltr.stats.suppliers.CounterSupplier;
import com.o19s.es.explore.ExplorerQueryBuilder;
import com.o19s.es.ltr.action.AddFeaturesToSetAction;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class LtrQueryParserPlugin extends Plugin implements SearchPlugin, Script
public static final String LTR_LEGACY_BASE_URI = "/_opendistro/_ltr";
private final LtrRankerParserFactory parserFactory;
private final Caches caches;
private LTRStats ltrStats;
private final LTRStats ltrStats;

public LtrQueryParserPlugin(Settings settings) {
caches = new Caches(settings);
Expand Down Expand Up @@ -281,9 +283,23 @@ public Collection<Object> createComponents(Client client,
final JvmService jvmService = new JvmService(environment.settings());
final LTRCircuitBreakerService ltrCircuitBreakerService = new LTRCircuitBreakerService(jvmService).init();

addStats(client, clusterService, ltrCircuitBreakerService);
return asList(caches, parserFactory, ltrCircuitBreakerService, ltrStats);
}

private void addStats(
final Client client,
final ClusterService clusterService,
final LTRCircuitBreakerService ltrCircuitBreakerService
) {
final StoreStatsSupplier storeStatsSupplier = StoreStatsSupplier.create(client, clusterService);
ltrStats.addStats(StatName.LTR_STORES_STATS.getName(), new LTRStat<>(true, storeStatsSupplier));

final PluginHealthStatusSupplier pluginHealthStatusSupplier = PluginHealthStatusSupplier.create(
client, clusterService, ltrCircuitBreakerService);
ltrStats.addStats(StatName.LTR_PLUGIN_STATUS.getName(), new LTRStat<>(true, pluginHealthStatusSupplier));
}

private LTRStats getInitialStats() {
Map<String, LTRStat<?>> stats = new HashMap<>();
stats.put(StatName.LTR_CACHE_STATS.getName(),
Expand All @@ -297,7 +313,7 @@ private LTRStats getInitialStats() {

protected FeatureStoreLoader getFeatureStoreLoader() {
return (storeName, clientSupplier) ->
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
}

// A simplified version of some token filters needed by the feature stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

/**
* Supplier for an overall plugin health status.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
* A supplier which provides information on all feature stores. It provides basic
* information such as the index health and count of feature sets, features and
* models in the store.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
private static final Logger LOG = LogManager.getLogger(StoreStatsSupplier.class);
private static final String AGG_FIELD = "type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.stats.suppliers;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.List;
import java.util.function.Supplier;

/**
* Supplier for an overall plugin health status, which is based on the
* aggregate store health and the circuit breaker state.
*/
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
private static final String STATUS_RED = "red";

private final StoreUtils storeUtils;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

protected PluginHealthStatusSupplier(StoreUtils storeUtils,
LTRCircuitBreakerService ltrCircuitBreakerService) {
this.storeUtils = storeUtils;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
public String get() {
if (ltrCircuitBreakerService.isOpen()) {
return STATUS_RED;
}
return getAggregateStoresStatus();
}

private String getAggregateStoresStatus() {
List<String> storeNames = storeUtils.getAllLtrStoreNames();
return storeNames.stream()
.map(storeUtils::getLtrStoreHealthStatus)
.reduce(STATUS_GREEN, this::combineStatuses);
}

private String combineStatuses(String status1, String status2) {
if (STATUS_RED.equals(status1) || STATUS_RED.equals(status2)) {
return STATUS_RED;
} else if (STATUS_YELLOW.equals(status1) || STATUS_YELLOW.equals(status2)) {
return STATUS_YELLOW;
} else {
return STATUS_GREEN;
}
}

public static PluginHealthStatusSupplier create(
final Client client,
final ClusterService clusterService,
LTRCircuitBreakerService ltrCircuitBreakerService) {
return new PluginHealthStatusSupplier(new StoreUtils(client, clusterService), ltrCircuitBreakerService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.stats.suppliers;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A supplier to provide stats on the LTR stores. It retrieves basic information
* on the store, such as the health of the underlying index and number of documents
* in the store grouped by their type.
*/
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
static final String LTR_STORE_STATUS = "status";
static final String LTR_STORE_FEATURE_COUNT = "feature_count";
static final String LTR_STORE_FEATURE_SET_COUNT = "featureset_count";
static final String LTR_STORE_MODEL_COUNT = "model_count";

private final StoreUtils storeUtils;

protected StoreStatsSupplier(final StoreUtils storeUtils) {
this.storeUtils = storeUtils;
}

@Override
public Map<String, Map<String, Object>> get() {
Map<String, Map<String, Object>> storeStats = new ConcurrentHashMap<>();
List<String> storeNames = storeUtils.getAllLtrStoreNames();
storeNames.forEach(s -> storeStats.put(s, getStoreStat(s)));
return storeStats;
}

private Map<String, Object> getStoreStat(String storeName) {
if (!storeUtils.checkLtrStoreExists(storeName)) {
throw new IllegalArgumentException("LTR Store [" + storeName + "] doesn't exist.");
}
Map<String, Object> storeStat = new HashMap<>();
storeStat.put(LTR_STORE_STATUS, storeUtils.getLtrStoreHealthStatus(storeName));
Map<String, Integer> featureSets = storeUtils.extractFeatureSetStats(storeName);
storeStat.put(LTR_STORE_FEATURE_COUNT, featureSets.values().stream().reduce(Integer::sum).orElse(0));
storeStat.put(LTR_STORE_FEATURE_SET_COUNT, featureSets.size());
storeStat.put(LTR_STORE_MODEL_COUNT, storeUtils.getModelCount(storeName));
return storeStat;
}

public static StoreStatsSupplier create(final Client client, final ClusterService clusterService) {
return new StoreStatsSupplier(new StoreUtils(client, clusterService));
}
}
126 changes: 126 additions & 0 deletions src/main/java/org/opensearch/ltr/stats/suppliers/utils/StoreUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.stats.suppliers.utils;

import com.o19s.es.ltr.feature.store.StoredFeatureSet;
import com.o19s.es.ltr.feature.store.StoredLtrModel;
import com.o19s.es.ltr.feature.store.index.IndexFeatureStore;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* A utility class to provide details on the LTR stores. It queries the underlying
* indices to get the details.
*/
public class StoreUtils {

private static final String FEATURE_SET_KEY = "featureset";
private static final String FEATURE_SET_NAME_KEY = "name";
private static final String FEATURES_KEY = "features";
private final Client client;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

public StoreUtils(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(clusterService.getSettings()));
}

public boolean checkLtrStoreExists(String storeName) {
return clusterService.state().getRoutingTable().hasIndex(storeName);
}

public List<String> getAllLtrStoreNames() {
String[] names = indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
new ClusterStateRequest().indices(
IndexFeatureStore.DEFAULT_STORE, IndexFeatureStore.STORE_PREFIX + "*"));
return Arrays.asList(names);
}

public String getLtrStoreHealthStatus(String storeName) {
if (!checkLtrStoreExists(storeName)) {
throw new IndexNotFoundException(storeName);
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(
clusterService.state().metadata().index(storeName),
clusterService.state().getRoutingTable().index(storeName)
);

return indexHealth.getStatus().name().toLowerCase();
}

/**
* Returns a map of feaureset and the number of features in the featureset.
*
* @param storeName the name of the index for the LTR store.
* @return A map of (featureset, features count)
*/
@SuppressWarnings("unchecked")
public Map<String, Integer> extractFeatureSetStats(String storeName) {
final Map<String, Integer> featureSetStats = new HashMap<>();
final SearchHits featureSetHits = searchStore(storeName, StoredFeatureSet.TYPE);

for (final SearchHit featureSetHit : featureSetHits) {
extractFeatureSetFromFeatureSetHit(featureSetHit).ifPresent(featureSet -> {
final List<String> features = (List<String>) featureSet.get(FEATURES_KEY);
featureSetStats.put((String) featureSet.get(FEATURE_SET_NAME_KEY), features.size());
});
}
return featureSetStats;
}

@SuppressWarnings("unchecked")
private Optional<Map<String, Object>> extractFeatureSetFromFeatureSetHit(SearchHit featureSetHit) {
final Map<String, Object> featureSetMap = featureSetHit.getSourceAsMap();
if (featureSetMap != null && featureSetMap.containsKey(FEATURE_SET_KEY)) {
final Map<String, Object> featureSet = (Map<String, Object>) featureSetMap.get(FEATURE_SET_KEY);

if (featureSet != null && featureSet.containsKey(FEATURES_KEY) && featureSet.containsKey(FEATURE_SET_NAME_KEY)) {
return Optional.of(featureSet);
}
}

return Optional.empty();
}

public long getModelCount(String storeName) {
return searchStore(storeName, StoredLtrModel.TYPE).getTotalHits().value;
}

private SearchHits searchStore(String storeName, String docType) {
return client.prepareSearch(storeName)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("type", docType))
.get()
.getHits();
}
}
Loading

0 comments on commit aced4bd

Please sign in to comment.