From 9d9a143ff7a896bcfb116fb133f84718b73dec8b Mon Sep 17 00:00:00 2001 From: Ashish Date: Tue, 27 Jun 2023 15:16:21 +0530 Subject: [PATCH] [Remote Store] Add remote segment upload backpressure integ tests (#8197) Signed-off-by: Ashish Singh --- ...emoteStoreMockRepositoryIntegTestCase.java | 5 +- .../RemoteStoreBackpressureIT.java | 117 ++++++++++++++++-- .../snapshots/mockstore/MockRepository.java | 6 +- 3 files changed, 115 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index 2bcbf3f5b614d..f57c312aa2cd0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -70,7 +70,7 @@ protected void deleteRepo() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } - protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { + protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the @@ -88,13 +88,14 @@ protected void setup(Path repoLocation, double ioFailureRate, String skipExcepti .put("max_failure_number", maxFailure) ); - internalCluster().startDataOnlyNodes(1); + String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0); createIndex(INDEX_NAME); logger.info("--> Created index={}", INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); logger.info("--> Cluster is yellow with no initializing shards"); ensureGreen(INDEX_NAME); logger.info("--> Cluster is green"); + return dataNodeName; } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index c46eab6468c6b..64d5f06f061a9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -11,40 +11,111 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception { + // Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions + // fail leading to consecutive failure limit getting exceeded and leading to rejections. + validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count"); + } + + public void testWritesRejectedDueToBytesLagBreach() throws Exception { + // Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the + // indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections. + validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag"); + } - public void testWritesRejected() { + public void testWritesRejectedDueToTimeLagBreach() throws Exception { + // Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the + // indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections. + validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag"); + } + + private void validateBackpressure( + int initialDocSize, + int initialDocsToIndex, + int onFailureDocSize, + int onFailureDocsToIndex, + String breachMode + ) throws Exception { Path location = randomRepoPath().toAbsolutePath(); - setup(location, 1d, "metadata", Long.MAX_VALUE); + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); - Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); + Settings request = Settings.builder() + .put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 10) + .build(); ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() .cluster() .prepareUpdateSettings() .setPersistentSettings(request) .get(); assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); + assertEquals(clusterUpdateResponse.getPersistentSettings().get(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey()), "10"); logger.info("--> Indexing data"); + + String jsonString = generateString(initialDocSize); + BytesReference initialSource = new BytesArray(jsonString); + indexDocAndRefresh(initialSource, initialDocsToIndex); + + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME)) + .setRandomControlIOExceptionRate(1d); + + jsonString = generateString(onFailureDocSize); + BytesReference onFailureSource = new BytesArray(jsonString); OpenSearchRejectedExecutionException ex = assertThrows( OpenSearchRejectedExecutionException.class, - () -> indexData(randomIntBetween(10, 20), randomBoolean()) + () -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex) ); assertTrue(ex.getMessage().contains("rejected execution on primary shard")); + assertTrue(ex.getMessage().contains(breachMode)); + + RemoteRefreshSegmentTracker.Stats stats = stats(); + assertTrue(stats.bytesLag > 0); + assertTrue(stats.refreshTimeLagMs > 0); + assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0); + assertTrue(stats.rejectionCount > 0); + + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME)) + .setRandomControlIOExceptionRate(0d); + + assertBusy(() -> { + RemoteRefreshSegmentTracker.Stats finalStats = stats(); + assertEquals(0, finalStats.bytesLag); + assertEquals(0, finalStats.refreshTimeLagMs); + assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber); + }, 30, TimeUnit.SECONDS); + + long rejectionCount = stats.rejectionCount; + stats = stats(); + indexDocAndRefresh(initialSource, initialDocsToIndex); + assertEquals(rejectionCount, stats.rejectionCount); + deleteRepo(); + } + + private RemoteRefreshSegmentTracker.Stats stats() { String shardId = "0"; RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); @@ -52,11 +123,37 @@ public void testWritesRejected() { .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); - assertTrue(stats.bytesLag > 0); - assertTrue(stats.refreshTimeLagMs > 0); - assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0); - assertTrue(stats.rejectionCount > 0); - deleteRepo(); + return matches.get(0).getStats(); + } + + private void indexDocAndRefresh(BytesReference source, int iterations) { + for (int i = 0; i < iterations; i++) { + client().prepareIndex(INDEX_NAME).setSource(source, XContentType.JSON).get(); + refresh(INDEX_NAME); + } + } + + /** + * Generates string of given sizeInBytes + * + * @param sizeInBytes size of the string + * @return the generated string + */ + private String generateString(int sizeInBytes) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + int i = 0; + // Based on local tests, 1 char is occupying 1 byte + while (sb.length() < sizeInBytes) { + String key = "field" + i; + String value = "value" + i; + sb.append("\"").append(key).append("\":\"").append(value).append("\","); + i++; + } + if (sb.length() > 1 && sb.charAt(sb.length() - 1) == ',') { + sb.setLength(sb.length() - 1); + } + sb.append("}"); + return sb.toString(); } } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index fcaf9f6c900d3..7a7c4bd448c55 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -114,7 +114,7 @@ public long getFailureCount() { return failureCounter.get(); } - private final double randomControlIOExceptionRate; + private volatile double randomControlIOExceptionRate; private final double randomDataFileIOExceptionRate; @@ -246,6 +246,10 @@ public synchronized void unblock() { this.notifyAll(); } + public void setRandomControlIOExceptionRate(double randomControlIOExceptionRate) { + this.randomControlIOExceptionRate = randomControlIOExceptionRate; + } + public void blockOnDataFiles(boolean blocked) { blockOnDataFiles = blocked; }