diff --git a/CHANGELOG.md b/CHANGELOG.md index 109af054e..ba04febcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Bug Fixes - Fix for missing HybridQuery results when concurrent segment search is enabled ([#800](https://github.com/opensearch-project/neural-search/pull/800)) ### Infrastructure +- Add BWC for batch ingestion ([#769](https://github.com/opensearch-project/neural-search/pull/769)) ### Documentation ### Maintenance ### Refactoring diff --git a/qa/restart-upgrade/build.gradle b/qa/restart-upgrade/build.gradle index ce29c77ca..dc6e4504e 100644 --- a/qa/restart-upgrade/build.gradle +++ b/qa/restart-upgrade/build.gradle @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } @@ -146,10 +147,11 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search tests and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java new file mode 100644 index 000000000..0e490e2e4 --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase { + private static final String PIPELINE_NAME = "pipeline-BatchIngestionIT"; + private static final String TEXT_FIELD_NAME = "passage_text"; + private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; + private static final int batchSize = 3; + + public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + String indexName = getIndexNameForTest(); + if (isRunningAgainstOldCluster()) { + String modelId = uploadSparseEncodingModel(); + loadModel(modelId); + createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + indexName, + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + PIPELINE_NAME + ); + List> docs = prepareDataForBulkIngestion(0, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); + } else { + String modelId = null; + modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(modelId); + try { + List> docs = prepareDataForBulkIngestion(5, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); + validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); + } finally { + wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null); + } + } + } + +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java index ca314300c..ba44eba9a 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java @@ -56,20 +56,21 @@ private void createChunkingIndex(String indexName) throws Exception { createIndexWithConfiguration(indexName, indexSetting, PIPELINE_NAME); } - private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) { - int docCount = getDocCount(indexName); - assertEquals(documentCount, docCount); + private Map getFirstDocumentInQuery(String indexName, int resultSize) { MatchAllQueryBuilder query = new MatchAllQueryBuilder(); - Map searchResults = search(indexName, query, 10); + Map searchResults = search(indexName, query, resultSize); assertNotNull(searchResults); - Map document = getFirstInnerHit(searchResults); - assertNotNull(document); - Object documentSource = document.get("_source"); - assert (documentSource instanceof Map); - @SuppressWarnings("unchecked") - Map documentSourceMap = (Map) documentSource; - assert (documentSourceMap).containsKey(fieldName); - Object ingestOutputs = documentSourceMap.get(fieldName); - assertEquals(expected, ingestOutputs); + return getFirstInnerHit(searchResults); + } + + private void validateTestIndex(String indexName, String fieldName, int documentCount, Object expected) { + Object outputs = validateDocCountAndInfo( + indexName, + documentCount, + () -> getFirstDocumentInQuery(indexName, 10), + fieldName, + List.class + ); + assertEquals(expected, outputs); } } diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index c2c03b824..1146bac19 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -90,10 +90,11 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } @@ -147,10 +148,11 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } @@ -203,10 +205,11 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } @@ -259,10 +262,11 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { } } - // Excluding the k-NN radial search tests because we introduce this feature in 2.14 + // Excluding the k-NN radial search and batch ingestion tests because we introduce these features in 2.14 if (ext.neural_search_bwc_version.startsWith("2.9") || ext.neural_search_bwc_version.startsWith("2.10") || ext.neural_search_bwc_version.startsWith("2.11") || ext.neural_search_bwc_version.startsWith("2.12") || ext.neural_search_bwc_version.startsWith("2.13")){ filter { excludeTestsMatching "org.opensearch.neuralsearch.bwc.KnnRadialSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.BatchIngestionIT.*" } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java new file mode 100644 index 000000000..3052b48cd --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import org.opensearch.neuralsearch.util.TestUtils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; +import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; + +public class BatchIngestionIT extends AbstractRollingUpgradeTestCase { + private static final String SPARSE_PIPELINE = "BatchIngestionIT_sparse_pipeline_rolling"; + private static final String TEXT_FIELD_NAME = "passage_text"; + private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; + + public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + String indexName = getIndexNameForTest(); + String sparseModelId = null; + switch (getClusterType()) { + case OLD: + sparseModelId = uploadSparseEncodingModel(); + loadModel(sparseModelId); + createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE); + createIndexWithConfiguration( + indexName, + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + SPARSE_PIPELINE + ); + List> docs = prepareDataForBulkIngestion(0, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2); + validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); + break; + case MIXED: + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + List> docsForMixed = prepareDataForBulkIngestion(5, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3); + validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); + break; + case UPGRADED: + try { + sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); + loadModel(sparseModelId); + List> docsForUpgraded = prepareDataForBulkIngestion(10, 5); + bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2); + validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class); + } finally { + wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } +} diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 689e4bf98..d85a70a1d 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.ArrayList; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -722,6 +723,36 @@ protected void addSparseEncodingDoc( assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); } + protected void bulkAddDocuments( + final String index, + final String textField, + final String pipeline, + final List> docs, + final int batchSize + ) throws IOException, ParseException { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < docs.size(); ++i) { + String doc = String.format( + Locale.ROOT, + "{ \"index\": { \"_index\": \"%s\", \"_id\": \"%s\" } },\n" + "{ \"%s\": \"%s\"}", + index, + docs.get(i).get("id"), + textField, + docs.get(i).get("text") + ); + builder.append(doc); + builder.append("\n"); + } + Request request = new Request( + "POST", + String.format(Locale.ROOT, "/_bulk?refresh=true&pipeline=%s&batch_size=%d", pipeline, batchSize) + ); + request.setJsonEntity(builder.toString()); + + Response response = client().performRequest(request); + assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + } + /** * Parse the first returned hit from a search response as a map * @@ -1286,6 +1317,27 @@ protected void wipeOfTestResources( } } + protected Object validateDocCountAndInfo( + String indexName, + int expectedDocCount, + Supplier> documentSupplier, + final String field, + final Class valueType + ) { + int count = getDocCount(indexName); + assertEquals(expectedDocCount, count); + Map document = documentSupplier.get(); + assertNotNull(document); + Object documentSource = document.get("_source"); + assertTrue(documentSource instanceof Map); + @SuppressWarnings("unchecked") + Map documentSourceMap = (Map) documentSource; + assertTrue(documentSourceMap.containsKey(field)); + Object outputs = documentSourceMap.get(field); + assertTrue(valueType.isAssignableFrom(outputs.getClass())); + return outputs; + } + /** * Enumeration for types of pipeline processors, used to lookup resources like create * processor request as those are type specific diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java b/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java new file mode 100644 index 000000000..ed12d864b --- /dev/null +++ b/src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A helper class to build docs for bulk request which is used by batch ingestion tests. + */ +public class BatchIngestionUtils { + private static final List TEXTS = Arrays.asList( + "hello", + "world", + "an apple", + "find me", + "birdy", + "flying piggy", + "newspaper", + "dynamic programming", + "random text", + "finally" + ); + + public static List> prepareDataForBulkIngestion(int startId, int count) { + List> docs = new ArrayList<>(); + for (int i = startId; i < startId + count; ++i) { + Map params = new HashMap<>(); + params.put("id", Integer.toString(i)); + params.put("text", TEXTS.get(i % TEXTS.size())); + docs.add(params); + } + return docs; + } +}