diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 0a593e6149ddd..ce5e0989b622f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.junit.Before; import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; @@ -95,11 +96,16 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends SegmentReplicationBaseIT { + @Before + private void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -125,7 +131,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); // start another node, index another doc and replicate. - String nodeC = internalCluster().startNode(); + String nodeC = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); @@ -134,10 +140,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { } public void testRestartPrimary() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); assertEquals(getNodeContainingPrimaryShard().getName(), primary); @@ -160,10 +166,10 @@ public void testRestartPrimary() throws Exception { public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final int initialDocCount = 1; @@ -190,8 +196,8 @@ public void testCancelPrimaryAllocation() throws Exception { } public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); final Settings settings = Settings.builder() .put(indexSettings()) .put( @@ -233,8 +239,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } public void testIndexReopenClose() throws Exception { - final String primary = internalCluster().startNode(); - final String replica = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -274,8 +280,8 @@ public void testMultipleShards() throws Exception { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -310,8 +316,8 @@ public void testMultipleShards() throws Exception { } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -351,14 +357,13 @@ public void testReplicationAfterForceMerge() throws Exception { * This test verifies that segment replication does not fail for closed indices */ public void testClosedIndices() { - internalCluster().startClusterManagerOnlyNode(); List nodes = new ArrayList<>(); // start 1st node so that it contains the primary - nodes.add(internalCluster().startNode()); + nodes.add(internalCluster().startDataOnlyNode()); createIndex(INDEX_NAME, super.indexSettings()); ensureYellowAndNoInitializingShards(INDEX_NAME); // start 2nd node so that it contains the replica - nodes.add(internalCluster().startNode()); + nodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); logger.info("--> Close index"); @@ -373,8 +378,7 @@ public void testClosedIndices() { * @throws Exception when issue is encountered */ public void testNodeDropWithOngoingReplication() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex( INDEX_NAME, Settings.builder() @@ -385,7 +389,7 @@ public void testNodeDropWithOngoingReplication() throws Exception { .build() ); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); // Get replica allocation id @@ -447,11 +451,11 @@ public void testNodeDropWithOngoingReplication() throws Exception { } public void testCancellation() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( SegmentReplicationSourceService.class, @@ -506,7 +510,7 @@ public void testCancellation() throws Exception { } public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -529,7 +533,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) ); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); @@ -544,8 +548,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { } public void testDeleteOperations() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -591,9 +595,9 @@ public void testDeleteOperations() throws Exception { */ public void testReplicationPostDeleteAndForceMerge() throws Exception { assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < initialDocCount; i++) { @@ -648,7 +652,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { } public void testUpdateOperations() throws Exception { - internalCluster().startClusterManagerOnlyNode(); final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); @@ -702,7 +705,6 @@ public void testDropPrimaryDuringReplication() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); final List dataNodes = internalCluster().startDataOnlyNodes(6); @@ -742,11 +744,10 @@ public void testDropPrimaryDuringReplication() throws Exception { } public void testReplicaHasDiffFilesThanPrimary() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); @@ -796,9 +797,9 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } public void testPressureServiceStats() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -848,7 +849,7 @@ public void testPressureServiceStats() throws Exception { assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size()); // start another replica. - String replicaNode_2 = internalCluster().startNode(); + String replicaNode_2 = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); String docId = String.valueOf(initialDocCount + 1); client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); @@ -887,10 +888,10 @@ public void testPressureServiceStats() throws Exception { public void testScrollCreatedOnReplica() throws Exception { assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); // create the cluster with one primary node containing primary shard and replica node containing replica shard - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); // index 100 docs @@ -981,7 +982,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { ); // create the cluster with one primary node containing primary shard and replica node containing replica shard - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); prepareCreate( INDEX_NAME, Settings.builder() @@ -989,7 +990,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { .put("index.refresh_interval", -1) ).get(); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); final int initialDocCount = 10; @@ -1104,10 +1105,10 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { } public void testPitCreatedOnReplica() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME) .setId("1") @@ -1234,13 +1235,13 @@ public void testPitCreatedOnReplica() throws Exception { */ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { final List nodes = new ArrayList<>(); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); nodes.add(primaryNode); final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); createIndex(INDEX_NAME, settings); ensureGreen(INDEX_NAME); // start a replica node, initially will be empty with no shard assignment. - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startDataOnlyNode(); nodes.add(replicaNode); // index a doc. diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java index ab0c0cc3aec77..7e79812fcfeea 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java @@ -8,7 +8,6 @@ package org.opensearch.remotestore; -import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.After; import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; @@ -26,7 +25,6 @@ * This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling * remote store. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7643") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT { @@ -49,7 +47,7 @@ protected Settings featureFlagSettings() { } @Before - public void setup() { + private void setup() { internalCluster().startClusterManagerOnlyNode(); Path absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 93d4d4bbfec8b..9938d11caca13 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1579,13 +1579,21 @@ public Tuple, ReplicationCheckpoint> getLatestSegme if (indexSettings.isSegRepEnabled() == false) { return null; } + + Tuple, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>( + new GatedCloseable<>(null, () -> {}), + ReplicationCheckpoint.empty(shardId, getDefaultCodecName()) + ); + if (getEngineOrNull() == null) { - return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())); + return nullSegmentInfosEmptyCheckpoint; } // do not close the snapshot - caller will close it. - final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { - try { + GatedCloseable snapshot = null; + try { + snapshot = getSegmentInfosSnapshot(); + if (snapshot.get() != null) { + SegmentInfos segmentInfos = snapshot.get(); return new Tuple<>( snapshot, new ReplicationCheckpoint( @@ -1601,10 +1609,18 @@ public Tuple, ReplicationCheckpoint> getLatestSegme getEngine().config().getCodec().getName() ) ); - } catch (IOException e) { - throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()))); + } catch (IOException | AlreadyClosedException e) { + logger.error("Error Fetching SegmentInfos and latest checkpoint", e); + if (snapshot != null) { + try { + snapshot.close(); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); + } + } + } + return nullSegmentInfosEmptyCheckpoint; } /** diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 1c2ddf43f8274..688f29fa1f4bf 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -282,7 +282,6 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { /** * Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt. * Snapshot and metadata files created in failed attempt should not break retry. - * @throws Exception */ public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception { int succeedOnAttempt = 1;