Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add support for Remote Translog Store upload stats in _nodes/stats/ API #8908

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))
- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -1436,30 +1437,35 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).get().getShards()[0];
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
RemoteTranslogStats remoteTranslogStatsFromIndexStats = shard.getStats().getTranslog().getRemoteTranslogStats();
assertZeroRemoteTranslogStats(remoteTranslogStatsFromIndexStats);

NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats(primaryNodeName(indexName)).get();
RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromNodesStats);
RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertZeroRemoteTranslogStats(remoteTranslogStatsFromNodesStats);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
assertEquals(0, remoteSegmentStats.getUploadBytesStarted());
assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getUploadBytesFailed());
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(0, remoteSegmentStats.getTotalUploadTime());
assertEquals(0, remoteSegmentStats.getTotalDownloadTime());
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteSegmentStats(), remoteSegmentStats);
}

private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteTranslogStats(), remoteTranslogStats);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.concurrent.TimeUnit;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "remote-index-1";
private static final int DATA_NODE_COUNT = 2;
private static final int CLUSTER_MANAGER_NODE_COUNT = 3;

@Before
public void setup() {
setupCustomCluster();
}

private void setupCustomCluster() {
internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT);
internalCluster().startDataOnlyNodes(DATA_NODE_COUNT);
ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT);
}

/**
* - Creates two indices with single primary shard, pinned to a single node.
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output, repeats this for the 2nd index
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
*/
public void testNodesStatsParityWithOnlyPrimaryShards() {
String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)];
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

// Create first index
createIndex(
firstIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(
secondIndex,
Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build()
);
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertNodeStatsParityOnNode(randomDataNode, firstIndex, secondIndex);
}

/**
* - Creates two indices with single primary shard and single replica
* - Index documents in both of them and forces a fresh for both
* - Polls the _remotestore/stats API for individual index level stats
* - Adds up requisite fields from the API output for both indices
* - Polls _nodes/stats and verifies that the total values at node level adds up
* to the values capture in the previous step
* - Repeats the above 3 steps for the second node
*/
public void testNodesStatsParityWithReplicaShards() throws Exception {
String firstIndex = INDEX_NAME + "1";
String secondIndex = INDEX_NAME + "2";

createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(firstIndex);
indexSingleDoc(firstIndex, true);

// Create second index
createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build());
ensureGreen(secondIndex);
indexSingleDoc(secondIndex, true);

assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS);
}

/**
* Ensures that node stats shows 0 values for dedicated cluster manager nodes
* since cluster manager nodes does not participate in indexing
*/
public void testZeroRemoteStatsOnNodesStatsForClusterManager() {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
refresh(INDEX_NAME);

NodesStatsResponse nodesStatsResponseForClusterManager = client().admin()
.cluster()
.prepareNodesStats(internalCluster().getClusterManagerName())
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

assertTrue(
nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode()
&& !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode()
);
assertZeroRemoteSegmentStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats()
);
assertZeroRemoteTranslogStats(
nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getTranslog().getRemoteTranslogStats()
);

NodesStatsResponse nodesStatsResponseForDataNode = client().admin()
.cluster()
.prepareNodesStats(primaryNodeName(INDEX_NAME))
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode());
RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);

RemoteTranslogStats remoteTranslogStats = nodesStatsResponseForDataNode.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertTrue(remoteTranslogStats.getUploadBytesStarted() > 0);
assertTrue(remoteTranslogStats.getUploadBytesSucceeded() > 0);
}

private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteSegmentStats(), remoteSegmentStats);
}

private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) {
// Compare with fresh object because all values default to 0 in default fresh object
assertEquals(new RemoteTranslogStats(), remoteTranslogStats);
}

private static void assertNodeStatsParityAcrossNodes(String... indices) {
for (String dataNode : internalCluster().getDataNodeNames()) {
assertNodeStatsParityOnNode(dataNode, indices);
}
}

private static void assertNodeStatsParityOnNode(String dataNode, String... indices) {
RemoteSegmentStats remoteSegmentStatsCumulative = new RemoteSegmentStats();
RemoteTranslogStats remoteTranslogStatsCumulative = new RemoteTranslogStats();
for (String index : indices) {
// Fetch _remotestore/stats
RemoteStoreStatsResponse remoteStoreStats = client(dataNode).admin()
.cluster()
.prepareRemoteStoreStats(index, "0")
.setLocal(true)
.get();
remoteSegmentStatsCumulative.add(new RemoteSegmentStats(remoteStoreStats.getRemoteStoreStats()[0].getSegmentStats()));
remoteTranslogStatsCumulative.add(new RemoteTranslogStats(remoteStoreStats.getRemoteStoreStats()[0].getTranslogStats()));
}

// Fetch _nodes/stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true))
.get();

// assert segment stats
RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getSegments()
.getRemoteSegmentStats();
assertEquals(remoteSegmentStatsCumulative, remoteSegmentStatsFromNodesStats);
// Ensure that total upload time has non-zero value if there has been segments uploaded from the node
if (remoteSegmentStatsCumulative.getUploadBytesStarted() > 0) {
assertTrue(remoteSegmentStatsCumulative.getTotalUploadTime() > 0);
}
// Ensure that total download time has non-zero value if there has been segments downloaded to the node
if (remoteSegmentStatsCumulative.getDownloadBytesStarted() > 0) {
assertTrue(remoteSegmentStatsCumulative.getTotalDownloadTime() > 0);
}
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

// assert translog stats
RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes()
.get(0)
.getIndices()
.getTranslog()
.getRemoteTranslogStats();
assertEquals(remoteTranslogStatsCumulative, remoteTranslogStatsFromNodesStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ static final class RoutingFields {
/**
* Fields for remote store stats response
*/
static final class UploadStatsFields {
public static final class UploadStatsFields {
/**
* Lag in terms of bytes b/w local and remote store
*/
Expand Down Expand Up @@ -294,12 +294,12 @@ static final class UploadStatsFields {
/**
* Count of files uploaded to remote store
*/
static final String TOTAL_UPLOADS = "total_uploads";
public static final String TOTAL_UPLOADS = "total_uploads";

/**
* Represents the total uploads to remote store in bytes
*/
static final String TOTAL_UPLOAD_SIZE = "total_upload_size";
public static final String TOTAL_UPLOAD_SIZE = "total_upload_size";

/**
* Total time spent on remote store uploads
Expand Down Expand Up @@ -367,17 +367,17 @@ static final class DownloadStatsFields {
/**
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
static final String SUCCEEDED = "succeeded";
static final String FAILED = "failed";
public static final class SubFields {
public static final String STARTED = "started";
public static final String SUCCEEDED = "succeeded";
public static final String FAILED = "failed";

static final String STARTED_BYTES = "started_bytes";
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String FAILED_BYTES = "failed_bytes";
public static final String STARTED_BYTES = "started_bytes";
public static final String SUCCEEDED_BYTES = "succeeded_bytes";
public static final String FAILED_BYTES = "failed_bytes";

static final String DOWNLOAD = "download";
static final String UPLOAD = "upload";
public static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.index.shard.IndexShard;

import java.io.IOException;
import java.util.Objects;

/**
* Tracks remote store segment download and upload stats
Expand Down Expand Up @@ -300,4 +301,40 @@ static final class Fields {
static final String TOTAL_TIME_SPENT = "total_time_spent";
static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteSegmentStats that = (RemoteSegmentStats) o;
return uploadBytesStarted == that.uploadBytesStarted
&& uploadBytesFailed == that.uploadBytesFailed
&& uploadBytesSucceeded == that.uploadBytesSucceeded
&& downloadBytesStarted == that.downloadBytesStarted
&& downloadBytesFailed == that.downloadBytesFailed
&& downloadBytesSucceeded == that.downloadBytesSucceeded
&& maxRefreshTimeLag == that.maxRefreshTimeLag
&& maxRefreshBytesLag == that.maxRefreshBytesLag
&& totalRefreshBytesLag == that.totalRefreshBytesLag
&& totalUploadTime == that.totalUploadTime
&& totalDownloadTime == that.totalDownloadTime;
}

@Override
public int hashCode() {
return Objects.hash(
uploadBytesStarted,
uploadBytesFailed,
uploadBytesSucceeded,
downloadBytesStarted,
downloadBytesFailed,
downloadBytesSucceeded,
maxRefreshTimeLag,
maxRefreshBytesLag,
totalRefreshBytesLag,
totalUploadTime,
totalDownloadTime
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -1406,7 +1407,15 @@ public FieldDataStats fieldDataStats(String... fields) {
}

public TranslogStats translogStats() {
return getEngine().translogManager().getTranslogStats();
TranslogStats translogStats = getEngine().translogManager().getTranslogStats();
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteStoreEnabled()) {
translogStats.addRemoteTranslogStats(
new RemoteTranslogStats(remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardId).stats())
);
}

return translogStats;
}

public CompletionStats completionStats(String... fields) {
Expand Down
Loading