From aabafcef5571302dae30c3751c885eaa17a3891e Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Fri, 2 Feb 2024 13:57:01 +0100 Subject: [PATCH 1/2] [#3585] Add acknowledgement required command feature for Google Pub/Sub based commands Signed-off-by: Matthias Kaemmer --- ...AbstractVertxBasedMqttProtocolAdapter.java | 18 +++- .../command/amqp/ProtonBasedCommand.java | 7 +- .../command/kafka/KafkaBasedCommand.java | 7 +- .../command/pubsub/PubSubBasedCommand.java | 18 +++- .../command/AbstractCommandContext.java | 84 +++++++++++++++---- .../eclipse/hono/client/command/Command.java | 9 +- .../client/pubsub/PubSubMessageHelper.java | 17 +++- .../eclipse/hono/util/CommandConstants.java | 7 +- .../api/command-and-control-pubsub/index.md | 21 +++-- 9 files changed, 153 insertions(+), 35 deletions(-) diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index d35eafd549..b02fb04c22 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -59,6 +59,7 @@ import org.eclipse.hono.client.NoConsumerException; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; +import org.eclipse.hono.client.command.AbstractCommandContext; import org.eclipse.hono.client.command.Command; import org.eclipse.hono.client.command.CommandContext; import org.eclipse.hono.client.command.CommandResponse; @@ -1734,8 +1735,19 @@ private void afterCommandPublished( reportPublishedCommand(tenantObject, subscription, commandContext, ProcessingOutcome.FORWARDED); log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", msgId, subscription.getTenant(), subscription.getDeviceId(), endpoint.clientIdentifier()); - commandContext.getTracingSpan().log("received PUBACK from device"); - commandContext.accept(); + final Span span = commandContext.getTracingSpan(); + span.log("received PUBACK from device"); + final Command command = commandContext.getCommand(); + if (command.isAckRequired() && command.isValid() + && commandContext instanceof AbstractCommandContext abstractCommandContext) { + abstractCommandContext + .sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED, + "Command successfully received", span, command.getCorrelationId(), + command.getMessagingType()) + .onComplete(v -> commandContext.accept()); + } else { + commandContext.accept(); + } }; final Handler onAckTimeoutHandler = v -> { diff --git a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java index 3f9f65383a..c31def5fda 100644 --- a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java +++ b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -193,6 +193,11 @@ public boolean isOneWay() { return replyToId == null; } + @Override + public boolean isAckRequired() { + return false; + } + @Override public boolean isValid() { return !validationError.isPresent(); diff --git a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java index a8c5d2fd25..0bfb3ea937 100644 --- a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java +++ b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -213,6 +213,11 @@ public boolean isOneWay() { return !responseRequired; } + @Override + public boolean isAckRequired() { + return false; + } + @Override public boolean isValid() { return !validationError.isPresent(); diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java index 84f0d289ce..8bb6d29d7c 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -47,6 +47,7 @@ public final class PubSubBasedCommand implements Command { private final String contentType; private final String requestId; private final boolean responseRequired; + private final boolean ackRequired; private String gatewayId; @@ -58,7 +59,8 @@ private PubSubBasedCommand( final String correlationId, final String subject, final String contentType, - final boolean responseRequired) { + final boolean responseRequired, + final boolean ackRequired) { this.validationError = validationError; this.pubsubMessage = pubsubMessage; @@ -68,6 +70,7 @@ private PubSubBasedCommand( this.subject = subject; this.contentType = contentType; this.responseRequired = responseRequired; + this.ackRequired = ackRequired; this.requestId = Commands.encodeRequestIdParameters(correlationId, MessagingType.pubsub); } @@ -134,10 +137,11 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage, final StringJoiner validationErrorJoiner = new StringJoiner(", "); final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes); + final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes); final String correlationId = PubSubMessageHelper.getCorrelationId(attributes) .filter(id -> !id.isEmpty()) .orElseGet(() -> { - if (responseRequired) { + if (responseRequired || ackRequired) { validationErrorJoiner.add("correlation-id is not set"); } return null; @@ -157,7 +161,8 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage, correlationId, subject, contentType, - responseRequired); + responseRequired, + ackRequired); } /** @@ -169,6 +174,11 @@ public PubsubMessage getPubsubMessage() { return pubsubMessage; } + @Override + public boolean isAckRequired() { + return !responseRequired && ackRequired; + } + @Override public boolean isOneWay() { return !responseRequired; diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java index 69bf4754d2..898e1e2ca9 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -14,12 +14,14 @@ import java.net.HttpURLConnection; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Optional; import org.eclipse.hono.tracing.TracingHelper; import org.eclipse.hono.util.CommandConstants; import org.eclipse.hono.util.MapBasedExecutionContext; +import org.eclipse.hono.util.MessageHelper; import org.eclipse.hono.util.MessagingType; import org.eclipse.hono.util.RegistrationAssertion; import org.eclipse.hono.util.TenantObject; @@ -36,7 +38,8 @@ * in a Pub/Sub and Kafka based command context. * @param The type of Command. */ -public abstract class AbstractCommandContext extends MapBasedExecutionContext implements CommandContext { +public abstract class AbstractCommandContext extends MapBasedExecutionContext + implements CommandContext { protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommandContext.class); @@ -157,24 +160,73 @@ protected Future sendDeliveryFailureCommandResponseMessage( commandResponse.setAdditionalProperties( Collections.unmodifiableMap(command.getDeliveryFailureNotificationProperties())); - return commandResponseSender.sendCommandResponse( - // try to retrieve tenant configuration from context - Optional.ofNullable(get(KEY_TENANT_CONFIG)) - .filter(TenantObject.class::isInstance) - .map(TenantObject.class::cast) - // and fall back to default configuration - .orElseGet(() -> TenantObject.from(command.getTenant())), - new RegistrationAssertion(command.getDeviceId()), - commandResponse, - span.context()) - .onFailure(thr -> { - LOG.debug("failed to publish command response [{}]", commandResponse, thr); - TracingHelper.logError(span, "failed to publish command response message", thr); - }) + return sendCommandResponse(commandResponse, span) .onSuccess(v -> { LOG.debug("published error command response [{}, cause: {}]", commandResponse, cause != null ? cause.getMessage() : error); span.log("published error command response"); }); } + + /** + * Sends a command response as a command acknowledgement. + * + * @param status The HTTP status code indicating the outcome of processing the command. + * @param successMessage The message for the response message body. + * @param span The active OpenTracing span to use for tracking this operation. + * @param correlationId The correlation ID of the command that this is the response for. + * @param messagingType The type of the messaging system via which the command message was received. + * @return A future indicating the outcome of the operation. + *

+ * The future will be succeeded if the command response has been sent. + *

+ * The future will be failed if the command response could not be sent. + */ + public Future sendDeliverySuccessCommandResponseMessage( + final int status, + final String successMessage, + final Span span, + final String correlationId, + final MessagingType messagingType) { + if (correlationId == null) { + TracingHelper.logError(span, "can't send command response message - no correlation id set"); + return Future.failedFuture("missing correlation id"); + } + final JsonObject payloadJson = new JsonObject(); + payloadJson.put("acknowledgement", successMessage != null ? successMessage : ""); + + final CommandResponse commandResponse = new CommandResponse( + command.getTenant(), + command.getDeviceId(), + payloadJson.toBuffer(), + CommandConstants.CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION, + status, + correlationId, + "", + messagingType); + commandResponse.setAdditionalProperties(Map.of(MessageHelper.SYS_PROPERTY_SUBJECT, command.getName())); + + return sendCommandResponse(commandResponse, span) + .onSuccess(v -> { + LOG.debug("published ack command response [{}]", commandResponse); + span.log("published ack command response"); + }); + } + + private Future sendCommandResponse(final CommandResponse commandResponse, final Span span) { + return commandResponseSender.sendCommandResponse( + // try to retrieve tenant configuration from context + Optional.ofNullable(get(KEY_TENANT_CONFIG)) + .filter(TenantObject.class::isInstance) + .map(TenantObject.class::cast) + // and fall back to default configuration + .orElseGet(() -> TenantObject.from(command.getTenant())), + new RegistrationAssertion(command.getDeviceId()), + commandResponse, + span.context()) + .onFailure(thr -> { + LOG.debug("failed to publish command response [{}]", commandResponse, thr); + TracingHelper.logError(span, "failed to publish command response message", thr); + }); + } } diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java index b08fdb2bdb..aea1e9c461 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -35,6 +35,13 @@ public interface Command { */ boolean isOneWay(); + /** + * Checks if an acknowledgement of this command should be sent to the messaging infrastructure. + * + * @return {@code true} if an acknowledgement is required. + */ + boolean isAckRequired(); + /** * Checks if this command contains all required information. * diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java index 4289d6f523..89a1ac9b3d 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -43,6 +43,10 @@ public final class PubSubMessageHelper { * The name of the Pub/Sub message property indicating whether a response to the message is expected/required. */ public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required"; + /** + * The name of the Pub/Sub message property indicating whether an acknowledgement to the message is expected/required. + */ + public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required"; /** * Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be @@ -202,6 +206,17 @@ public static boolean isResponseRequired(final Map attributesMap .parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_RESPONSE_REQUIRED).orElse("false")); } + /** + * Gets the value of the {@value PUBSUB_PROPERTY_ACK_REQUIRED} attribute. + * + * @param attributesMap The attributes map to get the value from. + * @return The attributes value. + */ + public static boolean isAckRequired(final Map attributesMap) { + return Boolean + .parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_ACK_REQUIRED).orElse("false")); + } + /** * Gets the value of the {@value MessageHelper#SYS_PROPERTY_CONTENT_TYPE} attribute. * diff --git a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java index 4dd05d9dd9..42f3ecc4a7 100644 --- a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java +++ b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -76,6 +76,11 @@ public class CommandConstants { */ public static final String COMMAND_RESPONSE_RESPONSE_PART_SHORT = "s"; + /** + * The content type that is defined for acknowledgement command response messages. + */ + public static final String CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION = "application/vnd.eclipse-hono-delivery-success-notification+json"; + /** * The content type that is defined for error command response messages sent by a protocol adapter or Command Router. */ diff --git a/site/documentation/content/api/command-and-control-pubsub/index.md b/site/documentation/content/api/command-and-control-pubsub/index.md index 609c191b79..10d9b5bdaa 100644 --- a/site/documentation/content/api/command-and-control-pubsub/index.md +++ b/site/documentation/content/api/command-and-control-pubsub/index.md @@ -63,16 +63,23 @@ project and `${tenant_id}` is the ID of the tenant that the client wants to send Metadata MUST be set as Pub/Sub attributes on a message. The following table provides an overview of the attributes the *Business Application* needs to set on a one-way command message. -| Name | Mandatory | Type | Description | -|:---------------|:---------:|:---------|:--------------------------------------------------------------| -| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | -| *subject* | yes | *string* | The name of the command to be executed by the device. | -| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | +| Name | Mandatory | Type | Description | +|:-----------------|:---------:|:----------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | +| *subject* | yes | *string* | The name of the command to be executed by the device. | +| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | +| *ack-required* | no | *boolean* | If set to `true` a command acknowledgement message will be sent on the command-response topic once the device acknowledges the command. Currently this only works with MQTT devices which have a QoS 1 subscription on the command topic. | +| *correlation-id* | no | *string* | MUST be set if *ack-required* is set to `true`. The identifier used to correlate a response message to the original request. It is used as the *correlation-id* attribute in the response. | The command message MAY contain arbitrary payload, set as message value, to be sent to the device. The value of the message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the -payload -data. +payload data. + +{{% notice info %}} +Currently the acknowledgement mechanism only works with devices connected via the MQTT protocol which have a +subscription on the command topic with a QoS level of 1. Getting an acknowledgement indicates that the device has +successfully received the command. However, it does not confirm whether the device has successfully processed the command. +{{% /notice %}} ## Send a (Request/Response) Command From 0fa5e87ee47bbd48e30fe8e433709e31bb1bad27 Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Fri, 8 Mar 2024 17:53:51 +0100 Subject: [PATCH 2/2] Add suggested changes and add ack mechanism for HTTP adapter Signed-off-by: Matthias Kaemmer --- ...AbstractVertxBasedHttpProtocolAdapter.java | 21 ++++- ...AbstractVertxBasedMqttProtocolAdapter.java | 2 +- .../command/amqp/ProtonBasedCommand.java | 2 +- .../command/kafka/KafkaBasedCommand.java | 2 +- .../command/pubsub/PubSubBasedCommand.java | 7 +- .../pubsub/PubSubBasedCommandContext.java | 6 +- .../command/AbstractCommandContext.java | 11 ++- .../eclipse/hono/client/command/Command.java | 2 +- .../client/pubsub/PubSubMessageHelper.java | 2 +- .../eclipse/hono/util/CommandConstants.java | 2 +- .../api/command-and-control-pubsub/index.md | 91 +++++++++++++------ 11 files changed, 102 insertions(+), 46 deletions(-) diff --git a/adapters/http-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java b/adapters/http-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java index 44de8dda88..910a40be63 100644 --- a/adapters/http-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java +++ b/adapters/http-base/src/main/java/org/eclipse/hono/adapter/http/AbstractVertxBasedHttpProtocolAdapter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2016 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -28,6 +28,7 @@ import org.eclipse.hono.adapter.auth.device.DeviceCredentials; import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.command.AbstractCommandContext; import org.eclipse.hono.client.command.Command; import org.eclipse.hono.client.command.CommandContext; import org.eclipse.hono.client.command.CommandResponse; @@ -706,13 +707,25 @@ private void doUploadMessage( endpoint, tenant, deviceId); if (commandContext != null) { commandContext.getTracingSpan().log("forwarded command to device in HTTP response body"); - commandContext.accept(); + final Command command = commandContext.getCommand(); + if (command.isAckRequired() && command.isValid() + && commandContext instanceof AbstractCommandContext abstractCommandContext) { + abstractCommandContext + .sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED, + "Command successfully forwarded to device", + currentSpan, + command.getCorrelationId(), + command.getMessagingType()) + .onComplete(v -> commandContext.accept()); + } else { + commandContext.accept(); + } metrics.reportCommand( - commandContext.getCommand().isOneWay() ? Direction.ONE_WAY : Direction.REQUEST, + command.isOneWay() ? Direction.ONE_WAY : Direction.REQUEST, tenant, tenantTracker.result(), ProcessingOutcome.FORWARDED, - commandContext.getCommand().getPayloadSize(), + command.getPayloadSize(), getMicrometerSample(commandContext)); } currentSpan.finish(); diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index b02fb04c22..d2bda20e09 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2016 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java index c31def5fda..dbca453740 100644 --- a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java +++ b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2016 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java index 0bfb3ea937..a896d4fb98 100644 --- a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java +++ b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2021 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java index 8bb6d29d7c..49ac2036ce 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -138,6 +138,9 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage, final StringJoiner validationErrorJoiner = new StringJoiner(", "); final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes); final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes); + if (responseRequired && ackRequired) { + validationErrorJoiner.add("response-required and ack-required must not both true"); + } final String correlationId = PubSubMessageHelper.getCorrelationId(attributes) .filter(id -> !id.isEmpty()) .orElseGet(() -> { @@ -176,7 +179,7 @@ public PubsubMessage getPubsubMessage() { @Override public boolean isAckRequired() { - return !responseRequired && ackRequired; + return ackRequired; } @Override diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommandContext.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommandContext.java index 690c54789d..2968d27203 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommandContext.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommandContext.java @@ -62,7 +62,7 @@ public void release(final Throwable error) { final ServiceInvocationException mappedError = StatusCodeMapper.toServerError(error); final int status = mappedError.getErrorCode(); Tags.HTTP_STATUS.set(span, status); - if (isRequestResponseCommand() && !(error instanceof CommandAlreadyProcessedException) + if ((isRequestResponseCommand() || isAckRequiredCommand()) && !(error instanceof CommandAlreadyProcessedException) && !(error instanceof CommandToBeReprocessedException)) { final String errorMessage = Optional .ofNullable(ServiceInvocationException.getErrorMessageForExternalClient(mappedError)) @@ -90,7 +90,7 @@ public void modify(final boolean deliveryFailed, final boolean undeliverableHere TracingHelper.logError(span, String.format("command for device handled with outcome 'modified' %s %s", deliveryFailedReason, undeliverableHereReason)); Tags.HTTP_STATUS.set(span, status); - if (isRequestResponseCommand()) { + if (isRequestResponseCommand() || isAckRequiredCommand()) { final String error = String.format("command not processed %s %s", deliveryFailedReason, undeliverableHereReason); final String correlationId = getCorrelationId(); @@ -112,7 +112,7 @@ public void reject(final Throwable error) { TracingHelper.logError(getTracingSpan(), "client error trying to deliver or process command", error); final Span span = getTracingSpan(); Tags.HTTP_STATUS.set(span, status); - if (isRequestResponseCommand()) { + if (isRequestResponseCommand() || isAckRequiredCommand()) { final String nonNullCause = Optional.ofNullable(error.getMessage()).orElse("Command message rejected"); final String correlationId = getCorrelationId(); sendDeliveryFailureCommandResponseMessage(status, nonNullCause, span, null, correlationId, diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java index 898e1e2ca9..ee83d6b4f8 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -118,6 +118,15 @@ protected boolean isRequestResponseCommand() { return !command.isOneWay(); } + /** + * Checks if the command is an ack-required command. + * + * @return True if it is an ack-required command, false otherwise. + */ + protected boolean isAckRequiredCommand() { + return command.isAckRequired(); + } + /** * Sends a command response if the command response message represents an error message. * diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java index aea1e9c461..0129ac49cb 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java index 89a1ac9b3d..b6d73bb244 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java index 42f3ecc4a7..eb1ef88d13 100644 --- a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java +++ b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation + * Copyright (c) 2016 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/site/documentation/content/api/command-and-control-pubsub/index.md b/site/documentation/content/api/command-and-control-pubsub/index.md index 10d9b5bdaa..d086010a48 100644 --- a/site/documentation/content/api/command-and-control-pubsub/index.md +++ b/site/documentation/content/api/command-and-control-pubsub/index.md @@ -63,24 +63,16 @@ project and `${tenant_id}` is the ID of the tenant that the client wants to send Metadata MUST be set as Pub/Sub attributes on a message. The following table provides an overview of the attributes the *Business Application* needs to set on a one-way command message. -| Name | Mandatory | Type | Description | -|:-----------------|:---------:|:----------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | -| *subject* | yes | *string* | The name of the command to be executed by the device. | -| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | -| *ack-required* | no | *boolean* | If set to `true` a command acknowledgement message will be sent on the command-response topic once the device acknowledges the command. Currently this only works with MQTT devices which have a QoS 1 subscription on the command topic. | -| *correlation-id* | no | *string* | MUST be set if *ack-required* is set to `true`. The identifier used to correlate a response message to the original request. It is used as the *correlation-id* attribute in the response. | +| Name | Mandatory | Type | Description | +|:---------------|:---------:|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | +| *subject* | yes | *string* | The name of the command to be executed by the device. | +| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | The command message MAY contain arbitrary payload, set as message value, to be sent to the device. The value of the message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the payload data. -{{% notice info %}} -Currently the acknowledgement mechanism only works with devices connected via the MQTT protocol which have a -subscription on the command topic with a QoS level of 1. Getting an acknowledgement indicates that the device has -successfully received the command. However, it does not confirm whether the device has successfully processed the command. -{{% /notice %}} - ## Send a (Request/Response) Command *Business Applications* use this operation to send a command to a device for which they expect the device to send back @@ -92,9 +84,23 @@ project and `${tenant_id}` is the ID of the tenant that the client wants to send The Business Application can consume the corresponding command response by creating a subscription from the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic. -In contrast to a one-way command, a request/response command contains a *response-required* attribute with value `true` -and a *correlation-id* attribute, providing the identifier that is used to correlate a response message to the original -request. +In contrast to a one-way command, a request/response command contains a *response-required* or an *ack-required* +attribute with value `true` and a *correlation-id* attribute, providing the identifier that is used to correlate a +response message to the original request. + +Devices can vary in their ability to respond to commands (e.g. firmware limitations). The Business Application can +tailor its expectations based on this: + +1. **Device Response Expected:** For devices that can send a response, set `response-required` to true. This is the +default behavior for sending request/response commands. The command is sent with the *correlation-id* to the device, +which is than expected to send a corresponding command response. + +2. **Acknowledgement Only:** For devices lacking response capability, set `ack-required` to true. The command is sent +without the *correlation-id* to the device, and the protocol adapter sends an acknowledgement as a command response on +behalf of the device upon receiving a transport layer confirmation (e.g., MQTT PUBACK) from the device. + +**Important Note:** Setting both `response-required` and `ack-required` to true is invalid and will result in command +rejection. **Preconditions** @@ -108,15 +114,26 @@ request. **Message Flow** -1. The *Business Application* writes a command message to the - `projects/${google_project_id}/topics/${tenant_id}.command` topic on *Pub/Sub*. -2. Hono consumes the message from *Pub/Sub* and forwards it to the device, provided that the target device is - connected and is accepting commands. -3. The device sends a command response message. Hono writes that message to - the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic - on *Pub/Sub*. -4. The *Business Application* consumes the command response message from an independently created subscription to - the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic. +Device sends response (*response-required* set to true): + 1. The *Business Application* writes a command message with *response-required* set to true to the + `projects/${google_project_id}/topics/${tenant_id}.command` topic on *Pub/Sub*. + 2. Hono consumes the message from *Pub/Sub* and forwards it to the device, provided that the target device is + connected and is accepting commands. + 3. The device sends a command response message. Hono writes that message to + the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic on *Pub/Sub*. + 4. The *Business Application* consumes the command response message from an independently created subscription to + the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic. + +Protocol adapter sends response on behalf of device (*ack-required* set to true): + 1. The *Business Application* writes a command message with *ack-required* set to true to the + `projects/${google_project_id}/topics/${tenant_id}.command` topic on *Pub/Sub*. + 2. Hono consumes the message from *Pub/Sub* and forwards it to the device as a one-way command, provided that the + target device is connected and is accepting commands. + 3. The device acknowledges the command on the transport layer. Hono writes an acknowledgement message to + the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic on *Pub/Sub*. + 4. The *Business Application* consumes the command response message from an independently created subscription to + the `projects/${google_project_id}/topics/${tenant_id}.command_response` topic. + **Command Message Format** @@ -128,18 +145,29 @@ command message. |:---------------------------------------------|:---------:|:----------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | *correlation-id* | yes | *string* | The identifier used to correlate a response message to the original request. It is used as the *correlation-id* attribute in the response. | | *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | -| *response-required* | yes | *boolean* | MUST be set with a value of `true`, meaning that the device is required to send a response for the command. | +| *response-required* | no | *boolean* | Either *response-required* or *ack-required* MUST exclusively be set with a value of `true` (XOR). *response-required* set to `true` means that the device is required to send a response for the command. | +| *ack-required* | no | *boolean* | Either *response-required* or *ack-required* MUST exclusively be set with a value of `true` (XOR). *ack-required* set to `true` means that the protocol adapter will try to send an acknowledgement response for the command on behalf of the device in case the command was successfully received by the device. This mechanism has some limitations which are described in the info field below. | | *subject* | yes | *string* | The name of the command to be executed by the device. | | *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | | *delivery-failure-notification-metadata[\*]* | no | *string* | Attributes with the *delivery-failure-notification-metadata* prefix are adopted for the error command response that is sent in case delivering the command to the device failed. In case of a successful command delivery, these attributes are ignored. | The command message MAY contain arbitrary payload, set as message value, to be sent to the device. The value of the -message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the payload -data. +message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the +payload data. + +{{% notice info %}} +The acknowledgement mechanism is based on the transport level acknowledgement of the different communication protocols +and indicates that a command was received from the device. For MQTT devices this means they must subscribe on the +command topic with a Quality of Service (QoS) level of 1. Since the HTTP protocol doesn't support a transport level +acknowledgement, acknowledgements are considered best-effort, indicating the message was sent, not necessarily received. + +Currently, the acknowledgement mechanism only works with devices connected via the MQTT and HTTP protocol. +{{% /notice %}} An application can determine the overall outcome of the operation by means of the response to the command. The response -is either sent back by the device, or in case the command could not be successfully forwarded to the device, an error -command response message is sent by the Hono protocol adapter or Command Router component. +is either sent back by the device (response-required) or the adapter (ack-required), or in case the command could not be +successfully forwarded to the device, an error command response message is sent by the Hono protocol adapter or Command +Router component. **Response Message Format** @@ -176,6 +204,9 @@ The semantics of the individual codes are specific to the device and command. Fo **Response Message sent from Hono Component** +If the command response is an acknowledgement of a command by the Hono protocol adapter, it has a status code +of *202* and the *content-type* attribute is set to *application/vnd.eclipse-hono-delivery-success-notification+json*. + If the command response message represents an error message sent by the Hono protocol adapter or Command Router component, with the *content-type* attribute set to *application/vnd.eclipse-hono-delivery-failure-notification+json*, the possible status codes are: