diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java
index a7c611a19e..1115bc8c11 100644
--- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java
+++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
+ * Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -41,6 +41,9 @@
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AuthorizationException;
+import org.eclipse.hono.adapter.ConnectionDurationExceededException;
+import org.eclipse.hono.adapter.DataVolumeExceededException;
+import org.eclipse.hono.adapter.TenantConnectionsExceededException;
import org.eclipse.hono.adapter.auth.device.AuthHandler;
import org.eclipse.hono.adapter.auth.device.ChainAuthHandler;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
@@ -90,6 +93,7 @@
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttVersion;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.log.Fields;
@@ -518,7 +522,8 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {
log.debug("rejecting connection request from client [clientId: {}], cause:",
endpoint.clientIdentifier(), t);
- final MqttConnectReturnCode code = getConnectReturnCode(t);
+ final boolean isPreMqtt5 = ((int) MqttVersion.MQTT_5.protocolLevel()) > endpoint.protocolVersion();
+ final var code = isPreMqtt5 ? getMqtt3ConnackReturnCode(t) : getMqtt5ConnackReasonCode(t);
rejectConnectionRequest(endpoint, code, span);
TracingHelper.logError(span, t);
}
@@ -1106,18 +1111,18 @@ final MqttDeviceEndpoint createMqttDeviceEndpoint(
return mqttDeviceEndpoint;
}
- private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) {
-
- if (e instanceof MqttConnectionException) {
- return ((MqttConnectionException) e).code();
- } else if (e instanceof AuthorizationException) {
- if (e instanceof AdapterConnectionsExceededException) {
- return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
- } else {
- return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
- }
- } else if (e instanceof ServiceInvocationException) {
- switch (((ServiceInvocationException) e).getErrorCode()) {
+ private static MqttConnectReturnCode getMqtt3ConnackReturnCode(final Throwable e) {
+ if (e instanceof MqttConnectionException connectionException) {
+ return connectionException.code();
+ }
+ if (e instanceof AdapterConnectionsExceededException) {
+ return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
+ }
+ if (e instanceof AuthorizationException) {
+ return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
+ }
+ if (e instanceof ServiceInvocationException exception) {
+ switch (exception.getErrorCode()) {
case HttpURLConnection.HTTP_UNAUTHORIZED:
case HttpURLConnection.HTTP_NOT_FOUND:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
@@ -1126,9 +1131,39 @@ private static MqttConnectReturnCode getConnectReturnCode(final Throwable e) {
default:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
- } else {
- return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
}
+
+ return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
+ }
+
+ private static MqttConnectReturnCode getMqtt5ConnackReasonCode(final Throwable e) {
+
+ if (e instanceof MqttConnectionException connectionException) {
+ return connectionException.code();
+ }
+ if (e instanceof AdapterConnectionsExceededException) {
+ return MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER;
+ }
+ if (e instanceof TenantConnectionsExceededException
+ || e instanceof DataVolumeExceededException
+ || e instanceof ConnectionDurationExceededException) {
+ return MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED;
+ }
+ if (e instanceof AuthorizationException) {
+ return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+ }
+ if (e instanceof ServiceInvocationException exception) {
+ switch (exception.getErrorCode()) {
+ case HttpURLConnection.HTTP_UNAUTHORIZED:
+ case HttpURLConnection.HTTP_NOT_FOUND:
+ return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+ case HttpURLConnection.HTTP_UNAVAILABLE:
+ return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+ default:
+ return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+ }
+ }
+ return MqttConnectReturnCode.CONNECTION_REFUSED_UNSPECIFIED_ERROR;
}
/**
diff --git a/bom/pom.xml b/bom/pom.xml
index 94e3b60630..75a25f8be6 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -595,6 +595,12 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000}
+
+ com.hivemq
+ hivemq-mqtt-client
+ 1.3.3
+ test
+
org.eclipse.hono
core-test-utils
diff --git a/site/documentation/content/user-guide/mqtt-adapter.md b/site/documentation/content/user-guide/mqtt-adapter.md
index 842b007450..ab588ac45f 100644
--- a/site/documentation/content/user-guide/mqtt-adapter.md
+++ b/site/documentation/content/user-guide/mqtt-adapter.md
@@ -9,7 +9,7 @@ consumers and for receiving commands from applications and sending back response
The MQTT adapter is **not** a general purpose MQTT broker. In particular the adapter
-* supports MQTT 3.1.1 only.
+* supports clients connecting using MQTT 3.1.1 or 5.0 only.
* does not maintain session state for clients and thus always sets the *session present* flag in its CONNACK packet
to `0`, regardless of the value of the *clean session* flag provided in a client's CONNECT packet.
* ignores any *Will* included in a client's CONNECT packet.
@@ -23,7 +23,7 @@ The MQTT adapter is **not** a general purpose MQTT broker. In particular the ada
## Authentication
The MQTT adapter by default requires clients (devices or gateway components) to authenticate during connection
-establishment. The adapter supports both the authentication based on the username/password provided in an MQTT CONNECT
+establishment. The adapter supports both authentication based on the username/password provided in an MQTT CONNECT
packet as well as client certificate based authentication as part of a TLS handshake for that purpose.
The adapter tries to authenticate the device using these mechanisms in the following order
@@ -46,7 +46,8 @@ in order to support this mechanism.
The MQTT adapter supports authenticating clients based on credentials provided during MQTT connection establishment.
This means that clients need to provide a *user* and a *password* field in their MQTT CONNECT packet as defined in
-[MQTT Version 3.1.1, Section 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028)
+[MQTT Version 3.1.1, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031)
+and [MQTT Version 5.0, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901058)
when connecting to the MQTT adapter. The username provided in the *user* field must match the pattern
*auth-id@tenant*, e.g. `sensor1@DEFAULT_TENANT`.
@@ -68,8 +69,9 @@ concepts.
The MQTT adapter supports authenticating clients based on a signed
[JSON Web Token](https://www.rfc-editor.org/rfc/rfc7519) (JWT) provided during MQTT connection establishment. This requires
a client to provide a *client identifier*, a *user* and a *password* field in its MQTT CONNECT packet as defined in
-[MQTT Version 3.1.1, Section 3.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028)
-when connecting to the MQTT adapter. The JWT must be sent in the password field. The content of the *user* field is
+[MQTT Version 3.1.1, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031)
+and [MQTT Version 5.0, Section 3.1.3](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901058)
+when connecting to the MQTT adapter. The JWT must be sent in the *password* field. The content of the *user* field is
ignored. The information about the tenant and the authentication identifier can be presented to the protocol adapter in
one of two ways:
@@ -107,26 +109,38 @@ a client tries to connect and/or send a message to the adapter.
### Connection Limits
-The adapter rejects a client’s connection attempt with return code
+The adapter rejects a client’s attempt to connect using
-* `0x03` (*Connection Refused: server unavailable*), if the maximum number of connections per protocol adapter instance
- is reached
-* `0x05` (*Connection Refused: not authorized*), if the maximum number of simultaneously connected devices for the
- tenant is reached.
+* MQTT 3.1.1 with return code `0x03` (*Server Unavailable*),
+* MQTT 5.0 with reason code `0x9C` (*Use Another Server*),
+
+if the maximum number of connections per protocol adapter instance is reached.
+
+The adapter rejects a client’s attempt to connect using
+
+* MQTT 3.1.1 with return code `0x05` (*Not Authorized*),
+* MQTT 5.0 with reason code `0x97` (*Quota Exceeded*),
+
+if the maximum number of simultaneously connected devices for the tenant is reached.
### Connection Duration Limits
-The adapter rejects a client’s connection attempt with return code `0x05` (*Connection Refused: not authorized*), if the
-[connection duration limit]({{< relref "/concepts/resource-limits#connection-duration-limit" >}}) that has been
-configured for the client’s tenant is exceeded.
+The adapter rejects a client’s attempt to connect using
+
+* MQTT 3.1.1 with return code `0x05` (*Not Authorized*)
+* MQTT 5.0 with reason code `0x97` (*Quota Exceeded*)
+
+if the [connection duration limit]({{< relref "/concepts/resource-limits#connection-duration-limit" >}})
+that has been configured for the client’s tenant is exceeded.
### Message Limits
The adapter
-* rejects a client's connection attempt with return code `0x05` (*Connection Refused: not authorized*),
+* rejects a client's attempt to connect using MQTT 3.1.1 with return code `0x05` (*Not Authorized*),
+* rejects a client's attempt to connect using MQTT 5.0 with reason code `0x97` (*Quota Exceeded*),
* discards any MQTT PUBLISH packet containing telemetry data or an event that is sent by a client and
-* rejects any AMQP 1.0 message containing a command sent by a north bound application
+* rejects any command messages sent by a north bound application
if the [message limit]({{< relref "/concepts/resource-limits.md" >}}) that has been configured for the device’s tenant
is exceeded.
diff --git a/tests/pom.xml b/tests/pom.xml
index 6592ae6c7f..6a218d582a 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -319,6 +319,11 @@
vertx-mqtt
test
+
+ com.hivemq
+ hivemq-mqtt-client
+ test
+
org.eclipse.californium
californium-core
diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java
index b7ee51202e..2ccb3e0930 100644
--- a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java
+++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttConnectionIT.java
@@ -67,7 +67,7 @@
import io.vertx.mqtt.MqttConnectionException;
/**
- * Integration tests for checking connection to the MQTT adapter.
+ * Integration tests for checking MQTT 3.1.1 based connection to the MQTT adapter.
*
*/
@ExtendWith(VertxExtension.class)
diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java
index d1c2775edf..b6039291c3 100644
--- a/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java
+++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt/MqttTestBase.java
@@ -1,5 +1,5 @@
/*******************************************************************************
- * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
+ * Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -36,7 +36,7 @@
import io.vertx.mqtt.messages.MqttConnAckMessage;
/**
- * Base class for MQTT adapter integration tests.
+ * Base class for MQTT adapter integration tests using MQTT 3.1.1.
*
*/
public abstract class MqttTestBase {
@@ -70,7 +70,7 @@ public abstract class MqttTestBase {
protected Context context;
/**
- * Creates default AMQP client options.
+ * Creates default MQTT client options.
*/
@BeforeAll
public static void init() {
diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java
new file mode 100644
index 0000000000..994a56dcb2
--- /dev/null
+++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttConnectionIT.java
@@ -0,0 +1,884 @@
+/*******************************************************************************
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *******************************************************************************/
+
+package org.eclipse.hono.tests.mqtt5;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import java.net.HttpURLConnection;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.security.auth.x500.X500Principal;
+
+import org.eclipse.hono.service.management.credentials.Credentials;
+import org.eclipse.hono.service.management.credentials.PasswordCredential;
+import org.eclipse.hono.service.management.credentials.X509CertificateCredential;
+import org.eclipse.hono.service.management.credentials.X509CertificateSecret;
+import org.eclipse.hono.service.management.device.Device;
+import org.eclipse.hono.service.management.tenant.Tenant;
+import org.eclipse.hono.tests.EnabledIfDnsRebindingIsSupported;
+import org.eclipse.hono.tests.EnabledIfProtocolAdaptersAreRunning;
+import org.eclipse.hono.tests.EnabledIfRegistrySupportsFeatures;
+import org.eclipse.hono.tests.IntegrationTestSupport;
+import org.eclipse.hono.tests.Tenants;
+import org.eclipse.hono.util.Adapter;
+import org.eclipse.hono.util.Constants;
+import org.eclipse.hono.util.CredentialsConstants;
+import org.eclipse.hono.util.IdentityTemplate;
+import org.eclipse.hono.util.RegistrationConstants;
+import org.eclipse.hono.util.RegistryManagementConstants;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
+import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException;
+import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
+
+import io.jsonwebtoken.Jwts;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.SelfSignedCertificate;
+import io.vertx.junit5.Timeout;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+/**
+ * Integration tests for checking MQTT 5.0 based connection to the MQTT adapter.
+ *
+ */
+@ExtendWith(VertxExtension.class)
+@Timeout(timeUnit = TimeUnit.SECONDS, value = 15)
+@EnabledIfProtocolAdaptersAreRunning(mqttAdapter = true)
+public class MqttConnectionIT extends MqttTestBase {
+
+ private SelfSignedCertificate deviceCert;
+ private String tenantId;
+ private String deviceId;
+ private String password;
+
+ /**
+ * Sets up the fixture.
+ */
+ @BeforeEach
+ public void setUp() {
+ tenantId = helper.getRandomTenantId();
+ deviceId = helper.getRandomDeviceId(tenantId);
+ password = "secret";
+ deviceCert = SelfSignedCertificate.create(UUID.randomUUID().toString());
+ }
+
+ /**
+ * Verifies that the adapter opens a connection to registered devices with credentials.
+ *
+ * @param tlsVersion The TLS protocol version to use for connecting to the adapter.
+ * @param ctx The test context
+ */
+ @ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
+ @ValueSource(strings = { IntegrationTestSupport.TLS_VERSION_1_2, IntegrationTestSupport.TLS_VERSION_1_3 })
+ public void testConnectSucceedsForRegisteredDevice(final String tlsVersion, final VertxTestContext ctx) {
+
+ final Tenant tenant = new Tenant();
+
+ helper.registry
+ .addDeviceForTenant(tenantId, tenant, deviceId, password)
+ .compose(ok -> connectToAdapter(
+ tlsVersion,
+ IntegrationTestSupport.getUsername(deviceId, tenantId),
+ password,
+ null,
+ null))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter opens a connection to a registered device that authenticates
+ * using a JSON Web Token.
+ *
+ * @param ctx The test context
+ * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography.
+ */
+ @Test
+ public void testConnectJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException {
+
+ final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG);
+ final var keyPair = generator.generateKeyPair();
+ final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic());
+
+ final var jws = Jwts.builder()
+ .header().type("JWT")
+ .and()
+ .audience().add(CredentialsConstants.AUDIENCE_HONO_ADAPTER)
+ .and()
+ .issuer(deviceId)
+ .subject(deviceId)
+ .claim(CredentialsConstants.CLAIM_TENANT_ID, tenantId)
+ .issuedAt(Date.from(Instant.now()))
+ .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10))))
+ .signWith(keyPair.getPrivate())
+ .compact();
+
+ helper.registry.addTenant(tenantId)
+ .compose(res -> helper.registry.registerDevice(tenantId, deviceId))
+ .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential)))
+ .compose(ok -> connectToAdapter("ignored", jws))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter opens a connection to a registered device that authenticates
+ * using a Google IoT Core style JSON Web Token in conjunction with an MQTT connection
+ * identifier that contains the tenant and authentication ID.
+ *
+ * @param ctx The test context
+ * @throws NoSuchAlgorithmException if the JVM does not support ECC cryptography.
+ */
+ @Test
+ public void testConnectGoogleIoTCoreJwtSucceedsForRegisteredDevice(final VertxTestContext ctx) throws NoSuchAlgorithmException {
+
+ final var generator = KeyPairGenerator.getInstance(CredentialsConstants.EC_ALG);
+ final var keyPair = generator.generateKeyPair();
+ final var rpkCredential = Credentials.createRPKCredential(deviceId, keyPair.getPublic());
+
+ final var jws = Jwts.builder()
+ .header().type("JWT")
+ .and()
+ .issuedAt(Date.from(Instant.now()))
+ .expiration(Date.from(Instant.now().plus(Duration.ofMinutes(10))))
+ .signWith(keyPair.getPrivate())
+ .compact();
+ final var clientId = MqttClientIdentifier.of("tenants/%s/devices/%s".formatted(tenantId, deviceId));
+
+ helper.registry.addTenant(tenantId)
+ .compose(res -> helper.registry.registerDevice(tenantId, deviceId))
+ .compose(res -> helper.registry.addCredentials(tenantId, deviceId, Set.of(rpkCredential)))
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, "ignored", jws, clientId, null))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectX509SucceedsForRegisteredDevice(final VertxTestContext ctx) {
+
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
+ })
+ .compose(ok -> connectToAdapter(deviceCert))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds
+ * for a device belonging to a tenant that uses the same trust anchor as another tenant.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ @EnabledIfDnsRebindingIsSupported
+ @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true)
+ public void testConnectX509SucceedsUsingSni(final VertxTestContext ctx) {
+
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ // GIVEN two tenants belonging to the same trust anchor group
+ final var tenant = Tenants.createTenantForTrustAnchor(cert)
+ .setTrustAnchorGroup("test-group");
+ // which both use the same CA
+ return helper.registry.addTenant(helper.getRandomTenantId(), tenant)
+ // and a device belonging to one of the tenants
+ .compose(ok -> helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert));
+ })
+ // WHEN the device connects to the adapter including its tenant ID in the host name
+ .compose(ok -> connectToAdapter(
+ deviceCert,
+ IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, tenantId)))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ // THEN the connection attempt succeeds
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that an attempt to open a connection using a valid X.509 client certificate succeeds
+ * for a device belonging to a tenant with a tenant alias.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ @EnabledIfDnsRebindingIsSupported
+ @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true)
+ public void testConnectX509SucceedsUsingSniWithTenantAlias(final VertxTestContext ctx) {
+
+ helper.getCertificate(deviceCert.certificatePath())
+ // GIVEN two tenants belonging to the same trust anchor group
+ // which both use the same CA
+ .compose(cert -> helper.registry.addTenant(
+ helper.getRandomTenantId(),
+ Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group"))
+ .map(cert))
+ // and a device belonging to one of the tenants which has an alias configured
+ .compose(cert -> helper.registry.addDeviceForTenant(
+ tenantId,
+ Tenants.createTenantForTrustAnchor(cert)
+ .setTrustAnchorGroup("test-group")
+ .setAlias("test-alias"),
+ deviceId,
+ cert))
+ // WHEN the device connects to the adapter including the tenant alias in the host name
+ .compose(ok -> connectToAdapter(
+ deviceCert,
+ IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "test-alias")))
+ .onComplete(ctx.succeeding(conAckMsg -> {
+ // THEN the connection attempt succeeds
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that an attempt to open a connection using a valid X.509 client certificate fails
+ * for a device belonging to a tenant using a non-existing tenant alias.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ @EnabledIfDnsRebindingIsSupported
+ @EnabledIfRegistrySupportsFeatures(trustAnchorGroups = true, tenantAlias = true)
+ public void testConnectX509FailsUsingSniWithNonExistingTenantAlias(final VertxTestContext ctx) {
+
+ helper.getCertificate(deviceCert.certificatePath())
+ // GIVEN two tenants belonging to the same trust anchor group
+ // which both use the same CA
+ .compose(cert -> helper.registry.addTenant(
+ helper.getRandomTenantId(),
+ Tenants.createTenantForTrustAnchor(cert).setTrustAnchorGroup("test-group"))
+ .map(cert))
+ // and a device belonging to one of the tenants which has an alias configured
+ .compose(cert -> helper.registry.addDeviceForTenant(
+ tenantId,
+ Tenants.createTenantForTrustAnchor(cert)
+ .setTrustAnchorGroup("test-group")
+ .setAlias("test-alias"),
+ deviceId,
+ cert))
+ // WHEN the device connects to the adapter including a wrong tenant alias in the host name
+ .compose(ok -> connectToAdapter(
+ deviceCert,
+ IntegrationTestSupport.getSniHostname(IntegrationTestSupport.MQTT_HOST, "wrong-alias")))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it
+ * is not configured auto-provisioning-device-id template and auth-id template.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectSucceedsWithAutoProvisioningWithoutTemplate(final VertxTestContext ctx) {
+
+ final Promise autoProvisionedDeviceId = Promise.promise();
+
+ final IdentityTemplate defaultTemplate = new IdentityTemplate(
+ RegistryManagementConstants.PLACEHOLDER_SUBJECT_DN);
+
+ final Future certTracker = helper.getCertificate(deviceCert.certificatePath());
+ final Future subjectDNTracker = certTracker
+ .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253));
+
+ helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId)
+ .compose(ok -> certTracker)
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(true);
+ return helper.registry.addTenant(tenantId, tenant);
+ })
+ .compose(ok -> connectToAdapter(deviceCert))
+ .compose(ok -> autoProvisionedDeviceId.future())
+ .compose(deviceId -> {
+ // verify the device ID is not generated by subject-dn template
+ ctx.verify(
+ () -> assertThat(deviceId).isNotEqualTo(defaultTemplate.apply(subjectDNTracker.result())));
+ return helper.registry.getRegistrationInfo(tenantId, deviceId);
+ })
+ .compose(registrationResult -> {
+ ctx.verify(() -> {
+ final var infoRegistration = registrationResult.bodyAsJsonObject();
+ IntegrationTestSupport.assertDeviceStatusProperties(
+ infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS),
+ true);
+ });
+ return helper.registry.getCredentials(tenantId, autoProvisionedDeviceId.future().result());
+ })
+ .onComplete(ctx.succeeding(credentialsResult -> {
+ ctx.verify(() -> {
+ final var infoCredentials = credentialsResult.bodyAsJsonArray();
+ // verify the auth ID is generated by subject-dn template
+ assertThat(infoCredentials.getJsonObject(0)
+ .getString(RegistryManagementConstants.FIELD_AUTH_ID))
+ .isEqualTo(defaultTemplate.apply(subjectDNTracker.result()));
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter opens a connection if auto-provisioning is enabled for the device certificate, and it
+ * is configured auto-provisioning-device-id template and auth-id template.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectSucceedsWithAutoProvisioningWithTemplate(final VertxTestContext ctx) {
+
+ final Promise autoProvisionedDeviceId = Promise.promise();
+
+ final IdentityTemplate deviceIdTemplate = new IdentityTemplate("{{subject-dn}}");
+ final IdentityTemplate authIdTemplate = new IdentityTemplate("{{subject-cn}}");
+
+ final Future certTracker = helper.getCertificate(deviceCert.certificatePath());
+ final Future subjectDNTracker = certTracker
+ .map(cert -> cert.getSubjectX500Principal().getName(X500Principal.RFC2253));
+
+ helper.createAutoProvisioningNotificationConsumer(ctx, autoProvisionedDeviceId, tenantId)
+ .compose(ok -> certTracker)
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ tenant.getTrustedCertificateAuthorities().get(0)
+ .setAutoProvisioningEnabled(true)
+ .setAutoProvisioningDeviceIdTemplate(deviceIdTemplate.toString())
+ .setAuthIdTemplate(authIdTemplate.toString());
+ return helper.registry.addTenant(tenantId, tenant);
+ })
+ .compose(ok -> connectToAdapter(deviceCert))
+ .compose(ok -> autoProvisionedDeviceId.future())
+ .compose(deviceId -> {
+ // verify the device ID is generated by auto-provisioning-device-id template
+ ctx.verify(() -> assertThat(deviceId).isEqualTo(deviceIdTemplate.apply(subjectDNTracker.result())));
+ return helper.registry.getRegistrationInfo(tenantId, deviceId);
+ })
+ .compose(registrationResult -> {
+ ctx.verify(() -> {
+ final var infoRegistration = registrationResult.bodyAsJsonObject();
+ IntegrationTestSupport.assertDeviceStatusProperties(
+ infoRegistration.getJsonObject(RegistryManagementConstants.FIELD_STATUS),
+ true);
+ });
+ final var deviceId = deviceIdTemplate.apply(subjectDNTracker.result());
+ return helper.registry.getCredentials(tenantId, deviceId);
+ })
+ .onComplete(ctx.succeeding(credentialsResult -> {
+ ctx.verify(() -> {
+ final var infoCredentials = credentialsResult.bodyAsJsonArray();
+ // verify the auth ID is generated by auth-id template
+ assertThat(infoCredentials.getJsonObject(0)
+ .getString(RegistryManagementConstants.FIELD_AUTH_ID))
+ .isEqualTo(authIdTemplate.apply(subjectDNTracker.result()));
+ });
+ ctx.completeNow();
+
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from an unknown device for which auto-provisioning is
+ * disabled.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsIfAutoProvisioningIsDisabled(final VertxTestContext ctx) {
+
+ // GIVEN a tenant configured with a trust anchor that does not allow auto-provisioning
+ // WHEN an unknown device tries to connect
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ tenant.getTrustedCertificateAuthorities().get(0).setAutoProvisioningEnabled(false);
+ return helper.registry.addTenant(tenantId, tenant);
+ })
+ // WHEN a unknown device tries to connect to the adapter
+ // using a client certificate with the trust anchor registered for the device's tenant
+ .compose(ok -> connectToAdapter(deviceCert))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from unknown devices
+ * for which neither registration information nor credentials are on record.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForNonExistingDevice(final VertxTestContext ctx) {
+
+ // GIVEN an adapter
+ // WHEN an unknown device tries to connect
+ connectToAdapter(IntegrationTestSupport.getUsername("non-existing", Constants.DEFAULT_TENANT), "secret")
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from unknown devices
+ * trying to authenticate using a client certificate but for which neither
+ * registration information nor credentials are on record.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectX509FailsForNonExistingDevice(final VertxTestContext ctx) {
+
+ // GIVEN an adapter
+ // WHEN an unknown device tries to connect
+ connectToAdapter(deviceCert)
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices
+ * using wrong credentials.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForWrongCredentials(final VertxTestContext ctx) {
+
+ // GIVEN a registered device
+ final Tenant tenant = new Tenant();
+
+ helper.registry
+ .addDeviceForTenant(tenantId, tenant, deviceId, password)
+ // WHEN the device tries to connect using a wrong password
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), "wrong password"))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices
+ * using credentials that contain a non-existing tenant.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForNonExistingTenant(final VertxTestContext ctx) {
+
+ // GIVEN a registered device
+ final Tenant tenant = new Tenant();
+
+ helper.registry
+ .addDeviceForTenant(tenantId, tenant, deviceId, password)
+ // WHEN a device of a non-existing tenant tries to connect
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, "nonExistingTenant"), "secret"))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices using a client certificate with an unknown
+ * subject DN.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectX509FailsForUnknownSubjectDN(final VertxTestContext ctx) {
+
+ // GIVEN a registered device
+
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ return helper.registry.addTenant(tenantId, tenant);
+ }).compose(ok -> helper.registry.registerDevice(tenantId, deviceId))
+ .compose(ok -> {
+ final String authId = new X500Principal("CN=4711").getName(X500Principal.RFC2253);
+ final var credential = X509CertificateCredential.fromAuthId(authId, List.of(new X509CertificateSecret()));
+ return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(credential));
+ })
+ // WHEN the device tries to connect using a client certificate with an unknown subject DN
+ .compose(ok -> connectToAdapter(deviceCert))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices belonging to a tenant for which the MQTT
+ * adapter has been disabled.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForDisabledAdapter(final VertxTestContext ctx) {
+
+ final Tenant tenant = new Tenant();
+ tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));
+
+ helper.registry
+ .addDeviceForTenant(tenantId, tenant, deviceId, password)
+ // WHEN a device that belongs to the tenant tries to connect to the adapter
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices
+ * using a client certificate which belong to a tenant for which the
+ * MQTT adapter has been disabled.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectX509FailsForDisabledAdapter(final VertxTestContext ctx) {
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ tenant.addAdapterConfig(new Adapter(Constants.PROTOCOL_ADAPTER_TYPE_MQTT).setEnabled(false));
+ return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
+ })
+ // WHEN a device that belongs to the tenant tries to connect to the adapter
+ .compose(ok -> connectToAdapter(deviceCert))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices for which credentials exist but are disabled.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForDisabledCredentials(final VertxTestContext ctx) {
+
+ helper.registry
+ .addTenant(tenantId)
+ .compose(ok -> {
+ return helper.registry.registerDevice(tenantId, deviceId);
+ })
+ .compose(ok -> {
+ final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password);
+ secret.setEnabled(false);
+ return helper.registry.addCredentials(tenantId, deviceId, List.of(secret));
+ })
+ // WHEN a device connects using the correct credentials
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices for which credentials exist but the device is
+ * disabled.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForDisabledDevice(final VertxTestContext ctx) {
+
+ final Tenant tenant = new Tenant();
+
+ helper.registry
+ .addTenant(tenantId, tenant)
+ .compose(ok -> {
+ final var device = new Device();
+ device.setEnabled(false);
+ return helper.registry.registerDevice(tenantId, deviceId, device);
+ })
+ .compose(ok -> {
+ final PasswordCredential secret = Credentials.createPasswordCredential(deviceId, password);
+ return helper.registry.addCredentials(tenantId, deviceId, Collections.singleton(secret));
+ })
+ // WHEN a device connects using the correct credentials
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices that belong to a disabled tenant.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectFailsForDisabledTenant(final VertxTestContext ctx) {
+
+ // Given a disabled tenant for which the MQTT adapter is enabled
+ final Tenant tenant = new Tenant();
+ tenant.setEnabled(false);
+
+ helper.registry
+ .addDeviceForTenant(tenantId, tenant, deviceId, password)
+ .compose(ok -> connectToAdapter(IntegrationTestSupport.getUsername(deviceId, tenantId), password))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter rejects connection attempts from devices
+ * using a client certificate that belong to a disabled tenant.
+ *
+ * @param ctx The test context
+ */
+ @Test
+ public void testConnectX509FailsForDisabledTenant(final VertxTestContext ctx) {
+
+ // Given a disabled tenant for which the MQTT adapter is enabled
+ helper.getCertificate(deviceCert.certificatePath())
+ .compose(cert -> {
+ final var tenant = Tenants.createTenantForTrustAnchor(cert);
+ tenant.setEnabled(false);
+ return helper.registry.addDeviceForTenant(tenantId, tenant, deviceId, cert);
+ })
+ .compose(ok -> connectToAdapter(deviceCert))
+ .onComplete(ctx.failing(t -> {
+ // THEN the connection is refused with a NOT_AUTHORIZED code
+ ctx.verify(() -> {
+ assertThat(t).isInstanceOf(Mqtt5ConnAckException.class);
+ final var error = ((Mqtt5ConnAckException) t).getMqttMessage();
+ assertThat(error.getReasonCode())
+ .isEqualTo(Mqtt5ConnAckReasonCode.NOT_AUTHORIZED);
+ });
+ ctx.completeNow();
+ }));
+ }
+
+ /**
+ * Verifies that the adapter closes the connection to a device when the registration data of the device
+ * is deleted.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @Test
+ public void testDeviceConnectionIsClosedOnDeviceDeleted(final VertxTestContext ctx) {
+ testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx,
+ () -> helper.registry.deregisterDevice(tenantId, deviceId));
+ }
+
+ /**
+ * Verifies that the adapter closes the connection to a device when the registration data of the device
+ * is disabled.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @Test
+ public void testDeviceConnectionIsClosedOnDeviceDisabled(final VertxTestContext ctx) {
+ final JsonObject updatedDeviceData = new JsonObject()
+ .put(RegistrationConstants.FIELD_ENABLED, Boolean.FALSE);
+ testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx,
+ () -> helper.registry.updateDevice(tenantId, deviceId, updatedDeviceData));
+ }
+
+ /**
+ * Verifies that the adapter closes the connection to a device when the tenant of the device
+ * is deleted.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @Test
+ public void testDeviceConnectionIsClosedOnTenantDeleted(final VertxTestContext ctx) {
+ testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx,
+ () -> helper.registry.removeTenant(tenantId));
+ }
+
+ /**
+ * Verifies that the adapter closes the connection to a device when the tenant of the device
+ * is disabled.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @Test
+ public void testDeviceConnectionIsClosedOnTenantDisabled(final VertxTestContext ctx) {
+ final Tenant updatedTenant = new Tenant().setEnabled(false);
+ testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx,
+ () -> helper.registry.updateTenant(tenantId, updatedTenant, HttpURLConnection.HTTP_NO_CONTENT));
+ }
+
+ /**
+ * Verifies that the adapter closes the connection to a device when registration data for all devices of the device
+ * tenant is deleted.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @Test
+ public void testDeviceConnectionIsClosedOnAllDevicesOfTenantDeleted(final VertxTestContext ctx) {
+ testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(ctx,
+ () -> helper.registry.deregisterDevicesOfTenant(tenantId));
+ }
+
+ private void testDeviceConnectionIsClosedOnDeviceOrTenantDisabledOrDeleted(
+ final VertxTestContext ctx,
+ final Supplier> deviceRegistryChangeOperation) {
+
+ final Promise connectionClosedPromise = Promise.promise();
+ final var disconnectedListener = new MqttClientDisconnectedListener() {
+ @Override
+ public void onDisconnected(@NotNull final MqttClientDisconnectedContext context) {
+ connectionClosedPromise.complete();
+ }
+ };
+
+ // GIVEN a connected device
+ helper.registry
+ .addDeviceForTenant(tenantId, new Tenant(), deviceId, password)
+ .compose(ok -> connectToAdapter(
+ IntegrationTestSupport.TLS_VERSION_1_2,
+ IntegrationTestSupport.getUsername(deviceId, tenantId),
+ password,
+ null,
+ disconnectedListener))
+ .compose(conAckMsg -> {
+ ctx.verify(() -> assertThat(conAckMsg.getReasonCode()).isEqualTo(Mqtt5ConnAckReasonCode.SUCCESS));
+ // WHEN corresponding device/tenant is removed/disabled
+ return deviceRegistryChangeOperation.get();
+ })
+ // THEN the device connection is closed
+ .compose(ok -> connectionClosedPromise.future())
+ .onComplete(ctx.succeedingThenComplete());
+ }
+}
diff --git a/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java
new file mode 100644
index 0000000000..66af1cfb71
--- /dev/null
+++ b/tests/src/test/java/org/eclipse/hono/tests/mqtt5/MqttTestBase.java
@@ -0,0 +1,284 @@
+/*******************************************************************************
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *******************************************************************************/
+
+package org.eclipse.hono.tests.mqtt5;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.eclipse.hono.tests.IntegrationTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hivemq.client.mqtt.MqttClientSslConfig;
+import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
+import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.net.PemTrustOptions;
+import io.vertx.core.net.SelfSignedCertificate;
+import io.vertx.junit5.VertxTestContext;
+
+/**
+ * Base class for MQTT adapter integration tests using MQTT 5.0.
+ *
+ */
+public abstract class MqttTestBase {
+
+ private static final Vertx vertx = Vertx.vertx();
+
+ private static TrustManagerFactory trustManagerFactory;
+ private static HostnameVerifier acceptAllHostnames;
+
+ /**
+ * A logger to be used by subclasses.
+ */
+ protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
+
+ /**
+ * A helper accessing the AMQP 1.0 Messaging Network and
+ * for managing tenants/devices/credentials.
+ */
+ protected IntegrationTestSupport helper;
+ /**
+ * A client for publishing messages to the MQTT protocol adapter.
+ */
+ protected Mqtt5AsyncClient mqttClient;
+ /**
+ * Creates default MQTT client TLS options.
+ *
+ * @throws Exception if the trust manager factory cannot be created.
+ */
+ @BeforeAll
+ public static void init() throws Exception {
+
+ trustManagerFactory = new PemTrustOptions()
+ .addCertPath(IntegrationTestSupport.TRUST_STORE_PATH)
+ .getTrustManagerFactory(vertx);
+ acceptAllHostnames = new HostnameVerifier() {
+ @Override
+ public boolean verify(final String hostname, final SSLSession session) {
+ // disable host name verification
+ return true;
+ }
+ };
+ }
+
+ /**
+ * Create integration test helper.
+ *
+ * @param testInfo The JUnit test info.
+ * @param ctx The vert.x test context.
+ */
+ @BeforeEach
+ public void createHelper(final TestInfo testInfo, final VertxTestContext ctx) {
+ LOGGER.info("running {}", testInfo.getDisplayName());
+ helper = new IntegrationTestSupport(vertx);
+ helper.init().onComplete(ctx.succeedingThenComplete());
+ }
+
+ /**
+ * Closes the connection to the AMQP 1.0 Messaging Network.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @AfterEach
+ public void closeConnectionToAmqpMessagingNetwork(final VertxTestContext ctx) {
+
+ helper.disconnect().onComplete(r -> ctx.completeNow());
+ }
+
+ /**
+ * Closes the connection to the MQTT adapter and deletes device registry entries created during the test.
+ *
+ * @param ctx The vert.x test context.
+ */
+ @AfterEach
+ public void closeMqttAdapterConnectionAndCleanupDeviceRegistry(final VertxTestContext ctx) {
+
+ final Promise disconnectHandler = Promise.promise();
+ if (mqttClient == null) {
+ disconnectHandler.complete();
+ } else {
+ Future.fromCompletionStage(mqttClient.disconnect()).onComplete(disconnectHandler);
+ }
+ disconnectHandler.future().onComplete(closeAttempt -> {
+ LOGGER.info("connection to MQTT adapter closed");
+ mqttClient = null;
+ // cleanup device registry - done after the adapter connection is closed because otherwise
+ // the adapter would close the connection from its end after having received the device deletion notification
+ helper.deleteObjects(ctx);
+ });
+ }
+
+ /**
+ * Opens a connection to the MQTT adapter using given credentials.
+ *
+ * @param username The username to use for authentication.
+ * @param password The password to use for authentication.
+ * @return A future that will be completed with the CONNACK packet received
+ * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException}
+ * if the connection could not be established.
+ */
+ protected final Future connectToAdapter(
+ final String username,
+ final String password) {
+ return connectToAdapter(IntegrationTestSupport.TLS_VERSION_1_2, username, password, null, null);
+ }
+
+ /**
+ * Opens a connection to the MQTT adapter using given credentials.
+ *
+ * @param tlsVersion The TLS protocol version to use for connecting to the adapter.
+ * @param username The username to use for authentication.
+ * @param password The password to use for authentication.
+ * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or
+ * {@code null}, if an arbitrary identifier should be used.
+ * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost
+ * or {@code null}, if no listener should be registered.
+ * @return A future that will be completed with the CONNACK packet received
+ * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException}
+ * if the connection could not be established.
+ */
+ protected final Future connectToAdapter(
+ final String tlsVersion,
+ final String username,
+ final String password,
+ final MqttClientIdentifier mqttClientId,
+ final MqttClientDisconnectedListener disconnectedListener) {
+
+ final var sslConfig = MqttClientSslConfig.builder()
+ .trustManagerFactory(trustManagerFactory)
+ .hostnameVerifier(acceptAllHostnames)
+ .protocols(Set.of(tlsVersion))
+ .build();
+ final var auth = Mqtt5SimpleAuth.builder()
+ .username(username)
+ .password(password.getBytes(StandardCharsets.UTF_8))
+ .build();
+ return connectToAdapter(sslConfig, IntegrationTestSupport.MQTT_HOST, mqttClientId, auth, disconnectedListener);
+ }
+
+ /**
+ * Opens a connection to the MQTT adapter using an X.509 client certificate.
+ *
+ * @param cert The client certificate to use for authentication.
+ * @return A future that will be completed with the CONNACK packet received
+ * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException}
+ * if the connection could not be established.
+ * @throws NullPointerException if client certificate is {@code null}.
+ * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter.
+ */
+ protected final Future connectToAdapter(final SelfSignedCertificate cert) {
+ return connectToAdapter(cert, IntegrationTestSupport.MQTT_HOST);
+ }
+
+ /**
+ * Opens a connection to the MQTT adapter using an X.509 client certificate.
+ *
+ * @param cert The client certificate to use for authentication.
+ * @param hostname The name of the host to connect to.
+ * @return A future that will be completed with the CONNACK packet received
+ * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException}
+ * if the connection could not be established.
+ * @throws NullPointerException if any of the parameters are {@code null}.
+ * @throws IllegalArgumentException if the certificate cannot be used for authenticating to the MQTT adapter.
+ */
+ protected final Future connectToAdapter(
+ final SelfSignedCertificate cert,
+ final String hostname) {
+
+ Objects.requireNonNull(cert);
+ Objects.requireNonNull(hostname);
+
+ final KeyManagerFactory selfSignedKeyManagerFactory;
+ try {
+ selfSignedKeyManagerFactory = cert.keyCertOptions().getKeyManagerFactory(vertx);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e.getCause());
+ }
+ final var sslConfig = MqttClientSslConfig.builder()
+ .trustManagerFactory(trustManagerFactory)
+ .hostnameVerifier(acceptAllHostnames)
+ .protocols(Set.of(IntegrationTestSupport.TLS_VERSION_1_2))
+ .keyManagerFactory(selfSignedKeyManagerFactory)
+ .build();
+
+ return connectToAdapter(sslConfig, hostname, null, null, null);
+ }
+
+ /**
+ * Opens a connection to the MQTT adapter using given options.
+ *
+ * @param sslConfig The SSL options to use for connecting to the adapter.
+ * @param hostname The name of the host to connect to.
+ * @param mqttClientId MQTT client identifier to use when connecting to the MQTT adapter or
+ * {@code null}, if an arbitrary identifier should be used.
+ * @param auth The credentials to use for authenticating to the adapter or {@code null}, if
+ * a client certificate (set in the SSL configuration) should be used.
+ * @param disconnectedListener A listener to be invoked when the connection to the adapter is lost
+ * or {@code null}, if no listener should be registered.
+ * @return A future that will be completed with the CONNACK packet received
+ * from the adapter or failed with a {@link io.vertx.mqtt.MqttConnectionException}
+ * if the connection could not be established.
+ * @throws NullPointerException if any of SSL config or host name are {@code null}.
+ */
+ protected final Future connectToAdapter(
+ final MqttClientSslConfig sslConfig,
+ final String hostname,
+ final MqttClientIdentifier mqttClientId,
+ final Mqtt5SimpleAuth auth,
+ final MqttClientDisconnectedListener disconnectedListener) {
+
+ Objects.requireNonNull(sslConfig);
+ Objects.requireNonNull(hostname);
+
+ final var clientId = Optional.ofNullable(mqttClientId).orElse(MqttClientIdentifier.of(UUID.randomUUID().toString()));
+ final var builder = Mqtt5Client.builder()
+ .identifier(clientId)
+ .sslConfig(sslConfig)
+ .serverHost(hostname)
+ .serverPort(IntegrationTestSupport.MQTTS_PORT);
+ Optional.ofNullable(disconnectedListener).ifPresent(builder::addDisconnectedListener);
+ mqttClient = builder.buildAsync();
+ final var connect = Mqtt5Connect.builder()
+ .cleanStart(true)
+ .keepAlive(10)
+ .simpleAuth(auth)
+ .build();
+ return Future.fromCompletionStage(mqttClient.connect(connect)).onSuccess(conAck -> {
+ LOGGER.info(
+ "MQTTS connection to adapter [host: {}, port: {}] established",
+ hostname, IntegrationTestSupport.MQTTS_PORT);
+ });
+ }
+}