Skip to content

Commit

Permalink
Integrate OTel with WS Next
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed Oct 25, 2024
1 parent 449be2a commit 09fe1fd
Show file tree
Hide file tree
Showing 27 changed files with 1,305 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ See the main xref:opentelemetry.adoc#exporters[OpenTelemetry Guide exporters] se
** Kafka
** Pulsar
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]


=== Disable parts of the automatic tracing
Expand Down
13 changes: 13 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,19 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[telemetry]]
== Telemetry

When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
If you do not require WebSocket traces, you can disable collecting of traces like in the example below:

[source, properties]
----
quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
== Configuration reference
Expand Down
11 changes: 11 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.util.stream.Collectors;

import jakarta.enterprise.context.SessionScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.invoke.Invoker;
import jakarta.inject.Singleton;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTransformation;
Expand All @@ -31,6 +33,7 @@
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.PrimitiveType;
import org.jboss.jandex.Type;
import org.jboss.jandex.Type.Kind;
Expand Down Expand Up @@ -123,6 +126,10 @@
import io.quarkus.websockets.next.runtime.WebSocketSessionContext;
import io.quarkus.websockets.next.runtime.kotlin.ApplicationCoroutineScope;
import io.quarkus.websockets.next.runtime.kotlin.CoroutineInvoker;
import io.quarkus.websockets.next.runtime.telemetry.TracesBuilderCustomizer;
import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProvider;
import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryProviderBuilder;
import io.quarkus.websockets.next.runtime.telemetry.WebSocketTelemetryRecorder;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
Expand Down Expand Up @@ -459,7 +466,8 @@ public void registerRoutes(WebSocketServerRecorder recorder, List<WebSocketEndpo
.displayOnNotFoundPage("WebSocket Endpoint")
.handlerType(HandlerType.NORMAL)
.handler(recorder.createEndpointHandler(endpoint.generatedClassName, endpoint.endpointId,
activateRequestContext(config, endpoint.endpointId, endpoints, validationPhase.getBeanResolver())));
activateRequestContext(config, endpoint.endpointId, endpoints, validationPhase.getBeanResolver()),
endpoint.path));
routes.produce(builder.build());
}
}
Expand Down Expand Up @@ -636,6 +644,40 @@ void createSecurityHttpUpgradeCheck(Capabilities capabilities, BuildProducer<Syn
}
}

@BuildStep
void addTracesSupport(Capabilities capabilities, BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer) {
if (isTracesSupportEnabled(capabilities)) {
additionalBeanProducer.produce(AdditionalBeanBuildItem.unremovableOf(TracesBuilderCustomizer.class));
}
}

@BuildStep
@Record(RUNTIME_INIT)
void createTelemetryProvider(BuildProducer<SyntheticBeanBuildItem> syntheticBeanProducer,
WebSocketTelemetryRecorder recorder, Capabilities capabilities) {
if (isTracesSupportEnabled(capabilities)) {
var syntheticBeanBuildItem = SyntheticBeanBuildItem
.configure(WebSocketTelemetryProvider.class)
.setRuntimeInit() // consumes runtime config: traces / metrics enabled
.unremovable()
// inject point type: List<Consumer<WebSocketTelemetryProviderBuilder>>
.addInjectionPoint(
ParameterizedType.create(
DotName.createSimple(Instance.class),
new Type[] { ParameterizedType.create(Consumer.class, ClassType.create(
DotName.createSimple(WebSocketTelemetryProviderBuilder.class))) },
null))
.createWith(recorder.createTelemetryProvider())
.scope(Singleton.class)
.done();
syntheticBeanProducer.produce(syntheticBeanBuildItem);
}
}

private static boolean isTracesSupportEnabled(Capabilities capabilities) {
return capabilities.isPresent(Capability.OPENTELEMETRY_TRACER);
}

private static Map<String, SecurityCheck> collectEndpointSecurityChecks(List<WebSocketEndpointBuildItem> endpoints,
ClassSecurityCheckStorageBuildItem storage, IndexView index) {
return endpoints
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.websockets.next.test.telemetry;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;

import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;

@ApplicationScoped
public class InMemorySpanExporterProducer {

@Produces
@Singleton
InMemorySpanExporter inMemorySpanExporter() {
return InMemorySpanExporter.create();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.opentelemetry.semconv.UrlAttributes.URL_PATH;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_CLIENT_ATTR_KEY;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ENDPOINT_ATTR_KEY;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CONNECTION_ID_ATTR_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.awaitility.Awaitility;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.builder.Version;
import io.quarkus.maven.dependency.Dependency;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketConnector;
import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceClient;
import io.quarkus.websockets.next.test.telemetry.endpoints.ontextmessage.BounceEndpoint;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketConnectOptions;

public class OpenTelemetryWebSocketsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(BounceEndpoint.class, WSClient.class, InMemorySpanExporterProducer.class, BounceClient.class)
.addAsResource(new StringAsset("""
quarkus.otel.bsp.export.timeout=1s
quarkus.otel.bsp.schedule.delay=50
"""), "application.properties"))
.setForcedDependencies(
List.of(Dependency.of("io.quarkus", "quarkus-opentelemetry-deployment", Version.getVersion())));

@TestHTTPResource("bounce")
URI bounceUri;

@TestHTTPResource("/")
URI baseUri;

@Inject
Vertx vertx;

@Inject
InMemorySpanExporter spanExporter;

@Inject
WebSocketConnector<BounceClient> connector;

@BeforeEach
public void resetSpans() {
spanExporter.reset();
BounceEndpoint.connectionId = null;
BounceEndpoint.endpointId = null;
BounceEndpoint.MESSAGES.clear();
BounceClient.MESSAGES.clear();
BounceClient.CLOSED_LATCH = new CountDownLatch(1);
BounceEndpoint.CLOSED_LATCH = new CountDownLatch(1);
}

@Test
public void testServerEndpointTracesOnly() {
assertEquals(0, spanExporter.getFinishedSpanItems().size());
try (WSClient client = new WSClient(vertx)) {
client.connect(new WebSocketConnectOptions(), bounceUri);
var response = client.sendAndAwaitReply("How U Livin'").toString();
assertEquals("How U Livin'", response);
}
waitForTracesToArrive(3);
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);

var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());

var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

@Test
public void testClientAndServerEndpointTraces() throws InterruptedException {
var clientConn = connector.baseUri(baseUri).connectAndAwait();
clientConn.sendTextAndAwait("Make It Bun Dem");

// assert client and server called
Awaitility.await().untilAsserted(() -> {
assertEquals(1, BounceEndpoint.MESSAGES.size());
assertEquals("Make It Bun Dem", BounceEndpoint.MESSAGES.get(0));
assertEquals(1, BounceClient.MESSAGES.size());
assertEquals("Make It Bun Dem", BounceClient.MESSAGES.get(0));
});

clientConn.closeAndAwait();
// assert connection closed and client/server were notified
assertTrue(BounceClient.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(BounceEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));

waitForTracesToArrive(5);

// server traces
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);
var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());
var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());

// client traces
connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.CLIENT);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertTrue(connectionOpenedSpan.getLinks().isEmpty());
connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.CLIENT);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertNotNull(getConnectionIdAttrVal(connectionClosedSpan));
assertNotNull(getClientIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

@Test
public void testServerTracesWhenErrorOnMessage() {
assertEquals(0, spanExporter.getFinishedSpanItems().size());
try (WSClient client = new WSClient(vertx)) {
client.connect(new WebSocketConnectOptions(), bounceUri);
var response = client.sendAndAwaitReply("It's Alright, Ma").toString();
assertEquals("It's Alright, Ma", response);
response = client.sendAndAwaitReply("I'm Only Bleeding").toString();
assertEquals("I'm Only Bleeding", response);

client.sendAndAwait("throw-exception");
Awaitility.await().atMost(Duration.ofSeconds(5)).until(client::isClosed);
assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode());
}
waitForTracesToArrive(3);

// server traces
var initialRequestSpan = getSpanByName("GET /bounce", SpanKind.SERVER);
var connectionOpenedSpan = getSpanByName("OPEN " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionOpenedSpan));
assertEquals(initialRequestSpan.getSpanId(), connectionOpenedSpan.getLinks().get(0).getSpanContext().getSpanId());
var connectionClosedSpan = getSpanByName("CLOSE " + bounceUri.getPath(), SpanKind.SERVER);
assertEquals(bounceUri.getPath(), getUriAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.connectionId, getConnectionIdAttrVal(connectionClosedSpan));
assertEquals(BounceEndpoint.endpointId, getEndpointIdAttrVal(connectionClosedSpan));
assertEquals(1, connectionClosedSpan.getLinks().size());
assertEquals(connectionOpenedSpan.getSpanId(), connectionClosedSpan.getLinks().get(0).getSpanContext().getSpanId());
}

private String getConnectionIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_ID_ATTR_KEY));
}

private String getClientIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_CLIENT_ATTR_KEY));
}

private String getUriAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan.getAttributes().get(URL_PATH);
}

private String getEndpointIdAttrVal(SpanData connectionOpenedSpan) {
return connectionOpenedSpan
.getAttributes()
.get(AttributeKey.stringKey(CONNECTION_ENDPOINT_ATTR_KEY));
}

private void waitForTracesToArrive(int expectedTracesCount) {
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertEquals(expectedTracesCount, spanExporter.getFinishedSpanItems().size()));
}

private SpanData getSpanByName(String name, SpanKind kind) {
return spanExporter.getFinishedSpanItems()
.stream()
.filter(sd -> name.equals(sd.getName()))
.filter(sd -> sd.getKind() == kind)
.findFirst()
.orElseThrow(() -> new AssertionError(
"Expected span name '" + name + "' and kind '" + kind + "' not found: "
+ spanExporter.getFinishedSpanItems()));
}
}
Loading

0 comments on commit 09fe1fd

Please sign in to comment.