Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3520] specific metrics for unknown messages #3519

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.ValidityBasedTrustOptions;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.util.ServiceBaseUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AdapterDisabledException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
Expand All @@ -67,8 +66,10 @@
import org.eclipse.hono.notification.deviceregistry.DeviceChangeNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
import org.eclipse.hono.service.metric.MetricsTags.EndpointType;
Expand Down Expand Up @@ -1325,7 +1326,8 @@ private Future<Void> doUploadMessage(
ProcessingOutcome.from(t),
context.isRemotelySettled() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE,
context.getPayloadSize(),
context.getTimer());
context.getTimer(),
MetricsTags.ProcessingOutcomeReason.from(t));
return Future.failedFuture(t);

}).map(ok -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ public void testUploadTelemetryMessageFailsForDisabledAdapter(final VertxTestCon
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -1000,7 +1001,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand All @@ -1027,7 +1029,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ protected final Future<Void> doUploadMessage(
qos,
payload.length(),
getTtdStatus(context),
context.getTimer());
context.getTimer(),
MetricsTags.ProcessingOutcomeReason.from(t));
TracingHelper.logError(currentSpan, t);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
return Future.failedFuture(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void testUploadEventFailsForRejectedOutcome(final VertxTestContext ctx) {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -187,7 +188,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
final var resource = givenAResource(adapter);
// which is disabled for tenant "my-tenant"
when(adapter.isAdapterEnabled(any(TenantObject.class)))
.thenReturn(Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN)));
.thenReturn(Future.failedFuture(new AdapterDisabledException("my-tenant")));

// WHEN a device that belongs to "my-tenant" publishes a telemetry message
final Buffer payload = Buffer.buffer("some payload");
Expand All @@ -118,7 +119,8 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -560,7 +563,8 @@ public void testUploadTelemetryReleasesCommandForFailedDownstreamSender(final Ve
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.COMMAND),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
// and the command delivery is released
verify(commandContext).release(any(Throwable.class));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
Expand Down Expand Up @@ -85,7 +86,7 @@ public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtoc

private static final String KEY_MATCH_ALL_ROUTE_APPLIED = "matchAllRouteApplied";

private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
protected HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
private HttpServer server;
private HttpServer insecureServer;

Expand Down Expand Up @@ -203,7 +204,15 @@ public final void doStart(final Promise<Void> startPromise) {
.onComplete(startPromise);
}

private Sample getMicrometerSample(final RoutingContext ctx) {
/**
* Gets the timer used to track the processing of a telemetry message.
*
* @param ctx The routing context to extract the sample from.
* @return The sample or {@code null} if the context does not
* contain a sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected Sample getMicrometerSample(final RoutingContext ctx) {
return ctx.get(KEY_MICROMETER_SAMPLE);
}

Expand Down Expand Up @@ -778,7 +787,8 @@ private void doUploadMessage(
qos,
payloadSize,
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.ProcessingOutcomeReason.from(t));
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
return Future.failedFuture(t);
Expand Down Expand Up @@ -1295,9 +1305,16 @@ public final void uploadCommandResponseMessage(
});
}

private static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final org.eclipse.hono.util.QoS requestedQos) {
/**
* Get the QoS based on the endpoint and the requested QoS.
*
* @param endpoint The endpoint the message was sent to.
* @param requestedQos The QoS requested by the sender.
* @return The resulting QoS.
*/
protected static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final QoS requestedQos) {

if (endpoint == EndpointType.EVENT) {
return MetricsTags.QoS.AT_LEAST_ONCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSendingFails() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.UNKNOWN));
// and the command consumer is closed
verify(commandConsumer).close(eq(false), any());
}
Expand Down Expand Up @@ -786,7 +787,8 @@ public void testMessageLimitExceededForATelemetryMessage() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down Expand Up @@ -825,7 +827,8 @@ public void testMessageLimitExceededForAnEventMessage() {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.ProcessingOutcomeReason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,46 +236,83 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {

final var gatewayDevice = ctx.getAuthenticatedDevice();
TracingHelper.setDeviceTags(currentSpan, gatewayDevice.getTenantId(), gatewayDevice.getDeviceId());
try {
final LoraMessage loraMessage = provider.getMessage(ctx.getRoutingContext());
final LoraMessageType type = loraMessage.getType();
currentSpan.log(Map.of("message type", type));
final String deviceId = loraMessage.getDevEUIAsString();
currentSpan.setTag(TAG_LORA_DEVICE_ID, deviceId);

switch (type) {
case UPLINK:
final UplinkLoraMessage uplinkMessage = (UplinkLoraMessage) loraMessage;
final Buffer payload = uplinkMessage.getPayload();

Optional.ofNullable(uplinkMessage.getMetaData())
.ifPresent(metaData -> ctx.put(LoraConstants.APP_PROPERTY_META_DATA, metaData));

Optional.ofNullable(uplinkMessage.getAdditionalData())
.ifPresent(additionalData -> ctx.put(LoraConstants.APP_PROPERTY_ADDITIONAL_DATA, additionalData));

final String contentType = payload.length() > 0
? LoraConstants.CONTENT_TYPE_LORA_BASE + provider.getProviderName()
: EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION;

currentSpan.finish(); // uploadTelemetryMessage will finish the root span, therefore finish child span here already
uploadTelemetryMessage(ctx, gatewayDevice.getTenantId(), deviceId, payload, contentType);
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
default:
LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");

final MetricsTags.EndpointType endpoint = MetricsTags.EndpointType.fromString(ctx.getRequestedResource().getEndpoint());
final MetricsTags.QoS qos = getQoSLevel(endpoint, ctx.getRequestedQos());

getTenantConfiguration(gatewayDevice.getTenantId(), currentSpan.context())
.map(tenantObject -> {
final LoraMessage loraMessage;
try {
loraMessage = provider.getMessage(ctx.getRoutingContext());

} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
TracingHelper.logError(currentSpan, "error processing request", e);
currentSpan.finish();
handle400(ctx.getRoutingContext(), ERROR_MSG_INVALID_PAYLOAD);
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantObject,
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.ProcessingOutcomeReason.BAD_SYNTAX);
return tenantObject;
}
final LoraMessageType type = loraMessage.getType();
currentSpan.log(Map.of("message type", type));
final String deviceId = loraMessage.getDevEUIAsString();
currentSpan.setTag(TAG_LORA_DEVICE_ID, deviceId);

switch (type) {
case UPLINK:
final UplinkLoraMessage uplinkMessage = (UplinkLoraMessage) loraMessage;
final Buffer payload = uplinkMessage.getPayload();

Optional.ofNullable(uplinkMessage.getMetaData())
.ifPresent(metaData -> ctx.put(LoraConstants.APP_PROPERTY_META_DATA, metaData));

Optional.ofNullable(uplinkMessage.getAdditionalData())
.ifPresent(additionalData -> ctx.put(LoraConstants.APP_PROPERTY_ADDITIONAL_DATA, additionalData));

final String contentType = payload.length() > 0
? LoraConstants.CONTENT_TYPE_LORA_BASE + provider.getProviderName()
: EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION;

currentSpan.finish(); // uploadTelemetryMessage will finish the root span, therefore finish child span here already
uploadTelemetryMessage(ctx, gatewayDevice.getTenantId(), deviceId, payload, contentType);
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
default:

LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());

final MetricsTags.ProcessingOutcomeReason reason = type == LoraMessageType.UNKNOWN ? MetricsTags.ProcessingOutcomeReason.UNKNOWN_TYPE : MetricsTags.ProcessingOutcomeReason.UNSUPPORTED_TYPE;
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantObject,
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
reason);
}
return tenantObject;
}).onFailure(e -> {
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());
}
} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
TracingHelper.logError(currentSpan, "error processing request", e);
currentSpan.finish();
handle400(ctx.getRoutingContext(), ERROR_MSG_INVALID_PAYLOAD);
}
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
});
}

private void registerCommandConsumerIfNeeded(final LoraProvider provider, final Device gatewayDevice,
Expand Down
Loading