From 020199d804e80aa3ffc13c9f65ab88538cac8d12 Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 28 Apr 2022 17:41:07 -0500 Subject: [PATCH 1/2] Updated the BulkRetryStrategyTests to rely less on specific details from the the implementation of the bulk client in OpenSearch. This change works for both OpenSearch 1 and 2. Updated to use JUnit 5 as well, and some other refactoring. Signed-off-by: David Venable --- .../opensearch/BulkRetryStrategyTests.java | 117 +++++++++++------- 1 file changed, 69 insertions(+), 48 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 9b23e9d1ac..1f68c30a89 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -10,28 +10,35 @@ import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.configuration.PluginSetting; import io.micrometer.core.instrument.Measurement; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; -import org.opensearch.index.Index; -import org.opensearch.index.shard.ShardId; -import org.junit.Before; -import org.junit.Test; +import org.opensearch.rest.RestStatus; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.StringJoiner; -import java.util.UUID; +import java.util.function.BiConsumer; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class BulkRetryStrategyTests { private static final String PLUGIN_NAME = "opensearch"; @@ -40,12 +47,18 @@ public class BulkRetryStrategyTests { setPipelineName(PIPELINE_NAME); }}; private static final PluginMetrics PLUGIN_METRICS = PluginMetrics.fromPluginSetting(PLUGIN_SETTING); + private BiConsumer, Throwable> logFailureConsumer; - @Before + @BeforeEach public void metricsInit() { MetricsTestUtil.initMetrics(); } + @BeforeEach + public void setUp() { + logFailureConsumer = mock(BiConsumer.class); + } + @Test public void testCanRetry() { final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( @@ -74,10 +87,9 @@ public void testExecuteSuccessOnFirstAttempt() throws Exception { final String testIndex = "bar"; final FakeClient client = new FakeClient(testIndex); client.successOnFirstAttempt = true; - final FakeLogger logger = new FakeLogger(); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logger::logFailure, PLUGIN_METRICS, BulkRequest::new); + client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); final BulkRequest testBulkRequest = new BulkRequest(); testBulkRequest.add(new IndexRequest(testIndex).id("1")); testBulkRequest.add(new IndexRequest(testIndex).id("2")); @@ -105,10 +117,9 @@ public void testExecuteSuccessOnFirstAttempt() throws Exception { public void testExecuteRetryable() throws Exception { final String testIndex = "bar"; final FakeClient client = new FakeClient(testIndex); - final FakeLogger logger = new FakeLogger(); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logger::logFailure, PLUGIN_METRICS, BulkRequest::new); + client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); final BulkRequest testBulkRequest = new BulkRequest(); testBulkRequest.add(new IndexRequest(testIndex).id("1")); testBulkRequest.add(new IndexRequest(testIndex).id("2")); @@ -122,9 +133,14 @@ public void testExecuteRetryable() throws Exception { assertFalse(client.finalResponse.hasFailures()); assertEquals("3", client.finalRequest.requests().get(0).id()); assertEquals("4", client.finalRequest.requests().get(1).id()); - final String logging = logger.msg.toString(); - assertTrue(logging.contains("[bar][_doc][2]")); - assertFalse(logging.contains("[bar][_doc][1]")); + + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerThrowableArgCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(logFailureConsumer).accept(loggerWriteRequestArgCaptor.capture(), loggerThrowableArgCaptor.capture()); + MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue(), notNullValue()); + MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().index(), equalTo(testIndex)); + MatcherAssert.assertThat(loggerWriteRequestArgCaptor.getValue().id(), equalTo("2")); + MatcherAssert.assertThat(loggerThrowableArgCaptor.getValue(), notNullValue()); // verify metrics final List documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList( @@ -149,10 +165,9 @@ public void testExecuteNonRetryableException() throws Exception { final String testIndex = "bar"; final FakeClient client = new FakeClient(testIndex); client.retryable = false; - final FakeLogger logger = new FakeLogger(); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logger::logFailure, PLUGIN_METRICS, BulkRequest::new); + client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); final BulkRequest testBulkRequest = new BulkRequest(); testBulkRequest.add(new IndexRequest(testIndex).id("1")); testBulkRequest.add(new IndexRequest(testIndex).id("2")); @@ -162,9 +177,17 @@ public void testExecuteNonRetryableException() throws Exception { bulkRetryStrategy.execute(testBulkRequest); assertEquals(1, client.attempt); - final String logging = logger.msg.toString(); - for (int i = 1; i <= 4; i++) { - assertTrue(logging.contains(String.format("[bar][_doc][%d]", i))); + + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerExceptionArgCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(logFailureConsumer, times(4)) + .accept(loggerWriteRequestArgCaptor.capture(), isA(IllegalArgumentException.class)); + final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); + for (int i = 0; i < allLoggerWriteRequests.size(); i++) { + final DocWriteRequest actualFailedWrite = allLoggerWriteRequests.get(i); + MatcherAssert.assertThat(actualFailedWrite.index(), equalTo(testIndex)); + String expectedIndexName = Integer.toString(i+1); + MatcherAssert.assertThat(actualFailedWrite.id(), equalTo(expectedIndexName)); } // verify metrics @@ -186,10 +209,9 @@ public void testExecuteNonRetryableResponse() throws Exception { final FakeClient client = new FakeClient(testIndex); client.retryable = false; client.nonRetryableException = false; - final FakeLogger logger = new FakeLogger(); final BulkRetryStrategy bulkRetryStrategy = new BulkRetryStrategy( - client::bulk, logger::logFailure, PLUGIN_METRICS, BulkRequest::new); + client::bulk, logFailureConsumer, PLUGIN_METRICS, BulkRequest::new); final BulkRequest testBulkRequest = new BulkRequest(); testBulkRequest.add(new IndexRequest(testIndex).id("1")); testBulkRequest.add(new IndexRequest(testIndex).id("2")); @@ -199,9 +221,17 @@ public void testExecuteNonRetryableResponse() throws Exception { bulkRetryStrategy.execute(testBulkRequest); assertEquals(1, client.attempt); - final String logging = logger.msg.toString(); - for (int i = 2; i <= 4; i++) { - assertTrue(logging.contains(String.format("[bar][_doc][%d]", i))); + + ArgumentCaptor loggerWriteRequestArgCaptor = ArgumentCaptor.forClass(DocWriteRequest.class); + ArgumentCaptor loggerExceptionArgCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(logFailureConsumer, times(3)) + .accept(loggerWriteRequestArgCaptor.capture(), isA(IllegalArgumentException.class)); + final List allLoggerWriteRequests = loggerWriteRequestArgCaptor.getAllValues(); + for (int i = 0; i < allLoggerWriteRequests.size(); i++) { + final DocWriteRequest actualFailedWrite = allLoggerWriteRequests.get(i); + MatcherAssert.assertThat(actualFailedWrite.index(), equalTo(testIndex)); + String expectedIndexName = Integer.toString(i+2); + MatcherAssert.assertThat(actualFailedWrite.id(), equalTo(expectedIndexName)); } // verify metrics @@ -218,31 +248,30 @@ public void testExecuteNonRetryableResponse() throws Exception { } private static BulkItemResponse successItemResponse(final String index) { - final String docId = UUID.randomUUID().toString(); - return new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, - new IndexResponse(new ShardId(new Index(index, "fakeUUID"), 1), - "_doc", docId, 1, 1, 1, true)); + return mock(BulkItemResponse.class); } private static BulkItemResponse badRequestItemResponse(final String index) { - final String docId = UUID.randomUUID().toString(); - return new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure(index, "_doc", docId, - new IllegalArgumentException())); + return customBulkFailureResponse(index, RestStatus.BAD_REQUEST, new IllegalArgumentException()); } private static BulkItemResponse tooManyRequestItemResponse(final String index) { - final String docId = UUID.randomUUID().toString(); - return new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure(index, "_doc", docId, - new OpenSearchRejectedExecutionException())); + return customBulkFailureResponse(index, RestStatus.TOO_MANY_REQUESTS, new OpenSearchRejectedExecutionException()); } private static BulkItemResponse internalServerErrorItemResponse(final String index) { - final String docId = UUID.randomUUID().toString(); - return new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure(index, "_doc", docId, - new IllegalAccessException())); + return customBulkFailureResponse(index, RestStatus.INTERNAL_SERVER_ERROR, new IllegalAccessException()); + } + + private static BulkItemResponse customBulkFailureResponse(final String index, final RestStatus restStatus, final Exception cause) { + final BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); + when(failure.getStatus()).thenReturn(restStatus); + when(failure.getCause()).thenReturn(cause); + final BulkItemResponse badResponse = mock(BulkItemResponse.class); + when(badResponse.isFailed()).thenReturn(true); + when(badResponse.status()).thenReturn(restStatus); + when(badResponse.getFailure()).thenReturn(failure); + return badResponse; } private static class FakeClient { @@ -327,12 +356,4 @@ private BulkResponse bulkSuccessResponse(final BulkRequest bulkRequest) { return new BulkResponse(bulkItemResponses, 10); } } - - private static class FakeLogger { - StringBuilder msg = new StringBuilder(); - - public void logFailure(final DocWriteRequest docWriteRequest, final Throwable t) { - msg.append(String.format("Document [%s] has failure: %s", docWriteRequest.toString(), t)); - } - } } From 833e0a6ab5483815ecec7ab393024af1cf5ca88e Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 29 Apr 2022 10:46:50 -0500 Subject: [PATCH 2/2] Combined the @BeforeEach methods in BulkRetryStrategyTests per a PR comment. Signed-off-by: David Venable --- .../plugins/sink/opensearch/BulkRetryStrategyTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 1f68c30a89..271868fbff 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -49,13 +49,9 @@ public class BulkRetryStrategyTests { private static final PluginMetrics PLUGIN_METRICS = PluginMetrics.fromPluginSetting(PLUGIN_SETTING); private BiConsumer, Throwable> logFailureConsumer; - @BeforeEach - public void metricsInit() { - MetricsTestUtil.initMetrics(); - } - @BeforeEach public void setUp() { + MetricsTestUtil.initMetrics(); logFailureConsumer = mock(BiConsumer.class); }