diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java new file mode 100644 index 0000000000000..a081110e6c5a1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.create; + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +import org.opensearch.Version; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.shrink.ResizeType; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.VersionUtils; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; + +public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase { + + @Override + protected boolean forbidPrivateIndexSettings() { + return false; + } + + public void testCreateCloneIndex() { + Version version = VersionUtils.randomIndexCompatibleVersion(random()); + int numPrimaryShards = randomIntBetween(1, 5); + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version) + ).get(); + final int docs = randomIntBetween(0, 128); + for (int i = 0; i < docs; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + internalCluster().ensureAtLeastNumDataNodes(2); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + ensureGreen(); + + final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); + + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")) + .get(); + try { + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setResizeType(ResizeType.CLONE) + .setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build()) + .get() + ); + ensureGreen(); + + final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get(); + assertThat(targetStats.getIndex("target").getIndexShards().keySet().size(), equalTo(numPrimaryShards)); + + final int size = docs > 0 ? 2 * docs : 1; + assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + + for (int i = docs; i < 2 * docs; i++) { + client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + flushAndRefresh(); + assertHitCount( + client().prepareSearch("target").setSize(2 * size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), + 2 * docs + ); + assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); + assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null)); + } finally { + // clean up + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null) + ) + .get(); + } + + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java new file mode 100644 index 0000000000000..282eb9c6ad95e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java @@ -0,0 +1,545 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.create; + +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedSetSelector; +import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.util.Constants; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.InternalClusterInfoService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.Murmur3HashFunction; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.VersionUtils; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.IntStream; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class RemoteShrinkIndexIT extends RemoteStoreBaseIntegTestCase { + @Override + protected boolean forbidPrivateIndexSettings() { + return false; + } + + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + public void testCreateShrinkIndexToN() { + + assumeFalse("https://github.com/elastic/elasticsearch/issues/34080", Constants.WINDOWS); + + int[][] possibleShardSplits = new int[][] { { 8, 4, 2 }, { 9, 3, 1 }, { 4, 2, 1 }, { 15, 5, 1 } }; + int[] shardSplits = randomFrom(possibleShardSplits); + assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]); + assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source") + .setId(Integer.toString(i)) + .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON) + .get(); + } + final Map dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]); + String mergeNode = discoveryNodes[0].getName(); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin() + .indices() + .prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true)) + .get(); + ensureGreen(); + // now merge source into a 4 shard index + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "first_shrink") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", shardSplits[1]) + .putNull("index.blocks.write") + .build() + ) + .get() + ); + ensureGreen(); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 0; i < 20; i++) { // now update + client().prepareIndex("first_shrink") + .setId(Integer.toString(i)) + .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON) + .get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + // relocate all shards to one node such that we can merge it. + client().admin() + .indices() + .prepareUpdateSettings("first_shrink") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true)) + .get(); + ensureGreen(); + // now merge source into a 2 shard index + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("first_shrink", "second_shrink") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", shardSplits[2]) + .putNull("index.blocks.write") + .putNull("index.routing.allocation.require._name") + .build() + ) + .get() + ); + ensureGreen(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + // let it be allocated anywhere and bump replicas + client().admin() + .indices() + .prepareUpdateSettings("second_shrink") + .setSettings(Settings.builder().putNull("index.routing.allocation.include._id").put("index.number_of_replicas", 0)) + .get(); + ensureGreen(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + + for (int i = 0; i < 20; i++) { // now update + client().prepareIndex("second_shrink") + .setId(Integer.toString(i)) + .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON) + .get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + } + + public void testShrinkIndexPrimaryTerm() throws Exception { + int numberOfShards = randomIntBetween(2, 20); + int numberOfTargetShards = randomValueOtherThanMany(n -> numberOfShards % n != 0, () -> randomIntBetween(1, numberOfShards - 1)); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); + + final Map dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertThat(dataNodes.size(), greaterThanOrEqualTo(2)); + final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]); + final String mergeNode = discoveryNodes[0].getName(); + // This needs more than the default timeout if a large number of shards were created. + ensureGreen(TimeValue.timeValueSeconds(120)); + + // fail random primary shards to force primary terms to increase + final Index source = resolveIndex("source"); + final int iterations = scaledRandomIntBetween(0, 16); + for (int i = 0; i < iterations; i++) { + final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0); + final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + final IndexService indexShards = indexServices.indexServiceSafe(source); + for (final Integer shardId : indexShards.shardIds()) { + final IndexShard shard = indexShards.getShard(shardId); + if (shard.routingEntry().primary() && randomBoolean()) { + disableAllocation("source"); + shard.failShard("test", new Exception("test")); + // this can not succeed until the shard is failed and a replica is promoted + int id = 0; + while (true) { + // find an ID that routes to the right shard, we will only index to the shard that saw a primary failure + final String s = Integer.toString(id); + final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards); + if (hash == shardId) { + final IndexRequest request = new IndexRequest("source").id(s) + .source("{ \"f\": \"" + s + "\"}", MediaTypeRegistry.JSON); + client().index(request).get(); + break; + } else { + id++; + } + } + enableAllocation("source"); + ensureGreen(); + } + } + } + + // relocate all shards to one node such that we can merge it. + final Settings.Builder prepareShrinkSettings = Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true); + client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get(); + ensureGreen(TimeValue.timeValueSeconds(120)); // needs more than the default to relocate many shards + + final IndexMetadata indexMetadata = indexMetadata(client(), "source"); + final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetadata::primaryTerm).max().getAsLong(); + + // now merge source into target + final Settings shrinkSettings = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", numberOfTargetShards) + .build(); + assertAcked(client().admin().indices().prepareResizeIndex("source", "target").setSettings(shrinkSettings).get()); + + ensureGreen(TimeValue.timeValueSeconds(120)); + + final IndexMetadata afterShrinkIndexMetadata = indexMetadata(client(), "target"); + for (int shardId = 0; shardId < numberOfTargetShards; shardId++) { + assertThat(afterShrinkIndexMetadata.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1)); + } + } + + private static IndexMetadata indexMetadata(final Client client, final String index) { + final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); + return clusterStateResponse.getState().metadata().index(index); + } + + public void testCreateShrinkIndex() { + internalCluster().ensureAtLeastNumDataNodes(2); + Version version = VersionUtils.randomVersion(random()); + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7)).put("index.version.created", version) + ).get(); + final int docs = randomIntBetween(0, 128); + for (int i = 0; i < docs; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + final Map dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin() + .indices() + .prepareUpdateSettings("source") + .setSettings( + Settings.builder() + .put("index.routing.allocation.require._name", discoveryNodes[0].getName()) + .put("index.blocks.write", true) + ) + .get(); + ensureGreen(); + + final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); + + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")) + .get(); + + // now merge source into a single shard index + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .putNull("index.blocks.write") + .putNull("index.routing.allocation.require._name") + .build() + ) + .get() + ); + ensureGreen(); + + // resolve true merge node - this is not always the node we required as all shards may be on another node + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId()); + logger.info("merge node {}", mergeNode); + + final long maxSeqNo = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getSeqNoStats) + .mapToLong(SeqNoStats::getMaxSeqNo) + .max() + .getAsLong(); + final long maxUnsafeAutoIdTimestamp = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getStats) + .map(CommonStats::getSegments) + .mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp) + .max() + .getAsLong(); + + final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get(); + for (final ShardStats shardStats : targetStats.getShards()) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + final ShardRouting shardRouting = shardStats.getShardRouting(); + assertThat("failed on " + shardRouting, seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); + assertThat("failed on " + shardRouting, seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); + assertThat( + "failed on " + shardRouting, + shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), + equalTo(maxUnsafeAutoIdTimestamp) + ); + } + + final int size = docs > 0 ? 2 * docs : 1; + assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + + for (int i = docs; i < 2 * docs; i++) { + client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("target").setSize(2 * size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 2 * docs); + assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); + assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null)); + + // clean up + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null) + ) + .get(); + } + + /** + * Tests that we can manually recover from a failed allocation due to shards being moved away etc. + */ + public void testCreateShrinkIndexFails() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7)).put("number_of_replicas", 0) + ).get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + final Map dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]); + String spareNode = discoveryNodes[0].getName(); + String mergeNode = discoveryNodes[1].getName(); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin() + .indices() + .prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true)) + .get(); + ensureGreen(); + + // now merge source into a single shard index + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings( + Settings.builder() + .put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up + .put("index.number_of_replicas", 0) + .put("index.allocation.max_retries", 1) + .build() + ) + .get(); + client().admin().cluster().prepareHealth("target").setWaitForEvents(Priority.LANGUID).get(); + + // now we move all shards away from the merge node + client().admin() + .indices() + .prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode).put("index.blocks.write", true)) + .get(); + ensureGreen("source"); + + client().admin() + .indices() + .prepareUpdateSettings("target") // erase the forcefully fuckup! + .setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")) + .get(); + // wait until it fails + assertBusy(() -> { + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + RoutingTable routingTables = clusterStateResponse.getState().routingTable(); + assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned()); + assertEquals( + UnassignedInfo.Reason.ALLOCATION_FAILED, + routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason() + ); + assertEquals(1, routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations()); + }); + client().admin() + .indices() + .prepareUpdateSettings("source") // now relocate them all to the right node + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode)) + .get(); + ensureGreen("source"); + + final InternalClusterInfoService infoService = (InternalClusterInfoService) internalCluster().getInstance( + ClusterInfoService.class, + internalCluster().getClusterManagerName() + ); + infoService.refresh(); + // kick off a retry and wait until it's done! + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + long expectedShardSize = clusterRerouteResponse.getState() + .routingTable() + .index("target") + .shard(0) + .getShards() + .get(0) + .getExpectedShardSize(); + // we support the expected shard size in the allocator to sum up over the source index shards + assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0); + ensureGreen(); + assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); + } + + public void testCreateShrinkWithIndexSort() throws Exception { + SortField expectedSortField = new SortedSetSortField("id", true, SortedSetSelector.Type.MAX); + expectedSortField.setMissingValue(SortedSetSortField.STRING_FIRST); + Sort expectedIndexSort = new Sort(expectedSortField); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings( + Settings.builder() + .put(indexSettings()) + .put("sort.field", "id") + .put("sort.order", "desc") + .put("number_of_shards", 8) + .put("number_of_replicas", 0) + ).setMapping("id", "type=keyword,doc_values=true").get(); + for (int i = 0; i < 20; i++) { + client().prepareIndex("source") + .setId(Integer.toString(i)) + .setSource("{\"foo\" : \"bar\", \"id\" : " + i + "}", MediaTypeRegistry.JSON) + .get(); + } + final Map dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]); + String mergeNode = discoveryNodes[0].getName(); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + + flushAndRefresh(); + assertSortedSegments("source", expectedIndexSort); + + // relocate all shards to one node such that we can merge it. + client().admin() + .indices() + .prepareUpdateSettings("source") + .setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true)) + .get(); + ensureGreen(); + + // check that index sort cannot be set on the target index + IllegalArgumentException exc = expectThrows( + IllegalArgumentException.class, + () -> client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", "2") + .put("index.sort.field", "foo") + .build() + ) + .get() + ); + assertThat(exc.getMessage(), containsString("can't override index sort when resizing an index")); + + // check that the index sort order of `source` is correctly applied to the `target` + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", "2") + .putNull("index.blocks.write") + .build() + ) + .get() + ); + ensureGreen(); + flushAndRefresh(); + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("target").execute().actionGet(); + assertEquals(settingsResponse.getSetting("target", "index.sort.field"), "id"); + assertEquals(settingsResponse.getSetting("target", "index.sort.order"), "desc"); + assertSortedSegments("target", expectedIndexSort); + + // ... and that the index sort is also applied to updates + for (int i = 20; i < 40; i++) { + client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + flushAndRefresh(); + assertSortedSegments("target", expectedIndexSort); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java new file mode 100644 index 0000000000000..dd4252d24f314 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java @@ -0,0 +1,506 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.indices.create; + +import org.apache.lucene.search.join.ScoreMode; +import org.apache.lucene.util.Constants; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.shrink.ResizeType; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MetadataCreateIndexService; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.Murmur3HashFunction; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.SegmentsStats; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.IntStream; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.nestedQuery; +import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; + +public class RemoteSplitIndexIT extends RemoteStoreBaseIntegTestCase { + + @Override + protected boolean forbidPrivateIndexSettings() { + return false; + } + + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + public void testCreateSplitIndexToN() throws IOException { + int[][] possibleShardSplits = new int[][] { { 2, 4, 8 }, { 3, 6, 12 }, { 1, 2, 4 } }; + int[] shardSplits = randomFrom(possibleShardSplits); + splitToN(shardSplits[0], shardSplits[1], shardSplits[2]); + } + + public void testSplitFromOneToN() { + + assumeFalse("https://github.com/elastic/elasticsearch/issues/34080", Constants.WINDOWS); + + splitToN(1, 5, 10); + client().admin().indices().prepareDelete("*").get(); + int randomSplit = randomIntBetween(2, 6); + splitToN(1, randomSplit, randomSplit * 2); + } + + private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) { + + assertEquals(sourceShards, (sourceShards * firstSplitShards) / firstSplitShards); + assertEquals(firstSplitShards, (firstSplitShards * secondSplitShards) / secondSplitShards); + internalCluster().ensureAtLeastNumDataNodes(2); + final boolean useRouting = randomBoolean(); + final boolean useNested = randomBoolean(); + final boolean useMixedRouting = useRouting ? randomBoolean() : false; + CreateIndexRequestBuilder createInitialIndex = prepareCreate("source"); + Settings.Builder settings = Settings.builder().put(indexSettings()).put("number_of_shards", sourceShards); + final boolean useRoutingPartition; + if (randomBoolean()) { + // randomly set the value manually + int routingShards = secondSplitShards * randomIntBetween(1, 10); + settings.put("index.number_of_routing_shards", routingShards); + useRoutingPartition = false; + } else { + useRoutingPartition = randomBoolean(); + } + if (useRouting && useMixedRouting == false && useRoutingPartition) { + int numRoutingShards = MetadataCreateIndexService.calculateNumRoutingShards(secondSplitShards, Version.CURRENT) - 1; + settings.put("index.routing_partition_size", randomIntBetween(1, numRoutingShards)); + if (useNested) { + createInitialIndex.setMapping("_routing", "required=true", "nested1", "type=nested"); + } else { + createInitialIndex.setMapping("_routing", "required=true"); + } + } else if (useNested) { + createInitialIndex.setMapping("nested1", "type=nested"); + } + logger.info("use routing {} use mixed routing {} use nested {}", useRouting, useMixedRouting, useNested); + createInitialIndex.setSettings(settings).get(); + + int numDocs = randomIntBetween(10, 50); + String[] routingValue = new String[numDocs]; + + BiFunction indexFunc = (index, id) -> { + try { + return client().prepareIndex(index) + .setId(Integer.toString(id)) + .setSource( + jsonBuilder().startObject() + .field("foo", "bar") + .field("i", id) + .startArray("nested1") + .startObject() + .field("n_field1", "n_value1_1") + .field("n_field2", "n_value2_1") + .endObject() + .startObject() + .field("n_field1", "n_value1_2") + .field("n_field2", "n_value2_2") + .endObject() + .endArray() + .endObject() + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + for (int i = 0; i < numDocs; i++) { + IndexRequestBuilder builder = indexFunc.apply("source", i); + if (useRouting) { + String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); + if (useMixedRouting && randomBoolean()) { + routingValue[i] = null; + } else { + routingValue[i] = routing; + } + builder.setRouting(routingValue[i]); + } + builder.get(); + } + + if (randomBoolean()) { + for (int i = 0; i < numDocs; i++) { // let's introduce some updates / deletes on the index + if (randomBoolean()) { + IndexRequestBuilder builder = indexFunc.apply("source", i); + if (useRouting) { + builder.setRouting(routingValue[i]); + } + builder.get(); + } + } + } + + ensureYellow(); + client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + ensureGreen(); + Settings.Builder firstSplitSettingsBuilder = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", firstSplitShards) + .putNull("index.blocks.write"); + if (sourceShards == 1 && useRoutingPartition == false && randomBoolean()) { // try to set it if we have a source index with 1 shard + firstSplitSettingsBuilder.put("index.number_of_routing_shards", secondSplitShards); + } + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "first_split") + .setResizeType(ResizeType.SPLIT) + .setSettings(firstSplitSettingsBuilder.build()) + .get() + ); + ensureGreen(); + assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + + for (int i = 0; i < numDocs; i++) { // now update + IndexRequestBuilder builder = indexFunc.apply("first_split", i); + if (useRouting) { + builder.setRouting(routingValue[i]); + } + builder.get(); + } + flushAndRefresh(); + assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + for (int i = 0; i < numDocs; i++) { + GetResponse getResponse = client().prepareGet("first_split", Integer.toString(i)).setRouting(routingValue[i]).get(); + assertTrue(getResponse.isExists()); + } + + client().admin() + .indices() + .prepareUpdateSettings("first_split") + .setSettings(Settings.builder().put("index.blocks.write", true)) + .get(); + ensureGreen(); + // now split source into a new index + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("first_split", "second_split") + .setResizeType(ResizeType.SPLIT) + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", secondSplitShards) + .putNull("index.blocks.write") + .build() + ) + .get() + ); + ensureGreen(); + assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + // let it be allocated anywhere and bump replicas + client().admin() + .indices() + .prepareUpdateSettings("second_split") + .setSettings(Settings.builder().put("index.number_of_replicas", 0)) + .get(); + ensureGreen(); + assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + + for (int i = 0; i < numDocs; i++) { // now update + IndexRequestBuilder builder = indexFunc.apply("second_split", i); + if (useRouting) { + builder.setRouting(routingValue[i]); + } + builder.get(); + } + flushAndRefresh(); + for (int i = 0; i < numDocs; i++) { + GetResponse getResponse = client().prepareGet("second_split", Integer.toString(i)).setRouting(routingValue[i]).get(); + assertTrue(getResponse.isExists()); + } + assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + if (useNested) { + assertNested("source", numDocs); + assertNested("first_split", numDocs); + assertNested("second_split", numDocs); + } + assertAllUniqueDocs( + client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), + numDocs + ); + assertAllUniqueDocs( + client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), + numDocs + ); + assertAllUniqueDocs(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + } + + public void assertNested(String index, int numDocs) { + // now, do a nested query + SearchResponse searchResponse = client().prepareSearch(index) + .setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"), ScoreMode.Avg)) + .get(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numDocs)); + } + + public void assertAllUniqueDocs(SearchResponse response, int numDocs) { + Set ids = new HashSet<>(); + for (int i = 0; i < response.getHits().getHits().length; i++) { + String id = response.getHits().getHits()[i].getId(); + assertTrue("found ID " + id + " more than once", ids.add(id)); + } + assertEquals(numDocs, ids.size()); + } + + public void testSplitIndexPrimaryTerm() throws Exception { + int numberOfTargetShards = randomIntBetween(2, 20); + int numberOfShards = randomValueOtherThanMany(n -> numberOfTargetShards % n != 0, () -> between(1, numberOfTargetShards - 1)); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings( + Settings.builder() + .put(indexSettings()) + .put("number_of_shards", numberOfShards) + .put("index.number_of_routing_shards", numberOfTargetShards) + ).get(); + ensureGreen(TimeValue.timeValueSeconds(120)); // needs more than the default to allocate many shards + + // fail random primary shards to force primary terms to increase + final Index source = resolveIndex("source"); + final int iterations = scaledRandomIntBetween(0, 16); + for (int i = 0; i < iterations; i++) { + final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0); + final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + final IndexService indexShards = indexServices.indexServiceSafe(source); + for (final Integer shardId : indexShards.shardIds()) { + final IndexShard shard = indexShards.getShard(shardId); + if (shard.routingEntry().primary() && randomBoolean()) { + disableAllocation("source"); + shard.failShard("test", new Exception("test")); + // this can not succeed until the shard is failed and a replica is promoted + int id = 0; + while (true) { + // find an ID that routes to the right shard, we will only index to the shard that saw a primary failure + final String s = Integer.toString(id); + final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards); + if (hash == shardId) { + final IndexRequest request = new IndexRequest("source").id(s) + .source("{ \"f\": \"" + s + "\"}", MediaTypeRegistry.JSON); + client().index(request).get(); + break; + } else { + id++; + } + } + enableAllocation("source"); + ensureGreen(); + } + } + } + + final Settings.Builder prepareSplitSettings = Settings.builder().put("index.blocks.write", true); + client().admin().indices().prepareUpdateSettings("source").setSettings(prepareSplitSettings).get(); + ensureYellow(); + + final IndexMetadata indexMetadata = indexMetadata(client(), "source"); + final long beforeSplitPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetadata::primaryTerm).max().getAsLong(); + + // now split source into target + final Settings splitSettings = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", numberOfTargetShards) + .putNull("index.blocks.write") + .build(); + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setResizeType(ResizeType.SPLIT) + .setSettings(splitSettings) + .get() + ); + + ensureGreen(TimeValue.timeValueSeconds(120)); // needs more than the default to relocate many shards + + final IndexMetadata aftersplitIndexMetadata = indexMetadata(client(), "target"); + for (int shardId = 0; shardId < numberOfTargetShards; shardId++) { + assertThat(aftersplitIndexMetadata.primaryTerm(shardId), equalTo(beforeSplitPrimaryTerm + 1)); + } + } + + private static IndexMetadata indexMetadata(final Client client, final String index) { + final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); + return clusterStateResponse.getState().metadata().index(index); + } + + public void testCreateSplitIndex() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + Version version = VersionUtils.randomIndexCompatibleVersion(random()); + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("index.version.created", version) + ).get(); + final int docs = randomIntBetween(0, 128); + for (int i = 0; i < docs; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + ensureGreen(); + + final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); + + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")) + .get(); + try { + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setResizeType(ResizeType.SPLIT) + .setSettings( + Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 2) + .putNull("index.blocks.write") + .build() + ) + .get() + ); + ensureGreen(); + + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode mergeNode = state.nodes().get(state.getRoutingTable().index("target").shard(0).primaryShard().currentNodeId()); + logger.info("split node {}", mergeNode); + + final long maxSeqNo = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getSeqNoStats) + .mapToLong(SeqNoStats::getMaxSeqNo) + .max() + .getAsLong(); + final long maxUnsafeAutoIdTimestamp = Arrays.stream(sourceStats.getShards()) + .filter(shard -> shard.getShardRouting().currentNodeId().equals(mergeNode.getId())) + .map(ShardStats::getStats) + .map(CommonStats::getSegments) + .mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp) + .max() + .getAsLong(); + + final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get(); + for (final ShardStats shardStats : targetStats.getShards()) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + final ShardRouting shardRouting = shardStats.getShardRouting(); + assertThat("failed on " + shardRouting, seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); + assertThat("failed on " + shardRouting, seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); + assertThat( + "failed on " + shardRouting, + shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), + equalTo(maxUnsafeAutoIdTimestamp) + ); + } + + final int size = docs > 0 ? 2 * docs : 1; + assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + + for (int i = docs; i < 2 * docs; i++) { + client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + flushAndRefresh(); + assertHitCount( + client().prepareSearch("target").setSize(2 * size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), + 2 * docs + ); + assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs); + GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get(); + assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null)); + } finally { + // clean up + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null) + ) + .get(); + } + + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 9e0b2a66467de..ad78c503a4a19 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -330,6 +330,8 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED); assertEquals(restoreSnapshotResponse2.status(), RestStatus.ACCEPTED); ensureGreen(indexName1, restoredIndexName2); + + assertRemoteSegmentsAndTranslogUploaded(restoredIndexName2); assertDocsPresentInIndex(client, indexName1, numDocsInIndex1); assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); // indexing some new docs and validating @@ -355,6 +357,29 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4); } + void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException { + String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID); + + Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata"); + Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data"); + Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata"); + Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data"); + + try ( + Stream translogMetadata = Files.list(remoteTranslogMetadataPath); + Stream translogData = Files.list(remoteTranslogDataPath); + Stream segmentMetadata = Files.list(segmentMetadataPath); + Stream segmentData = Files.list(segmentDataPath); + + ) { + assertTrue(translogData.count() > 0); + assertTrue(translogMetadata.count() > 0); + assertTrue(segmentMetadata.count() > 0); + assertTrue(segmentData.count() > 0); + } + + } + public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNodes(2); @@ -395,23 +420,7 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); - // Make sure remote translog is empty - String indexUUID = client().admin() - .indices() - .prepareGetSettings(indexName1) - .get() - .getSetting(indexName1, IndexMetadata.SETTING_INDEX_UUID); - - Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata"); - Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data"); - - try ( - Stream translogMetadata = Files.list(remoteTranslogMetadataPath); - Stream translogData = Files.list(remoteTranslogDataPath) - ) { - assertTrue(translogData.count() > 0); - assertTrue(translogMetadata.count() > 0); - } + assertRemoteSegmentsAndTranslogUploaded(indexName1); // Clear the local data before stopping the node. This will make sure that remote translog is empty. IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index bccca283ba772..8b4981a15433a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -56,7 +56,7 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; protected static final int SHARD_COUNT = 1; - protected static final int REPLICA_COUNT = 1; + protected static int REPLICA_COUNT = 1; protected static final String TOTAL_OPERATIONS = "total-operations"; protected static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; protected static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 352d4efc95269..32396f1a3df2e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -203,6 +203,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -2006,6 +2007,29 @@ private RemoteSegmentStoreDirectory getRemoteDirectory() { return ((RemoteSegmentStoreDirectory) remoteDirectory); } + /** + Returns true iff it is able to verify that remote segment store + is in sync with local + */ + boolean isRemoteSegmentStoreInSync() { + assert indexSettings.isRemoteStoreEnabled(); + try { + RemoteSegmentStoreDirectory directory = getRemoteDirectory(); + if (directory.readLatestMetadataFile() != null) { + // verifying that all files except EXCLUDE_FILES are uploaded to the remote + Collection uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet(); + SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + Collection localFiles = segmentInfos.files(true); + if (uploadFiles.containsAll(localFiles)) { + return true; + } + } + } catch (IOException e) { + logger.error("Exception while reading latest metadata", e); + } + return false; + } + public void preRecovery() { final IndexShardState currentState = this.state; // single volatile read if (currentState == IndexShardState.CLOSED) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 464adc88ae16f..dd40327298874 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -20,6 +20,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; @@ -179,6 +180,9 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { return this.primaryTerm != indexShard.getOperationPrimaryTerm(); } + /* + @return false if retry is needed + */ private boolean syncSegments() { if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event @@ -485,7 +489,9 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { * @return true iff primaryMode is true and index shard is not in closed state. */ private boolean isReadyForUpload() { - boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED; + boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED) + || isLocalOrSnapshotRecovery(); + if (isReady == false) { StringBuilder sb = new StringBuilder("Skipped syncing segments with"); if (indexShard.getReplicationTracker() != null) { @@ -497,11 +503,25 @@ private boolean isReadyForUpload() { if (indexShard.getEngineOrNull() != null) { sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); } + if (isLocalOrSnapshotRecovery() == false) { + sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType()); + sb.append(" primary=").append(indexShard.shardRouting.primary()); + } logger.trace(sb.toString()); } return isReady; } + private boolean isLocalOrSnapshotRecovery() { + // In this case when the primary mode is false, we need to upload segments to Remote Store + // This is required in case of snapshots/shrink/ split/clone where we need to durable persist + // all segments to remote before completing the recovery to ensure durability. + + return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary()) + && (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS + || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT); + } + /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index c0211e1257c8e..e823401e5ef7e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -191,6 +191,15 @@ void recoverFromLocalShards( // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID()); + if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteSegmentStoreInSync() == false) { + throw new IndexShardRecoveryException( + indexShard.shardId(), + "failed to upload to remote", + new IOException("Failed to upload to remote segment store") + ); + } + } return true; } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); @@ -418,6 +427,12 @@ void recoverFromSnapshotAndRemoteStore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); + if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteSegmentStoreInSync() == false) { + listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); + return; + } + } indexShard.postRecovery("restore done"); listener.onResponse(true); @@ -697,6 +712,12 @@ private void restore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); + if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteSegmentStoreInSync() == false) { + listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); + return; + } + } indexShard.postRecovery("restore done"); listener.onResponse(true); }, e -> listener.onFailure(new IndexShardRestoreFailedException(shardId, "restore failed", e))); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index fa3cf7676f55c..dc2111fdcfc56 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2850,6 +2850,7 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException { indexDoc(source, "_doc", "1"); indexDoc(source, "_doc", "2"); source.refresh("test"); + assertTrue("At lease one remote sync should have been completed", source.isRemoteSegmentStoreInSync()); assertDocs(source, "1", "2"); indexDoc(source, "_doc", "3"); source.refresh("test");