Skip to content

Commit

Permalink
WebSockets Next integration wiht OTel and Micrometer
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed Jul 21, 2024
1 parent c466d16 commit 7c325ce
Show file tree
Hide file tree
Showing 90 changed files with 3,283 additions and 62 deletions.
18 changes: 18 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,24 @@ quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3>
<2> Set the number of characters of a text message payload which will be logged.

Check warning on line 1016 in docs/src/main/asciidoc/websockets-next-reference.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using ', which (non restrictive clause preceded by a comma)' or 'that (restrictive clause without a comma)' rather than 'which'.", "location": {"path": "docs/src/main/asciidoc/websockets-next-reference.adoc", "range": {"start": {"line": 1016, "column": 48}}}, "severity": "INFO"}
<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`.

[[telemetry]]
== Telemetry

When the OpenTelemetry extension is present, traces for opened and closed WebSocket connections are collected by default.
If you do not require WebSocket traces, you can disable tracing like in the example below:

[source, properties]
----
quarkus.websockets-next.tracing.enabled=false
----

When the Micrometer extension is present, metrics for messages, errors and bytes transferred are collected.
If you do not require WebSocket metrics, you can disable metrics like in the example below:

[source, properties]
----
quarkus.websockets-next.metrics.enabled=false
----

[[websocket-next-configuration-reference]]
== Configuration reference
Expand Down
6 changes: 6 additions & 0 deletions extensions/websockets-next/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<artifactId>mutiny-kotlin</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for InMemorySpanExporter to verify captured traces -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.test.telemetry.Connection.sendAndAssertResponses;
import static io.quarkus.websockets.next.test.telemetry.MetricsAsserter.stringToBytes;

import java.net.URI;
import java.util.List;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.Test;

import io.quarkus.builder.Version;
import io.quarkus.maven.dependency.Dependency;
import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.WebSocketConnector;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketConnectOptions;

public abstract class AbstractWebSocketsOnMessageTest {

static QuarkusUnitTest createQuarkusUnitTest(String endpointsPackage) {
return new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addPackage(endpointsPackage)
.addClasses(WSClient.class, Connection.class, MetricsAsserter.class,
AbstractWebSocketsOnMessageTest.class)
.addAsResource(new StringAsset("""
bounce-endpoint.prefix-responses=true
"""), "application.properties"))
.setForcedDependencies(
List.of(Dependency.of("io.quarkus", "quarkus-micrometer-registry-prometheus-deployment",
Version.getVersion())));
}

protected final MetricsAsserter asserter = new MetricsAsserter();

@TestHTTPResource("bounce")
URI bounceUri;

@TestHTTPResource("/")
URI baseUri;

@TestHTTPResource("received-single-text-response-none")
URI singleTextReceived_NoResponse_Uri;

@TestHTTPResource("received-single-text-response-multi-text")
URI singleTextReceived_MultiTextResponse_Uri;

@TestHTTPResource("received-multi-text-response-none")
URI multiTextReceived_NoResponse_Uri;

@TestHTTPResource("received-multi-text-response-single-text")
URI multiTextReceived_SingleTextResponse_Uri;

@TestHTTPResource("received-multi-text-response-multi-text")
URI multiTextReceived_MultiTextResponse_Uri;

@TestHTTPResource("received-single-text-response-uni-text")
URI singleTextReceived_UniTextResponse_Uri;

@TestHTTPResource("received-single-dto-response-single-dto")
URI singleDtoReceived_SingleDtoResponse_Uri;

@TestHTTPResource("received-single-dto-response-none")
URI singleDtoReceived_NoResponse_Uri;

@TestHTTPResource("received-single-dto-response-uni-dto")
URI singleDtoReceived_UniDtoResponse_Uri;

@TestHTTPResource("received-single-dto-response-multi-dto")
URI singleDtoReceived_MultiDtoResponse_Uri;

@TestHTTPResource("received-multi-dto-response-none")
URI multiDtoReceived_NoResponse_Uri;

@TestHTTPResource("received-multi-dto-response-single-dto")
URI multiDtoReceived_SingleDtoResponse_Uri;

@TestHTTPResource("received-multi-dto-response-multi-dto")
URI multiDtoReceived_MultiDtoResponse_Uri;

@TestHTTPResource("broadcast")
URI broadcast_Uri;

@Inject
Vertx vertx;

protected abstract boolean binaryMode();

protected abstract WebSocketConnector<?> bounceClientConnector();

protected abstract WebSocketConnector<?> multiClientConnector();

@Test
public void testServerEndpoint_SingleTextReceived_NoSent() {
// endpoint: void onMessage(String message)
var connection = Connection.of(singleTextReceived_NoResponse_Uri, false, 0, binaryMode(), "Ballad of a Prodigal Son");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleTextReceived_SingleTextSent() {
// endpoint: String onMessage(String message)
var connection = Connection.of(bounceUri, false, 1, binaryMode(), "Can't Find My Way Home");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleTextReceived_MultiTextSent() {
// endpoint: Multi<String> onMessage(String message)
var connection = Connection.of(singleTextReceived_MultiTextResponse_Uri, false, 2, binaryMode(),
"Always take a banana to a party");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_MultiTextReceived_NoSent() {
// endpoint: void onMessage(Multi<String> message)
var connection = Connection.of(multiTextReceived_NoResponse_Uri, false, 0, binaryMode(), "When I go",
"don't cry for me",
"In my Father's arms I'll be", "The wounds this world left on my soul");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 4, connection);
}

@Test
public void testServerEndpoint_MultiTextReceived_SingleTextSent() {
// endpoint: String onMessage(Multi<String> message)
var connection = Connection.of(multiTextReceived_SingleTextResponse_Uri, false, 1, "Alpha Shallows", binaryMode(),
"Msg1", "Msg2", "Msg3", "Msg4");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 4, connection);
}

@Test
public void testServerEndpoint_MultiTextReceived_MultiTextSent() {
// endpoint: Multi<String> onMessage(Multi<String> message)
var connection = Connection.of(multiTextReceived_MultiTextResponse_Uri, false, 2, binaryMode(), "Msg1", "Msg2", "Msg3");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 3, connection);
}

@Test
public void testServerEndpoint_SingleTextReceived_UniTextSent() {
// endpoint: Uni<String> onMessage(String message)
var connection = Connection.of(singleTextReceived_UniTextResponse_Uri, false, 1, binaryMode(), "Bernie Sanders");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleDtoReceived_NoSent() {
// endpoint: void onMessage(Dto dto)
var connection = Connection.of(singleDtoReceived_NoResponse_Uri, false, 0, binaryMode(),
"major disappointment speaking");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleDtoReceived_SingleDtoSent() {
// endpoint: Dto onMessage(Dto dto)
var connection = Connection.of(singleDtoReceived_SingleDtoResponse_Uri, false, 1, binaryMode(), "abcd123456");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleDtoReceived_UniDtoSent() {
// endpoint: Uni<Dto> onMessage(Dto dto)
var connection = Connection.of(singleDtoReceived_UniDtoResponse_Uri, false, 1, binaryMode(),
"Shot heard round the world");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_SingleDtoReceived_MultiDtoSent() {
// endpoint: Multi<Dto> onMessage(Dto dto)
var connection = Connection.of(singleDtoReceived_MultiDtoResponse_Uri, false, 2, binaryMode(),
"Bananas are good");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 1, connection);
}

@Test
public void testServerEndpoint_MultiDtoReceived_NoSent() {
// endpoint: void onMessage(Multi<Dto> dto)
var connection = Connection.of(multiDtoReceived_NoResponse_Uri, false, 0, binaryMode(), "Tell me how ya livin",
"Soljie what ya got givin");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 2, connection);
}

@Test
public void testServerEndpoint_MultiDtoReceived_SingleDtoSent() {
// endpoint: Dto onMessage(Multi<Dto> message)
var connection = Connection.of(multiDtoReceived_SingleDtoResponse_Uri, false, 1, "ut labore et dolore magna aliqua",
binaryMode(), "Lorem ipsum dolor sit amet", "consectetur adipiscing elit", "sed do eiusmod tempor incididunt");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 3, connection);
}

@Test
public void testServerEndpoint_MultiDtoReceived_MultiDtoSent() {
// endpoint: Multi<Dto> onMessage(Multi<Dto> dto)
var connection = Connection.of(multiDtoReceived_MultiDtoResponse_Uri, false, 2, binaryMode(), "Right", "Left");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 2, connection);
}

@Test
public void testClientEndpoint_SingleTextReceived_NoSent() {
var clientConn = bounceClientConnector().baseUri(baseUri).connectAndAwait();
var msg1 = "Ut enim ad minim veniam";
sendClientMessageAndWait(clientConn, msg1);
// 'clientConn' sends 'Ut enim ad minim veniam'
// 'BounceEndpoint' -> 'String onMessage(String message)' sends 'Response 0: Ut enim ad minim veniam'
// 'BounceClient' -> 'void echo(String message)' receives 'Response 0: Ut enim ad minim veniam'
// that is received 2 messages and sent 2 messages
int clientBytesReceived = stringToBytes("Response 0: " + msg1);
int clientBytesSent = stringToBytes(msg1);
int serverBytesReceived = clientBytesSent;
int serverBytesSent = clientBytesReceived;
asserter.assertMetrics(0, 0, 1, serverBytesReceived, serverBytesSent, 1, clientBytesSent, clientBytesReceived);

msg1 = "quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat";
var msg2 = "Duis aute irure dolor in reprehenderit";
sendClientMessageAndWait(clientConn, msg1);
sendClientMessageAndWait(clientConn, msg2);

clientBytesReceived = stringToBytes("Response 0: " + msg1, "Response 0: " + msg2);
clientBytesSent = stringToBytes(msg1, msg2);
serverBytesReceived = clientBytesSent;
serverBytesSent = clientBytesReceived;
asserter.assertMetrics(0, 0, 2, serverBytesReceived, serverBytesSent, 2, clientBytesSent, clientBytesReceived);

clientConn.closeAndAwait();
}

@Test
public void testClientEndpoint_MultiTextReceived_MultiTextSent() {
var clientConn = multiClientConnector().baseUri(baseUri).connectAndAwait();
var msg1 = "in voluptate velit esse cillum dolore eu fugiat nulla pariatur";
var msg2 = "Excepteur sint occaecat cupidatat non proident";
sendClientMessageAndWait(clientConn, msg1);
sendClientMessageAndWait(clientConn, msg2);

// 2 sent: 'clientConn' sends 2 messages
// 2 sent, 2 received: 'MultiEndpoint' -> 'Multi<String> echo(Multi<String> messages)' -> accepts and receives message
// 2 sent, 2 received: 'MultiClient' -> 'Multi<String> echo(Multi<String> messages)' -> accepts, receives, adds "Response 0: "
// 2 received: 'MultiEndpoint' -> accepts and returns empty Multi
int clientBytesReceived = stringToBytes(msg1, msg2);
int clientBytesSent = stringToBytes(msg1, msg2, msg1 + "Response 0: ", msg2 + "Response 0: ");
int serverBytesReceived = clientBytesSent;
int serverBytesSent = clientBytesReceived;

asserter.assertMetrics(0, 0, 4, serverBytesReceived, serverBytesSent, 4, clientBytesSent, clientBytesReceived);

clientConn.closeAndAwait();
}

@Test
public void testServerEndpoint_broadcasting() {
// broadcast = true
// endpoint: String onMessage(String message)

var msg1 = "It's alright ma";
// expected metrics:
// endpoint receives msg1
// 2 connections are opened so 2 responses are expected
int sentBytes = stringToBytes("Response 0: " + msg1, "Response 0: " + msg1);
int receivedBytes = stringToBytes(msg1);
var connection1 = Connection.of(broadcast_Uri, true, 1, binaryMode(), msg1);

var msg2 = "I'm Only Bleeding";
// expected metrics:
// endpoint receives msg2
// 2 connections are opened so 2 responses are expected
sentBytes += stringToBytes("Response 0: " + msg2, "Response 0: " + msg2);
receivedBytes += stringToBytes(msg2);
var connection2 = Connection.of(broadcast_Uri, true, 1, binaryMode(), msg2);
sendAndAssertResponses(vertx, connection1, connection2);
asserter.assertMetrics(0, 2, sentBytes, receivedBytes);
}

@Test
public void testServerEndpoint_SingleTextReceived_SingleTextSent_MultipleConnections() {
// endpoint: String onMessage(String message)
// testing multiple connections because we need to know that same counter endpoint counter is used by connections
var msg = "Can't Find My Way Home";

try (var client1 = new WSClient(vertx)) {
var connection1 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
client1.connect(new WebSocketConnectOptions(), bounceUri);
sendClientMessageAndWait(client1, msg);
asserter.assertMetrics(0, 1, connection1);

var connection2 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
sendAndAssertResponses(vertx, connection2);
asserter.assertMetrics(0, 1, connection2);

var connection3 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
sendAndAssertResponses(vertx, connection3);
asserter.assertMetrics(0, 1, connection3);

// --- try different endpoint - start
// endpoint: void onMessage(Multi<String> message)
var connection = Connection.of(multiTextReceived_NoResponse_Uri, false, 0, binaryMode(), "I get up in the evening",
"I ain't nothing but tired", "I could use just a little help");
sendAndAssertResponses(vertx, connection);
asserter.assertMetrics(0, 3, connection);
// --- try different endpoint - end

var connection4 = Connection.of(bounceUri, false, 1, binaryMode(), msg);
sendAndAssertResponses(vertx, connection4);
asserter.assertMetrics(0, 1, connection4);

// send again message via the first connection that is still open
sendClientMessageAndWait(client1, msg);
asserter.assertMetrics(0, 1, connection1);
}
}

private void sendClientMessageAndWait(WSClient client, String msg) {
if (binaryMode()) {
client.sendAndAwait(Buffer.buffer(msg));
} else {
client.sendAndAwait(msg);
}
}

protected void sendClientMessageAndWait(WebSocketClientConnection clientConn, String msg1) {
if (binaryMode()) {
clientConn.sendBinaryAndAwait(Buffer.buffer(msg1));
} else {
clientConn.sendTextAndAwait(msg1);
}
}
}
Loading

0 comments on commit 7c325ce

Please sign in to comment.