Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

81 supplier plugin health and store usage #84

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/main/java/com/o19s/es/ltr/LtrQueryParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.opensearch.ltr.stats.LTRStats;
import org.opensearch.ltr.stats.StatName;
import org.opensearch.ltr.stats.suppliers.CacheStatsOnNodeSupplier;
import org.opensearch.ltr.stats.suppliers.PluginHealthStatusSupplier;
import org.opensearch.ltr.stats.suppliers.StoreStatsSupplier;
import org.opensearch.ltr.stats.suppliers.CounterSupplier;
import com.o19s.es.explore.ExplorerQueryBuilder;
import com.o19s.es.ltr.action.AddFeaturesToSetAction;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class LtrQueryParserPlugin extends Plugin implements SearchPlugin, Script
public static final String LTR_LEGACY_BASE_URI = "/_opendistro/_ltr";
private final LtrRankerParserFactory parserFactory;
private final Caches caches;
private LTRStats ltrStats;
private final LTRStats ltrStats;

public LtrQueryParserPlugin(Settings settings) {
caches = new Caches(settings);
Expand Down Expand Up @@ -278,9 +280,23 @@ public Collection<Object> createComponents(Client client,
final JvmService jvmService = new JvmService(environment.settings());
final LTRCircuitBreakerService ltrCircuitBreakerService = new LTRCircuitBreakerService(jvmService).init();

addStats(client, clusterService, ltrCircuitBreakerService);
return asList(caches, parserFactory, ltrCircuitBreakerService, ltrStats);
}

private void addStats(
final Client client,
final ClusterService clusterService,
final LTRCircuitBreakerService ltrCircuitBreakerService
) {
final StoreStatsSupplier storeStatsSupplier = StoreStatsSupplier.create(client, clusterService);
ltrStats.addStats(StatName.LTR_STORES_STATS.getName(), new LTRStat<>(true, storeStatsSupplier));

final PluginHealthStatusSupplier pluginHealthStatusSupplier = PluginHealthStatusSupplier.create(
client, clusterService, ltrCircuitBreakerService);
ltrStats.addStats(StatName.LTR_PLUGIN_STATUS.getName(), new LTRStat<>(true, pluginHealthStatusSupplier));
}

private LTRStats getInitialStats() {
Map<String, LTRStat<?>> stats = new HashMap<>();
stats.put(StatName.LTR_CACHE_STATS.getName(),
Expand All @@ -294,7 +310,7 @@ private LTRStats getInitialStats() {

protected FeatureStoreLoader getFeatureStoreLoader() {
return (storeName, clientSupplier) ->
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
new CachedFeatureStore(new IndexFeatureStore(storeName, clientSupplier, parserFactory), caches);
}

// A simplified version of some token filters needed by the feature stores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

/**
* Supplier for an overall plugin health status.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
* A supplier which provides information on all feature stores. It provides basic
* information such as the index health and count of feature sets, features and
* models in the store.
* @deprecated This class is outdated since 3.0.0-3.0.0 and will be removed in the future.
* Please use the new stats framework in the {@link org.opensearch.ltr.stats} package.
*/
@Deprecated(since = "3.0.0-3.0.0", forRemoval = true)
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
private static final Logger LOG = LogManager.getLogger(StoreStatsSupplier.class);
private static final String AGG_FIELD = "type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.ltr.stats.suppliers;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add license header



import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.List;
import java.util.function.Supplier;

/**
* Supplier for an overall plugin health status, which is based on the
* aggregate store health and the circuit breaker state.
*/
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
private static final String STATUS_RED = "red";

private final StoreUtils storeUtils;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

protected PluginHealthStatusSupplier(StoreUtils storeUtils,
LTRCircuitBreakerService ltrCircuitBreakerService) {
this.storeUtils = storeUtils;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
public String get() {
if (ltrCircuitBreakerService.isOpen()) {
return STATUS_RED;
}
return getAggregateStoresStatus();
}

private String getAggregateStoresStatus() {
List<String> storeNames = storeUtils.getAllLtrStoreNames();
return storeNames.stream()
.map(storeUtils::getLtrStoreHealthStatus)
.reduce(STATUS_GREEN, this::combineStatuses);
}

private String combineStatuses(String s1, String s2) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the variable name more readable: s1, s2

if (s2 == null || STATUS_RED.equals(s1) || STATUS_RED.equals(s2)) {
return STATUS_RED;
} else if (STATUS_YELLOW.equals(s1) || STATUS_YELLOW.equals(s2)) {
return STATUS_YELLOW;
} else {
return STATUS_GREEN;
}
}

public static PluginHealthStatusSupplier create(
final Client client,
final ClusterService clusterService,
LTRCircuitBreakerService ltrCircuitBreakerService) {
return new PluginHealthStatusSupplier(new StoreUtils(client, clusterService), ltrCircuitBreakerService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.ltr.stats.suppliers;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A supplier to provide stats on the LTR stores. It retrieves basic information
* on the store, such as the health of the underlying index and number of documents
* in the store grouped by their type.
*/
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
static final String LTR_STORE_STATUS = "status";
static final String LTR_STORE_FEATURE_COUNT = "feature_count";
static final String LTR_STORE_FEATURE_SET_COUNT = "featureset_count";
static final String LTR_STORE_MODEL_COUNT = "model_count";

private final StoreUtils storeUtils;

protected StoreStatsSupplier(final StoreUtils storeUtils) {
this.storeUtils = storeUtils;
}

@Override
public Map<String, Map<String, Object>> get() {
Map<String, Map<String, Object>> storeStats = new ConcurrentHashMap<>();
List<String> storeNames = storeUtils.getAllLtrStoreNames();
storeNames.forEach(s -> storeStats.put(s, getStoreStat(s)));
return storeStats;
}

private Map<String, Object> getStoreStat(String storeName) {
if (!storeUtils.checkLtrStoreExists(storeName)) {
throw new IllegalArgumentException("LTR Store [" + storeName + "] doesn't exist.");
}
Map<String, Object> storeStat = new HashMap<>();
storeStat.put(LTR_STORE_STATUS, storeUtils.getLtrStoreHealthStatus(storeName));
Map<String, Integer> featureSets = storeUtils.extractFeatureSetStats(storeName);
storeStat.put(LTR_STORE_FEATURE_COUNT, featureSets.values().stream().reduce(Integer::sum).orElse(0));
storeStat.put(LTR_STORE_FEATURE_SET_COUNT, featureSets.size());
storeStat.put(LTR_STORE_MODEL_COUNT, storeUtils.getModelCount(storeName));
return storeStat;
}

public static StoreStatsSupplier create(final Client client, final ClusterService clusterService) {
return new StoreStatsSupplier(new StoreUtils(client, clusterService));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.ltr.stats.suppliers.utils;

import com.o19s.es.ltr.feature.store.StoredFeatureSet;
import com.o19s.es.ltr.feature.store.StoredLtrModel;
import com.o19s.es.ltr.feature.store.index.IndexFeatureStore;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* A utility class to provide details on the LTR stores. It queries the underlying
* indices to get the details.
*/
public class StoreUtils {

private static final String FEATURE_SET_KEY = "featureset";
private static final String FEATURE_SET_NAME_KEY = "name";
private static final String FEATURES_KEY = "features";
private Client client;
private ClusterService clusterService;
private IndexNameExpressionResolver indexNameExpressionResolver;

public StoreUtils(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(clusterService.getSettings()));
}

public boolean checkLtrStoreExists(String storeName) {
return clusterService.state().getRoutingTable().hasIndex(storeName);
}

public List<String> getAllLtrStoreNames() {
String[] names = indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
new ClusterStateRequest().indices(
IndexFeatureStore.DEFAULT_STORE, IndexFeatureStore.STORE_PREFIX + "*"));
return Arrays.asList(names);
}

public String getLtrStoreHealthStatus(String storeName) {
if (!checkLtrStoreExists(storeName)) {
throw new IndexNotFoundException(storeName);
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(
clusterService.state().metadata().index(storeName),
clusterService.state().getRoutingTable().index(storeName)
);

return indexHealth.getStatus().name().toLowerCase();
}

/**
* Returns a map of feaureset and the number of features in the featureset.
*
* @param storeName the name of the index for the LTR store.
* @return A map of (featureset, features count)
*/
@SuppressWarnings("unchecked")
public Map<String, Integer> extractFeatureSetStats(String storeName) {
final Map<String, Integer> featureSetStats = new HashMap<>();
final SearchHits featureSetHits = searchStore(storeName, StoredFeatureSet.TYPE);

for (final SearchHit featureSetHit : featureSetHits) {
extractFeatureSetFromFeatureSetHit(featureSetHit).ifPresent(featureSet -> {
final List<String> features = (List<String>) featureSet.get(FEATURES_KEY);
featureSetStats.put((String) featureSet.get(FEATURE_SET_NAME_KEY), features.size());
});
}
return featureSetStats;
}

@SuppressWarnings("unchecked")
private Optional<Map<String, Object>> extractFeatureSetFromFeatureSetHit(SearchHit featureSetHit) {
final Map<String, Object> featureSetMap = featureSetHit.getSourceAsMap();
if (featureSetMap != null && featureSetMap.containsKey(FEATURE_SET_KEY)) {
final Map<String, Object> featureSet = (Map<String, Object>) featureSetMap.get(FEATURE_SET_KEY);

if (featureSet != null && featureSet.containsKey(FEATURES_KEY) && featureSet.containsKey(FEATURE_SET_NAME_KEY)) {
return Optional.of(featureSet);
}
}

return Optional.empty();
}



// private

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remote this line ?


public long getModelCount(String storeName) {
return searchStore(storeName, StoredLtrModel.TYPE).getTotalHits().value;
}

private SearchHits searchStore(String storeName, String docType) {
return client.prepareSearch(storeName)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("type", docType))
.get()
.getHits();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.opensearch.ltr.stats.suppliers;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

public class PluginHealthStatusSupplierTests {
private PluginHealthStatusSupplier pluginHealthStatusSupplier;

@Mock
private LTRCircuitBreakerService ltrCircuitBreakerService;

@Mock
StoreUtils storeUtils;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
pluginHealthStatusSupplier =
new PluginHealthStatusSupplier(storeUtils, ltrCircuitBreakerService);
}

@Test
public void testStatusGreen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus(Mockito.anyString())).thenReturn("green");

assertEquals("green", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusYellowStoreStatusYellow() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("green");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("yellow");
assertEquals("yellow", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedStoreStatusRed() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("red");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("green");

assertEquals("red", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedCircuitBreakerOpen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(true);
assertEquals("red", pluginHealthStatusSupplier.get());
}
}
Loading
Loading