Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3585] Add ack-required command feature for Google Pub/Sub based commands #3612

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
Expand Down Expand Up @@ -706,13 +707,25 @@ private void doUploadMessage(
endpoint, tenant, deviceId);
if (commandContext != null) {
commandContext.getTracingSpan().log("forwarded command to device in HTTP response body");
commandContext.accept();
final Command command = commandContext.getCommand();
if (command.isAckRequired() && command.isValid()
&& commandContext instanceof AbstractCommandContext<?> abstractCommandContext) {
abstractCommandContext
.sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED,
"Command successfully forwarded to device",
currentSpan,
command.getCorrelationId(),
command.getMessagingType())
.onComplete(v -> commandContext.accept());
} else {
commandContext.accept();
}
metrics.reportCommand(
commandContext.getCommand().isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
command.isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
tenant,
tenantTracker.result(),
ProcessingOutcome.FORWARDED,
commandContext.getCommand().getPayloadSize(),
command.getPayloadSize(),
getMicrometerSample(commandContext));
}
currentSpan.finish();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -59,6 +59,7 @@
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
Expand Down Expand Up @@ -1734,8 +1735,19 @@ private void afterCommandPublished(
reportPublishedCommand(tenantObject, subscription, commandContext, ProcessingOutcome.FORWARDED);
log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]",
msgId, subscription.getTenant(), subscription.getDeviceId(), endpoint.clientIdentifier());
commandContext.getTracingSpan().log("received PUBACK from device");
commandContext.accept();
final Span span = commandContext.getTracingSpan();
span.log("received PUBACK from device");
final Command command = commandContext.getCommand();
if (command.isAckRequired() && command.isValid()
&& commandContext instanceof AbstractCommandContext<?> abstractCommandContext) {
abstractCommandContext
.sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED,
"Command successfully received", span, command.getCorrelationId(),
command.getMessagingType())
.onComplete(v -> commandContext.accept());
} else {
commandContext.accept();
}
};

final Handler<Void> onAckTimeoutHandler = v -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -193,6 +193,11 @@ public boolean isOneWay() {
return replyToId == null;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -213,6 +213,11 @@ public boolean isOneWay() {
return !responseRequired;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public final class PubSubBasedCommand implements Command {
private final String contentType;
private final String requestId;
private final boolean responseRequired;
private final boolean ackRequired;

private String gatewayId;

Expand All @@ -58,7 +59,8 @@ private PubSubBasedCommand(
final String correlationId,
final String subject,
final String contentType,
final boolean responseRequired) {
final boolean responseRequired,
final boolean ackRequired) {

this.validationError = validationError;
this.pubsubMessage = pubsubMessage;
Expand All @@ -68,6 +70,7 @@ private PubSubBasedCommand(
this.subject = subject;
this.contentType = contentType;
this.responseRequired = responseRequired;
this.ackRequired = ackRequired;
this.requestId = Commands.encodeRequestIdParameters(correlationId, MessagingType.pubsub);
}

Expand Down Expand Up @@ -134,10 +137,14 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,

final StringJoiner validationErrorJoiner = new StringJoiner(", ");
final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes);
final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes);
if (responseRequired && ackRequired) {
validationErrorJoiner.add("response-required and ack-required must not both true");
}
final String correlationId = PubSubMessageHelper.getCorrelationId(attributes)
.filter(id -> !id.isEmpty())
.orElseGet(() -> {
if (responseRequired) {
if (responseRequired || ackRequired) {
validationErrorJoiner.add("correlation-id is not set");
}
return null;
Expand All @@ -157,7 +164,8 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,
correlationId,
subject,
contentType,
responseRequired);
responseRequired,
ackRequired);
}

/**
Expand All @@ -169,6 +177,11 @@ public PubsubMessage getPubsubMessage() {
return pubsubMessage;
}

@Override
public boolean isAckRequired() {
return ackRequired;
}

@Override
public boolean isOneWay() {
return !responseRequired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void release(final Throwable error) {
final ServiceInvocationException mappedError = StatusCodeMapper.toServerError(error);
final int status = mappedError.getErrorCode();
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand() && !(error instanceof CommandAlreadyProcessedException)
if ((isRequestResponseCommand() || isAckRequiredCommand()) && !(error instanceof CommandAlreadyProcessedException)
&& !(error instanceof CommandToBeReprocessedException)) {
final String errorMessage = Optional
.ofNullable(ServiceInvocationException.getErrorMessageForExternalClient(mappedError))
Expand Down Expand Up @@ -90,7 +90,7 @@ public void modify(final boolean deliveryFailed, final boolean undeliverableHere
TracingHelper.logError(span, String.format("command for device handled with outcome 'modified' %s %s",
deliveryFailedReason, undeliverableHereReason));
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand()) {
if (isRequestResponseCommand() || isAckRequiredCommand()) {
final String error = String.format("command not processed %s %s", deliveryFailedReason,
undeliverableHereReason);
final String correlationId = getCorrelationId();
Expand All @@ -112,7 +112,7 @@ public void reject(final Throwable error) {
TracingHelper.logError(getTracingSpan(), "client error trying to deliver or process command", error);
final Span span = getTracingSpan();
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand()) {
if (isRequestResponseCommand() || isAckRequiredCommand()) {
final String nonNullCause = Optional.ofNullable(error.getMessage()).orElse("Command message rejected");
final String correlationId = getCorrelationId();
sendDeliveryFailureCommandResponseMessage(status, nonNullCause, span, null, correlationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MapBasedExecutionContext;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;
Expand All @@ -36,7 +38,8 @@
* in a Pub/Sub and Kafka based command context.
* @param <T> The type of Command.
*/
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext implements CommandContext {
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext
implements CommandContext {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommandContext.class);

Expand Down Expand Up @@ -115,6 +118,15 @@ protected boolean isRequestResponseCommand() {
return !command.isOneWay();
}

/**
* Checks if the command is an ack-required command.
*
* @return True if it is an ack-required command, false otherwise.
*/
protected boolean isAckRequiredCommand() {
return command.isAckRequired();
}

/**
* Sends a command response if the command response message represents an error message.
*
Expand Down Expand Up @@ -157,24 +169,73 @@ protected Future<Void> sendDeliveryFailureCommandResponseMessage(
commandResponse.setAdditionalProperties(
Collections.unmodifiableMap(command.getDeliveryFailureNotificationProperties()));

return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
})
return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published error command response [{}, cause: {}]", commandResponse,
cause != null ? cause.getMessage() : error);
span.log("published error command response");
});
}

/**
* Sends a command response as a command acknowledgement.
*
* @param status The HTTP status code indicating the outcome of processing the command.
* @param successMessage The message for the response message body.
* @param span The active OpenTracing span to use for tracking this operation.
* @param correlationId The correlation ID of the command that this is the response for.
* @param messagingType The type of the messaging system via which the command message was received.
* @return A future indicating the outcome of the operation.
* <p>
* The future will be succeeded if the command response has been sent.
* <p>
* The future will be failed if the command response could not be sent.
*/
public Future<Void> sendDeliverySuccessCommandResponseMessage(
final int status,
final String successMessage,
final Span span,
final String correlationId,
final MessagingType messagingType) {
if (correlationId == null) {
TracingHelper.logError(span, "can't send command response message - no correlation id set");
return Future.failedFuture("missing correlation id");
}
final JsonObject payloadJson = new JsonObject();
payloadJson.put("acknowledgement", successMessage != null ? successMessage : "");

final CommandResponse commandResponse = new CommandResponse(
command.getTenant(),
command.getDeviceId(),
payloadJson.toBuffer(),
CommandConstants.CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION,
status,
correlationId,
"",
messagingType);
commandResponse.setAdditionalProperties(Map.of(MessageHelper.SYS_PROPERTY_SUBJECT, command.getName()));

return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published ack command response [{}]", commandResponse);
span.log("published ack command response");
});
}

private Future<Void> sendCommandResponse(final CommandResponse commandResponse, final Span span) {
return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -35,6 +35,13 @@ public interface Command {
*/
boolean isOneWay();

/**
* Checks if an acknowledgement of this command should be sent to the messaging infrastructure.
*
* @return {@code true} if an acknowledgement is required.
*/
boolean isAckRequired();

/**
* Checks if this command contains all required information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public final class PubSubMessageHelper {
* The name of the Pub/Sub message property indicating whether a response to the message is expected/required.
*/
public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required";
/**
* The name of the Pub/Sub message property indicating whether an acknowledgement to the message is expected/required.
*/
public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required";

/**
* Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be
Expand Down Expand Up @@ -202,6 +206,17 @@ public static boolean isResponseRequired(final Map<String, String> attributesMap
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_RESPONSE_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value PUBSUB_PROPERTY_ACK_REQUIRED} attribute.
*
* @param attributesMap The attributes map to get the value from.
* @return The attributes value.
*/
public static boolean isAckRequired(final Map<String, String> attributesMap) {
return Boolean
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_ACK_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value MessageHelper#SYS_PROPERTY_CONTENT_TYPE} attribute.
*
Expand Down
Loading