Skip to content

Commit

Permalink
[eclipse-hono#3520] specific metrics for unknown messages
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Claerhout <[email protected]>
  • Loading branch information
BobClaerhout committed Sep 12, 2023
1 parent 4a8911a commit e94a14c
Show file tree
Hide file tree
Showing 42 changed files with 846 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.ValidityBasedTrustOptions;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.util.ServiceBaseUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AdapterDisabledException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
Expand All @@ -67,8 +66,10 @@
import org.eclipse.hono.notification.deviceregistry.DeviceChangeNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
import org.eclipse.hono.service.metric.MetricsTags.EndpointType;
Expand Down Expand Up @@ -1325,7 +1326,8 @@ private Future<Void> doUploadMessage(
ProcessingOutcome.from(t),
context.isRemotelySettled() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE,
context.getPayloadSize(),
context.getTimer());
context.getTimer(),
MetricsTags.ProcessingOutcomeReason.from(t));
return Future.failedFuture(t);

}).map(ok -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ public void testUploadTelemetryMessageFailsForDisabledAdapter(final VertxTestCon
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -1000,7 +1001,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand All @@ -1027,7 +1029,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ protected final Future<Void> doUploadMessage(
qos,
payload.length(),
getTtdStatus(context),
context.getTimer());
context.getTimer(),
MetricsTags.ProcessingOutcomeReason.from(t));
TracingHelper.logError(currentSpan, t);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
return Future.failedFuture(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void testUploadEventFailsForRejectedOutcome(final VertxTestContext ctx) {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -187,7 +188,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
final var resource = givenAResource(adapter);
// which is disabled for tenant "my-tenant"
when(adapter.isAdapterEnabled(any(TenantObject.class)))
.thenReturn(Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN)));
.thenReturn(Future.failedFuture(new AdapterDisabledException("my-tenant")));

// WHEN a device that belongs to "my-tenant" publishes a telemetry message
final Buffer payload = Buffer.buffer("some payload");
Expand All @@ -118,7 +119,8 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -560,7 +563,8 @@ public void testUploadTelemetryReleasesCommandForFailedDownstreamSender(final Ve
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.COMMAND),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
// and the command delivery is released
verify(commandContext).release(any(Throwable.class));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
Expand Down Expand Up @@ -85,7 +86,7 @@ public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtoc

private static final String KEY_MATCH_ALL_ROUTE_APPLIED = "matchAllRouteApplied";

private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
protected HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
private HttpServer server;
private HttpServer insecureServer;

Expand Down Expand Up @@ -203,7 +204,15 @@ public final void doStart(final Promise<Void> startPromise) {
.onComplete(startPromise);
}

private Sample getMicrometerSample(final RoutingContext ctx) {
/**
* Gets the timer used to track the processing of a telemetry message.
*
* @param ctx The routing context to extract the sample from.
* @return The sample or {@code null} if the context does not
* contain a sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected Sample getMicrometerSample(final RoutingContext ctx) {
return ctx.get(KEY_MICROMETER_SAMPLE);
}

Expand Down Expand Up @@ -778,7 +787,8 @@ private void doUploadMessage(
qos,
payloadSize,
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.ProcessingOutcomeReason.from(t));
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
return Future.failedFuture(t);
Expand Down Expand Up @@ -1295,9 +1305,16 @@ public final void uploadCommandResponseMessage(
});
}

private static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final org.eclipse.hono.util.QoS requestedQos) {
/**
* Get the QoS based on the endpoint and the requested QoS.
*
* @param endpoint The endpoint the message was sent to.
* @param requestedQos The QoS requested by the sender.
* @return The resulting QoS.
*/
protected static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final QoS requestedQos) {

if (endpoint == EndpointType.EVENT) {
return MetricsTags.QoS.AT_LEAST_ONCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSendingFails() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
// and the command consumer is closed
verify(commandConsumer).close(eq(false), any());
}
Expand Down Expand Up @@ -786,7 +787,8 @@ public void testMessageLimitExceededForATelemetryMessage() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down Expand Up @@ -825,7 +827,8 @@ public void testMessageLimitExceededForAnEventMessage() {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,46 +236,83 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {

final var gatewayDevice = ctx.getAuthenticatedDevice();
TracingHelper.setDeviceTags(currentSpan, gatewayDevice.getTenantId(), gatewayDevice.getDeviceId());
try {
final LoraMessage loraMessage = provider.getMessage(ctx.getRoutingContext());
final LoraMessageType type = loraMessage.getType();
currentSpan.log(Map.of("message type", type));
final String deviceId = loraMessage.getDevEUIAsString();
currentSpan.setTag(TAG_LORA_DEVICE_ID, deviceId);

switch (type) {
case UPLINK:
final UplinkLoraMessage uplinkMessage = (UplinkLoraMessage) loraMessage;
final Buffer payload = uplinkMessage.getPayload();

Optional.ofNullable(uplinkMessage.getMetaData())
.ifPresent(metaData -> ctx.put(LoraConstants.APP_PROPERTY_META_DATA, metaData));

Optional.ofNullable(uplinkMessage.getAdditionalData())
.ifPresent(additionalData -> ctx.put(LoraConstants.APP_PROPERTY_ADDITIONAL_DATA, additionalData));

final String contentType = payload.length() > 0
? LoraConstants.CONTENT_TYPE_LORA_BASE + provider.getProviderName()
: EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION;

currentSpan.finish(); // uploadTelemetryMessage will finish the root span, therefore finish child span here already
uploadTelemetryMessage(ctx, gatewayDevice.getTenantId(), deviceId, payload, contentType);
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
default:
LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");

final MetricsTags.EndpointType endpoint = MetricsTags.EndpointType.fromString(ctx.getRequestedResource().getEndpoint());
final MetricsTags.QoS qos = getQoSLevel(endpoint, ctx.getRequestedQos());

getTenantConfiguration(gatewayDevice.getTenantId(), currentSpan.context())
.map(tenantObject -> {
final LoraMessage loraMessage;
try {
loraMessage = provider.getMessage(ctx.getRoutingContext());

} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
TracingHelper.logError(currentSpan, "error processing request", e);
currentSpan.finish();
handle400(ctx.getRoutingContext(), ERROR_MSG_INVALID_PAYLOAD);
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantObject,
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.ProcessingOutcomeReason.BAD_SYNTAX);
return tenantObject;
}
final LoraMessageType type = loraMessage.getType();
currentSpan.log(Map.of("message type", type));
final String deviceId = loraMessage.getDevEUIAsString();
currentSpan.setTag(TAG_LORA_DEVICE_ID, deviceId);

switch (type) {
case UPLINK:
final UplinkLoraMessage uplinkMessage = (UplinkLoraMessage) loraMessage;
final Buffer payload = uplinkMessage.getPayload();

Optional.ofNullable(uplinkMessage.getMetaData())
.ifPresent(metaData -> ctx.put(LoraConstants.APP_PROPERTY_META_DATA, metaData));

Optional.ofNullable(uplinkMessage.getAdditionalData())
.ifPresent(additionalData -> ctx.put(LoraConstants.APP_PROPERTY_ADDITIONAL_DATA, additionalData));

final String contentType = payload.length() > 0
? LoraConstants.CONTENT_TYPE_LORA_BASE + provider.getProviderName()
: EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION;

currentSpan.finish(); // uploadTelemetryMessage will finish the root span, therefore finish child span here already
uploadTelemetryMessage(ctx, gatewayDevice.getTenantId(), deviceId, payload, contentType);
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
default:

LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());

final MetricsTags.ProcessingOutcomeReason reason = type == LoraMessageType.UNKNOWN ? MetricsTags.ProcessingOutcomeReason.UNKNOWN_TYPE : MetricsTags.ProcessingOutcomeReason.UNSUPPORTED_TYPE;
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantObject,
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
reason);
}
return tenantObject;
}).onFailure(e -> {
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());
}
} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
TracingHelper.logError(currentSpan, "error processing request", e);
currentSpan.finish();
handle400(ctx.getRoutingContext(), ERROR_MSG_INVALID_PAYLOAD);
}
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
});
}

private void registerCommandConsumerIfNeeded(final LoraProvider provider, final Device gatewayDevice,
Expand Down
Loading

0 comments on commit e94a14c

Please sign in to comment.