Skip to content

Commit

Permalink
feat: implemented plugin health status and store stats supplier
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Peter <[email protected]>
  • Loading branch information
JohannesDaniel committed Dec 5, 2024
1 parent 8217ea6 commit a85c223
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.ltr.stats.suppliers;


import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.List;
import java.util.function.Supplier;

/**
* Supplier for an overall plugin health status, which is based on the
* aggregate store health and the circuit breaker state.
*/
public class PluginHealthStatusSupplier implements Supplier<String> {
private static final String STATUS_GREEN = "green";
private static final String STATUS_YELLOW = "yellow";
private static final String STATUS_RED = "red";

private final StoreUtils storeUtils;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

protected PluginHealthStatusSupplier(StoreUtils storeUtils,
LTRCircuitBreakerService ltrCircuitBreakerService) {
this.storeUtils = storeUtils;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
public String get() {
if (ltrCircuitBreakerService.isOpen()) {
return STATUS_RED;
}
return getAggregateStoresStatus();
}

private String getAggregateStoresStatus() {
List<String> storeNames = storeUtils.getAllLtrStoreNames();
return storeNames.stream()
.map(storeUtils::getLtrStoreHealthStatus)
.reduce(STATUS_GREEN, this::combineStatuses);
}

private String combineStatuses(String s1, String s2) {
if (s2 == null || STATUS_RED.equals(s1) || STATUS_RED.equals(s2)) {
return STATUS_RED;
} else if (STATUS_YELLOW.equals(s1) || STATUS_YELLOW.equals(s2)) {
return STATUS_YELLOW;
} else {
return STATUS_GREEN;
}
}

public static PluginHealthStatusSupplier create(
final Client client,
final ClusterService clusterService,
LTRCircuitBreakerService ltrCircuitBreakerService) {
return new PluginHealthStatusSupplier(new StoreUtils(client, clusterService), ltrCircuitBreakerService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.ltr.stats.suppliers;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A supplier to provide stats on the LTR stores. It retrieves basic information
* on the store, such as the health of the underlying index and number of documents
* in the store grouped by their type.
*/
public class StoreStatsSupplier implements Supplier<Map<String, Map<String, Object>>> {
static final String LTR_STORE_STATUS = "status";
static final String LTR_STORE_FEATURE_COUNT = "feature_count";
static final String LTR_STORE_FEATURE_SET_COUNT = "featureset_count";
static final String LTR_STORE_MODEL_COUNT = "model_count";

private final StoreUtils storeUtils;

protected StoreStatsSupplier(final StoreUtils storeUtils) {
this.storeUtils = storeUtils;
}

@Override
public Map<String, Map<String, Object>> get() {
Map<String, Map<String, Object>> storeStats = new ConcurrentHashMap<>();
List<String> storeNames = storeUtils.getAllLtrStoreNames();
storeNames.forEach(s -> storeStats.put(s, getStoreStat(s)));
return storeStats;
}

private Map<String, Object> getStoreStat(String storeName) {
if (!storeUtils.checkLtrStoreExists(storeName)) {
throw new IllegalArgumentException("LTR Store [" + storeName + "] doesn't exist.");
}
Map<String, Object> storeStat = new HashMap<>();
storeStat.put(LTR_STORE_STATUS, storeUtils.getLtrStoreHealthStatus(storeName));
Map<String, Integer> featureSets = storeUtils.extractFeatureSetStats(storeName);
storeStat.put(LTR_STORE_FEATURE_COUNT, featureSets.values().stream().reduce(Integer::sum).orElse(0));
storeStat.put(LTR_STORE_FEATURE_SET_COUNT, featureSets.size());
storeStat.put(LTR_STORE_MODEL_COUNT, storeUtils.getModelCount(storeName));
return storeStat;
}

public static StoreStatsSupplier create(final Client client, final ClusterService clusterService) {
return new StoreStatsSupplier(new StoreUtils(client, clusterService));
}
}
115 changes: 115 additions & 0 deletions src/main/java/org/opensearch/ltr/stats/suppliers/utils/StoreUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.ltr.stats.suppliers.utils;

import com.o19s.es.ltr.feature.store.StoredFeatureSet;
import com.o19s.es.ltr.feature.store.StoredLtrModel;
import com.o19s.es.ltr.feature.store.index.IndexFeatureStore;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* A utility class to provide details on the LTR stores. It queries the underlying
* indices to get the details.
*/
public class StoreUtils {

private static final String FEATURE_SET_KEY = "featureset";
private static final String FEATURE_SET_NAME_KEY = "name";
private static final String FEATURES_KEY = "features";
private Client client;
private ClusterService clusterService;
private IndexNameExpressionResolver indexNameExpressionResolver;

public StoreUtils(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(clusterService.getSettings()));
}

public boolean checkLtrStoreExists(String storeName) {
return clusterService.state().getRoutingTable().hasIndex(storeName);
}

public List<String> getAllLtrStoreNames() {
String[] names = indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
new ClusterStateRequest().indices(
IndexFeatureStore.DEFAULT_STORE, IndexFeatureStore.STORE_PREFIX + "*"));
return Arrays.asList(names);
}

public String getLtrStoreHealthStatus(String storeName) {
if (!checkLtrStoreExists(storeName)) {
throw new IndexNotFoundException(storeName);
}
ClusterIndexHealth indexHealth = new ClusterIndexHealth(
clusterService.state().metadata().index(storeName),
clusterService.state().getRoutingTable().index(storeName)
);

return indexHealth.getStatus().name().toLowerCase();
}

/**
* Returns a map of feaureset and the number of features in the featureset.
*
* @param storeName the name of the index for the LTR store.
* @return A map of (featureset, features count)
*/
@SuppressWarnings("unchecked")
public Map<String, Integer> extractFeatureSetStats(String storeName) {
final Map<String, Integer> featureSetStats = new HashMap<>();
final SearchHits featureSetHits = searchStore(storeName, StoredFeatureSet.TYPE);

for (final SearchHit featureSetHit : featureSetHits) {
extractFeatureSetFromFeatureSetHit(featureSetHit).ifPresent(featureSet -> {
final List<String> features = (List<String>) featureSet.get(FEATURES_KEY);
featureSetStats.put((String) featureSet.get(FEATURE_SET_NAME_KEY), features.size());
});
}
return featureSetStats;
}

@SuppressWarnings("unchecked")
private Optional<Map<String, Object>> extractFeatureSetFromFeatureSetHit(SearchHit featureSetHit) {
final Map<String, Object> featureSetMap = featureSetHit.getSourceAsMap();
if (featureSetMap != null && featureSetMap.containsKey(FEATURE_SET_KEY)) {
final Map<String, Object> featureSet = (Map<String, Object>) featureSetMap.get(FEATURE_SET_KEY);

if (featureSet != null && featureSet.containsKey(FEATURES_KEY) && featureSet.containsKey(FEATURE_SET_NAME_KEY)) {
return Optional.of(featureSet);
}
}

return Optional.empty();
}



// private

public long getModelCount(String storeName) {
return searchStore(storeName, StoredLtrModel.TYPE).getTotalHits().value;
}

private SearchHits searchStore(String storeName, String docType) {
return client.prepareSearch(storeName)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("type", docType))
.get()
.getHits();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.opensearch.ltr.stats.suppliers;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;

import java.util.Arrays;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

public class PluginHealthStatusSupplierTests {
private PluginHealthStatusSupplier pluginHealthStatusSupplier;

@Mock
private LTRCircuitBreakerService ltrCircuitBreakerService;

@Mock
StoreUtils storeUtils;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
pluginHealthStatusSupplier =
new PluginHealthStatusSupplier(storeUtils, ltrCircuitBreakerService);
}

@Test
public void testStatusGreen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus(Mockito.anyString())).thenReturn("green");

assertEquals("green", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusYellowStoreStatusYellow() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("green");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("yellow");
assertEquals("yellow", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedStoreStatusRed() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(false);
when(storeUtils.getAllLtrStoreNames()).thenReturn(Arrays.asList("store1", "store2"));
when(storeUtils.getLtrStoreHealthStatus("store1")).thenReturn("red");
when(storeUtils.getLtrStoreHealthStatus("store2")).thenReturn("green");

assertEquals("red", pluginHealthStatusSupplier.get());
}

@Test
public void testStatusRedCircuitBreakerOpen() {
when(ltrCircuitBreakerService.isOpen()).thenReturn(true);
assertEquals("red", pluginHealthStatusSupplier.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.ltr.stats.suppliers;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.ltr.stats.suppliers.utils.StoreUtils;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.when;

public class StoreStatsSupplierTests extends OpenSearchTestCase {
private static final String STORE_NAME = ".ltrstore";

@Mock
private StoreUtils storeUtils;

private StoreStatsSupplier storeStatsSupplier;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
storeStatsSupplier = new StoreStatsSupplier(storeUtils);
}

@Test
public void getStoreStats_NoLtrStore() {
when(storeUtils.getAllLtrStoreNames()).thenReturn(Collections.emptyList());
Map<String, Map<String, Object>> stats = storeStatsSupplier.get();
assertTrue(stats.isEmpty());
}

@Test
public void getStoreStats_Success() {
when(storeUtils.getAllLtrStoreNames()).thenReturn(Collections.singletonList(STORE_NAME));
when(storeUtils.checkLtrStoreExists(STORE_NAME)).thenReturn(true);
when(storeUtils.getLtrStoreHealthStatus(STORE_NAME)).thenReturn("green");
Map<String, Integer> featureSets = new HashMap<>();
featureSets.put("featureset_1", 10);
when(storeUtils.extractFeatureSetStats(STORE_NAME)).thenReturn(featureSets);
when(storeUtils.getModelCount(STORE_NAME)).thenReturn(5L);

Map<String, Map<String, Object>> stats = storeStatsSupplier.get();
Map<String, Object> ltrStoreStats = stats.get(STORE_NAME);

assertNotNull(ltrStoreStats);
assertEquals("green", ltrStoreStats.get(StoreStatsSupplier.LTR_STORE_STATUS));
assertEquals(10, ltrStoreStats.get(StoreStatsSupplier.LTR_STORE_FEATURE_COUNT));
assertEquals(1, ltrStoreStats.get(StoreStatsSupplier.LTR_STORE_FEATURE_SET_COUNT));
assertEquals(5L, ltrStoreStats.get(StoreStatsSupplier.LTR_STORE_MODEL_COUNT));
}
}
Loading

0 comments on commit a85c223

Please sign in to comment.