Skip to content

Commit

Permalink
[Backport 2.x] [Snapshot Interop] Fix Flakiness in Snapshot Interop C…
Browse files Browse the repository at this point in the history
…ode (#9795)

Signed-off-by: Harish Bhakuni <[email protected]>
Co-authored-by: Harish Bhakuni <[email protected]>
  • Loading branch information
harishbhakuni and Harish Bhakuni committed Feb 6, 2024
1 parent 49c0c50 commit f502ce5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -140,32 +139,21 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1, indexName2)));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -269,7 +257,6 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut
assertDocsPresentInIndex(client, restoredIndexName1Doc, numDocsInIndex1 + 2);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9326")
public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -300,32 +287,24 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

internalCluster().startDataOnlyNode();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

updateRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, false));
CreateSnapshotResponse createSnapshotResponse2 = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse2.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo2 = createSnapshot(
snapshotRepoName,
snapshotName2,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));

DeleteResponse deleteResponse = client().prepareDelete(indexName1, "0").execute().actionGet();
assertEquals(deleteResponse.getResult(), DocWriteResponse.Result.DELETED);
Expand Down Expand Up @@ -355,6 +334,10 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
assertRemoteSegmentsAndTranslogUploaded(restoredIndexName2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1);
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);

// deleting data for restoredIndexName1 and restoring from remote store.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
Expand All @@ -369,9 +352,9 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
// indexing some new docs and validating
indexDocuments(client, indexName1, numDocsInIndex1, numDocsInIndex1 + 2);
indexDocuments(client, indexName1, numDocsInIndex1 + 2, numDocsInIndex1 + 4);
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4);
}

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
Expand Down Expand Up @@ -496,18 +479,14 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException
internalCluster().startDataOnlyNode();

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1, indexName2)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
SnapshotInfo snapshotInfo1 = createSnapshot(
snapshotRepoName,
snapshotName1,
new ArrayList<>(Arrays.asList(indexName1, indexName2))
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

Settings remoteStoreIndexSettings = Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStoreRepo2Name)
Expand Down Expand Up @@ -583,18 +562,10 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
ensureGreen(indexName1);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.setIndices(indexName1)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(List.of(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

client().admin().indices().close(Requests.closeIndexRequest(indexName1)).get();
createRepository(remoteStoreRepoNameUpdated, "fs", remoteRepoPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.repositories.blobstore;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
Expand All @@ -29,16 +31,21 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class BlobStoreRepositoryHelperTests extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -124,13 +131,28 @@ protected Settings getRemoteStoreBackedIndexSettings() {
.build();
}

protected SnapshotInfo createSnapshot(String repositoryName, String snapshot, List<String> indices) {
logger.info("--> creating snapshot [{}] of {} in [{}]", snapshot, indices, repositoryName);

final CreateSnapshotResponse response = client().admin()
.cluster()
.prepareCreateSnapshot(repositoryName, snapshot)
.setIndices(indices.toArray(Strings.EMPTY_ARRAY))
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = response.getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.failedShards(), equalTo(0));
return snapshotInfo;
}

protected void indexDocuments(Client client, String indexName) {
int numDocs = randomIntBetween(10, 20);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
}
client.admin().indices().prepareFlush(indexName).get();
}

protected IndexSettings getIndexSettings(String indexName) {
Expand Down
Loading

0 comments on commit f502ce5

Please sign in to comment.