From cd163d448ac5831157f93ea663d9e37317e4a555 Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Tue, 19 Nov 2024 15:51:25 +0100 Subject: [PATCH] [#2955] Fix MQTT5 Connect Reason Codes MQTT5 defines new reason codes to be included in CONNACK packets when connection establishment fails. The abstract adapter base class has been changed accordingly. Also added integration tests based on HiveMQ client for testing connection establishment. --- ...AbstractVertxBasedMqttProtocolAdapter.java | 67 +- bom/pom.xml | 6 + .../content/user-guide/mqtt-adapter.md | 44 +- tests/pom.xml | 5 + .../hono/tests/mqtt/MqttConnectionIT.java | 2 +- .../eclipse/hono/tests/mqtt/MqttTestBase.java | 6 +- .../hono/tests/mqtt5/MqttConnectionIT.java | 884 ++++++++++++++++++ .../hono/tests/mqtt5/MqttTestBase.java | 284 ++++++ 8 files changed, 1263 insertions(+), 35 deletions(-) create mode 100644 tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java create mode 100644 tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index a7c611a19e..1115bc8c11 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -41,6 +41,9 @@ import org.eclipse.hono.adapter.AbstractProtocolAdapterBase; import org.eclipse.hono.adapter.AdapterConnectionsExceededException; import org.eclipse.hono.adapter.AuthorizationException; +import org.eclipse.hono.adapter.ConnectionDurationExceededException; +import org.eclipse.hono.adapter.DataVolumeExceededException; +import org.eclipse.hono.adapter.TenantConnectionsExceededException; import org.eclipse.hono.adapter.auth.device.AuthHandler; import org.eclipse.hono.adapter.auth.device.ChainAuthHandler; import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider; @@ -90,6 +93,7 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttVersion; import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.log.Fields; @@ -518,7 +522,8 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) { log.debug("rejecting connection request from client [clientId: {}], cause:", endpoint.clientIdentifier(), t); - final MqttConnectReturnCode code = getConnectReturnCode(t); + final boolean isPreMqtt5 = ((int) MqttVersion.MQTT_5.protocolLevel()) > endpoint.protocolVersion(); + final var code = isPreMqtt5 ? getMqtt3ConnackReturnCode(t) : getMqtt5ConnackReasonCode(t); rejectConnectionRequest(endpoint, code, span); TracingHelper.logError(span, t); } @@ -1106,18 +1111,18 @@ final MqttDeviceEndpoint createMqttDeviceEndpoint( return mqttDeviceEndpoint; } - private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) { - - if (e instanceof MqttConnectionException) { - return ((MqttConnectionException) e).code(); - } else if (e instanceof AuthorizationException) { - if (e instanceof AdapterConnectionsExceededException) { - return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; - } else { - return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; - } - } else if (e instanceof ServiceInvocationException) { - switch (((ServiceInvocationException) e).getErrorCode()) { + private static MqttConnectReturnCode getMqtt3ConnackReturnCode(final Throwable e) { + if (e instanceof MqttConnectionException connectionException) { + return connectionException.code(); + } + if (e instanceof AdapterConnectionsExceededException) { + return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; + } + if (e instanceof AuthorizationException) { + return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; + } + if (e instanceof ServiceInvocationException exception) { + switch (exception.getErrorCode()) { case HttpURLConnection.HTTP_UNAUTHORIZED: case HttpURLConnection.HTTP_NOT_FOUND: return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; @@ -1126,9 +1131,39 @@ private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) { default: return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; } - } else { - return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; } + + return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; + } + + private static MqttConnectReturnCode getMqtt5ConnackReasonCode(final Throwable e) { + + if (e instanceof MqttConnectionException connectionException) { + return connectionException.code(); + } + if (e instanceof AdapterConnectionsExceededException) { + return MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER; + } + if (e instanceof TenantConnectionsExceededException + || e instanceof DataVolumeExceededException + || e instanceof ConnectionDurationExceededException) { + return MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED; + } + if (e instanceof AuthorizationException) { + return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; + } + if (e instanceof ServiceInvocationException exception) { + switch (exception.getErrorCode()) { + case HttpURLConnection.HTTP_UNAUTHORIZED: + case HttpURLConnection.HTTP_NOT_FOUND: + return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; + case HttpURLConnection.HTTP_UNAVAILABLE: + return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; + default: + return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; + } + } + return MqttConnectReturnCode.CONNECTION_REFUSED_UNSPECIFIED_ERROR; } /** diff --git a/bom/pom.xml b/bom/pom.xml index 94e3b60630..75a25f8be6 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -595,6 +595,12 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000} + + com.hivemq + hivemq-mqtt-client + 1.3.3 + test + org.eclipse.hono core-test-utils diff --git a/site/documentation/content/user-guide/mqtt-adapter.md b/site/documentation/content/user-guide/mqtt-adapter.md index 842b007450..ab588ac45f 100644 --- a/site/documentation/content/user-guide/mqtt-adapter.md +++ b/site/documentation/content/user-guide/mqtt-adapter.md @@ -9,7 +9,7 @@ consumers and for receiving commands from applications and sending back response The MQTT adapter is **not** a general purpose MQTT broker. In particular the adapter -* supports MQTT 3.1.1 only. +* supports clients connecting using MQTT 3.1.1 or 5.0 only. * does not maintain session state for clients and thus always sets the *session present* flag in its CONNACK packet to `0`, regardless of the value of the *clean session* flag provided in a client's CONNECT packet. * ignores any *Will* included in a client's CONNECT packet. @@ -23,7 +23,7 @@ The MQTT adapter is **not** a general purpose MQTT broker. In particular the ada ## Authentication The MQTT adapter by default requires clients (devices or gateway components) to authenticate during connection -establishment. The adapter supports both the authentication based on the username/password provided in an MQTT CONNECT +establishment. The adapter supports both authentication based on the username/password provided in an MQTT CONNECT packet as well as client certificate based authentication as part of a TLS handshake for that purpose. The adapter tries to authenticate the device using these mechanisms in the following order @@ -46,7 +46,8 @@ in order to support this mechanism. The MQTT adapter supports authenticating clients based on credentials provided during MQTT connection establishment. This means that clients need to provide a *user* and a *password* field in their MQTT CONNECT packet as defined in -[MQTT Version 3.1.1, Section 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028) +[MQTT Version 3.1.1, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031) +and [MQTT Version 5.0, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901058) when connecting to the MQTT adapter. The username provided in the *user* field must match the pattern *auth-id@tenant*, e.g. `sensor1@DEFAULT_TENANT`. @@ -68,8 +69,9 @@ concepts. The MQTT adapter supports authenticating clients based on a signed [JSON Web Token](https://www.rfc-editor.org/rfc/rfc7519) (JWT) provided during MQTT connection establishment. This requires a client to provide a *client identifier*, a *user* and a *password* field in its MQTT CONNECT packet as defined in -[MQTT Version 3.1.1, Section 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028) -when connecting to the MQTT adapter. The JWT must be sent in the password field. The content of the *user* field is +[MQTT Version 3.1.1, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031) +and [MQTT Version 5.0, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901058) +when connecting to the MQTT adapter. The JWT must be sent in the *password* field. The content of the *user* field is ignored. The information about the tenant and the authentication identifier can be presented to the protocol adapter in one of two ways: @@ -107,26 +109,38 @@ a client tries to connect and/or send a message to the adapter. ### Connection Limits -The adapter rejects a client’s connection attempt with return code +The adapter rejects a client’s attempt to connect using -* `0x03` (*Connection Refused: server unavailable*), if the maximum number of connections per protocol adapter instance - is reached -* `0x05` (*Connection Refused: not authorized*), if the maximum number of simultaneously connected devices for the - tenant is reached. +* MQTT 3.1.1 with return code `0x03` (*Server Unavailable*), +* MQTT 5.0 with reason code `0x9C` (*Use Another Server*), + +if the maximum number of connections per protocol adapter instance is reached. + +The adapter rejects a client’s attempt to connect using + +* MQTT 3.1.1 with return code `0x05` (*Not Authorized*), +* MQTT 5.0 with reason code `0x97` (*Quota Exceeded*), + +if the maximum number of simultaneously connected devices for the tenant is reached. ### Connection Duration Limits -The adapter rejects a client’s connection attempt with return code `0x05` (*Connection Refused: not authorized*), if the -[connection duration limit]({{< relref "/concepts/resource-limits#connection-duration-limit" >}}) that has been -configured for the client’s tenant is exceeded. +The adapter rejects a client’s attempt to connect using + +* MQTT 3.1.1 with return code `0x05` (*Not Authorized*) +* MQTT 5.0 with reason code `0x97` (*Quota Exceeded*) + +if the [connection duration limit]({{< relref "/concepts/resource-limits#connection-duration-limit" >}}) +that has been configured for the client’s tenant is exceeded. ### Message Limits The adapter -* rejects a client's connection attempt with return code `0x05` (*Connection Refused: not authorized*), +* rejects a client's attempt to connect using MQTT 3.1.1 with return code `0x05` (*Not Authorized*), +* rejects a client's attempt to connect using MQTT 5.0 with reason code `0x97` (*Quota Exceeded*), * discards any MQTT PUBLISH packet containing telemetry data or an event that is sent by a client and -* rejects any AMQP 1.0 message containing a command sent by a north bound application +* rejects any command messages sent by a north bound application if the [message limit]({{< relref "/concepts/resource-limits.md" >}}) that has been configured for the device’s tenant is exceeded. diff --git a/tests/pom.xml b/tests/pom.xml index 6592ae6c7f..6a218d582a 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -319,6 +319,11 @@ vertx-mqtt test + + com.hivemq + hivemq-mqtt-client + test + org.eclipse.californium californium-core diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java index b7ee51202e..2ccb3e0930 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java @@ -67,7 +67,7 @@ import io.vertx.mqtt.MqttConnectionException; /** - * Integration tests for checking connection to the MQTT adapter. + * Integration tests for checking MQTT 3.1.1 based connection to the MQTT adapter. * */ @ExtendWith(VertxExtension.class) diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java index d1c2775edf..b6039291c3 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -36,7 +36,7 @@ import io.vertx.mqtt.messages.MqttConnAckMessage; /** - * Base class for MQTT adapter integration tests. + * Base class for MQTT adapter integration tests using MQTT 3.1.1. * */ public abstract class MqttTestBase { @@ -70,7 +70,7 @@ public abstract class MqttTestBase { protected Context context; /** - * Creates default AMQP client options. + * Creates default MQTT client options. */ @BeforeAll public static void init() { diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java new file mode 100644 index 0000000000..994a56dcb2 --- /dev/null +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java @@ -0,0 +1,884 @@ +/******************************************************************************* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.tests.mqtt5; + +import static com.google.common.truth.Truth.assertThat; + +import java.net.HttpURLConnection; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.security.auth.x500.X500Principal; + +import org.eclipse.hono.service.management.credentials.Credentials; +import org.eclipse.hono.service.management.credentials.PasswordCredential; +import org.eclipse.hono.service.management.credentials.X509CertificateCredential; +import org.eclipse.hono.service.management.credentials.X509CertificateSecret; +import org.eclipse.hono.service.management.device.Device; +import org.eclipse.hono.service.management.tenant.Tenant; +import org.eclipse.hono.tests.EnabledIfDnsRebindingIsSupported; +import org.eclipse.hono.tests.EnabledIfProtocolAdaptersAreRunning; +import org.eclipse.hono.tests.EnabledIfRegistrySupportsFeatures; +import org.eclipse.hono.tests.IntegrationTestSupport; +import org.eclipse.hono.tests.Tenants; +import org.eclipse.hono.util.Adapter; +import org.eclipse.hono.util.Constants; +import org.eclipse.hono.util.CredentialsConstants; +import org.eclipse.hono.util.IdentityTemplate; +import org.eclipse.hono.util.RegistrationConstants; +import org.eclipse.hono.util.RegistryManagementConstants; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; + +import io.jsonwebtoken.Jwts; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.SelfSignedCertificate; +import io.vertx.junit5.Timeout; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +/** + * Integration tests for checking MQTT 5.0 based connection to the MQTT adapter. + * + */ +@ExtendWith(VertxExtension.class) +@Timeout(timeUnit = TimeUnit.SECONDS, value = 15) +@EnabledIfProtocolAdaptersAreRunning(mqttAdapter = true) +public class MqttConnectionIT extends MqttTestBase { + + private SelfSignedCertificate deviceCert; + private String tenantId; + private String deviceId; + private String password; + + /** + * Sets up the fixture. + */ + @BeforeEach + public void setUp() { + tenantId = helper.getRandomTenantId(); + deviceId = helper.getRandomDeviceId(tenantId); + password = "secret"; + deviceCert = SelfSignedCertificate.create(UUID.randomUUID().toString()); + } + + /** + * Verifies that the adapter opens a connection to registered devices with credentials. + * + * @param tlsVersion The TLS protocol version to use for connecting to the adapter. + * @param ctx The test context + */ + @ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN) + @ValueSource(strings = { IntegrationTestSupport.TLS_VERSION_1_2, IntegrationTestSupport.TLS_VERSION_1_3 }) + public void testConnectSucceedsForRegisteredDevice(final String tlsVersion, final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + .compose(ok -> connectToAdapter( + tlsVersion, + IntegrationTestSupport.getUsername(deviceId, tenantId), + password, + null, + null)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection to a registered device that authenticates + * using a JSON Web Token. + * + * @param ctx The test context + * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography. + */ + @Test + public void testConnectJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException { + + final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG); + final var keyPair = generator.generateKeyPair(); + final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic()); + + final var jws = Jwts.builder() + .header().type("JWT") + .and() + .audience().add(CredentialsConstants.AUDIENCE_HONO_ADAPTER) + .and() + .issuer(deviceId) + .subject(deviceId) + .claim(CredentialsConstants.CLAIM_TENANT_ID, tenantId) + .issuedAt(Date.from(Instant.now())) + .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10)))) + .signWith(keyPair.getPrivate()) + .compact(); + + helper.registry.addTenant(tenantId) + .compose(res -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential))) + .compose(ok -> connectToAdapter("ignored", jws)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection to a registered device that authenticates + * using a Google IoT Core style JSON Web Token in conjunction with an MQTT connection + * identifier that contains the tenant and authentication ID. + * + * @param ctx The test context + * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography. + */ + @Test + public void testConnectGoogleIoTCoreJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException { + + final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG); + final var keyPair = generator.generateKeyPair(); + final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic()); + + final var jws = Jwts.builder() + .header().type("JWT") + .and() + .issuedAt(Date.from(Instant.now())) + .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10)))) + .signWith(keyPair.getPrivate()) + .compact(); + final var clientId = MqttClientIdentifier.of("tenants/%s/devices/%s".formatted(tenantId, deviceId)); + + helper.registry.addTenant(tenantId) + .compose(res -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential))) + .compose(ok -> connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, "ignored", jws, clientId, null)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds. + * + * @param ctx The test context + */ + @Test + public void testConnectX509SucceedsForRegisteredDevice(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.succeeding(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds + * for a device belonging to a tenant that uses the same trust anchor as another tenant. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true) + public void testConnectX509SucceedsUsingSni(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + // GIVEN two tenants belonging to the same trust anchor group + final var tenant = Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group"); + // which both use the same CA + return helper.registry.addTenant(helper.getRandomTenantId(), tenant) + // and a device belonging to one of the tenants + .compose(ok -> helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert)); + }) + // WHEN the device connects to the adapter including its tenant ID in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, tenantId))) + .onComplete(ctx.succeeding(conAckMsg -> { + // THEN the connection attempt succeeds + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds + * for a device belonging to a tenant with a tenant alias. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true) + public void testConnectX509SucceedsUsingSniWithTenantAlias(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + // GIVEN two tenants belonging to the same trust anchor group + // which both use the same CA + .compose(cert -> helper.registry.addTenant( + helper.getRandomTenantId(), + Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group")) + .map(cert)) + // and a device belonging to one of the tenants which has an alias configured + .compose(cert -> helper.registry.addDeviceForTenant( + tenantId, + Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group") + .setAlias("test-alias"), + deviceId, + cert)) + // WHEN the device connects to the adapter including the tenant alias in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "test-alias"))) + .onComplete(ctx.succeeding(conAckMsg -> { + // THEN the connection attempt succeeds + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + ctx.completeNow(); + })); + } + + /** + * Verifies that an attempt to open a connection using a valid X.509 client certificate fails + * for a device belonging to a tenant using a non-existing tenant alias. + * + * @param ctx The test context + */ + @Test + @EnabledIfDnsRebindingIsSupported + @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true) + public void testConnectX509FailsUsingSniWithNonExistingTenantAlias(final VertxTestContext ctx) { + + helper.getCertificate(deviceCert.certificatePath()) + // GIVEN two tenants belonging to the same trust anchor group + // which both use the same CA + .compose(cert -> helper.registry.addTenant( + helper.getRandomTenantId(), + Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group")) + .map(cert)) + // and a device belonging to one of the tenants which has an alias configured + .compose(cert -> helper.registry.addDeviceForTenant( + tenantId, + Tenants.createTenantForTrustAnchor(cert) + .setTrustAnchorGroup("test-group") + .setAlias("test-alias"), + deviceId, + cert)) + // WHEN the device connects to the adapter including a wrong tenant alias in the host name + .compose(ok -> connectToAdapter( + deviceCert, + IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "wrong-alias"))) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it + * is not configured auto-provisioning-device-id template and auth-id template. + * + * @param ctx The test context + */ + @Test + public void testConnectSucceedsWithAutoProvisioningWithoutTemplate(final VertxTestContext ctx) { + + final Promise autoProvisionedDeviceId = Promise.promise(); + + final IdentityTemplate defaultTemplate = new IdentityTemplate( + RegistryManagementConstants.PLACEHOLDER_SUBJECT_DN); + + final Future certTracker = helper.getCertificate(deviceCert.certificatePath()); + final Future subjectDNTracker = certTracker + .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253)); + + helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId) + .compose(ok -> certTracker) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(true); + return helper.registry.addTenant(tenantId, tenant); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .compose(ok -> autoProvisionedDeviceId.future()) + .compose(deviceId -> { + // verify the device ID is not generated by subject-dn template + ctx.verify( + () -> assertThat(deviceId).isNotEqualTo(defaultTemplate.apply(subjectDNTracker.result()))); + return helper.registry.getRegistrationInfo(tenantId, deviceId); + }) + .compose(registrationResult -> { + ctx.verify(() -> { + final var infoRegistration = registrationResult.bodyAsJsonObject(); + IntegrationTestSupport.assertDeviceStatusProperties( + infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS), + true); + }); + return helper.registry.getCredentials(tenantId, autoProvisionedDeviceId.future().result()); + }) + .onComplete(ctx.succeeding(credentialsResult -> { + ctx.verify(() -> { + final var infoCredentials = credentialsResult.bodyAsJsonArray(); + // verify the auth ID is generated by subject-dn template + assertThat(infoCredentials.getJsonObject(0) + .getString(RegistryManagementConstants.FIELD_AUTH_ID)) + .isEqualTo(defaultTemplate.apply(subjectDNTracker.result())); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it + * is configured auto-provisioning-device-id template and auth-id template. + * + * @param ctx The test context + */ + @Test + public void testConnectSucceedsWithAutoProvisioningWithTemplate(final VertxTestContext ctx) { + + final Promise autoProvisionedDeviceId = Promise.promise(); + + final IdentityTemplate deviceIdTemplate = new IdentityTemplate("{{subject-dn}}"); + final IdentityTemplate authIdTemplate = new IdentityTemplate("{{subject-cn}}"); + + final Future certTracker = helper.getCertificate(deviceCert.certificatePath()); + final Future subjectDNTracker = certTracker + .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253)); + + helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId) + .compose(ok -> certTracker) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0) + .setAutoProvisioningEnabled(true) + .setAutoProvisioningDeviceIdTemplate(deviceIdTemplate.toString()) + .setAuthIdTemplate(authIdTemplate.toString()); + return helper.registry.addTenant(tenantId, tenant); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .compose(ok -> autoProvisionedDeviceId.future()) + .compose(deviceId -> { + // verify the device ID is generated by auto-provisioning-device-id template + ctx.verify(() -> assertThat(deviceId).isEqualTo(deviceIdTemplate.apply(subjectDNTracker.result()))); + return helper.registry.getRegistrationInfo(tenantId, deviceId); + }) + .compose(registrationResult -> { + ctx.verify(() -> { + final var infoRegistration = registrationResult.bodyAsJsonObject(); + IntegrationTestSupport.assertDeviceStatusProperties( + infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS), + true); + }); + final var deviceId = deviceIdTemplate.apply(subjectDNTracker.result()); + return helper.registry.getCredentials(tenantId, deviceId); + }) + .onComplete(ctx.succeeding(credentialsResult -> { + ctx.verify(() -> { + final var infoCredentials = credentialsResult.bodyAsJsonArray(); + // verify the auth ID is generated by auth-id template + assertThat(infoCredentials.getJsonObject(0) + .getString(RegistryManagementConstants.FIELD_AUTH_ID)) + .isEqualTo(authIdTemplate.apply(subjectDNTracker.result())); + }); + ctx.completeNow(); + + })); + } + + /** + * Verifies that the adapter rejects connection attempts from an unknown device for which auto-provisioning is + * disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsIfAutoProvisioningIsDisabled(final VertxTestContext ctx) { + + // GIVEN a tenant configured with a trust anchor that does not allow auto-provisioning + // WHEN an unknown device tries to connect + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(false); + return helper.registry.addTenant(tenantId, tenant); + }) + // WHEN a unknown device tries to connect to the adapter + // using a client certificate with the trust anchor registered for the device's tenant + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from unknown devices + * for which neither registration information nor credentials are on record. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForNonExistingDevice(final VertxTestContext ctx) { + + // GIVEN an adapter + // WHEN an unknown device tries to connect + connectToAdapter(IntegrationTestSupport.getUsername("non-existing", Constants.DEFAULT_TENANT), "secret") + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from unknown devices + * trying to authenticate using a client certificate but for which neither + * registration information nor credentials are on record. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForNonExistingDevice(final VertxTestContext ctx) { + + // GIVEN an adapter + // WHEN an unknown device tries to connect + connectToAdapter(deviceCert) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using wrong credentials. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForWrongCredentials(final VertxTestContext ctx) { + + // GIVEN a registered device + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN the device tries to connect using a wrong password + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), "wrong password")) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using credentials that contain a non-existing tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForNonExistingTenant(final VertxTestContext ctx) { + + // GIVEN a registered device + final Tenant tenant = new Tenant(); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN a device of a non-existing tenant tries to connect + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, "nonExistingTenant"), "secret")) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices using a client certificate with an unknown + * subject DN. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForUnknownSubjectDN(final VertxTestContext ctx) { + + // GIVEN a registered device + + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + return helper.registry.addTenant(tenantId, tenant); + }).compose(ok -> helper.registry.registerDevice(tenantId, deviceId)) + .compose(ok -> { + final String authId = new X500Principal("CN=4711").getName(X500Principal.RFC2253); + final var credential = X509CertificateCredential.fromAuthId(authId, List.of(new X509CertificateSecret())); + return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(credential)); + }) + // WHEN the device tries to connect using a client certificate with an unknown subject DN + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices belonging to a tenant for which the MQTT + * adapter has been disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledAdapter(final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false)); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + // WHEN a device that belongs to the tenant tries to connect to the adapter + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using a client certificate which belong to a tenant for which the + * MQTT adapter has been disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForDisabledAdapter(final VertxTestContext ctx) { + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false)); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + // WHEN a device that belongs to the tenant tries to connect to the adapter + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices for which credentials exist but are disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledCredentials(final VertxTestContext ctx) { + + helper.registry + .addTenant(tenantId) + .compose(ok -> { + return helper.registry.registerDevice(tenantId, deviceId); + }) + .compose(ok -> { + final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password); + secret.setEnabled(false); + return helper.registry.addCredentials(tenantId, deviceId, List.of(secret)); + }) + // WHEN a device connects using the correct credentials + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices for which credentials exist but the device is + * disabled. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledDevice(final VertxTestContext ctx) { + + final Tenant tenant = new Tenant(); + + helper.registry + .addTenant(tenantId, tenant) + .compose(ok -> { + final var device = new Device(); + device.setEnabled(false); + return helper.registry.registerDevice(tenantId, deviceId, device); + }) + .compose(ok -> { + final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password); + return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(secret)); + }) + // WHEN a device connects using the correct credentials + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices that belong to a disabled tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectFailsForDisabledTenant(final VertxTestContext ctx) { + + // Given a disabled tenant for which the MQTT adapter is enabled + final Tenant tenant = new Tenant(); + tenant.setEnabled(false); + + helper.registry + .addDeviceForTenant(tenantId, tenant, deviceId, password) + .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter rejects connection attempts from devices + * using a client certificate that belong to a disabled tenant. + * + * @param ctx The test context + */ + @Test + public void testConnectX509FailsForDisabledTenant(final VertxTestContext ctx) { + + // Given a disabled tenant for which the MQTT adapter is enabled + helper.getCertificate(deviceCert.certificatePath()) + .compose(cert -> { + final var tenant = Tenants.createTenantForTrustAnchor(cert); + tenant.setEnabled(false); + return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert); + }) + .compose(ok -> connectToAdapter(deviceCert)) + .onComplete(ctx.failing(t -> { + // THEN the connection is refused with a NOT_AUTHORIZED code + ctx.verify(() -> { + assertThat(t).isInstanceOf(Mqtt5ConnAckException.class); + final var error = ((Mqtt5ConnAckException) t).getMqttMessage(); + assertThat(error.getReasonCode()) + .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED); + }); + ctx.completeNow(); + })); + } + + /** + * Verifies that the adapter closes the connection to a device when the registration data of the device + * is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnDeviceDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.deregisterDevice(tenantId, deviceId)); + } + + /** + * Verifies that the adapter closes the connection to a device when the registration data of the device + * is disabled. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnDeviceDisabled(final VertxTestContext ctx) { + final JsonObject updatedDeviceData = new JsonObject() + .put(RegistrationConstants.FIELD_ENABLED, Boolean.FALSE); + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.updateDevice(tenantId, deviceId, updatedDeviceData)); + } + + /** + * Verifies that the adapter closes the connection to a device when the tenant of the device + * is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnTenantDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.removeTenant(tenantId)); + } + + /** + * Verifies that the adapter closes the connection to a device when the tenant of the device + * is disabled. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnTenantDisabled(final VertxTestContext ctx) { + final Tenant updatedTenant = new Tenant().setEnabled(false); + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.updateTenant(tenantId, updatedTenant, HttpURLConnection.HTTP_NO_CONTENT)); + } + + /** + * Verifies that the adapter closes the connection to a device when registration data for all devices of the device + * tenant is deleted. + * + * @param ctx The vert.x test context. + */ + @Test + public void testDeviceConnectionIsClosedOnAllDevicesOfTenantDeleted(final VertxTestContext ctx) { + testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx, + () -> helper.registry.deregisterDevicesOfTenant(tenantId)); + } + + private void testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted( + final VertxTestContext ctx, + final Supplier> deviceRegistryChangeOperation) { + + final Promise connectionClosedPromise = Promise.promise(); + final var disconnectedListener = new MqttClientDisconnectedListener() { + @Override + public void onDisconnected(@NotNull final MqttClientDisconnectedContext context) { + connectionClosedPromise.complete(); + } + }; + + // GIVEN a connected device + helper.registry + .addDeviceForTenant(tenantId, new Tenant(), deviceId, password) + .compose(ok -> connectToAdapter( + IntegrationTestSupport.TLS_VERSION_1_2, + IntegrationTestSupport.getUsername(deviceId, tenantId), + password, + null, + disconnectedListener)) + .compose(conAckMsg -> { + ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS)); + // WHEN corresponding device/tenant is removed/disabled + return deviceRegistryChangeOperation.get(); + }) + // THEN the device connection is closed + .compose(ok -> connectionClosedPromise.future()) + .onComplete(ctx.succeedingThenComplete()); + } +} diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java new file mode 100644 index 0000000000..66af1cfb71 --- /dev/null +++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java @@ -0,0 +1,284 @@ +/******************************************************************************* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.tests.mqtt5; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManagerFactory; + +import org.eclipse.hono.tests.IntegrationTestSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.net.PemTrustOptions; +import io.vertx.core.net.SelfSignedCertificate; +import io.vertx.junit5.VertxTestContext; + +/** + * Base class for MQTT adapter integration tests using MQTT 5.0. + * + */ +public abstract class MqttTestBase { + + private static final Vertx vertx = Vertx.vertx(); + + private static TrustManagerFactory trustManagerFactory; + private static HostnameVerifier acceptAllHostnames; + + /** + * A logger to be used by subclasses. + */ + protected final Logger LOGGER = LoggerFactory.getLogger(getClass()); + + /** + * A helper accessing the AMQP 1.0 Messaging Network and + * for managing tenants/devices/credentials. + */ + protected IntegrationTestSupport helper; + /** + * A client for publishing messages to the MQTT protocol adapter. + */ + protected Mqtt5AsyncClient mqttClient; + /** + * Creates default MQTT client TLS options. + * + * @throws Exception if the trust manager factory cannot be created. + */ + @BeforeAll + public static void init() throws Exception { + + trustManagerFactory = new PemTrustOptions() + .addCertPath(IntegrationTestSupport.TRUST_STORE_PATH) + .getTrustManagerFactory(vertx); + acceptAllHostnames = new HostnameVerifier() { + @Override + public boolean verify(final String hostname, final SSLSession session) { + // disable host name verification + return true; + } + }; + } + + /** + * Create integration test helper. + * + * @param testInfo The JUnit test info. + * @param ctx The vert.x test context. + */ + @BeforeEach + public void createHelper(final TestInfo testInfo, final VertxTestContext ctx) { + LOGGER.info("running {}", testInfo.getDisplayName()); + helper = new IntegrationTestSupport(vertx); + helper.init().onComplete(ctx.succeedingThenComplete()); + } + + /** + * Closes the connection to the AMQP 1.0 Messaging Network. + * + * @param ctx The vert.x test context. + */ + @AfterEach + public void closeConnectionToAmqpMessagingNetwork(final VertxTestContext ctx) { + + helper.disconnect().onComplete(r -> ctx.completeNow()); + } + + /** + * Closes the connection to the MQTT adapter and deletes device registry entries created during the test. + * + * @param ctx The vert.x test context. + */ + @AfterEach + public void closeMqttAdapterConnectionAndCleanupDeviceRegistry(final VertxTestContext ctx) { + + final Promise disconnectHandler = Promise.promise(); + if (mqttClient == null) { + disconnectHandler.complete(); + } else { + Future.fromCompletionStage(mqttClient.disconnect()).onComplete(disconnectHandler); + } + disconnectHandler.future().onComplete(closeAttempt -> { + LOGGER.info("connection to MQTT adapter closed"); + mqttClient = null; + // cleanup device registry - done after the adapter connection is closed because otherwise + // the adapter would close the connection from its end after having received the device deletion notification + helper.deleteObjects(ctx); + }); + } + + /** + * Opens a connection to the MQTT adapter using given credentials. + * + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + */ + protected final Future connectToAdapter( + final String username, + final String password) { + return connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, username, password, null, null); + } + + /** + * Opens a connection to the MQTT adapter using given credentials. + * + * @param tlsVersion The TLS protocol version to use for connecting to the adapter. + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or + * {@code null}, if an arbitrary identifier should be used. + * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost + * or {@code null}, if no listener should be registered. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + */ + protected final Future connectToAdapter( + final String tlsVersion, + final String username, + final String password, + final MqttClientIdentifier mqttClientId, + final MqttClientDisconnectedListener disconnectedListener) { + + final var sslConfig = MqttClientSslConfig.builder() + .trustManagerFactory(trustManagerFactory) + .hostnameVerifier(acceptAllHostnames) + .protocols(Set.of(tlsVersion)) + .build(); + final var auth = Mqtt5SimpleAuth.builder() + .username(username) + .password(password.getBytes(StandardCharsets.UTF_8)) + .build(); + return connectToAdapter(sslConfig, IntegrationTestSupport.MQTT_HOST, mqttClientId, auth, disconnectedListener); + } + + /** + * Opens a connection to the MQTT adapter using an X.509 client certificate. + * + * @param cert The client certificate to use for authentication. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if client certificate is {@code null}. + * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter. + */ + protected final Future connectToAdapter(final SelfSignedCertificate cert) { + return connectToAdapter(cert, IntegrationTestSupport.MQTT_HOST); + } + + /** + * Opens a connection to the MQTT adapter using an X.509 client certificate. + * + * @param cert The client certificate to use for authentication. + * @param hostname The name of the host to connect to. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if any of the parameters are {@code null}. + * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter. + */ + protected final Future connectToAdapter( + final SelfSignedCertificate cert, + final String hostname) { + + Objects.requireNonNull(cert); + Objects.requireNonNull(hostname); + + final KeyManagerFactory selfSignedKeyManagerFactory; + try { + selfSignedKeyManagerFactory = cert.keyCertOptions().getKeyManagerFactory(vertx); + } catch (final Exception e) { + throw new IllegalArgumentException(e.getMessage(), e.getCause()); + } + final var sslConfig = MqttClientSslConfig.builder() + .trustManagerFactory(trustManagerFactory) + .hostnameVerifier(acceptAllHostnames) + .protocols(Set.of(IntegrationTestSupport.TLS_VERSION_1_2)) + .keyManagerFactory(selfSignedKeyManagerFactory) + .build(); + + return connectToAdapter(sslConfig, hostname, null, null, null); + } + + /** + * Opens a connection to the MQTT adapter using given options. + * + * @param sslConfig The SSL options to use for connecting to the adapter. + * @param hostname The name of the host to connect to. + * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or + * {@code null}, if an arbitrary identifier should be used. + * @param auth The credentials to use for authenticating to the adapter or {@code null}, if + * a client certificate (set in the SSL configuration) should be used. + * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost + * or {@code null}, if no listener should be registered. + * @return A future that will be completed with the CONNACK packet received + * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException} + * if the connection could not be established. + * @throws NullPointerException if any of SSL config or host name are {@code null}. + */ + protected final Future connectToAdapter( + final MqttClientSslConfig sslConfig, + final String hostname, + final MqttClientIdentifier mqttClientId, + final Mqtt5SimpleAuth auth, + final MqttClientDisconnectedListener disconnectedListener) { + + Objects.requireNonNull(sslConfig); + Objects.requireNonNull(hostname); + + final var clientId = Optional.ofNullable(mqttClientId).orElse(MqttClientIdentifier.of(UUID.randomUUID().toString())); + final var builder = Mqtt5Client.builder() + .identifier(clientId) + .sslConfig(sslConfig) + .serverHost(hostname) + .serverPort(IntegrationTestSupport.MQTTS_PORT); + Optional.ofNullable(disconnectedListener).ifPresent(builder::addDisconnectedListener); + mqttClient = builder.buildAsync(); + final var connect = Mqtt5Connect.builder() + .cleanStart(true) + .keepAlive(10) + .simpleAuth(auth) + .build(); + return Future.fromCompletionStage(mqttClient.connect(connect)).onSuccess(conAck -> { + LOGGER.info( + "MQTTS connection to adapter [host: {}, port: {}] established", + hostname, IntegrationTestSupport.MQTTS_PORT); + }); + } +}