diff --git a/config/rca_cluster_manager.conf b/config/rca_cluster_manager.conf
index 20bec542e..8dc44cfc8 100644
--- a/config/rca_cluster_manager.conf
+++ b/config/rca_cluster_manager.conf
@@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,
"tags": {
- "locus": "cluster_manager-node"
+ "locus": "cluster_manager-node,data-node"
},
"remote-peers": ["ip1", "ip2", "ip3"],
diff --git a/config/rca_idle_cluster_manager.conf b/config/rca_idle_cluster_manager.conf
index 947e4207d..3e7ab32f5 100644
--- a/config/rca_idle_cluster_manager.conf
+++ b/config/rca_idle_cluster_manager.conf
@@ -21,7 +21,7 @@
"max-flow-units-per-vertex-buffer": 200,
"tags": {
- "locus": "idle-cluster_manager-node"
+ "locus": "idle-cluster_manager-node,data-node"
},
"remote-peers": ["ip1", "ip2", "ip3"],
diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/RcaController.java b/src/main/java/org/opensearch/performanceanalyzer/rca/RcaController.java
index ab89f813e..241b9c7bd 100644
--- a/src/main/java/org/opensearch/performanceanalyzer/rca/RcaController.java
+++ b/src/main/java/org/opensearch/performanceanalyzer/rca/RcaController.java
@@ -177,7 +177,8 @@ private void start() {
return;
}
- subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
+ String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
+ subscriptionManager.setCurrentLocus(currentLocus);
this.connectedComponents = getRcaGraphComponents(rcaConf);
// Mute the rca nodes after the graph creation and before the scheduler start
diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java
index bfc85fcd3..402013cf7 100644
--- a/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java
+++ b/src/main/java/org/opensearch/performanceanalyzer/rca/Version.java
@@ -19,11 +19,14 @@ public final class Version {
* transferred packets should be dropped. Every increment here should be accompanied with a line
* describing the version bump.
*
- * Note: The RCA version is agnostic of OpenSearch version.
+ *
Note: The RCA version is agnostic of OpenSearch version.
*/
static final class Major {
- // Bumping this post the Commons Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
- // and Service Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8) change
+ // Bumping this post the Commons
+ // Lib(https://github.com/opensearch-project/performance-analyzer-commons/issues/2)
+ // and Service
+ // Metrics(https://github.com/opensearch-project/performance-analyzer-commons/issues/8)
+ // change
static final int RCA_MAJ_VERSION = 1;
}
diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/util/RcaUtil.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/util/RcaUtil.java
index 274c0c0f0..a4be64344 100644
--- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/util/RcaUtil.java
+++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/util/RcaUtil.java
@@ -55,12 +55,40 @@ public static List getAnalysisGraphComponents(AnalysisGraph
return Stats.getInstance().getConnectedComponents();
}
+ /**
+ * As there is possibility for host locus tags to be hybrid, in terms of rca subscription we
+ * still have to identify the host with single tag, the most priority one.
+ */
+ public static String getPriorityLocus(String hostLocus) {
+ if (hostLocus == null || hostLocus.isEmpty()) {
+ return "";
+ }
+ List hostLociStrings =
+ Arrays.asList(hostLocus.split(RcaConsts.RcaTagConstants.SEPARATOR));
+ // Non-empty string was split -> guaranteed to be of size at least one.
+ return hostLociStrings.get(0);
+ }
+
+ public static boolean containsAny(List containerList, List containedList) {
+ for (String elem : containedList) {
+ if (containerList.contains(elem)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static boolean doTagsMatch(Node> node, RcaConf conf) {
Map rcaTagMap = conf.getTagMap();
for (Map.Entry tag : node.getTags().entrySet()) {
- String rcaConfTagvalue = rcaTagMap.get(tag.getKey());
+ String rcaConfTag = rcaTagMap.get(tag.getKey());
+ if (rcaConfTag == null) {
+ return false;
+ }
+ List rcaConfTagStrings = Arrays.asList(rcaConfTag.split(","));
+
return tag.getValue() != null
- && Arrays.asList(tag.getValue().split(",")).contains(rcaConfTagvalue);
+ && containsAny(rcaConfTagStrings, Arrays.asList(tag.getValue().split(",")));
}
return true;
}
@@ -70,12 +98,14 @@ public static boolean shouldExecuteLocally(Node> node, RcaConf conf) {
final Map nodeTagMap = node.getTags();
if (confTagMap != null && nodeTagMap != null) {
- final String hostLocus = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
+ final String hostLoci = confTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
final String nodeLoci = nodeTagMap.get(RcaConsts.RcaTagConstants.TAG_LOCUS);
if (nodeLoci != null && !nodeLoci.isEmpty()) {
List nodeLociStrings =
Arrays.asList(nodeLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
- return nodeLociStrings.contains(hostLocus);
+ List hostLociStrings =
+ Arrays.asList(hostLoci.split(RcaConsts.RcaTagConstants.SEPARATOR));
+ return containsAny(hostLociStrings, nodeLociStrings);
}
}
diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java
index e144f2ee1..b878ee9d5 100644
--- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java
+++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java
@@ -183,7 +183,8 @@ public void construct() {
// Use EVALUATION_INTERVAL_SECONDS instead of RCA_PERIOD which resolved to 12 seconds.
// This is resulting in this RCA not getting executed in every 5 seconds.
Rca> threadMetricsRca =
- new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
+ new ThreadMetricsRca(
+ threadBlockedTime, threadWaitedTime, EVALUATION_INTERVAL_SECONDS);
threadMetricsRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
@@ -502,8 +503,7 @@ private void constructShardResourceUsageGraph() {
Metric cpuUtilization = new CPU_Utilization(EVALUATION_INTERVAL_SECONDS);
cpuUtilization.addTag(
- RcaConsts.RcaTagConstants.TAG_LOCUS,
- RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
+ RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
addLeaf(cpuUtilization);
@@ -511,8 +511,7 @@ private void constructShardResourceUsageGraph() {
HotShardRca hotShardRca =
new HotShardRca(EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, cpuUtilization);
hotShardRca.addTag(
- RcaConsts.RcaTagConstants.TAG_LOCUS,
- RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
+ RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
hotShardRca.addAllUpstreams(Arrays.asList(cpuUtilization));
// Hot Shard Cluster RCA which consumes the above
diff --git a/src/test/java/org/opensearch/performanceanalyzer/rca/scheduler/RCASchedulerTaskTests.java b/src/test/java/org/opensearch/performanceanalyzer/rca/scheduler/RCASchedulerTaskTests.java
index 9763be045..9302234a8 100644
--- a/src/test/java/org/opensearch/performanceanalyzer/rca/scheduler/RCASchedulerTaskTests.java
+++ b/src/test/java/org/opensearch/performanceanalyzer/rca/scheduler/RCASchedulerTaskTests.java
@@ -6,6 +6,8 @@
package org.opensearch.performanceanalyzer.rca.scheduler;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,6 +29,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.Node;
import org.opensearch.performanceanalyzer.rca.framework.core.Queryable;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
+import org.opensearch.performanceanalyzer.rca.framework.util.RcaConsts;
import org.opensearch.performanceanalyzer.rca.framework.util.RcaUtil;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.messages.IntentMsg;
@@ -398,4 +401,51 @@ public void mergeLists() {
AssertHelper.compareLists(l1.get(i), ret.get(i));
}
}
+
+ @Test
+ public void testHybridLocusTags() {
+ Node cpuUtilization = new CPU_Utilization(5);
+ cpuUtilization.addTag(
+ RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);
+
+ Node hotShardClusterRca = new CPU_Utilization(5);
+ hotShardClusterRca.addTag(
+ RcaConsts.RcaTagConstants.TAG_LOCUS,
+ RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE);
+
+ RcaConf nonDedicatedClusterManagerConf =
+ new RcaConf() {
+ @Override
+ public Map getTagMap() {
+ return new HashMap() {
+ {
+ this.put(
+ RcaConsts.RcaTagConstants.TAG_LOCUS,
+ "cluster_manager-node,data-node");
+ }
+ };
+ }
+ };
+
+ assertTrue(RcaUtil.shouldExecuteLocally(cpuUtilization, nonDedicatedClusterManagerConf));
+ assertTrue(
+ RcaUtil.shouldExecuteLocally(hotShardClusterRca, nonDedicatedClusterManagerConf));
+
+ RcaConf dedicatedClusterManagerConf =
+ new RcaConf() {
+ @Override
+ public Map getTagMap() {
+ return new HashMap() {
+ {
+ this.put(
+ RcaConsts.RcaTagConstants.TAG_LOCUS,
+ "cluster_manager-node");
+ }
+ };
+ }
+ };
+
+ assertFalse(RcaUtil.shouldExecuteLocally(cpuUtilization, dedicatedClusterManagerConf));
+ assertTrue(RcaUtil.shouldExecuteLocally(hotShardClusterRca, dedicatedClusterManagerConf));
+ }
}
diff --git a/src/test/java/org/opensearch/performanceanalyzer/rca/store/ResourceHeatMapGraphTest.java b/src/test/java/org/opensearch/performanceanalyzer/rca/store/ResourceHeatMapGraphTest.java
index 54eb6d66d..710ca7a5d 100644
--- a/src/test/java/org/opensearch/performanceanalyzer/rca/store/ResourceHeatMapGraphTest.java
+++ b/src/test/java/org/opensearch/performanceanalyzer/rca/store/ResourceHeatMapGraphTest.java
@@ -139,7 +139,8 @@ private List createAndExecuteRcaGraph(AppContext appContext)
RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
subscriptionManager = new SubscriptionManager(new GRPCConnectionManager(false));
- subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
+ String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
+ subscriptionManager.setCurrentLocus(currentLocus);
WireHopper wireHopper =
new WireHopper(
@@ -664,7 +665,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf = new RcaConf(dataNodeRcaConf);
SubscriptionManager subscriptionManager =
new SubscriptionManager(new GRPCConnectionManager(false));
- subscriptionManager.setCurrentLocus(rcaConf.getTagMap().get("locus"));
+ String currentLocus = RcaUtil.getPriorityLocus(rcaConf.getTagMap().get("locus"));
+ subscriptionManager.setCurrentLocus(currentLocus);
AppContext appContext = RcaTestHelper.setMyIp("192.168.0.1", AllMetrics.NodeRole.DATA);
@@ -697,7 +699,8 @@ public void testHotShardClusterApiResponse() throws Exception {
RcaConf rcaConf2 = new RcaConf(clusterManagerNodeRcaConf);
SubscriptionManager subscriptionManager2 =
new SubscriptionManager(new GRPCConnectionManager(false));
- subscriptionManager2.setCurrentLocus(rcaConf2.getTagMap().get("locus"));
+ String currentLocus2 = RcaUtil.getPriorityLocus(rcaConf2.getTagMap().get("locus"));
+ subscriptionManager2.setCurrentLocus(currentLocus2);
AppContext appContextClusterManager =
RcaTestHelper.setMyIp("192.168.0.4", AllMetrics.NodeRole.ELECTED_CLUSTER_MANAGER);