diff --git a/docs/src/main/asciidoc/opentelemetry-tracing.adoc b/docs/src/main/asciidoc/opentelemetry-tracing.adoc index ef62e35cbb4039..0b83a9587e61d8 100644 --- a/docs/src/main/asciidoc/opentelemetry-tracing.adoc +++ b/docs/src/main/asciidoc/opentelemetry-tracing.adoc @@ -573,6 +573,7 @@ See the main xref:opentelemetry.adoc#exporters[OpenTelemetry Guide exporters] se ** Kafka ** Pulsar * https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests) +* xref:websockets-next-reference.adoc[`websockets-next`] === Disable parts of the automatic tracing diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index 3d78f97ba32476..7935bb99743581 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -1131,6 +1131,19 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3> <2> Set the number of characters of a text message payload which will be logged. <3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`. +[[telemetry]] +== Telemetry + +When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default. +If you do not require WebSocket traces, you can disable collecting of traces like in the example below: + +[source, properties] +---- +quarkus.websockets-next.server.traces.enabled=false +quarkus.websockets-next.client.traces.enabled=false +---- + +NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported. [[websocket-next-configuration-reference]] == Configuration reference diff --git a/extensions/websockets-next/deployment/pom.xml b/extensions/websockets-next/deployment/pom.xml index 7681fcf852e7bd..c6b47704d0f510 100644 --- a/extensions/websockets-next/deployment/pom.xml +++ b/extensions/websockets-next/deployment/pom.xml @@ -80,6 +80,17 @@ mutiny-kotlin test + + + io.opentelemetry + opentelemetry-sdk-testing + test + + + io.opentelemetry.semconv + opentelemetry-semconv + test + diff --git a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java index bb2f699544ce44..aced79ae39472b 100644 --- a/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java +++ b/extensions/websockets-next/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketProcessor.java @@ -20,7 +20,9 @@ import java.util.stream.Collectors; import jakarta.enterprise.context.SessionScoped; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.invoke.Invoker; +import jakarta.inject.Singleton; import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.AnnotationTransformation; @@ -31,6 +33,7 @@ import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; import org.jboss.jandex.MethodInfo; +import org.jboss.jandex.ParameterizedType; import org.jboss.jandex.PrimitiveType; import org.jboss.jandex.Type; import org.jboss.jandex.Type.Kind; @@ -123,6 +126,10 @@ import io.quarkus.websockets.next.runtime.WebSocketSessionContext; import io.quarkus.websockets.next.runtime.kotlin.ApplicationCoroutineScope; import io.quarkus.websockets.next.runtime.kotlin.CoroutineInvoker; +import io.quarkus.websockets.next.runtime.telemetry.TracesBuilderCustomizer; +import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProvider; +import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProviderBuilder; +import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryRecorder; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniCreate; @@ -459,7 +466,8 @@ public void registerRoutes(WebSocketServerRecorder recorder, List additionalBeanProducer) { + if (isTracesSupportEnabled(capabilities)) { + additionalBeanProducer.produce(AdditionalBeanBuildItem.unremovableOf(TracesBuilderCustomizer.class)); + } + } + + @BuildStep + @Record(RUNTIME_INIT) + void createTelemetryProvider(BuildProducer syntheticBeanProducer, + WebSocketTelemetryRecorder recorder, Capabilities capabilities) { + if (isTracesSupportEnabled(capabilities)) { + var syntheticBeanBuildItem = SyntheticBeanBuildItem + .configure(WebSocketTelemetryProvider.class) + .setRuntimeInit() // consumes runtime config: traces / metrics enabled + .unremovable() + // inject point type: List> + .addInjectionPoint( + ParameterizedType.create( + DotName.createSimple(Instance.class), + new Type[] { ParameterizedType.create(Consumer.class, ClassType.create( + DotName.createSimple(WebSocketTelemetryProviderBuilder.class))) }, + null)) + .createWith(recorder.createTelemetryProvider()) + .scope(Singleton.class) + .done(); + syntheticBeanProducer.produce(syntheticBeanBuildItem); + } + } + + private static boolean isTracesSupportEnabled(Capabilities capabilities) { + return capabilities.isPresent(Capability.OPENTELEMETRY_TRACER); + } + private static Map collectEndpointSecurityChecks(List endpoints, ClassSecurityCheckStorageBuildItem storage, IndexView index) { return endpoints diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java new file mode 100644 index 00000000000000..dc4f4d3997c3a9 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/InMemorySpanExporterProducer.java @@ -0,0 +1,18 @@ +package io.quarkus.websockets.next.test.telemetry; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Singleton; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; + +@ApplicationScoped +public class InMemorySpanExporterProducer { + + @Produces + @Singleton + InMemorySpanExporter inMemorySpanExporter() { + return InMemorySpanExporter.create(); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java new file mode 100644 index 00000000000000..c55bde343fa924 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/OpenTelemetryWebSocketsTest.java @@ -0,0 +1,214 @@ +package io.quarkus.websockets.next.test.telemetry; + +import static io.opentelemetry.semconv.UrlAttributes.URL_PATH; +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLIENT_ATTR_KEY; +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ENDPOINT_ATTR_KEY; +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ID_ATTR_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.quarkus.builder.Version; +import io.quarkus.maven.dependency.Dependency; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketConnector; +import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient; +import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceEndpoint; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocketConnectOptions; + +public class OpenTelemetryWebSocketsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(BounceEndpoint.class, WSClient.class, InMemorySpanExporterProducer.class, BounceClient.class) + .addAsResource(new StringAsset(""" + quarkus.otel.bsp.export.timeout=1s + quarkus.otel.bsp.schedule.delay=50 + """), "application.properties")) + .setForcedDependencies( + List.of(Dependency.of("io.quarkus", "quarkus-opentelemetry-deployment", Version.getVersion()))); + + @TestHTTPResource("bounce") + URI bounceUri; + + @TestHTTPResource("/") + URI baseUri; + + @Inject + Vertx vertx; + + @Inject + InMemorySpanExporter spanExporter; + + @Inject + WebSocketConnector connector; + + @BeforeEach + public void resetSpans() { + spanExporter.reset(); + BounceEndpoint.connectionId = null; + BounceEndpoint.endpointId = null; + BounceEndpoint.MESSAGES.clear(); + BounceClient.MESSAGES.clear(); + BounceClient.CLOSED_LATCH = new CountDownLatch(1); + BounceEndpoint.CLOSED_LATCH = new CountDownLatch(1); + } + + @Test + public void testServerEndpointTracesOnly() { + assertEquals(0, spanExporter.getFinishedSpanItems().size()); + try (WSClient client = new WSClient(vertx)) { + client.connect(new WebSocketConnectOptions(), bounceUri); + var response = client.sendAndAwaitReply("How U Livin'").toString(); + assertEquals("How U Livin'", response); + } + waitForTracesToArrive(3); + var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER); + + var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan)); + assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId()); + + var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan)); + assertEquals(1, connectionClosedSpan.getLinks().size()); + assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId()); + } + + @Test + public void testClientAndServerEndpointTraces() throws InterruptedException { + var clientConn = connector.baseUri(baseUri).connectAndAwait(); + clientConn.sendTextAndAwait("Make It Bun Dem"); + + // assert client and server called + Awaitility.await().untilAsserted(() -> { + assertEquals(1, BounceEndpoint.MESSAGES.size()); + assertEquals("Make It Bun Dem", BounceEndpoint.MESSAGES.get(0)); + assertEquals(1, BounceClient.MESSAGES.size()); + assertEquals("Make It Bun Dem", BounceClient.MESSAGES.get(0)); + }); + + clientConn.closeAndAwait(); + // assert connection closed and client/server were notified + assertTrue(BounceClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(BounceEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + + waitForTracesToArrive(5); + + // server traces + var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER); + var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan)); + assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId()); + var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan)); + assertEquals(1, connectionClosedSpan.getLinks().size()); + assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId()); + + // client traces + connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.CLIENT); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan)); + assertTrue(connectionOpenedSpan.getLinks().isEmpty()); + connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.CLIENT); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan)); + assertNotNull(getConnectionIdAttrVal(connectionClosedSpan)); + assertNotNull(getClientIdAttrVal(connectionClosedSpan)); + assertEquals(1, connectionClosedSpan.getLinks().size()); + assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId()); + } + + @Test + public void testServerTracesWhenErrorOnMessage() { + assertEquals(0, spanExporter.getFinishedSpanItems().size()); + try (WSClient client = new WSClient(vertx)) { + client.connect(new WebSocketConnectOptions(), bounceUri); + var response = client.sendAndAwaitReply("It's Alright, Ma").toString(); + assertEquals("It's Alright, Ma", response); + response = client.sendAndAwaitReply("I'm Only Bleeding").toString(); + assertEquals("I'm Only Bleeding", response); + + client.sendAndAwait("throw-exception"); + Awaitility.await().atMost(Duration.ofSeconds(5)).until(client::isClosed); + assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode()); + } + waitForTracesToArrive(3); + + // server traces + var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER); + var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan)); + assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId()); + var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER); + assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan)); + assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan)); + assertEquals(1, connectionClosedSpan.getLinks().size()); + assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId()); + } + + private String getConnectionIdAttrVal(SpanData connectionOpenedSpan) { + return connectionOpenedSpan + .getAttributes() + .get(AttributeKey.stringKey(CONNECTION_ID_ATTR_KEY)); + } + + private String getClientIdAttrVal(SpanData connectionOpenedSpan) { + return connectionOpenedSpan + .getAttributes() + .get(AttributeKey.stringKey(CONNECTION_CLIENT_ATTR_KEY)); + } + + private String getUriAttrVal(SpanData connectionOpenedSpan) { + return connectionOpenedSpan.getAttributes().get(URL_PATH); + } + + private String getEndpointIdAttrVal(SpanData connectionOpenedSpan) { + return connectionOpenedSpan + .getAttributes() + .get(AttributeKey.stringKey(CONNECTION_ENDPOINT_ATTR_KEY)); + } + + private void waitForTracesToArrive(int expectedTracesCount) { + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertEquals(expectedTracesCount, spanExporter.getFinishedSpanItems().size())); + } + + private SpanData getSpanByName(String name, SpanKind kind) { + return spanExporter.getFinishedSpanItems() + .stream() + .filter(sd -> name.equals(sd.getName())) + .filter(sd -> sd.getKind() == kind) + .findFirst() + .orElseThrow(() -> new AssertionError( + "Expected span name '" + name + "' and kind '" + kind + "' not found: " + + spanExporter.getFinishedSpanItems())); + } +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/TracesDisabledWebSocketsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/TracesDisabledWebSocketsTest.java new file mode 100644 index 00000000000000..f5fce20db188c0 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/TracesDisabledWebSocketsTest.java @@ -0,0 +1,138 @@ +package io.quarkus.websockets.next.test.telemetry; + +import static io.opentelemetry.semconv.UrlAttributes.URL_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.quarkus.builder.Version; +import io.quarkus.maven.dependency.Dependency; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketConnector; +import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient; +import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceEndpoint; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocketConnectOptions; + +public class TracesDisabledWebSocketsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(BounceEndpoint.class, WSClient.class, InMemorySpanExporterProducer.class, BounceClient.class) + .addAsResource(new StringAsset(""" + quarkus.otel.bsp.export.timeout=1s + quarkus.otel.bsp.schedule.delay=50 + quarkus.websockets-next.server.traces.enabled=false + quarkus.websockets-next.client.traces.enabled=false + """), "application.properties")) + .setForcedDependencies( + List.of(Dependency.of("io.quarkus", "quarkus-opentelemetry-deployment", Version.getVersion()))); + + @TestHTTPResource("bounce") + URI bounceUri; + + @TestHTTPResource + URI baseUri; + + @Inject + Vertx vertx; + + @Inject + InMemorySpanExporter spanExporter; + + @Inject + WebSocketConnector connector; + + @BeforeEach + public void resetSpans() { + spanExporter.reset(); + BounceEndpoint.connectionId = null; + BounceEndpoint.endpointId = null; + BounceEndpoint.MESSAGES.clear(); + BounceClient.MESSAGES.clear(); + BounceClient.CLOSED_LATCH = new CountDownLatch(1); + BounceEndpoint.CLOSED_LATCH = new CountDownLatch(1); + } + + @Test + public void testServerEndpointTracesDisabled() { + assertEquals(0, spanExporter.getFinishedSpanItems().size()); + try (WSClient client = new WSClient(vertx)) { + client.connect(new WebSocketConnectOptions(), bounceUri); + var response = client.sendAndAwaitReply("How U Livin'").toString(); + assertEquals("How U Livin'", response); + } + waitForInitialRequestTrace(); + + // check HTTP server traces still enabled + var initialRequestSpan = getInitialRequestSpan(); + assertEquals(bounceUri.getPath(), initialRequestSpan.getAttributes().get(URL_PATH)); + + // check WebSocket server endpoint traces are disabled + assertEquals(1, spanExporter.getFinishedSpanItems().size()); + } + + @Test + public void testClientAndServerEndpointTracesDisabled() throws InterruptedException { + var clientConn = connector.baseUri(baseUri).connectAndAwait(); + clientConn.sendTextAndAwait("Make It Bun Dem"); + + // assert client and server called + Awaitility.await().untilAsserted(() -> { + assertEquals(1, BounceEndpoint.MESSAGES.size()); + assertEquals("Make It Bun Dem", BounceEndpoint.MESSAGES.get(0)); + assertEquals(1, BounceClient.MESSAGES.size()); + assertEquals("Make It Bun Dem", BounceClient.MESSAGES.get(0)); + }); + + clientConn.closeAndAwait(); + // assert connection closed and client/server were notified + assertTrue(BounceClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(BounceEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + + waitForInitialRequestTrace(); + + // check HTTP server traces still enabled + var initialRequestSpan = getInitialRequestSpan(); + assertEquals("", initialRequestSpan.getAttributes().get(URL_PATH)); + + // check both client and server WebSocket endpoint traces are disabled + assertEquals(1, spanExporter.getFinishedSpanItems().size()); + } + + private void waitForInitialRequestTrace() { + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertEquals(1, spanExporter.getFinishedSpanItems().size())); + } + + private SpanData getInitialRequestSpan() { + return spanExporter.getFinishedSpanItems() + .stream() + .filter(sd -> "GET /bounce".equals(sd.getName())) + .filter(sd -> sd.getKind() == SpanKind.SERVER) + .findFirst() + .orElseThrow(() -> new AssertionError( + "Expected span name 'GET /bounce' and kind '" + SpanKind.SERVER + "' not found: " + + spanExporter.getFinishedSpanItems())); + } +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java new file mode 100644 index 00000000000000..aba3c46ab0d794 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceClient.java @@ -0,0 +1,27 @@ +package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocketClient; + +@WebSocketClient(path = "/bounce", clientId = "bounce-client-id") +public class BounceClient { + + public static List MESSAGES = new CopyOnWriteArrayList<>(); + public static CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnTextMessage + void echo(String message) { + MESSAGES.add(message); + } + + @OnClose + void onClose() { + CLOSED_LATCH.countDown(); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java new file mode 100644 index 00000000000000..6f5527583dc1a1 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/telemetry/endpoints/ontextmessage/BounceEndpoint.java @@ -0,0 +1,49 @@ +package io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; + +@WebSocket(path = "/bounce", endpointId = "bounce-server-endpoint-id") +public class BounceEndpoint { + + public static final List MESSAGES = new CopyOnWriteArrayList<>(); + public static CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + public static volatile String connectionId = null; + public static volatile String endpointId = null; + + @ConfigProperty(name = "bounce-endpoint.prefix-responses", defaultValue = "false") + boolean prefixResponses; + + @OnTextMessage + public String onMessage(String message) { + if (prefixResponses) { + message = "echo 0: " + message; + } + MESSAGES.add(message); + if (message.equals("throw-exception")) { + throw new RuntimeException("Failing 'onMessage' to test behavior when an exception was thrown"); + } + return message; + } + + @OnOpen + void open(WebSocketConnection connection) { + connectionId = connection.id(); + endpointId = connection.endpointId(); + } + + @OnClose + void onClose() { + CLOSED_LATCH.countDown(); + } + +} diff --git a/extensions/websockets-next/runtime/pom.xml b/extensions/websockets-next/runtime/pom.xml index 4f0487b5905997..b3199acdd0bd38 100644 --- a/extensions/websockets-next/runtime/pom.xml +++ b/extensions/websockets-next/runtime/pom.xml @@ -43,6 +43,17 @@ io.quarkus.security quarkus-security + + + io.opentelemetry + opentelemetry-api + true + + + io.opentelemetry.semconv + opentelemetry-semconv + true + org.junit.jupiter diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TelemetryConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TelemetryConfig.java new file mode 100644 index 00000000000000..4bbdd21d268469 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TelemetryConfig.java @@ -0,0 +1,19 @@ +package io.quarkus.websockets.next; + +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; + +/** + * Configures telemetry in the WebSockets extension. + */ +public interface TelemetryConfig { + + /** + * If collection of WebSocket traces is enabled. + * Only applicable when the OpenTelemetry extension is present. + */ + @WithName("traces.enabled") + @WithDefault("true") + boolean tracesEnabled(); + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java index 30ad1b84f474a4..4db5076d36cb6e 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java @@ -8,6 +8,7 @@ import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; +import io.smallrye.config.WithParentName; @ConfigMapping(prefix = "quarkus.websockets-next.client") @ConfigRoot(phase = ConfigPhase.RUN_TIME) @@ -66,4 +67,10 @@ public interface WebSocketsClientRuntimeConfig { */ TrafficLoggingConfig trafficLogging(); + /** + * Telemetry configuration. + */ + @WithParentName + TelemetryConfig telemetry(); + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java index a5df5c23dd1046..cfd403b4724681 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java @@ -9,6 +9,7 @@ import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; +import io.smallrye.config.WithParentName; @ConfigMapping(prefix = "quarkus.websockets-next.server") @ConfigRoot(phase = ConfigPhase.RUN_TIME) @@ -69,6 +70,12 @@ public interface WebSocketsServerRuntimeConfig { */ TrafficLoggingConfig trafficLogging(); + /** + * Telemetry configuration. + */ + @WithParentName + TelemetryConfig telemetry(); + interface Security { /** diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java index 7ccc97539e7f28..6ff0bf6b1116f5 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java @@ -1,5 +1,6 @@ package io.quarkus.websockets.next.runtime; +import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.Optional; import java.util.function.Consumer; @@ -19,6 +20,7 @@ import io.quarkus.websockets.next.UnhandledFailureStrategy; import io.quarkus.websockets.next.WebSocketException; import io.quarkus.websockets.next.runtime.WebSocketSessionContext.SessionContextState; +import io.quarkus.websockets.next.runtime.telemetry.TelemetrySupport; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; import io.vertx.core.Context; @@ -34,7 +36,7 @@ class Endpoints { static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSocketConnectionBase connection, WebSocketBase ws, String generatedEndpointClass, Optional autoPingInterval, SecuritySupport securitySupport, UnhandledFailureStrategy unhandledFailureStrategy, TrafficLogger trafficLogger, - Runnable onClose, boolean activateRequestContext) { + Runnable onClose, boolean activateRequestContext, TelemetrySupport telemetrySupport) { Context context = vertx.getOrCreateContext(); @@ -48,7 +50,7 @@ static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSo // Create an endpoint that delegates callbacks to the endpoint bean WebSocketEndpoint endpoint = createEndpoint(generatedEndpointClass, context, connection, codecs, contextSupport, - securitySupport); + securitySupport, telemetrySupport); // A broadcast processor is only needed if Multi is consumed by the callback BroadcastProcessor textBroadcastProcessor = endpoint.consumedTextMultiType() != null @@ -374,7 +376,8 @@ public void handle(Void event) { } private static WebSocketEndpoint createEndpoint(String endpointClassName, Context context, - WebSocketConnectionBase connection, Codecs codecs, ContextSupport contextSupport, SecuritySupport securitySupport) { + WebSocketConnectionBase connection, Codecs codecs, ContextSupport contextSupport, SecuritySupport securitySupport, + TelemetrySupport telemetrySupport) { try { ClassLoader cl = Thread.currentThread().getContextClassLoader(); if (cl == null) { @@ -383,16 +386,28 @@ private static WebSocketEndpoint createEndpoint(String endpointClassName, Contex @SuppressWarnings("unchecked") Class endpointClazz = (Class) cl .loadClass(endpointClassName); - WebSocketEndpoint endpoint = (WebSocketEndpoint) endpointClazz - .getDeclaredConstructor(WebSocketConnectionBase.class, Codecs.class, ContextSupport.class, - SecuritySupport.class) - .newInstance(connection, codecs, contextSupport, securitySupport); - return endpoint; + + if (telemetrySupport != null) { + WebSocketEndpoint endpoint = createWebSocketEndpoint(connection, codecs, contextSupport, securitySupport, + endpointClazz); + return telemetrySupport.decorate(endpoint, connection); + } + + return createWebSocketEndpoint(connection, codecs, contextSupport, securitySupport, endpointClazz); } catch (Exception e) { throw new WebSocketException("Unable to create endpoint instance: " + endpointClassName, e); } } + private static WebSocketEndpoint createWebSocketEndpoint(WebSocketConnectionBase connection, Codecs codecs, + ContextSupport contextSupport, SecuritySupport securitySupport, Class endpointClazz) + throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + return (WebSocketEndpoint) endpointClazz + .getDeclaredConstructor(WebSocketConnectionBase.class, Codecs.class, ContextSupport.class, + SecuritySupport.class) + .newInstance(connection, codecs, contextSupport, securitySupport); + } + private static WebSocketSessionContext sessionContext(ArcContainer container) { for (InjectableContext injectableContext : container.getContexts(SessionScoped.class)) { if (WebSocketSessionContext.class.equals(injectableContext.getClass())) { diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java index 5d9fdb6c0f456c..427c9f8abe0877 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicReference; import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Typed; import jakarta.enterprise.inject.spi.InjectionPoint; @@ -23,6 +24,7 @@ import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig; import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpoint; import io.quarkus.websockets.next.runtime.WebSocketClientRecorder.ClientEndpointsContext; +import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProvider; import io.smallrye.mutiny.Uni; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -42,11 +44,14 @@ public class WebSocketConnectorImpl extends WebSocketConnectorBase telemetryProvider) { super(vertx, codecs, connectionManager, config, tlsConfigurationRegistry); this.clientEndpoint = Objects.requireNonNull(endpointsContext.endpoint(getEndpointClass(injectionPoint))); + this.telemetryProvider = telemetryProvider.isResolvable() ? telemetryProvider.get() : null; setPath(clientEndpoint.path); } @@ -94,6 +99,8 @@ public Uni connect() { } subprotocols.forEach(connectOptions::addSubProtocol); + var telemetrySupport = telemetryProvider == null ? null + : telemetryProvider.createClientTelemetrySupport(clientEndpoint.path); Uni websocket = Uni.createFrom(). emitter(e -> { // Create a new event loop context for each client, otherwise the current context is used // We want to avoid a situation where if multiple clients/connections are created in a row, @@ -105,12 +112,18 @@ public Uni connect() { public void handle(Void event) { WebSocketClient c = vertx.createWebSocketClient(populateClientOptions()); client.setPlain(c); + if (telemetrySupport != null && telemetrySupport.interceptConnection()) { + telemetrySupport.connectionOpened(); + } c.connect(connectOptions, new Handler>() { @Override public void handle(AsyncResult r) { if (r.succeeded()) { e.complete(r.result()); } else { + if (telemetrySupport != null && telemetrySupport.interceptConnection()) { + telemetrySupport.connectionOpeningFailed(r.cause()); + } e.fail(r.cause()); } } @@ -135,7 +148,7 @@ public void handle(AsyncResult r) { () -> { connectionManager.remove(clientEndpoint.generatedEndpointClass, connection); client.get().close(); - }, true); + }, true, telemetrySupport); return connection; }); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index 077dca8885fee8..f738e265fae1ad 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -23,6 +23,7 @@ import io.quarkus.websockets.next.HttpUpgradeCheck.HttpUpgradeContext; import io.quarkus.websockets.next.WebSocketServerException; import io.quarkus.websockets.next.WebSocketsServerRuntimeConfig; +import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProvider; import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Uni; import io.vertx.core.Context; @@ -61,12 +62,13 @@ public Object get() { } public Handler createEndpointHandler(String generatedEndpointClass, String endpointId, - boolean activateRequestContext) { + boolean activateRequestContext, String endpointPath) { ArcContainer container = Arc.container(); ConnectionManager connectionManager = container.instance(ConnectionManager.class).get(); Codecs codecs = container.instance(Codecs.class).get(); HttpUpgradeCheck[] httpUpgradeChecks = getHttpUpgradeChecks(endpointId, container); TrafficLogger trafficLogger = TrafficLogger.forServer(config); + WebSocketTelemetryProvider telemetryProvider = container.instance(WebSocketTelemetryProvider.class).orElse(null); return new Handler() { @Override @@ -93,7 +95,21 @@ public void handle(RoutingContext ctx) { } private void httpUpgrade(RoutingContext ctx) { - Future future = ctx.request().toWebSocket(); + var telemetrySupport = telemetryProvider == null ? null + : telemetryProvider.createServerTelemetrySupport(endpointPath); + final Future future; + if (telemetrySupport != null && telemetrySupport.interceptConnection()) { + telemetrySupport.connectionOpened(); + future = ctx.request().toWebSocket().onFailure(new Handler() { + @Override + public void handle(Throwable throwable) { + telemetrySupport.connectionOpeningFailed(throwable); + } + }); + } else { + future = ctx.request().toWebSocket(); + } + future.onSuccess(ws -> { Vertx vertx = VertxCoreRecorder.getVertx().get(); @@ -108,7 +124,8 @@ private void httpUpgrade(RoutingContext ctx) { Endpoints.initialize(vertx, container, codecs, connection, ws, generatedEndpointClass, config.autoPingInterval(), securitySupport, config.unhandledFailureStrategy(), trafficLogger, - () -> connectionManager.remove(generatedEndpointClass, connection), activateRequestContext); + () -> connectionManager.remove(generatedEndpointClass, connection), activateRequestContext, + telemetrySupport); }); } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ConnectionInterceptor.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ConnectionInterceptor.java new file mode 100644 index 00000000000000..de51e14cf9a046 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ConnectionInterceptor.java @@ -0,0 +1,53 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public sealed interface ConnectionInterceptor permits TracesConnectionInterceptor, + ConnectionInterceptor.CompositeConnectionInterceptor { + + void connectionOpened(); + + void connectionOpeningFailed(Throwable cause); + + /** + * Way to pass a context between {@link ConnectionInterceptor} and telemetry endpoint decorators. + * + * @return unmodifiable map passed to decorators as {@link TelemetryWebSocketEndpointContext#contextData()} + */ + Map getContextData(); + + final class CompositeConnectionInterceptor implements ConnectionInterceptor { + + private final List leaves; + + CompositeConnectionInterceptor(List leaves) { + this.leaves = List.copyOf(leaves); + } + + @Override + public void connectionOpened() { + for (var leaf : leaves) { + leaf.connectionOpened(); + } + } + + @Override + public void connectionOpeningFailed(Throwable cause) { + for (var leaf : leaves) { + leaf.connectionOpeningFailed(cause); + } + } + + @Override + public Map getContextData() { + Map map = new HashMap<>(); + for (var leaf : leaves) { + map.putAll(leaf.getContextData()); + } + return Collections.unmodifiableMap(map); + } + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ForwardingWebSocketEndpoint.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ForwardingWebSocketEndpoint.java new file mode 100644 index 00000000000000..cf9f971c930400 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/ForwardingWebSocketEndpoint.java @@ -0,0 +1,107 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.lang.reflect.Type; + +import io.quarkus.websockets.next.InboundProcessingMode; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; + +/** + * {@link WebSocketEndpoint} wrapper that delegates all methods to {@link #delegate}. + * This way, subclasses can only override methods they need to intercept. + */ +abstract class ForwardingWebSocketEndpoint implements WebSocketEndpoint { + + protected final WebSocketEndpoint delegate; + + protected ForwardingWebSocketEndpoint(WebSocketEndpoint delegate) { + this.delegate = delegate; + } + + @Override + public InboundProcessingMode inboundProcessingMode() { + return delegate.inboundProcessingMode(); + } + + @Override + public Future onOpen() { + return delegate.onOpen(); + } + + @Override + public ExecutionModel onOpenExecutionModel() { + return delegate.onOpenExecutionModel(); + } + + @Override + public Future onTextMessage(Object message) { + return delegate.onTextMessage(message); + } + + @Override + public ExecutionModel onTextMessageExecutionModel() { + return delegate.onTextMessageExecutionModel(); + } + + @Override + public Type consumedTextMultiType() { + return delegate.consumedTextMultiType(); + } + + @Override + public Object decodeTextMultiItem(Object message) { + return delegate.decodeTextMultiItem(message); + } + + @Override + public Future onBinaryMessage(Object message) { + return delegate.onBinaryMessage(message); + } + + @Override + public ExecutionModel onBinaryMessageExecutionModel() { + return delegate.onBinaryMessageExecutionModel(); + } + + @Override + public Type consumedBinaryMultiType() { + return delegate.consumedBinaryMultiType(); + } + + @Override + public Object decodeBinaryMultiItem(Object message) { + return delegate.decodeBinaryMultiItem(message); + } + + @Override + public Future onPongMessage(Buffer message) { + return delegate.onPongMessage(message); + } + + @Override + public ExecutionModel onPongMessageExecutionModel() { + return delegate.onPongMessageExecutionModel(); + } + + @Override + public Future onClose() { + return delegate.onClose(); + } + + @Override + public ExecutionModel onCloseExecutionModel() { + return delegate.onCloseExecutionModel(); + } + + @Override + public Uni doOnError(Throwable t) { + return delegate.doOnError(t); + } + + @Override + public String beanIdentifier() { + return delegate.beanIdentifier(); + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryConstants.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryConstants.java new file mode 100644 index 00000000000000..b94665ed843890 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryConstants.java @@ -0,0 +1,16 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +public final class TelemetryConstants { + + private TelemetryConstants() { + // class with constants + } + + /** + * OpenTelemetry attributes added to spans created for opened and closed connections. + */ + public static final String CONNECTION_ID_ATTR_KEY = "connection.id"; + public static final String CONNECTION_ENDPOINT_ATTR_KEY = "connection.endpoint.id"; + public static final String CONNECTION_CLIENT_ATTR_KEY = "connection.client.id"; + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetrySupport.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetrySupport.java new file mode 100644 index 00000000000000..c4e3de0161116d --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetrySupport.java @@ -0,0 +1,44 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.Map; + +import io.quarkus.websockets.next.runtime.WebSocketConnectionBase; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; + +/** + * Integrates traces into WebSockets with {@link WebSocketEndpoint} decorator. + */ +public abstract class TelemetrySupport { + + private final ConnectionInterceptor connectionInterceptor; + + TelemetrySupport(ConnectionInterceptor connectionInterceptor) { + this.connectionInterceptor = connectionInterceptor; + } + + public abstract WebSocketEndpoint decorate(WebSocketEndpoint endpoint, WebSocketConnectionBase connection); + + public boolean interceptConnection() { + return connectionInterceptor != null; + } + + /** + * Collects telemetry when WebSocket connection is opened. + * Only supported when {@link #interceptConnection()}. + */ + public void connectionOpened() { + connectionInterceptor.connectionOpened(); + } + + /** + * Collects telemetry when WebSocket connection opening failed. + * Only supported when {@link #interceptConnection()}. + */ + public void connectionOpeningFailed(Throwable throwable) { + connectionInterceptor.connectionOpeningFailed(throwable); + } + + protected Map getContextData() { + return connectionInterceptor == null ? Map.of() : connectionInterceptor.getContextData(); + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryWebSocketEndpointContext.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryWebSocketEndpointContext.java new file mode 100644 index 00000000000000..cd472e7d55cfb9 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TelemetryWebSocketEndpointContext.java @@ -0,0 +1,13 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.Map; + +import io.quarkus.websockets.next.runtime.WebSocketConnectionBase; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; + +/** + * Data carrier used to instantiate {@link TelemetrySupport}. + */ +record TelemetryWebSocketEndpointContext(WebSocketEndpoint endpoint, WebSocketConnectionBase connection, String path, + Map contextData) { +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesBuilderCustomizer.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesBuilderCustomizer.java new file mode 100644 index 00000000000000..f6780c9efb2506 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesBuilderCustomizer.java @@ -0,0 +1,84 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import static io.quarkus.websockets.next.runtime.telemetry.TracesConnectionInterceptor.CONNECTION_OPENED_SPAN_CTX; + +import java.util.function.Consumer; +import java.util.function.Function; + +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig; +import io.quarkus.websockets.next.WebSocketsServerRuntimeConfig; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; + +/** + * Installs traces support into the WebSockets extension. + */ +public final class TracesBuilderCustomizer implements Consumer { + + @Inject + WebSocketsServerRuntimeConfig serverRuntimeConfig; + + @Inject + WebSocketsClientRuntimeConfig clientRuntimeConfig; + + @Inject + Instance tracerInstance; + + @Override + public void accept(WebSocketTelemetryProviderBuilder builder) { + var serverTracesEnabled = serverRuntimeConfig.telemetry().tracesEnabled(); + var clientTracesEnabled = clientRuntimeConfig.telemetry().tracesEnabled(); + if (serverTracesEnabled || clientTracesEnabled) { + final Tracer tracer = tracerInstance.get(); + if (serverTracesEnabled) { + addServerTracesSupport(builder, tracer); + } + if (clientTracesEnabled) { + addClientTracesSupport(builder, tracer); + } + } + } + + private static void addServerTracesSupport(WebSocketTelemetryProviderBuilder builder, Tracer tracer) { + builder.serverEndpointDecorator(new Function<>() { + @Override + public WebSocketEndpoint apply(TelemetryWebSocketEndpointContext ctx) { + var onOpenSpanCtx = (SpanContext) ctx.contextData().get(CONNECTION_OPENED_SPAN_CTX); + return new TracesForwardingWebSocketEndpoint(ctx.endpoint(), tracer, (WebSocketConnection) ctx.connection(), + onOpenSpanCtx, ctx.path()); + } + }); + builder.pathToServerConnectionInterceptor(new Function<>() { + @Override + public ConnectionInterceptor apply(String path) { + return new TracesConnectionInterceptor(tracer, SpanKind.SERVER, path); + } + }); + } + + private static void addClientTracesSupport(WebSocketTelemetryProviderBuilder builder, Tracer tracer) { + builder.clientEndpointDecorator(new Function<>() { + @Override + public WebSocketEndpoint apply(TelemetryWebSocketEndpointContext ctx) { + var onOpenSpanCtx = (SpanContext) ctx.contextData().get(CONNECTION_OPENED_SPAN_CTX); + return new TracesForwardingWebSocketEndpoint(ctx.endpoint(), tracer, + (WebSocketClientConnection) ctx.connection(), + onOpenSpanCtx, ctx.path()); + } + }); + builder.pathToClientConnectionInterceptor(new Function<>() { + @Override + public ConnectionInterceptor apply(String path) { + return new TracesConnectionInterceptor(tracer, SpanKind.CLIENT, path); + } + }); + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesConnectionInterceptor.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesConnectionInterceptor.java new file mode 100644 index 00000000000000..04e72f8977e7e3 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesConnectionInterceptor.java @@ -0,0 +1,70 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.HashMap; +import java.util.Map; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.semconv.UrlAttributes; + +final class TracesConnectionInterceptor implements ConnectionInterceptor { + + static final String CONNECTION_OPENED_SPAN_CTX = "io.quarkus.websockets.next.connection-opened-span-ctx"; + + private final Tracer tracer; + private final String path; + private final Map contextData; + private final SpanKind spanKind; + + TracesConnectionInterceptor(Tracer tracer, SpanKind spanKind, String path) { + this.tracer = tracer; + this.path = path; + this.contextData = new HashMap<>(); + this.spanKind = spanKind; + } + + @Override + public void connectionOpened() { + var span = tracer + .spanBuilder("OPEN " + path) + .setSpanKind(spanKind) + .addLink(previousSpanContext()) + .setAttribute(UrlAttributes.URL_PATH, path) + .startSpan(); + try (var ignored = span.makeCurrent()) { + contextData.put(CONNECTION_OPENED_SPAN_CTX, span.getSpanContext()); + } finally { + span.end(); + } + } + + @Override + public void connectionOpeningFailed(Throwable cause) { + var span = tracer + .spanBuilder("OPEN " + path) + .setSpanKind(spanKind) + .addLink((SpanContext) contextData.get(CONNECTION_OPENED_SPAN_CTX)) + .setAttribute(UrlAttributes.URL_PATH, path) + .startSpan(); + try (var ignored = span.makeCurrent()) { + span.recordException(cause); + } finally { + span.end(); + } + } + + @Override + public Map getContextData() { + return contextData; + } + + private static SpanContext previousSpanContext() { + var span = Span.current(); + if (span.getSpanContext().isValid()) { + return span.getSpanContext(); + } + return null; + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesForwardingWebSocketEndpoint.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesForwardingWebSocketEndpoint.java new file mode 100644 index 00000000000000..64b24956b9a6d4 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/TracesForwardingWebSocketEndpoint.java @@ -0,0 +1,98 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLIENT_ATTR_KEY; +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ENDPOINT_ATTR_KEY; +import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ID_ATTR_KEY; + +import java.util.function.Function; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.semconv.UrlAttributes; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; +import io.vertx.core.Future; +import io.vertx.core.Handler; + +/** + * {@link WebSocketEndpoint} wrapper that produces OpenTelemetry spans for closed connection. + */ +final class TracesForwardingWebSocketEndpoint extends ForwardingWebSocketEndpoint { + + /** + * Target ID represents either endpoint id or client id. + */ + private final String targetIdKey; + private final String targetIdValue; + /** + * Span context for an HTTP request used to establish the WebSocket connection. + */ + private final Tracer tracer; + private final String connectionId; + private final String path; + private final SpanContext onOpenSpanContext; + private final SpanKind spanKind; + + TracesForwardingWebSocketEndpoint(WebSocketEndpoint delegate, Tracer tracer, WebSocketConnection connection, + SpanContext onOpenSpanContext, String path) { + super(delegate); + this.tracer = tracer; + this.onOpenSpanContext = onOpenSpanContext; + this.connectionId = connection.id(); + this.targetIdKey = CONNECTION_ENDPOINT_ATTR_KEY; + this.targetIdValue = connection.endpointId(); + this.path = path; + this.spanKind = SpanKind.SERVER; + } + + TracesForwardingWebSocketEndpoint(WebSocketEndpoint delegate, Tracer tracer, WebSocketClientConnection connection, + SpanContext onOpenSpanContext, String path) { + super(delegate); + this.tracer = tracer; + this.onOpenSpanContext = onOpenSpanContext; + this.connectionId = connection.id(); + this.targetIdKey = CONNECTION_CLIENT_ATTR_KEY; + this.targetIdValue = connection.clientId(); + this.path = path; + this.spanKind = SpanKind.CLIENT; + } + + @Override + public Future onClose() { + return delegate.onClose().map(new Function() { + @Override + public Void apply(Void unused) { + var span = tracer.spanBuilder("CLOSE " + path) + .setSpanKind(spanKind) + .addLink(onOpenSpanContext) + .setAttribute(CONNECTION_ID_ATTR_KEY, connectionId) + .setAttribute(UrlAttributes.URL_PATH, path) + .setAttribute(targetIdKey, targetIdValue) + .startSpan(); + try { + span.makeCurrent().close(); + } finally { + span.end(); + } + return null; + } + }).onFailure(new Handler() { + @Override + public void handle(Throwable throwable) { + var span = tracer.spanBuilder("CLOSE " + path) + .setSpanKind(spanKind) + .addLink(onOpenSpanContext) + .setAttribute(CONNECTION_ID_ATTR_KEY, connectionId) + .setAttribute(UrlAttributes.URL_PATH, path) + .setAttribute(targetIdKey, targetIdValue) + .startSpan(); + try (var ignored = span.makeCurrent()) { + span.recordException(throwable); + } + span.end(); + } + }); + } +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProvider.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProvider.java new file mode 100644 index 00000000000000..e324487ecde7ba --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProvider.java @@ -0,0 +1,75 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.function.Function; + +import io.quarkus.websockets.next.runtime.WebSocketConnectionBase; +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; + +public final class WebSocketTelemetryProvider { + + private final Function serverEndpointDecorator; + private final Function clientEndpointDecorator; + private final Function pathToClientConnectionInterceptor; + private final Function pathToServerConnectionInterceptor; + private final boolean clientTelemetryEnabled; + private final boolean serverTelemetryEnabled; + + WebSocketTelemetryProvider(Function serverEndpointDecorator, + Function clientEndpointDecorator, + Function pathToClientConnectionInterceptor, + Function pathToServerConnectionInterceptor) { + this.serverTelemetryEnabled = serverEndpointDecorator != null || pathToServerConnectionInterceptor != null; + this.serverEndpointDecorator = serverEndpointDecorator; + this.pathToServerConnectionInterceptor = pathToServerConnectionInterceptor; + this.clientTelemetryEnabled = clientEndpointDecorator != null || pathToClientConnectionInterceptor != null; + this.clientEndpointDecorator = clientEndpointDecorator; + this.pathToClientConnectionInterceptor = pathToClientConnectionInterceptor; + } + + /** + * This method may only be called on the Vert.x context of the initial HTTP request as it collects context data. + * + * @param path endpoint path with path param placeholders + * @return TelemetryDecorator + */ + public TelemetrySupport createServerTelemetrySupport(String path) { + if (serverTelemetryEnabled) { + return new TelemetrySupport(getServerConnectionInterceptor(path)) { + @Override + public WebSocketEndpoint decorate(WebSocketEndpoint endpoint, WebSocketConnectionBase connection) { + if (serverEndpointDecorator == null) { + return endpoint; + } + return serverEndpointDecorator + .apply(new TelemetryWebSocketEndpointContext(endpoint, connection, path, getContextData())); + } + }; + } + return null; + } + + public TelemetrySupport createClientTelemetrySupport(String path) { + if (clientTelemetryEnabled) { + return new TelemetrySupport(getClientConnectionInterceptor(path)) { + @Override + public WebSocketEndpoint decorate(WebSocketEndpoint endpoint, WebSocketConnectionBase connection) { + if (clientEndpointDecorator == null) { + return endpoint; + } + return clientEndpointDecorator + .apply(new TelemetryWebSocketEndpointContext(endpoint, connection, path, getContextData())); + } + }; + } + return null; + } + + private ConnectionInterceptor getServerConnectionInterceptor(String path) { + return pathToServerConnectionInterceptor == null ? null : pathToServerConnectionInterceptor.apply(path); + } + + private ConnectionInterceptor getClientConnectionInterceptor(String path) { + return pathToClientConnectionInterceptor == null ? null : pathToClientConnectionInterceptor.apply(path); + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProviderBuilder.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProviderBuilder.java new file mode 100644 index 00000000000000..74c1cf4769b912 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryProviderBuilder.java @@ -0,0 +1,98 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import io.quarkus.websockets.next.runtime.WebSocketEndpoint; +import io.quarkus.websockets.next.runtime.telemetry.ConnectionInterceptor.CompositeConnectionInterceptor; + +/** + * Quarkus uses this class internally to build {@link WebSocketTelemetryProvider}. + */ +public final class WebSocketTelemetryProviderBuilder { + + private Function pathToClientConnectionInterceptor; + private Function pathToServerConnectionInterceptor; + private Function serverEndpointDecorator; + private Function clientEndpointDecorator; + + WebSocketTelemetryProviderBuilder() { + serverEndpointDecorator = null; + clientEndpointDecorator = null; + } + + void clientEndpointDecorator(Function decorator) { + Objects.requireNonNull(decorator); + if (this.clientEndpointDecorator == null) { + this.clientEndpointDecorator = decorator; + } else { + this.clientEndpointDecorator = this.clientEndpointDecorator + .compose(new Function() { + @Override + public TelemetryWebSocketEndpointContext apply(TelemetryWebSocketEndpointContext ctx) { + var decorated = decorator.apply(ctx); + return new TelemetryWebSocketEndpointContext(decorated, ctx.connection(), ctx.path(), + ctx.contextData()); + } + }); + } + } + + void serverEndpointDecorator(Function decorator) { + Objects.requireNonNull(decorator); + if (this.serverEndpointDecorator == null) { + this.serverEndpointDecorator = decorator; + } else { + this.serverEndpointDecorator = this.serverEndpointDecorator + .compose(new Function() { + @Override + public TelemetryWebSocketEndpointContext apply(TelemetryWebSocketEndpointContext ctx) { + var decorated = decorator.apply(ctx); + return new TelemetryWebSocketEndpointContext(decorated, ctx.connection(), ctx.path(), + ctx.contextData()); + } + }); + } + } + + void pathToClientConnectionInterceptor(Function pathToInterceptor1) { + Objects.requireNonNull(pathToInterceptor1); + if (this.pathToClientConnectionInterceptor == null) { + this.pathToClientConnectionInterceptor = pathToInterceptor1; + } else { + var pathToInterceptor2 = this.pathToClientConnectionInterceptor; + this.pathToClientConnectionInterceptor = new Function<>() { + @Override + public ConnectionInterceptor apply(String path) { + var interceptor1 = pathToInterceptor1.apply(path); + var interceptor2 = pathToInterceptor2.apply(path); + return new CompositeConnectionInterceptor(List.of(interceptor1, interceptor2)); + } + }; + } + } + + void pathToServerConnectionInterceptor(Function pathToInterceptor1) { + Objects.requireNonNull(pathToInterceptor1); + if (this.pathToServerConnectionInterceptor == null) { + this.pathToServerConnectionInterceptor = pathToInterceptor1; + } else { + var pathToInterceptor2 = this.pathToServerConnectionInterceptor; + this.pathToServerConnectionInterceptor = new Function<>() { + @Override + public ConnectionInterceptor apply(String path) { + var interceptor1 = pathToInterceptor1.apply(path); + var interceptor2 = pathToInterceptor2.apply(path); + return new CompositeConnectionInterceptor(List.of(interceptor1, interceptor2)); + } + }; + } + } + + WebSocketTelemetryProvider build() { + return new WebSocketTelemetryProvider(serverEndpointDecorator, clientEndpointDecorator, + pathToClientConnectionInterceptor, pathToServerConnectionInterceptor); + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryRecorder.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryRecorder.java new file mode 100644 index 00000000000000..7cdbe1e92bbfb9 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/telemetry/WebSocketTelemetryRecorder.java @@ -0,0 +1,31 @@ +package io.quarkus.websockets.next.runtime.telemetry; + +import java.util.function.Consumer; +import java.util.function.Function; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +import io.quarkus.arc.SyntheticCreationalContext; +import io.quarkus.runtime.annotations.Recorder; + +@Recorder +public class WebSocketTelemetryRecorder { + + public Function, WebSocketTelemetryProvider> createTelemetryProvider() { + return new Function<>() { + @Override + public WebSocketTelemetryProvider apply(SyntheticCreationalContext ctx) { + Instance> builderCustomizers = ctx + .getInjectedReference(new TypeLiteral<>() { + }); + var builder = new WebSocketTelemetryProviderBuilder(); + for (Consumer customizer : builderCustomizers) { + customizer.accept(builder); + } + return builder.build(); + } + }; + } + +}