forked from opensearch-project/neural-search
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add BWC for batch ingestion (opensearch-project#769)
* Add BWC for batch ingestion Signed-off-by: Liyun Xiu <[email protected]> * Update Changelog Signed-off-by: Liyun Xiu <[email protected]> * Fix spotlessLicenseCheck Signed-off-by: Liyun Xiu <[email protected]> * Fix comments Signed-off-by: Liyun Xiu <[email protected]> * Reuse the same code Signed-off-by: Liyun Xiu <[email protected]> * Rename some functions Signed-off-by: Liyun Xiu <[email protected]> * Rename a function Signed-off-by: Liyun Xiu <[email protected]> * Minor change to trigger rebuild Signed-off-by: Liyun Xiu <[email protected]> --------- Signed-off-by: Liyun Xiu <[email protected]>
- Loading branch information
Showing
8 changed files
with
235 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 53 additions & 0 deletions
53
qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.bwc; | ||
|
||
import org.opensearch.neuralsearch.util.TestUtils; | ||
|
||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; | ||
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; | ||
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; | ||
|
||
public class BatchIngestionIT extends AbstractRestartUpgradeRestTestCase { | ||
private static final String PIPELINE_NAME = "pipeline-BatchIngestionIT"; | ||
private static final String TEXT_FIELD_NAME = "passage_text"; | ||
private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; | ||
private static final int batchSize = 3; | ||
|
||
public void testBatchIngestionWithNeuralSparseProcessor_E2EFlow() throws Exception { | ||
waitForClusterHealthGreen(NODES_BWC_CLUSTER); | ||
String indexName = getIndexNameForTest(); | ||
if (isRunningAgainstOldCluster()) { | ||
String modelId = uploadSparseEncodingModel(); | ||
loadModel(modelId); | ||
createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); | ||
createIndexWithConfiguration( | ||
indexName, | ||
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), | ||
PIPELINE_NAME | ||
); | ||
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5); | ||
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); | ||
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); | ||
} else { | ||
String modelId = null; | ||
modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); | ||
loadModel(modelId); | ||
try { | ||
List<Map<String, String>> docs = prepareDataForBulkIngestion(5, 5); | ||
bulkAddDocuments(indexName, TEXT_FIELD_NAME, PIPELINE_NAME, docs, batchSize); | ||
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); | ||
} finally { | ||
wipeOfTestResources(indexName, PIPELINE_NAME, modelId, null); | ||
} | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/BatchIngestionIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.bwc; | ||
|
||
import org.opensearch.neuralsearch.util.TestUtils; | ||
|
||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.opensearch.neuralsearch.util.BatchIngestionUtils.prepareDataForBulkIngestion; | ||
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER; | ||
import static org.opensearch.neuralsearch.util.TestUtils.SPARSE_ENCODING_PROCESSOR; | ||
|
||
public class BatchIngestionIT extends AbstractRollingUpgradeTestCase { | ||
private static final String SPARSE_PIPELINE = "BatchIngestionIT_sparse_pipeline_rolling"; | ||
private static final String TEXT_FIELD_NAME = "passage_text"; | ||
private static final String EMBEDDING_FIELD_NAME = "passage_embedding"; | ||
|
||
public void testBatchIngestion_SparseEncodingProcessor_E2EFlow() throws Exception { | ||
waitForClusterHealthGreen(NODES_BWC_CLUSTER); | ||
String indexName = getIndexNameForTest(); | ||
String sparseModelId = null; | ||
switch (getClusterType()) { | ||
case OLD: | ||
sparseModelId = uploadSparseEncodingModel(); | ||
loadModel(sparseModelId); | ||
createPipelineForSparseEncodingProcessor(sparseModelId, SPARSE_PIPELINE); | ||
createIndexWithConfiguration( | ||
indexName, | ||
Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), | ||
SPARSE_PIPELINE | ||
); | ||
List<Map<String, String>> docs = prepareDataForBulkIngestion(0, 5); | ||
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docs, 2); | ||
validateDocCountAndInfo(indexName, 5, () -> getDocById(indexName, "4"), EMBEDDING_FIELD_NAME, Map.class); | ||
break; | ||
case MIXED: | ||
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); | ||
loadModel(sparseModelId); | ||
List<Map<String, String>> docsForMixed = prepareDataForBulkIngestion(5, 5); | ||
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForMixed, 3); | ||
validateDocCountAndInfo(indexName, 10, () -> getDocById(indexName, "9"), EMBEDDING_FIELD_NAME, Map.class); | ||
break; | ||
case UPGRADED: | ||
try { | ||
sparseModelId = TestUtils.getModelId(getIngestionPipeline(SPARSE_PIPELINE), SPARSE_ENCODING_PROCESSOR); | ||
loadModel(sparseModelId); | ||
List<Map<String, String>> docsForUpgraded = prepareDataForBulkIngestion(10, 5); | ||
bulkAddDocuments(indexName, TEXT_FIELD_NAME, SPARSE_PIPELINE, docsForUpgraded, 2); | ||
validateDocCountAndInfo(indexName, 15, () -> getDocById(indexName, "14"), EMBEDDING_FIELD_NAME, Map.class); | ||
} finally { | ||
wipeOfTestResources(indexName, SPARSE_PIPELINE, sparseModelId, null); | ||
} | ||
break; | ||
default: | ||
throw new IllegalStateException("Unexpected value: " + getClusterType()); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
40 changes: 40 additions & 0 deletions
40
src/testFixtures/java/org/opensearch/neuralsearch/util/BatchIngestionUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.neuralsearch.util; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* A helper class to build docs for bulk request which is used by batch ingestion tests. | ||
*/ | ||
public class BatchIngestionUtils { | ||
private static final List<String> TEXTS = Arrays.asList( | ||
"hello", | ||
"world", | ||
"an apple", | ||
"find me", | ||
"birdy", | ||
"flying piggy", | ||
"newspaper", | ||
"dynamic programming", | ||
"random text", | ||
"finally" | ||
); | ||
|
||
public static List<Map<String, String>> prepareDataForBulkIngestion(int startId, int count) { | ||
List<Map<String, String>> docs = new ArrayList<>(); | ||
for (int i = startId; i < startId + count; ++i) { | ||
Map<String, String> params = new HashMap<>(); | ||
params.put("id", Integer.toString(i)); | ||
params.put("text", TEXTS.get(i % TEXTS.size())); | ||
docs.add(params); | ||
} | ||
return docs; | ||
} | ||
} |