diff --git a/data-prepper-core/integrationTest.gradle b/data-prepper-core/integrationTest.gradle index bad36c7e1..f10c0b3e2 100644 --- a/data-prepper-core/integrationTest.gradle +++ b/data-prepper-core/integrationTest.gradle @@ -48,6 +48,9 @@ task removeDataPrepperNetwork(type: DockerRemoveNetwork) { networkId = createDataPrepperNetwork.getNetworkId() } +def RAW_SPAN_PIPELINE_YAML = "raw-span-e2e-pipeline.yml" +def SERVICE_MAP_PIPELINE_YAML = "service-map-e2e-pipeline.yml" + /** * DataPrepper Docker tasks */ @@ -59,9 +62,10 @@ task createDataPrepperDockerFile(type: Dockerfile) { exposePort(4900) workingDir("/app") copyFile("build/libs/${jar.archiveName}", "/app/data-prepper.jar") - copyFile("src/integrationTest/resources/pipeline.yml", "/app/pipeline.yml") + copyFile("src/integrationTest/resources/${RAW_SPAN_PIPELINE_YAML}", "/app/${RAW_SPAN_PIPELINE_YAML}") + copyFile("src/integrationTest/resources/${SERVICE_MAP_PIPELINE_YAML}", "/app/${SERVICE_MAP_PIPELINE_YAML}") copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml") - defaultCommand("java", "-jar", "data-prepper.jar", "/app/pipeline.yml", "/app/data_prepper.yml") + defaultCommand("java", "-jar", "data-prepper.jar", "/app/${RAW_SPAN_PIPELINE_YAML}", "/app/data_prepper.yml") } task buildDataPrepperDockerImage(type: DockerBuildImage) { @@ -71,54 +75,36 @@ task buildDataPrepperDockerImage(type: DockerBuildImage) { images.add("integ-test-pipeline-image") } -task createDataPrepper1DockerContainer(type: DockerCreateContainer) { - dependsOn buildDataPrepperDockerImage - dependsOn createDataPrepperNetwork - containerName = "data-prepper1" - hostConfig.portBindings = ['21890:21890', '4900:4900'] - hostConfig.network = createDataPrepperNetwork.getNetworkName() - targetImageId buildDataPrepperDockerImage.getImageId() -} - -task createDataPrepper2DockerContainer(type: DockerCreateContainer) { - dependsOn buildDataPrepperDockerImage - dependsOn createDataPrepperNetwork - containerName = "data-prepper2" - hostConfig.portBindings = ['21891:21890', '4901:4900'] - hostConfig.network = createDataPrepperNetwork.getNetworkName() - targetImageId buildDataPrepperDockerImage.getImageId() -} - -task startDataPrepper1DockerContainer(type: DockerStartContainer) { - dependsOn createDataPrepper1DockerContainer - targetContainerId createDataPrepper1DockerContainer.getContainerId() - doLast { - sleep(10*1000) +def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int grpcPort, + final int serverPort, final String pipelineConfigYAML) { + return tasks.create("create${taskBaseName}", DockerCreateContainer) { + dependsOn buildDataPrepperDockerImage + dependsOn createDataPrepperNetwork + containerName = dataPrepperName + hostConfig.portBindings = ['%d:21890'.formatted(grpcPort), '%d:4900'.formatted(serverPort)] + hostConfig.network = createDataPrepperNetwork.getNetworkName() + cmd = ["java", "-jar", "data-prepper.jar", pipelineConfigYAML, "/app/data_prepper.yml"] + targetImageId buildDataPrepperDockerImage.getImageId() } } -task startDataPrepper2DockerContainer(type: DockerStartContainer) { - dependsOn createDataPrepper2DockerContainer - targetContainerId createDataPrepper2DockerContainer.getContainerId() - doLast { - sleep(10*1000) +def startDataPrepperDockerContainer(final DockerCreateContainer createDataPrepperDockerContainerTask) { + return tasks.create("start${createDataPrepperDockerContainerTask.getName()}", DockerStartContainer) { + dependsOn createDataPrepperDockerContainerTask + targetContainerId createDataPrepperDockerContainerTask.getContainerId() } } -task stopDataPrepper1DockerContainer(type: DockerStopContainer) { - targetContainerId createDataPrepper1DockerContainer.getContainerId() -} - -task stopDataPrepper2DockerContainer(type: DockerStopContainer) { - targetContainerId createDataPrepper2DockerContainer.getContainerId() -} - -task removeDataPrepper1DockerContainer(type: DockerRemoveContainer) { - targetContainerId stopDataPrepper1DockerContainer.getContainerId() +def stopDataPrepperDockerContainer(final DockerStartContainer startDataPrepperDockerContainerTask) { + return tasks.create("stop${startDataPrepperDockerContainerTask.getName()}", DockerStopContainer) { + targetContainerId startDataPrepperDockerContainerTask.getContainerId() + } } -task removeDataPrepper2DockerContainer(type: DockerRemoveContainer) { - targetContainerId stopDataPrepper2DockerContainer.getContainerId() +def removeDataPrepperDockerContainer(final DockerStopContainer stopDataPrepperDockerContainerTask) { + return tasks.create("remove${stopDataPrepperDockerContainerTask.getName()}", DockerRemoveContainer) { + targetContainerId stopDataPrepperDockerContainerTask.getContainerId() + } } /** @@ -159,8 +145,20 @@ task stopOdfeDockerContainer(type: DockerStopContainer) { task rawSpanEndToEndTest(type: Test) { dependsOn build dependsOn startOdfeDockerContainer - dependsOn startDataPrepper1DockerContainer - startDataPrepper1DockerContainer.mustRunAfter 'startOdfeDockerContainer' + def createDataPrepper1Task = createDataPrepperDockerContainer( + "rawSpanDataPrepper1", "data-prepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_YAML}") + def createDataPrepper2Task = createDataPrepperDockerContainer( + "rawSpanDataPrepper2", "data-prepper2", 21891, 4901, "/app/${RAW_SPAN_PIPELINE_YAML}") + def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) + def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) + dependsOn startDataPrepper1Task + dependsOn startDataPrepper2Task + startDataPrepper1Task.mustRunAfter 'startOdfeDockerContainer' + startDataPrepper2Task.mustRunAfter 'startOdfeDockerContainer' + // wait for data-preppers to be ready + doFirst { + sleep(10*1000) + } description = 'Runs the raw span integration tests.' group = 'verification' @@ -172,20 +170,34 @@ task rawSpanEndToEndTest(type: Test) { } finalizedBy stopOdfeDockerContainer - finalizedBy stopDataPrepper1DockerContainer - finalizedBy removeDataPrepper1DockerContainer + def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) + def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) + finalizedBy stopDataPrepper1Task + finalizedBy stopDataPrepper2Task + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) finalizedBy removeDataPrepperNetwork } task serviceMapEndToEndTest(type: Test) { dependsOn build dependsOn startOdfeDockerContainer - dependsOn startDataPrepper1DockerContainer - dependsOn startDataPrepper2DockerContainer - startDataPrepper1DockerContainer.mustRunAfter 'startOdfeDockerContainer' - startDataPrepper2DockerContainer.mustRunAfter 'startOdfeDockerContainer' + def createDataPrepper1Task = createDataPrepperDockerContainer( + "serviceMapDataPrepper1", "data-prepper1", 21890, 4900, "/app/${SERVICE_MAP_PIPELINE_YAML}") + def createDataPrepper2Task = createDataPrepperDockerContainer( + "serviceMapDataPrepper2", "data-prepper2", 21891, 4901, "/app/${SERVICE_MAP_PIPELINE_YAML}") + def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) + def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) + dependsOn startDataPrepper1Task + dependsOn startDataPrepper2Task + startDataPrepper1Task.mustRunAfter 'startOdfeDockerContainer' + startDataPrepper2Task.mustRunAfter 'startOdfeDockerContainer' + // wait for data-preppers to be ready + doFirst { + sleep(10*1000) + } - description = 'Runs the raw span integration tests.' + description = 'Runs the service-map integration tests.' group = 'verification' testClassesDirs = sourceSets.integrationTest.output.classesDirs classpath = sourceSets.integrationTest.runtimeClasspath @@ -195,9 +207,11 @@ task serviceMapEndToEndTest(type: Test) { } finalizedBy stopOdfeDockerContainer - finalizedBy stopDataPrepper1DockerContainer - finalizedBy stopDataPrepper2DockerContainer - finalizedBy removeDataPrepper1DockerContainer - finalizedBy removeDataPrepper2DockerContainer + def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) + def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) + finalizedBy stopDataPrepper1Task + finalizedBy stopDataPrepper2Task + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) finalizedBy removeDataPrepperNetwork } diff --git a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndRawSpanTest.java b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndRawSpanTest.java index 070e6e163..08a8911bd 100644 --- a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndRawSpanTest.java +++ b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndRawSpanTest.java @@ -13,7 +13,6 @@ import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.Span; -import io.opentelemetry.proto.trace.v1.Status; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -25,49 +24,68 @@ import org.junit.Test; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; public class EndToEndRawSpanTest { - - private static final Random RANDOM = new Random(); - private static final List SPAN_KINDS = - Arrays.asList(Span.SpanKind.SPAN_KIND_CLIENT, Span.SpanKind.SPAN_KIND_CONSUMER, Span.SpanKind.SPAN_KIND_INTERNAL, Span.SpanKind.SPAN_KIND_PRODUCER, Span.SpanKind.SPAN_KIND_SERVER); + private static final int DATA_PREPPER_PORT_1 = 21890; + private static final int DATA_PREPPER_PORT_2 = 21891; + + private static final Map TEST_TRACEID_TO_TRACE_GROUP = new HashMap() {{ + put(Hex.toHexString(EndToEndTestSpan.TRACE_1_ROOT_SPAN.traceId.getBytes()), EndToEndTestSpan.TRACE_1_ROOT_SPAN.name); + put(Hex.toHexString(EndToEndTestSpan.TRACE_2_ROOT_SPAN.traceId.getBytes()), EndToEndTestSpan.TRACE_2_ROOT_SPAN.name); + }}; + private static final List TEST_SPAN_SET_1_WITH_ROOT_SPAN = Arrays.asList( + EndToEndTestSpan.TRACE_1_ROOT_SPAN, EndToEndTestSpan.TRACE_1_SPAN_2, EndToEndTestSpan.TRACE_1_SPAN_3, + EndToEndTestSpan.TRACE_1_SPAN_4, EndToEndTestSpan.TRACE_1_SPAN_5, EndToEndTestSpan.TRACE_1_SPAN_6); + private static final List TEST_SPAN_SET_1_WITHOUT_ROOT_SPAN = Arrays.asList( + EndToEndTestSpan.TRACE_1_SPAN_7, EndToEndTestSpan.TRACE_1_SPAN_8, EndToEndTestSpan.TRACE_1_SPAN_9, + EndToEndTestSpan.TRACE_1_SPAN_10, EndToEndTestSpan.TRACE_1_SPAN_11); + private static final List TEST_SPAN_SET_2_WITH_ROOT_SPAN = Arrays.asList( + EndToEndTestSpan.TRACE_2_ROOT_SPAN, EndToEndTestSpan.TRACE_2_SPAN_2, EndToEndTestSpan.TRACE_2_SPAN_3); + private static final List TEST_SPAN_SET_2_WITHOUT_ROOT_SPAN = Arrays.asList( + EndToEndTestSpan.TRACE_2_SPAN_4, EndToEndTestSpan.TRACE_2_SPAN_5); private static final String INDEX_NAME = "otel-v1-apm-span-000001"; @Test - public void testPipelineEndToEnd() throws IOException, InterruptedException { + public void testPipelineEndToEnd() throws InterruptedException { //Send data to otel trace source - final ExportTraceServiceRequest exportTraceServiceRequest1 = getExportTraceServiceRequest( - getRandomResourceSpans(10) + final ExportTraceServiceRequest exportTraceServiceRequestTrace1BatchWithRoot = getExportTraceServiceRequest( + getResourceSpansBatch(TEST_SPAN_SET_1_WITH_ROOT_SPAN) ); - - final ExportTraceServiceRequest exportTraceServiceRequest2 = getExportTraceServiceRequest( - getRandomResourceSpans(10) + final ExportTraceServiceRequest exportTraceServiceRequestTrace1BatchNoRoot = getExportTraceServiceRequest( + getResourceSpansBatch(TEST_SPAN_SET_1_WITHOUT_ROOT_SPAN) + ); + final ExportTraceServiceRequest exportTraceServiceRequestTrace2BatchWithRoot = getExportTraceServiceRequest( + getResourceSpansBatch(TEST_SPAN_SET_2_WITH_ROOT_SPAN) + ); + final ExportTraceServiceRequest exportTraceServiceRequestTrace2BatchNoRoot = getExportTraceServiceRequest( + getResourceSpansBatch(TEST_SPAN_SET_2_WITHOUT_ROOT_SPAN) ); - sendExportTraceServiceRequestToSource(exportTraceServiceRequest1); - sendExportTraceServiceRequestToSource(exportTraceServiceRequest2); + sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequestTrace1BatchWithRoot); + sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequestTrace2BatchNoRoot); + sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequestTrace2BatchWithRoot); + sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequestTrace1BatchNoRoot); //Verify data in elasticsearch sink - final List> expectedDocuments = getExpectedDocuments(exportTraceServiceRequest1, exportTraceServiceRequest2); + final List> expectedDocuments = getExpectedDocuments( + exportTraceServiceRequestTrace1BatchWithRoot, exportTraceServiceRequestTrace1BatchNoRoot, + exportTraceServiceRequestTrace2BatchWithRoot, exportTraceServiceRequestTrace2BatchNoRoot); final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder( Collections.singletonList("https://127.0.0.1:9200")); builder.withUsername("admin"); builder.withPassword("admin"); final RestHighLevelClient restHighLevelClient = builder.build().createClient(); - // Wait for otel-trace-raw-prepper by trace_flush_interval - Thread.sleep(3000); + // Wait for otel-trace-raw-prepper by at least trace_flush_interval + Thread.sleep(6000); // Wait for data to flow through pipeline and be indexed by ES await().atMost(10, TimeUnit.SECONDS).untilAsserted( () -> { @@ -88,7 +106,7 @@ public void testPipelineEndToEnd() throws IOException, InterruptedException { */ expectedDocuments.forEach(expectedDoc -> { Assert.assertTrue(foundSources.stream() - .filter(i -> i.get("traceId").equals(expectedDoc.get("traceId"))) + .filter(i -> i.get("spanId").equals(expectedDoc.get("spanId"))) .findFirst().get() .entrySet().containsAll(expectedDoc.entrySet())); }); @@ -101,10 +119,9 @@ private void refreshIndices(final RestHighLevelClient restHighLevelClient) throw restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT); } - private void sendExportTraceServiceRequestToSource(ExportTraceServiceRequest request) { - final TraceServiceGrpc.TraceServiceBlockingStub client = Clients.newClient( - "gproto+http://127.0.0.1:21890/", TraceServiceGrpc.TraceServiceBlockingStub.class); - client.export(request); + private void sendExportTraceServiceRequestToSource(final int port, final ExportTraceServiceRequest request) { + Clients.newClient(String.format("gproto+http://127.0.0.1:%d/", port), + TraceServiceGrpc.TraceServiceBlockingStub.class).export(request); } private List> getSourcesFromSearchHits(final SearchHits searchHits) { @@ -113,15 +130,9 @@ private List> getSourcesFromSearchHits(final SearchHits sear return sources; } - public static ResourceSpans getResourceSpans(final String serviceName, final String spanName, final byte[] - spanId, final byte[] parentId, final byte[] traceId, final Span.SpanKind spanKind, final int statusCode, final String statusMessage) { + spanId, final byte[] parentId, final byte[] traceId, final Span.SpanKind spanKind) { final ByteString parentSpanId = parentId != null ? ByteString.copyFrom(parentId) : ByteString.EMPTY; - final Status.Builder statusBuilder = Status.newBuilder().setCodeValue(statusCode); - if (statusMessage != null) { - statusBuilder.setMessage(statusMessage); - } - final Status status = statusBuilder.build(); return ResourceSpans.newBuilder() .setResource( Resource.newBuilder() @@ -139,7 +150,6 @@ public static ResourceSpans getResourceSpans(final String serviceName, final Str .setKind(spanKind) .setSpanId(ByteString.copyFrom(spanId)) .setParentSpanId(parentSpanId) - .setStatus(status) .setTraceId(ByteString.copyFrom(traceId)) .build() ) @@ -154,6 +164,28 @@ public static ExportTraceServiceRequest getExportTraceServiceRequest(List getResourceSpansBatch(final List testSpanList) { + final ArrayList spansList = new ArrayList<>(); + for(final EndToEndTestSpan testSpan : testSpanList) { + final String traceId = testSpan.traceId; + final String parentId = testSpan.parentId; + final String spanId = testSpan.spanId; + final String serviceName = testSpan.serviceName; + final String spanName = testSpan.name; + final Span.SpanKind spanKind = testSpan.spanKind; + final ResourceSpans rs = getResourceSpans( + serviceName, + spanName, + spanId.getBytes(), + parentId != null ? parentId.getBytes() : null, + traceId.getBytes(), + spanKind + ); + spansList.add(rs); + } + return spansList; + } + private List> getExpectedDocuments(ExportTraceServiceRequest...exportTraceServiceRequests) { final List> expectedDocuments = new ArrayList<>(); for(int i=0; i> getExpectedDocuments(ExportTraceServiceRequest private Map getExpectedEsDocumentSource(final Span span, final String serviceName) { final Map esDocSource = new HashMap<>(); - esDocSource.put("traceId", Hex.toHexString(span.getTraceId().toByteArray())); + final String traceId = Hex.toHexString(span.getTraceId().toByteArray()); + esDocSource.put("traceId", traceId); esDocSource.put("spanId", Hex.toHexString(span.getSpanId().toByteArray())); esDocSource.put("parentSpanId", Hex.toHexString(span.getParentSpanId().toByteArray())); esDocSource.put("name", span.getName()); esDocSource.put("kind", span.getKind().name()); esDocSource.put("status.code", span.getStatus().getCodeValue()); - esDocSource.put("status.message", span.getStatus().getMessage()); esDocSource.put("serviceName", serviceName); - if (span.getParentSpanId().isEmpty()) { - esDocSource.put("traceGroup", span.getName()); - } + final String traceGroup = TEST_TRACEID_TO_TRACE_GROUP.get(traceId); + esDocSource.put("traceGroup", traceGroup); return esDocSource; } - private byte[] getRandomBytes(int len) { - final byte[] bytes = new byte[len]; - RANDOM.nextBytes(bytes); - return bytes; - } - - private List getRandomResourceSpans(int len) throws UnsupportedEncodingException { - final ArrayList spansList = new ArrayList<>(); - for(int i=0; i kv.getKey().equals("service.name")) .findFirst().get().getValue().getStringValue(); diff --git a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndServiceMapTest.java b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndServiceMapTest.java index 78d656a81..d2dbb3f55 100644 --- a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndServiceMapTest.java +++ b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndServiceMapTest.java @@ -41,32 +41,32 @@ public class EndToEndServiceMapTest { private static final String TEST_TRACEID_2 = "CBA"; private static final int DATA_PREPPER_PORT_1 = 21890; private static final int DATA_PREPPER_PORT_2 = 21891; - private static final List testDataSet11 = Arrays.asList( - ServiceMapTestData.DATA_100, ServiceMapTestData.DATA_200, ServiceMapTestData.DATA_500, ServiceMapTestData.DATA_600, - ServiceMapTestData.DATA_700, ServiceMapTestData.DATA_1000); - private static final List testDataSet12 = Arrays.asList( - ServiceMapTestData.DATA_300, ServiceMapTestData.DATA_400, ServiceMapTestData.DATA_800, - ServiceMapTestData.DATA_900, ServiceMapTestData.DATA_1100); - private static final List testDataSet21 = Arrays.asList( - ServiceMapTestData.DATA_101, ServiceMapTestData.DATA_201, ServiceMapTestData.DATA_401, ServiceMapTestData.DATA_501); - private static final List testDataSet22 = Collections.singletonList(ServiceMapTestData.DATA_301); + private static final List TEST_TRACE_1_BATCH_1 = Arrays.asList( + EndToEndTestSpan.TRACE_1_ROOT_SPAN, EndToEndTestSpan.TRACE_1_SPAN_2, EndToEndTestSpan.TRACE_1_SPAN_5, + EndToEndTestSpan.TRACE_1_SPAN_6, EndToEndTestSpan.TRACE_1_SPAN_7, EndToEndTestSpan.TRACE_1_SPAN_10); + private static final List TEST_TRACE_1_BATCH_2 = Arrays.asList( + EndToEndTestSpan.TRACE_1_SPAN_3, EndToEndTestSpan.TRACE_1_SPAN_4, EndToEndTestSpan.TRACE_1_SPAN_8, + EndToEndTestSpan.TRACE_1_SPAN_9, EndToEndTestSpan.TRACE_1_SPAN_11); + private static final List TEST_TRACE_2_BATCH_1 = Arrays.asList(EndToEndTestSpan.TRACE_2_ROOT_SPAN, + EndToEndTestSpan.TRACE_2_SPAN_2, EndToEndTestSpan.TRACE_2_SPAN_4, EndToEndTestSpan.TRACE_2_SPAN_5); + private static final List TEST_TRACE_2_BATCH_2 = Collections.singletonList(EndToEndTestSpan.TRACE_2_SPAN_3); private static final String SERVICE_MAP_INDEX_NAME = "otel-v1-apm-service-map"; @Test public void testPipelineEndToEnd() throws IOException, InterruptedException { // Send test trace group 1 final ExportTraceServiceRequest exportTraceServiceRequest11 = getExportTraceServiceRequest( - getResourceSpansBatch(TEST_TRACEID_1, testDataSet11) + getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_1) ); final ExportTraceServiceRequest exportTraceServiceRequest12 = getExportTraceServiceRequest( - getResourceSpansBatch(TEST_TRACEID_1, testDataSet12) + getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_2) ); sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest11); sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12); //Verify data in elasticsearch sink - final List testDataSet1 = Stream.of(testDataSet11, testDataSet12) + final List testDataSet1 = Stream.of(TEST_TRACE_1_BATCH_1, TEST_TRACE_1_BATCH_2) .flatMap(Collection::stream).collect(Collectors.toList()); final List> possibleEdges = getPossibleEdges(TEST_TRACEID_1, testDataSet1); final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder( @@ -91,16 +91,16 @@ public void testPipelineEndToEnd() throws IOException, InterruptedException { sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12); // Send test trace group 2 final ExportTraceServiceRequest exportTraceServiceRequest21 = getExportTraceServiceRequest( - getResourceSpansBatch(TEST_TRACEID_2, testDataSet21) + getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_1) ); final ExportTraceServiceRequest exportTraceServiceRequest22 = getExportTraceServiceRequest( - getResourceSpansBatch(TEST_TRACEID_2, testDataSet22) + getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_2) ); sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest21); sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest22); - final List testDataSet2 = Stream.of(testDataSet21, testDataSet22) + final List testDataSet2 = Stream.of(TEST_TRACE_2_BATCH_1, TEST_TRACE_2_BATCH_2) .flatMap(Collection::stream).collect(Collectors.toList()); possibleEdges.addAll(getPossibleEdges(TEST_TRACEID_2, testDataSet2)); // Wait for service map prepper by 2 * window_duration @@ -176,10 +176,10 @@ public static ExportTraceServiceRequest getExportTraceServiceRequest(List getResourceSpansBatch(final String traceId, final List data) { + private List getResourceSpansBatch(final String traceId, final List data) { final ArrayList spansList = new ArrayList<>(); for(int i=0; i < data.size(); i++) { - final ServiceMapTestData currData = data.get(i); + final EndToEndTestSpan currData = data.get(i); final String parentId = currData.parentId; final String spanId = currData.spanId; final String serviceName = currData.serviceName; @@ -198,14 +198,14 @@ private List getResourceSpansBatch(final String traceId, final Li return spansList; } - private List> getPossibleEdges(final String traceId, final List data) { - final Map spanIdToServiceMapTestData = data.stream() - .collect(Collectors.toMap(serviceMapTestData -> serviceMapTestData.spanId, serviceMapTestData -> serviceMapTestData)); + private List> getPossibleEdges(final String traceId, final List data) { + final Map spanIdToServiceMapTestData = data.stream() + .collect(Collectors.toMap(endToEndTestSpan -> endToEndTestSpan.spanId, endToEndTestSpan -> endToEndTestSpan)); final List> possibleEdges = new ArrayList<>(); - for (final ServiceMapTestData currData : data) { + for (final EndToEndTestSpan currData : data) { final String parentId = currData.parentId; if (parentId != null) { - final ServiceMapTestData parentData = spanIdToServiceMapTestData.get(parentId); + final EndToEndTestSpan parentData = spanIdToServiceMapTestData.get(parentId); if (parentData != null && !parentData.serviceName.equals(currData.serviceName)) { String rootSpanName = getRootSpanName(parentId, spanIdToServiceMapTestData); possibleEdges.addAll(getEdgeMaps(rootSpanName, currData, parentData)); @@ -216,16 +216,16 @@ private List> getPossibleEdges(final String traceId, final L return possibleEdges; } - private String getRootSpanName(String spanId, final Map spanIdToServiceMapTestData) { - ServiceMapTestData rootServiceMapTestData = spanIdToServiceMapTestData.get(spanId); - while (rootServiceMapTestData.parentId != null) { - rootServiceMapTestData = spanIdToServiceMapTestData.get(rootServiceMapTestData.parentId); + private String getRootSpanName(String spanId, final Map spanIdToServiceMapTestData) { + EndToEndTestSpan rootEndToEndTestSpan = spanIdToServiceMapTestData.get(spanId); + while (rootEndToEndTestSpan.parentId != null) { + rootEndToEndTestSpan = spanIdToServiceMapTestData.get(rootEndToEndTestSpan.parentId); } - return rootServiceMapTestData.name; + return rootEndToEndTestSpan.name; } private List> getEdgeMaps( - final String rootSpanName, final ServiceMapTestData currData, final ServiceMapTestData parentData) { + final String rootSpanName, final EndToEndTestSpan currData, final EndToEndTestSpan parentData) { final List> edges = new ArrayList<>(); Map destination = new HashMap<>(); diff --git a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndTestSpan.java b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndTestSpan.java new file mode 100644 index 000000000..58abea1ca --- /dev/null +++ b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/EndToEndTestSpan.java @@ -0,0 +1,40 @@ +package com.amazon.dataprepper.integration; + +import io.opentelemetry.proto.trace.v1.Span; + +public enum EndToEndTestSpan { + + TRACE_1_ROOT_SPAN("TRACE_1", "100", "ServiceA", null, "FRUITS", Span.SpanKind.SPAN_KIND_INTERNAL), + TRACE_1_SPAN_2("TRACE_1", "200", "ServiceA", "100", "CALL_SERVICE_B_APPLE", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_1_SPAN_3("TRACE_1", "300", "ServiceB", "200", "/APPLE", Span.SpanKind.SPAN_KIND_SERVER), + TRACE_1_SPAN_4("TRACE_1", "400", "ServiceB", "300", "CALL_SERVICE_C_ORANGE", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_1_SPAN_5("TRACE_1", "500", "ServiceC", "400", "/ORANGE", Span.SpanKind.SPAN_KIND_SERVER), + TRACE_1_SPAN_6("TRACE_1", "600", "ServiceC", "500", "SOME_INTERNAL", Span.SpanKind.SPAN_KIND_INTERNAL), + TRACE_1_SPAN_7("TRACE_1", "700", "ServiceA", "100", "CALL_SERVICE_B_JACKFRUIT", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_1_SPAN_8("TRACE_1", "800", "ServiceB", "700", "/JACKFRUIT", Span.SpanKind.SPAN_KIND_SERVER), + TRACE_1_SPAN_9("TRACE_1", "900", "ServiceB", "800", "SOME_INTERNAL", Span.SpanKind.SPAN_KIND_INTERNAL), + TRACE_1_SPAN_10("TRACE_1", "1000", "ServiceA", "100", "CALL_SERVICE_D_PEAR", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_1_SPAN_11("TRACE_1", "1100", "ServiceD", "1000", "/PEAR", Span.SpanKind.SPAN_KIND_SERVER), + + TRACE_2_ROOT_SPAN("TRACE_2", "101", "ServiceA", null, "VEGGIES", Span.SpanKind.SPAN_KIND_INTERNAL), + TRACE_2_SPAN_2("TRACE_2", "201", "ServiceA", "101", "CALL_SERVICE_B_ONION", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_2_SPAN_3("TRACE_2", "301", "ServiceB", "201", "/ONION", Span.SpanKind.SPAN_KIND_SERVER), + TRACE_2_SPAN_4("TRACE_2", "401", "ServiceA", "101", "CALL_SERVICE_E_POTATO", Span.SpanKind.SPAN_KIND_CLIENT), + TRACE_2_SPAN_5("TRACE_2", "501", "ServiceE", "401", "/POTATO", Span.SpanKind.SPAN_KIND_SERVER); + + public final String traceId; + public final String spanId; + public final String serviceName; + public final String parentId; + public final String name; + public final Span.SpanKind spanKind; + + EndToEndTestSpan(String traceId, String spanId, String serviceName, String parentId, String name, Span.SpanKind kind) { + this.traceId = traceId; + this.spanId = spanId; + this.serviceName = serviceName; + this.parentId = parentId; + this.name = name; + this.spanKind = kind; + } +} diff --git a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/ServiceMapTestData.java b/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/ServiceMapTestData.java deleted file mode 100644 index ad849f620..000000000 --- a/data-prepper-core/src/integrationTest/java/com/amazon/dataprepper/integration/ServiceMapTestData.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.amazon.dataprepper.integration; - -import io.opentelemetry.proto.trace.v1.Span; - -public enum ServiceMapTestData { - - DATA_100("100", "ServiceA", null, "FRUITS", Span.SpanKind.SPAN_KIND_INTERNAL), - DATA_200("200", "ServiceA", "100", "CALL_SERVICE_B_APPLE", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_300("300", "ServiceB", "200", "/APPLE", Span.SpanKind.SPAN_KIND_SERVER), - DATA_400("400", "ServiceB", "300", "CALL_SERVICE_C_ORANGE", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_500("500", "ServiceC", "400", "/ORANGE", Span.SpanKind.SPAN_KIND_SERVER), - DATA_600("600", "ServiceC", "500", "SOME_INTERNAL", Span.SpanKind.SPAN_KIND_INTERNAL), - DATA_700("700", "ServiceA", "100", "CALL_SERVICE_B_JACKFRUIT", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_800("800", "ServiceB", "700", "/JACKFRUIT", Span.SpanKind.SPAN_KIND_SERVER), - DATA_900("900", "ServiceB", "800", "SOME_INTERNAL", Span.SpanKind.SPAN_KIND_INTERNAL), - DATA_1000("1000", "ServiceA", "100", "CALL_SERVICE_D_PEAR", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_1100("1100", "ServiceD", "1000", "/PEAR", Span.SpanKind.SPAN_KIND_SERVER), - - DATA_101("101", "ServiceA", null, "VEGGIES", Span.SpanKind.SPAN_KIND_INTERNAL), - DATA_201("201", "ServiceA", "101", "CALL_SERVICE_B_ONION", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_301("301", "ServiceB", "201", "/ONION", Span.SpanKind.SPAN_KIND_SERVER), - DATA_401("401", "ServiceA", "101", "CALL_SERVICE_E_POTATO", Span.SpanKind.SPAN_KIND_CLIENT), - DATA_501("501", "ServiceE", "401", "/POTATO", Span.SpanKind.SPAN_KIND_SERVER); - - public final String spanId; - public final String serviceName; - public final String parentId; - public final String name; - public final Span.SpanKind spanKind; - - ServiceMapTestData(String spanId, String serviceName, String parentId, String name, Span.SpanKind kind) { - this.spanId = spanId; - this.serviceName = serviceName; - this.parentId = parentId; - this.name = name; - this.spanKind = kind; - } -} diff --git a/data-prepper-core/src/integrationTest/resources/raw-span-e2e-pipeline.yml b/data-prepper-core/src/integrationTest/resources/raw-span-e2e-pipeline.yml new file mode 100644 index 000000000..6b93c3039 --- /dev/null +++ b/data-prepper-core/src/integrationTest/resources/raw-span-e2e-pipeline.yml @@ -0,0 +1,25 @@ +entry-pipeline: + source: + otel_trace_source: + ssl: false + sink: + - pipeline: + name: "raw-pipeline" +raw-pipeline: + source: + pipeline: + name: "entry-pipeline" + prepper: + - otel_trace_raw_prepper: + root_span_flush_delay: 1 + trace_flush_interval: 5 + - otel_trace_group_prepper: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + sink: + - elasticsearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + trace_analytics_raw: true \ No newline at end of file diff --git a/data-prepper-core/src/integrationTest/resources/pipeline.yml b/data-prepper-core/src/integrationTest/resources/service-map-e2e-pipeline.yml similarity index 52% rename from data-prepper-core/src/integrationTest/resources/pipeline.yml rename to data-prepper-core/src/integrationTest/resources/service-map-e2e-pipeline.yml index 04d339899..9850b20a8 100644 --- a/data-prepper-core/src/integrationTest/resources/pipeline.yml +++ b/data-prepper-core/src/integrationTest/resources/service-map-e2e-pipeline.yml @@ -8,28 +8,8 @@ entry-pipeline: static_endpoints: ["data-prepper1", "data-prepper2"] ssl: false sink: - - pipeline: - name: "raw-pipeline" - pipeline: name: "service-map-pipeline" -raw-pipeline: - source: - pipeline: - name: "entry-pipeline" - prepper: - - otel_trace_raw_prepper: - root_span_flush_delay: 1 - trace_flush_interval: 3 - - otel_trace_group_prepper: - hosts: [ "https://node-0.example.com:9200" ] - username: "admin" - password: "admin" - sink: - - elasticsearch: - hosts: [ "https://node-0.example.com:9200" ] - username: "admin" - password: "admin" - trace_analytics_raw: true service-map-pipeline: source: pipeline: