Skip to content

Commit

Permalink
[Remote Store] Add cumulative bytes lag to NodesStats (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#9393)

---------

Signed-off-by: Shourya Dutta Biswas <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shourya035 authored and shiv0408 committed Apr 25, 2024
1 parent 13238a8 commit 3ea31c0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,6 +77,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -85,9 +86,11 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();

cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -101,6 +104,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down Expand Up @@ -173,6 +177,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand All @@ -181,7 +186,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -197,6 +202,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -214,6 +220,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -230,6 +237,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long maxRefreshBytesLag;
/**
* Total refresh lag (in bytes) between local and the remote store
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;

public RemoteSegmentStats() {}

Expand All @@ -73,6 +79,19 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
downloadBytesSucceeded = in.readLong();
maxRefreshTimeLag = in.readLong();
maxRefreshBytesLag = in.readLong();
/* TODO:
Adding version checks here since the base PR of adding remote store stats
in SegmentStats has already been merged and backported to 2.x branch.
Since this is a new field that is being added, we need to have this check in place
to ensure BWCs don't break.
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
totalRefreshBytesLag = in.readLong();
}
}

/**
Expand All @@ -91,7 +110,11 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
// Initializing both total and max bytes lag to the same `bytesLag`
// value from the tracker object
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
}

// Getter and setters. All are visible for testing
Expand Down Expand Up @@ -155,8 +178,16 @@ public long getMaxRefreshBytesLag() {
return maxRefreshBytesLag;
}

public void setMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = maxRefreshBytesLag;
public void addMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, maxRefreshBytesLag);
}

public long getTotalRefreshBytesLag() {
return totalRefreshBytesLag;
}

public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

/**
Expand All @@ -174,6 +205,7 @@ public void add(RemoteSegmentStats existingStats) {
this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded();
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
}
}

Expand All @@ -187,33 +219,53 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(downloadBytesSucceeded);
out.writeLong(maxRefreshTimeLag);
out.writeLong(maxRefreshBytesLag);
/* TODO:
Adding version checks here since the base PR of adding remote store stats
in SegmentStats has already been merged and backported to 2.x branch.
Since this is a new field that is being added, we need to have this check in place
to ensure BWCs don't break.
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(totalRefreshBytesLag);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REMOTE_STORE);
builder.startObject(Fields.UPLOAD);
buildUploadStats(builder);
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
buildDownloadStats(builder);
builder.endObject();
builder.endObject();
return builder;
}

private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_UPLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(
Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS,
Fields.MAX_REFRESH_SIZE_LAG,
new ByteSizeValue(maxRefreshBytesLag)
);
builder.startObject(Fields.REFRESH_SIZE_LAG);
builder.humanReadableField(Fields.TOTAL_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag));
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_DOWNLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
}

static final class Fields {
Expand All @@ -222,15 +274,18 @@ static final class Fields {
static final String DOWNLOAD = "download";
static final String TOTAL_UPLOADS = "total_uploads";
static final String TOTAL_DOWNLOADS = "total_downloads";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String REFRESH_SIZE_LAG = "refresh_size_lag";
static final String STARTED = "started";
static final String STARTED_BYTES = "started_bytes";
static final String FAILED = "failed";
static final String FAILED_BYTES = "failed_bytes";
static final String SUCCEEDED = "succeeded";
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String MAX_REFRESH_SIZE_LAG = "max_refresh_size_lag";
static final String MAX_REFRESH_SIZE_LAG_IN_MILLIS = "max_refresh_size_lag_in_bytes";
static final String TOTAL = "total";
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ public void testSerialization() throws IOException {
assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed());
assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag());
}
}
}
Expand Down Expand Up @@ -789,7 +790,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
remoteSegmentStats.addDownloadBytesStarted(10L);
remoteSegmentStats.addDownloadBytesSucceeded(10L);
remoteSegmentStats.addDownloadBytesFailed(1L);
remoteSegmentStats.setMaxRefreshBytesLag(5L);
remoteSegmentStats.addTotalRefreshBytesLag(5L);
remoteSegmentStats.addMaxRefreshBytesLag(2L);
remoteSegmentStats.setMaxRefreshTimeLag(2L);
}
return indicesStats;
Expand Down

0 comments on commit 3ea31c0

Please sign in to comment.