Skip to content

Commit

Permalink
[#3520] specific metrics for unknown messages
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Claerhout <[email protected]>
  • Loading branch information
BobClaerhout committed Jul 25, 2023
1 parent 4a8911a commit 84c0094
Show file tree
Hide file tree
Showing 42 changed files with 46,025 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.ValidityBasedTrustOptions;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.util.ServiceBaseUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AdapterDisabledException;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
Expand Down Expand Up @@ -69,6 +69,7 @@
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
import org.eclipse.hono.service.metric.MetricsTags.EndpointType;
Expand Down Expand Up @@ -1325,7 +1326,8 @@ private Future<Void> doUploadMessage(
ProcessingOutcome.from(t),
context.isRemotelySettled() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE,
context.getPayloadSize(),
context.getTimer());
context.getTimer(),
MetricsTags.Reason.from(t));
return Future.failedFuture(t);

}).map(ok -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ public void testUploadTelemetryMessageFailsForDisabledAdapter(final VertxTestCon
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -1000,7 +1001,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand All @@ -1027,7 +1029,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
Expand Down Expand Up @@ -413,7 +415,8 @@ protected final Future<Void> doUploadMessage(
qos,
payload.length(),
getTtdStatus(context),
context.getTimer());
context.getTimer(),
MetricsTags.Reason.from(t));
TracingHelper.logError(currentSpan, t);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
return Future.failedFuture(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void testUploadEventFailsForRejectedOutcome(final VertxTestContext ctx) {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
});
ctx.completeNow();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.californium.core.coap.OptionSet;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
final var resource = givenAResource(adapter);
// which is disabled for tenant "my-tenant"
when(adapter.isAdapterEnabled(any(TenantObject.class)))
.thenReturn(Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN)));
.thenReturn(Future.failedFuture(new AdapterDisabledException("my-tenant")));

// WHEN a device that belongs to "my-tenant" publishes a telemetry message
final Buffer payload = Buffer.buffer("some payload");
Expand All @@ -118,7 +119,8 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -560,7 +563,8 @@ public void testUploadTelemetryReleasesCommandForFailedDownstreamSender(final Ve
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.COMMAND),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
// and the command delivery is released
verify(commandContext).release(any(Throwable.class));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
Expand Down Expand Up @@ -85,7 +86,7 @@ public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtoc

private static final String KEY_MATCH_ALL_ROUTE_APPLIED = "matchAllRouteApplied";

private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
protected HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
private HttpServer server;
private HttpServer insecureServer;

Expand Down Expand Up @@ -203,7 +204,15 @@ public final void doStart(final Promise<Void> startPromise) {
.onComplete(startPromise);
}

private Sample getMicrometerSample(final RoutingContext ctx) {
/**
* Gets the timer used to track the processing of a telemetry message.
*
* @param ctx The routing context to extract the sample from.
* @return The sample or {@code null} if the context does not
* contain a sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected Sample getMicrometerSample(final RoutingContext ctx) {
return ctx.get(KEY_MICROMETER_SAMPLE);
}

Expand Down Expand Up @@ -778,7 +787,8 @@ private void doUploadMessage(
qos,
payloadSize,
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.Reason.from(t));
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
return Future.failedFuture(t);
Expand Down Expand Up @@ -1295,9 +1305,16 @@ public final void uploadCommandResponseMessage(
});
}

private static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final org.eclipse.hono.util.QoS requestedQos) {
/**
* Get the QoS based on the endpoint and the requested QoS.
*
* @param endpoint The endpoint the message was sent to.
* @param requestedQos The QoS requested by the sender.
* @return The resulting QoS.
*/
protected static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final QoS requestedQos) {

if (endpoint == EndpointType.EVENT) {
return MetricsTags.QoS.AT_LEAST_ONCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSendingFails() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
// and the command consumer is closed
verify(commandConsumer).close(eq(false), any());
}
Expand Down Expand Up @@ -786,7 +787,8 @@ public void testMessageLimitExceededForATelemetryMessage() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down Expand Up @@ -825,7 +827,8 @@ public void testMessageLimitExceededForAnEventMessage() {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpServerSpanHelper;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
Expand Down Expand Up @@ -262,13 +263,11 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {
uploadTelemetryMessage(ctx, gatewayDevice.getTenantId(), deviceId, payload, contentType);
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
case UNKNOWN:
discardMessage(ctx, currentSpan, gatewayDevice, type, deviceId, MetricsTags.Reason.UNKNOWN);
break;
default:
LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());
discardMessage(ctx, currentSpan, gatewayDevice, type, deviceId, MetricsTags.Reason.UNSUPPORTED_TYPE);
}
} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
Expand All @@ -278,6 +277,29 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {
}
}

private void discardMessage(final HttpContext ctx, final Span currentSpan, final DeviceUser gatewayDevice, final LoraMessageType type, final String deviceId, final MetricsTags.Reason reason) {
LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());

final Future<TenantObject> tenantTracker = getTenantConfiguration(gatewayDevice.getTenantId(), currentSpan.context());
final MetricsTags.EndpointType endpoint = MetricsTags.EndpointType.fromString(ctx.getRequestedResource().getEndpoint());
final MetricsTags.QoS qos = getQoSLevel(endpoint, ctx.getRequestedQos());
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantTracker.result(),
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
reason);
}

private void registerCommandConsumerIfNeeded(final LoraProvider provider, final Device gatewayDevice,
final SpanContext context) {
final String tenantId = gatewayDevice.getTenantId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/


package org.eclipse.hono.adapter.lora;

import io.vertx.core.buffer.Buffer;


/**
* A Lora message that contains unknown data sent from an end-device to a Network Server.
*
*/
public class UnknownLoraMessage implements LoraMessage {

/**
* {@inheritDoc}
*/
@Override
public final byte[] getDevEUI() {
return new byte[0];
}

/**
* {@inheritDoc}
*/
@Override
public final String getDevEUIAsString() {
return "";
}

/**
* {@inheritDoc}
*/
@Override
public final LoraMessageType getType() {
return LoraMessageType.UNKNOWN;
}

/**
* {@inheritDoc}
*/
@Override
public final Buffer getPayload() {
return Buffer.buffer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.hono.adapter.lora.LoraMessage;
import org.eclipse.hono.adapter.lora.LoraMessageType;
import org.eclipse.hono.adapter.lora.LoraMetaData;
import org.eclipse.hono.adapter.lora.UnknownLoraMessage;
import org.eclipse.hono.adapter.lora.UplinkLoraMessage;
import org.eclipse.hono.util.CommandEndpoint;
import org.eclipse.hono.util.Strings;
Expand Down Expand Up @@ -51,7 +52,7 @@ public LoraMessage getMessage(final RoutingContext ctx) {
case UPLINK:
return createUplinkMessage(ctx.request(), message);
default:
throw new LoraProviderMalformedPayloadException(String.format("unsupported message type [%s]", type));
return createUnknownMessage(ctx.request(), message);
}
} catch (final RuntimeException e) {
// catch generic exception in order to also cover any (runtime) exceptions
Expand Down Expand Up @@ -184,4 +185,26 @@ protected UplinkLoraMessage createUplinkMessage(final HttpServerRequest request,
message.setAdditionalData(getAdditionalData(requestBody));
return message;
}

/**
* Creates an object representation of a Lora unknown message.
* <p>
* This method uses the {@link #getDevEui(JsonObject)}
* method to extract relevant information from the request body to add
* to the returned message.
*
* @param request The request sent by the provider's Network Server.
* @param requestBody The JSON object contained in the request's body.
* @return The message.
* @throws RuntimeException if the message cannot be parsed.
*/
protected UnknownLoraMessage createUnknownMessage(final HttpServerRequest request, final JsonObject requestBody) {

Objects.requireNonNull(requestBody);

final UnknownLoraMessage message = new UnknownLoraMessage();
return message;
}


}
Loading

0 comments on commit 84c0094

Please sign in to comment.