Skip to content

Commit

Permalink
81 supplier plugin health and store usage (opensearch-project#84)
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Peter <[email protected]>
  • Loading branch information
JohannesDaniel authored Dec 12, 2024
1 parent 2271478 commit e58e20d
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/main/java/com/o19s/es/ltr/LtrQueryParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.opensearch.ltr.stats.LTRStats;
import org.opensearch.ltr.stats.StatName;
import org.opensearch.ltr.stats.suppliers.CacheStatsOnNodeSupplier;
import org.opensearch.ltr.stats.suppliers.PluginHealthStatusSupplier;
import org.opensearch.ltr.stats.suppliers.StoreStatsSupplier;
import org.opensearch.ltr.stats.suppliers.CounterSupplier;
import com.o19s.es.explore.ExplorerQueryBuilder;
import com.o19s.es.ltr.action.AddFeaturesToSetAction;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class LtrQueryParserPlugin extends Plugin implements SearchPlugin, Script
public static final String LTR_LEGACY_BASE_URI = "/_opendistro/_ltr";
private final LtrRankerParserFactory parserFactory;
private final Caches caches;
private LTRStats ltrStats;
private final LTRStats ltrStats;

public LtrQueryParserPlugin(Settings settings) {
caches = new Caches(settings);
Expand Down Expand Up @@ -281,9 +283,23 @@ public Collection<Object> createComponents(Client client,
final JvmService jvmService = new JvmService(environment.settings());
final LTRCircuitBreakerService ltrCircuitBreakerService = new LTRCircuitBreakerService(jvmService).init();

addStats(client, clusterService, ltrCircuitBreakerService);
return asList(caches, parserFactory, ltrCircuitBreakerService, ltrStats);
}

private void addStats(
final Client client,
final ClusterService clusterService,
final LTRCircuitBreakerService ltrCircuitBreakerService
) {
final StoreStatsSupplier storeStatsSupplier = StoreStatsSupplier.create(client, clusterService);
ltrStats.addStats(StatName.LTR_STORES_STATS.getName(), new LTRStat<>(true, storeStatsSupplier));

final PluginHealthStatusSupplier pluginHealthStatusSupplier = PluginHealthStatusSupplier.create(
client, clusterService, ltrCircuitBreakerService);
ltrStats.addStats(StatName.LTR_PLUGIN_STATUS.getName(), new LTRStat<>(true, pluginHealthStatusSupplier));
}

private LTRStats getInitialStats() {
Map<String, LTRStat<?>> stats = new HashMap<>();
stats.put(StatName.LTR_CACHE_STATS.getName(),
Expand All @@ -297,7 +313,7 @@ private LTRStats getInitialStats() {

protected FeatureStoreLoader getFeatureStoreLoader() {
return (storeName, clientSupplier) ->
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
}

// A simplified version of some token filters needed by the feature stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

/**
* Supplier for an overall plugin health status.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
* A supplier which provides information on all feature stores. It provides basic
* information such as the index health and count of feature sets, features and
* models in the store.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
private static final Logger LOG = LogManager.getLogger(StoreStatsSupplier.class);
private static final String AGG_FIELD = "type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.ltr.stats.suppliers;


import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.List;
import java.util.function.Supplier;

/**
* Supplier for an overall plugin health status, which is based on the
* aggregate store health and the circuit breaker state.
*/
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
private static final String STATUS_RED = "red";

private final StoreUtils storeUtils;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

protected PluginHealthStatusSupplier(StoreUtils storeUtils,
LTRCircuitBreakerService ltrCircuitBreakerService) {
this.storeUtils = storeUtils;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
public String get() {
if (ltrCircuitBreakerService.isOpen()) {
return STATUS_RED;
}
return getAggregateStoresStatus();
}

private String getAggregateStoresStatus() {
List<String> storeNames = storeUtils.getAllLtrStoreNames();
return storeNames.stream()
.map(storeUtils::getLtrStoreHealthStatus)
.reduce(STATUS_GREEN, this::combineStatuses);
}

private String combineStatuses(String s1, String s2) {
if (s2 == null || STATUS_RED.equals(s1) || STATUS_RED.equals(s2)) {
return STATUS_RED;
} else if (STATUS_YELLOW.equals(s1) || STATUS_YELLOW.equals(s2)) {
return STATUS_YELLOW;
} else {
return STATUS_GREEN;
}
}

public static PluginHealthStatusSupplier create(
final Client client,
final ClusterService clusterService,
LTRCircuitBreakerService ltrCircuitBreakerService) {
return new PluginHealthStatusSupplier(new StoreUtils(client, clusterService), ltrCircuitBreakerService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.ltr.stats.suppliers;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A supplier to provide stats on the LTR stores. It retrieves basic information
* on the store, such as the health of the underlying index and number of documents
* in the store grouped by their type.
*/
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
static final String LTR_STORE_STATUS = "status";
static final String LTR_STORE_FEATURE_COUNT = "feature_count";
static final String LTR_STORE_FEATURE_SET_COUNT = "featureset_count";
static final String LTR_STORE_MODEL_COUNT = "model_count";

private final StoreUtils storeUtils;

protected StoreStatsSupplier(final StoreUtils storeUtils) {
this.storeUtils = storeUtils;
}

@Override
public Map<String, Map<String, Object>> get() {
Map<String, Map<String, Object>> storeStats = new ConcurrentHashMap<>();
List<String> storeNames = storeUtils.getAllLtrStoreNames();
storeNames.forEach(s -> storeStats.put(s, getStoreStat(s)));
return storeStats;
}

private Map<String, Object> getStoreStat(String storeName) {
if (!storeUtils.checkLtrStoreExists(storeName)) {
throw new IllegalArgumentException("LTR Store [" + storeName + "] doesn't exist.");
}
Map<String, Object> storeStat = new HashMap<>();
storeStat.put(LTR_STORE_STATUS, storeUtils.getLtrStoreHealthStatus(storeName));
Map<String, Integer> featureSets = storeUtils.extractFeatureSetStats(storeName);
storeStat.put(LTR_STORE_FEATURE_COUNT, featureSets.values().stream().reduce(Integer::sum).orElse(0));
storeStat.put(LTR_STORE_FEATURE_SET_COUNT, featureSets.size());
storeStat.put(LTR_STORE_MODEL_COUNT, storeUtils.getModelCount(storeName));
return storeStat;
}

public static StoreStatsSupplier create(final Client client, final ClusterService clusterService) {
return new StoreStatsSupplier(new StoreUtils(client, clusterService));
}
}
115 changes: 115 additions & 0 deletions src/main/java/org/opensearch/ltr/stats/suppliers/utils/StoreUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.ltr.stats.suppliers.utils;

import com.o19s.es.ltr.feature.store.StoredFeatureSet;
import com.o19s.es.ltr.feature.store.StoredLtrModel;
import com.o19s.es.ltr.feature.store.index.IndexFeatureStore;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* A utility class to provide details on the LTR stores. It queries the underlying
* indices to get the details.
*/
public class StoreUtils {

private static final String FEATURE_SET_KEY = "featureset";
private static final String FEATURE_SET_NAME_KEY = "name";
private static final String FEATURES_KEY = "features";
private Client client;
private ClusterService clusterService;
private IndexNameExpressionResolver indexNameExpressionResolver;

public StoreUtils(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(clusterService.getSettings()));
}

public boolean checkLtrStoreExists(String storeName) {
return clusterService.state().getRoutingTable().hasIndex(storeName);
}

public List<String> getAllLtrStoreNames() {
String[] names = indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
new ClusterStateRequest().indices(
IndexFeatureStore.DEFAULT_STORE, IndexFeatureStore.STORE_PREFIX + "*"));
return Arrays.asList(names);
}

public String getLtrStoreHealthStatus(String storeName) {
if (!checkLtrStoreExists(storeName)) {
throw new IndexNotFoundException(storeName);
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(
clusterService.state().metadata().index(storeName),
clusterService.state().getRoutingTable().index(storeName)
);

return indexHealth.getStatus().name().toLowerCase();
}

/**
* Returns a map of feaureset and the number of features in the featureset.
*
* @param storeName the name of the index for the LTR store.
* @return A map of (featureset, features count)
*/
@SuppressWarnings("unchecked")
public Map<String, Integer> extractFeatureSetStats(String storeName) {
final Map<String, Integer> featureSetStats = new HashMap<>();
final SearchHits featureSetHits = searchStore(storeName, StoredFeatureSet.TYPE);

for (final SearchHit featureSetHit : featureSetHits) {
extractFeatureSetFromFeatureSetHit(featureSetHit).ifPresent(featureSet -> {
final List<String> features = (List<String>) featureSet.get(FEATURES_KEY);
featureSetStats.put((String) featureSet.get(FEATURE_SET_NAME_KEY), features.size());
});
}
return featureSetStats;
}

@SuppressWarnings("unchecked")
private Optional<Map<String, Object>> extractFeatureSetFromFeatureSetHit(SearchHit featureSetHit) {
final Map<String, Object> featureSetMap = featureSetHit.getSourceAsMap();
if (featureSetMap != null && featureSetMap.containsKey(FEATURE_SET_KEY)) {
final Map<String, Object> featureSet = (Map<String, Object>) featureSetMap.get(FEATURE_SET_KEY);

if (featureSet != null && featureSet.containsKey(FEATURES_KEY) && featureSet.containsKey(FEATURE_SET_NAME_KEY)) {
return Optional.of(featureSet);
}
}

return Optional.empty();
}



// private

public long getModelCount(String storeName) {
return searchStore(storeName, StoredLtrModel.TYPE).getTotalHits().value;
}

private SearchHits searchStore(String storeName, String docType) {
return client.prepareSearch(storeName)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("type", docType))
.get()
.getHits();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.opensearch.ltr.stats.suppliers;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

public class PluginHealthStatusSupplierTests {
private PluginHealthStatusSupplier pluginHealthStatusSupplier;

@Mock
private LTRCircuitBreakerService ltrCircuitBreakerService;

@Mock
StoreUtils storeUtils;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
pluginHealthStatusSupplier =
new PluginHealthStatusSupplier(storeUtils, ltrCircuitBreakerService);
}

@Test
public void testStatusGreen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus(Mockito.anyString())).thenReturn("green");

assertEquals("green", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusYellowStoreStatusYellow() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("green");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("yellow");
assertEquals("yellow", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedStoreStatusRed() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("red");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("green");

assertEquals("red", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedCircuitBreakerOpen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(true);
assertEquals("red", pluginHealthStatusSupplier.get());
}
}
Loading

0 comments on commit e58e20d

Please sign in to comment.