diff --git a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java index dbde31ef1eb65..657d0f178e096 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java @@ -189,7 +189,7 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); if (shouldSetBatchSize) { - bulkRequest.batchSize(numRequests); + bulkRequest.batchSize(scaledRandomIntBetween(2, numRequests)); } for (int i = 0; i < numRequests; i++) { IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id"); @@ -214,6 +214,9 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except ); assertThat(indexResponse, notNullValue()); assertThat(indexResponse.getId(), equalTo(Integer.toString(i))); + // verify field of successful doc + Map successDoc = client().prepareGet("index", indexResponse.getId()).get().getSourceAsMap(); + assertThat(successDoc.get("processed"), equalTo(true)); assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); } } @@ -223,51 +226,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except assertTrue(deletePipelineResponse.isAcknowledged()); } - public void testBulkWithIngestFailuresBatch() throws Exception { - createIndex("index"); - - BytesReference source = BytesReference.bytes( - jsonBuilder().startObject() - .field("description", "my_pipeline") - .startArray("processors") - .startObject() - .startObject("test") - .endObject() - .endObject() - .endArray() - .endObject() - ); - PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); - client().admin().cluster().putPipeline(putPipelineRequest).get(); - - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.batchSize(2); - bulkRequest.add( - new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true) - ); - bulkRequest.add( - new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false) - ); - - BulkResponse response = client().bulk(bulkRequest).actionGet(); - MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); - - Map results = Arrays.stream(response.getItems()) - .collect(Collectors.toMap(BulkItemResponse::getId, r -> r)); - - MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success")); - assertNotNull(results.get("_fail").getFailure()); - assertNull(results.get("_success").getFailure()); - - // verify field of successful doc - Map successDoc = client().prepareGet("index", "_success").get().getSourceAsMap(); - assertThat(successDoc.get("processed"), equalTo(true)); - - // cleanup - AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); - assertTrue(deletePipelineResponse.isAcknowledged()); - } - public void testBulkWithIngestFailuresAndDropBatch() throws Exception { createIndex("index"); diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index a32cd2c3cad3f..684297c11c140 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -81,6 +81,7 @@ import org.junit.Before; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -88,6 +89,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -97,7 +99,6 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; -import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -1923,27 +1924,21 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() { return null; }).when(mockCompoundProcessor).batchExecute(any(), any()); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); - final IntConsumer dropHandler = mock(IntConsumer.class); + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + final List dropHandler = new ArrayList<>(); ingestService.executeBulkRequest( 3, bulkRequest.requests(), - failureHandler, - completionHandler, - dropHandler, + failureHandler::put, + completionHandler::put, + dropHandler::add, Names.WRITE, bulkRequest ); - ArgumentCaptor failureSlotCaptor = ArgumentCaptor.forClass(Integer.class); - verify(failureHandler, times(1)).accept(failureSlotCaptor.capture(), any()); - assertEquals(1, failureSlotCaptor.getValue().intValue()); - ArgumentCaptor dropSlotCaptor = ArgumentCaptor.forClass(Integer.class); - verify(dropHandler, times(1)).accept(dropSlotCaptor.capture()); - assertEquals(2, dropSlotCaptor.getValue().intValue()); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + assertEquals(Set.of(1), failureHandler.keySet()); + assertEquals(List.of(2), dropHandler); + assertEquals(Set.of(Thread.currentThread()), completionHandler.keySet()); verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); verify(mockCompoundProcessor, never()).execute(any(), any()); }