Skip to content

Commit

Permalink
[Remote Store] Add cumulative bytes lag to NodesStats (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#9393)

---------

Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 authored and kaushalmahi12 committed Sep 12, 2023
1 parent 660a535 commit 61f7cf0
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
indexSingleDoc(secondIndex, true);

long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin()
.cluster()
Expand All @@ -77,6 +77,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -85,9 +86,11 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
.prepareRemoteStoreStats(secondIndex, "0")
.setLocal(true)
.get();

cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded;
cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted;
cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -101,6 +104,7 @@ public void testNodesStatsParityWithOnlyPrimaryShards() {
assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted());
assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down Expand Up @@ -173,6 +177,7 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats)
assertEquals(0, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(0, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand All @@ -181,7 +186,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
for (String dataNode : internalCluster().getDataNodeNames()) {
long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0;
long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0;
long max_bytes_lag = 0, max_time_lag = 0;
long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0;
// Fetch upload stats
RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin()
.cluster()
Expand All @@ -197,6 +202,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -214,6 +220,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted;
cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0]
.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed;
total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag;
max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag);
max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs);

Expand All @@ -230,6 +237,7 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s
assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded());
assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted());
assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed());
assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag());
assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment {
* Used to check for data freshness in the remote store
*/
private long maxRefreshBytesLag;
/**
* Total refresh lag (in bytes) between local and the remote store
* Used to check for data freshness in the remote store
*/
private long totalRefreshBytesLag;

public RemoteSegmentStats() {}

Expand All @@ -73,6 +79,19 @@ public RemoteSegmentStats(StreamInput in) throws IOException {
downloadBytesSucceeded = in.readLong();
maxRefreshTimeLag = in.readLong();
maxRefreshBytesLag = in.readLong();
/* TODO:
Adding version checks here since the base PR of adding remote store stats
in SegmentStats has already been merged and backported to 2.x branch.
Since this is a new field that is being added, we need to have this check in place
to ensure BWCs don't break.
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
totalRefreshBytesLag = in.readLong();
}
}

/**
Expand All @@ -91,7 +110,11 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) {
this.downloadBytesStarted = trackerStats.directoryFileTransferTrackerStats.transferredBytesStarted;
this.downloadBytesFailed = trackerStats.directoryFileTransferTrackerStats.transferredBytesFailed;
this.maxRefreshTimeLag = trackerStats.refreshTimeLagMs;
// Initializing both total and max bytes lag to the same `bytesLag`
// value from the tracker object
// Aggregations would be performed on the add method
this.maxRefreshBytesLag = trackerStats.bytesLag;
this.totalRefreshBytesLag = trackerStats.bytesLag;
}

// Getter and setters. All are visible for testing
Expand Down Expand Up @@ -155,8 +178,16 @@ public long getMaxRefreshBytesLag() {
return maxRefreshBytesLag;
}

public void setMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = maxRefreshBytesLag;
public void addMaxRefreshBytesLag(long maxRefreshBytesLag) {
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, maxRefreshBytesLag);
}

public long getTotalRefreshBytesLag() {
return totalRefreshBytesLag;
}

public void addTotalRefreshBytesLag(long totalRefreshBytesLag) {
this.totalRefreshBytesLag += totalRefreshBytesLag;
}

/**
Expand All @@ -174,6 +205,7 @@ public void add(RemoteSegmentStats existingStats) {
this.downloadBytesSucceeded += existingStats.getDownloadBytesSucceeded();
this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag());
this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag());
this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag();
}
}

Expand All @@ -187,33 +219,53 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(downloadBytesSucceeded);
out.writeLong(maxRefreshTimeLag);
out.writeLong(maxRefreshBytesLag);
/* TODO:
Adding version checks here since the base PR of adding remote store stats
in SegmentStats has already been merged and backported to 2.x branch.
Since this is a new field that is being added, we need to have this check in place
to ensure BWCs don't break.
This would have to be removed after the new field addition PRs are also backported to 2.x.
If possible we would need to ensure that all field addition PRs are backported at once
*/
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeLong(totalRefreshBytesLag);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REMOTE_STORE);
builder.startObject(Fields.UPLOAD);
buildUploadStats(builder);
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
buildDownloadStats(builder);
builder.endObject();
builder.endObject();
return builder;
}

private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_UPLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(uploadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(uploadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(
Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS,
Fields.MAX_REFRESH_SIZE_LAG,
new ByteSizeValue(maxRefreshBytesLag)
);
builder.startObject(Fields.REFRESH_SIZE_LAG);
builder.humanReadableField(Fields.TOTAL_BYTES, Fields.TOTAL, new ByteSizeValue(totalRefreshBytesLag));
builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag));
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
}

private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.startObject(Fields.TOTAL_DOWNLOADS);
builder.humanReadableField(Fields.STARTED_BYTES, Fields.STARTED, new ByteSizeValue(downloadBytesStarted));
builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded));
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed));
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
}

static final class Fields {
Expand All @@ -222,15 +274,18 @@ static final class Fields {
static final String DOWNLOAD = "download";
static final String TOTAL_UPLOADS = "total_uploads";
static final String TOTAL_DOWNLOADS = "total_downloads";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String REFRESH_SIZE_LAG = "refresh_size_lag";
static final String STARTED = "started";
static final String STARTED_BYTES = "started_bytes";
static final String FAILED = "failed";
static final String FAILED_BYTES = "failed_bytes";
static final String SUCCEEDED = "succeeded";
static final String SUCCEEDED_BYTES = "succeeded_bytes";
static final String MAX_REFRESH_TIME_LAG = "max_refresh_time_lag";
static final String MAX_REFRESH_TIME_LAG_IN_MILLIS = "max_refresh_time_lag_in_millis";
static final String MAX_REFRESH_SIZE_LAG = "max_refresh_size_lag";
static final String MAX_REFRESH_SIZE_LAG_IN_MILLIS = "max_refresh_size_lag_in_bytes";
static final String TOTAL = "total";
static final String TOTAL_BYTES = "total_bytes";
static final String MAX = "max";
static final String MAX_BYTES = "max_bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ public void testSerialization() throws IOException {
assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed());
assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag());
assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag());
}
}
}
Expand Down Expand Up @@ -789,7 +790,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
remoteSegmentStats.addDownloadBytesStarted(10L);
remoteSegmentStats.addDownloadBytesSucceeded(10L);
remoteSegmentStats.addDownloadBytesFailed(1L);
remoteSegmentStats.setMaxRefreshBytesLag(5L);
remoteSegmentStats.addTotalRefreshBytesLag(5L);
remoteSegmentStats.addMaxRefreshBytesLag(2L);
remoteSegmentStats.setMaxRefreshTimeLag(2L);
}
return indicesStats;
Expand Down

0 comments on commit 61f7cf0

Please sign in to comment.