Skip to content

Commit

Permalink
[Remote Store] Integrate Remote Translog Store upload stats with _nod…
Browse files Browse the repository at this point in the history
…es/stats API

Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Sep 6, 2023
1 parent 9119b6d commit c85623e
Show file tree
Hide file tree
Showing 13 changed files with 733 additions and 326 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))
- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -1436,30 +1437,35 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).get().getShards()[0];
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
RemoteTranslogStats remoteTranslogStatsFromIndexStats = shard.getStats().getTranslog().getRemoteTranslogStats();
assertZeroRemoteTranslogStats(remoteTranslogStatsFromIndexStats);

NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats(primaryNodeName(indexName)).get();
RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromNodesStats);
RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertZeroRemoteTranslogStats(remoteTranslogStatsFromNodesStats);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
assertEquals(0, remoteSegmentStats.getUploadBytesStarted());
assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getUploadBytesFailed());
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteSegmentStats(), remoteSegmentStats);
}

private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteTranslogStats(), remoteTranslogStats);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "remote-index-1";
private static final int DATA_NODE_COUNT = 2;
private static final int CLUSTER_MANAGER_NODE_COUNT = 3;

@Before
public void setup() {
setupCustomCluster();
}

private void setupCustomCluster() {
internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT);
internalCluster().startDataOnlyNodes(DATA_NODE_COUNT);
ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT);
}

/**
* - Creates two indices with single primary shard, pinned to a single node.
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output, repeats this for the 2nd index
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
*/
public void testNodesStatsParityWithOnlyPrimaryShards() {
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)];
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

// Create first index
createIndex(
firstIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(
secondIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertNodeStatsParityOnNode(randomDataNode, firstIndex, secondIndex);
}

/**
* - Creates two indices with single primary shard and single replica
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output for both indices
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
* - Repeats the above 3 steps for the second node
*/
public void testNodesStatsParityWithReplicaShards() throws Exception {
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS);
}

/**
* Ensures that node stats shows 0 values for dedicated cluster manager nodes
* since cluster manager nodes does not participate in indexing
*/
public void testZeroRemoteStatsOnNodesStatsForClusterManager() {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
refresh(INDEX_NAME);

NodesStatsResponse nodesStatsResponseForClusterManager = client().admin()
.cluster()
.prepareNodesStats(internalCluster().getClusterManagerName())
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

assertTrue(
nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode()
&& !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode()
);
assertZeroRemoteSegmentStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats()
);
assertZeroRemoteTranslogStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getTranslog().getRemoteTranslogStats()
);

NodesStatsResponse nodesStatsResponseForDataNode = client().admin()
.cluster()
.prepareNodesStats(primaryNodeName(INDEX_NAME))
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode());
RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);

RemoteTranslogStats remoteTranslogStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertTrue(remoteTranslogStats.getUploadBytesStarted() > 0);
assertTrue(remoteTranslogStats.getUploadBytesSucceeded() > 0);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteSegmentStats(), remoteSegmentStats);
}

private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteTranslogStats(), remoteTranslogStats);
}

private static void assertNodeStatsParityAcrossNodes(String... indices) {
for (String dataNode : internalCluster().getDataNodeNames()) {
assertNodeStatsParityOnNode(dataNode, indices);
}
}

private static void assertNodeStatsParityOnNode(String dataNode, String... indices) {
RemoteSegmentStats remoteSegmentStatsCumulative = new RemoteSegmentStats();
RemoteTranslogStats remoteTranslogStatsCumulative = new RemoteTranslogStats();
for (String index : indices) {
// Fetch _remotestore/stats
RemoteStoreStatsResponse remoteStoreStats = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(index, "0")
.setLocal(true)
.get();
remoteSegmentStatsCumulative.add(new RemoteSegmentStats(remoteStoreStats.getRemoteStoreStats()[0].getSegmentStats()));
remoteTranslogStatsCumulative.add(new RemoteTranslogStats(remoteStoreStats.getRemoteStoreStats()[0].getTranslogStats()));
}

// Fetch _nodes/stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

// assert segment stats
RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertEquals(remoteSegmentStatsCumulative, remoteSegmentStatsFromNodesStats);
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (remoteSegmentStatsCumulative.getUploadBytesStarted() > 0) {
assertTrue(remoteSegmentStatsCumulative.getTotalUploadTime() > 0);
}
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (remoteSegmentStatsCumulative.getDownloadBytesStarted() > 0) {
assertTrue(remoteSegmentStatsCumulative.getTotalDownloadTime() > 0);
}

// assert translog stats
RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertEquals(remoteTranslogStatsCumulative, remoteTranslogStatsFromNodesStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ static final class RoutingFields {
/**
* Fields for remote store stats response
*/
static final class UploadStatsFields {
public static final class UploadStatsFields {
/**
* Lag in terms of bytes b/w local and remote store
*/
Expand Down Expand Up @@ -294,12 +294,12 @@ static final class UploadStatsFields {
/**
* Count of files uploaded to remote store
*/
static final String TOTAL_UPLOADS = "total_uploads";
public static final String TOTAL_UPLOADS = "total_uploads";

/**
* Represents the total uploads to remote store in bytes
*/
static final String TOTAL_UPLOAD_SIZE = "total_upload_size";
public static final String TOTAL_UPLOAD_SIZE = "total_upload_size";

/**
* Total time spent on remote store uploads
Expand Down Expand Up @@ -367,17 +367,17 @@ static final class DownloadStatsFields {
/**
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
static final String SUCCEEDED = "succeeded";
static final String FAILED = "failed";
public static final class SubFields {
public static final String STARTED = "started";
public static final String SUCCEEDED = "succeeded";
public static final String FAILED = "failed";

static final String STARTED_BYTES = "started_bytes";
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String FAILED_BYTES = "failed_bytes";
public static final String STARTED_BYTES = "started_bytes";
public static final String SUCCEEDED_BYTES = "succeeded_bytes";
public static final String FAILED_BYTES = "failed_bytes";

static final String DOWNLOAD = "download";
static final String UPLOAD = "upload";
public static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.index.shard.IndexShard;

import java.io.IOException;
import java.util.Objects;

/**
* Tracks remote store segment download and upload stats
Expand Down Expand Up @@ -300,4 +301,40 @@ static final class Fields {
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteSegmentStats that = (RemoteSegmentStats) o;
return uploadBytesStarted == that.uploadBytesStarted
&& uploadBytesFailed == that.uploadBytesFailed
&& uploadBytesSucceeded == that.uploadBytesSucceeded
&& downloadBytesStarted == that.downloadBytesStarted
&& downloadBytesFailed == that.downloadBytesFailed
&& downloadBytesSucceeded == that.downloadBytesSucceeded
&& maxRefreshTimeLag == that.maxRefreshTimeLag
&& maxRefreshBytesLag == that.maxRefreshBytesLag
&& totalRefreshBytesLag == that.totalRefreshBytesLag
&& totalUploadTime == that.totalUploadTime
&& totalDownloadTime == that.totalDownloadTime;
}

@Override
public int hashCode() {
return Objects.hash(
uploadBytesStarted,
uploadBytesFailed,
uploadBytesSucceeded,
downloadBytesStarted,
downloadBytesFailed,
downloadBytesSucceeded,
maxRefreshTimeLag,
maxRefreshBytesLag,
totalRefreshBytesLag,
totalUploadTime,
totalDownloadTime
);
}
}
11 changes: 10 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -1406,7 +1407,15 @@ public FieldDataStats fieldDataStats(String... fields) {
}

public TranslogStats translogStats() {
return getEngine().translogManager().getTranslogStats();
TranslogStats translogStats = getEngine().translogManager().getTranslogStats();
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteStoreEnabled()) {
translogStats.addRemoteTranslogStats(
new RemoteTranslogStats(remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardId).stats())
);
}

return translogStats;
}

public CompletionStats completionStats(String... fields) {
Expand Down
Loading

0 comments on commit c85623e

Please sign in to comment.