diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java deleted file mode 100644 index 604c233ca49c4..0000000000000 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec; - -import org.opensearch.action.admin.indices.flush.FlushResponse; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.support.ActiveShardCount; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.engine.Segment; -import org.opensearch.index.reindex.BulkByScrollResponse; -import org.opensearch.index.reindex.ReindexAction; -import org.opensearch.index.reindex.ReindexModulePlugin; -import org.opensearch.index.reindex.ReindexRequestBuilder; -import org.opensearch.index.reindex.ReindexTestCase; -import org.opensearch.plugins.Plugin; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.util.stream.Collectors.toList; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; - -public class MultiCodecReindexIT extends ReindexTestCase { - - @Override - protected Collection> nodePlugins() { - return List.of(ReindexModulePlugin.class); - } - - public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { - internalCluster().ensureAtLeastNumDataNodes(1); - Map codecMap = Map.of( - "best_compression", - "BEST_COMPRESSION", - "zlib", - "BEST_COMPRESSION", - "default", - "BEST_SPEED", - "lz4", - "BEST_SPEED" - ); - - for (Map.Entry codec : codecMap.entrySet()) { - assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); - } - - } - - private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map codecMap) - throws ExecutionException, InterruptedException { - - final String index = "test-index" + destCodec; - final String destIndex = "dest-index" + destCodec; - - // creating source index - createIndex( - index, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", "default") - .put("index.merge.policy.max_merged_segment", "1b") - .build() - ); - ensureGreen(index); - - final int nbDocs = randomIntBetween(2, 5); - - // indexing with all 4 codecs - for (Map.Entry codec : codecMap.entrySet()) { - useCodec(index, codec.getKey()); - ingestDocs(index, nbDocs); - } - - assertTrue( - getSegments(index).stream() - .flatMap(s -> s.getAttributes().values().stream()) - .collect(Collectors.toSet()) - .containsAll(codecMap.values()) - ); - - // creating destination index with destination codec - createIndex( - destIndex, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", destCodec) - .build() - ); - - BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index) - .destination(destIndex) - .refresh(true) - .waitForActiveShards(ActiveShardCount.ONE) - .get(); - - assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated()); - assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); - } - - private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { - assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); - - assertAcked( - client().admin() - .indices() - .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) - .get() - ); - - assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); - } - - private void flushAndRefreshIndex(String index) { - - // Request is not blocked - for (String blockSetting : Arrays.asList( - SETTING_BLOCKS_READ, - SETTING_BLOCKS_WRITE, - SETTING_READ_ONLY, - SETTING_BLOCKS_METADATA, - SETTING_READ_ONLY_ALLOW_DELETE - )) { - try { - enableIndexBlock(index, blockSetting); - // flush - FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); - assertNoFailures(flushResponse); - - // refresh - RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet(); - assertNoFailures(refreshResponse); - } finally { - disableIndexBlock(index, blockSetting); - } - } - } - - private void ingestDocs(String index, int nbDocs) throws InterruptedException { - - indexRandom( - randomBoolean(), - false, - randomBoolean(), - IntStream.range(0, nbDocs) - .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) - .collect(toList()) - ); - flushAndRefreshIndex(index); - } - - private ArrayList getSegments(String index) { - - return new ArrayList<>( - client().admin() - .indices() - .segments(new IndicesSegmentsRequest(index)) - .actionGet() - .getIndices() - .get(index) - .getShards() - .get(0) - .getShards()[0].getSegments() - ); - } - -} diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java new file mode 100644 index 0000000000000..53a0545fd2ff7 --- /dev/null +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.reindex; + +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.MergePolicyProvider; +import org.opensearch.index.engine.Segment; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class MultiCodecReindexTests extends ReindexTestCase { + final static Map codecMap = Map.of( + "best_compression", + "BEST_COMPRESSION", + "zlib", + "BEST_COMPRESSION", + "default", + "BEST_SPEED", + "lz4", + "BEST_SPEED" + ); + final static String[] codecChoices = codecMap.keySet().toArray(String[]::new); + + @Override + protected Collection> nodePlugins() { + return List.of(InternalSettingsPlugin.class, ReindexModulePlugin.class); + } + + public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { + for (Map.Entry candidate : codecMap.entrySet()) { + final int nbDocs = randomIntBetween(2, 5); + + final String destCodec = candidate.getKey(); + final String destCodecMode = candidate.getValue(); + + final String index = "test-index-" + destCodec; + final String destIndex = "dest-index-" + destCodec; + + // create source index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(codecChoices)) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) + .build() + ); + ensureGreen(index); + + // index using all codecs + for (String codec : codecMap.keySet()) { + useCodec(index, codec); + ingestDocs(index, nbDocs); + } + + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + + // create destination index with destination codec + createIndex( + destIndex, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", destCodec) + .build() + ); + ensureGreen(destIndex); + + // perform reindex + BulkByScrollResponse response = reindex().source(index) + .destination(destIndex) + .refresh(true) + .waitForActiveShards(ActiveShardCount.ONE) + .get(); + final int expectedResponseSize = codecMap.size() * nbDocs; + + // assertions + assertEquals(0, response.getNoops()); + assertEquals(1, response.getBatches()); + assertEquals(0, response.getDeleted()); + assertEquals(0, response.getVersionConflicts()); + assertEquals(0, response.getBulkFailures().size()); + assertEquals(0, response.getSearchFailures().size()); + + assertEquals(expectedResponseSize, response.getTotal()); + assertEquals(expectedResponseSize, response.getCreated()); + + assertTrue(response.getTook().getMillis() > 0); + assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); + } + } + + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { + assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); + } + + private void ingestDocs(String index, int nbDocs) throws InterruptedException { + indexRandom( + randomBoolean(), + false, + randomBoolean(), + IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) + .collect(toList()) + ); + + flushAndRefresh(index); + } + + private ArrayList getSegments(String index) { + return new ArrayList<>( + client().admin() + .indices() + .segments(new IndicesSegmentsRequest(index)) + .actionGet() + .getIndices() + .get(index) + .getShards() + .get(0) + .getShards()[0].getSegments() + ); + } +}