Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTel support for Service Bus #34492

Merged
merged 15 commits into from
Feb 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using NUnit.Framework;

namespace Azure.Core.Tests
{
Expand All @@ -16,11 +17,11 @@ public class TestActivitySourceListener: IDisposable
public Queue<Activity> Activities { get; } =
new Queue<Activity>();

public TestActivitySourceListener(string name) : this(source => source.Name == name)
public TestActivitySourceListener(string name, Action<Activity> activityStartedCallback = default) : this(source => source.Name.StartsWith(name), activityStartedCallback)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
}

public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector, Action<Activity> activityStartedCallback = default)
{
_listener = new ActivityListener
{
Expand All @@ -31,6 +32,7 @@ public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
{
Activities.Enqueue(activity);
}
activityStartedCallback?.Invoke(activity);
},
Sample = (ref ActivityCreationOptions<ActivityContext> options) =>
{
Expand All @@ -46,6 +48,13 @@ public TestActivitySourceListener(Func<ActivitySource, bool> sourceSelector)
ActivitySource.AddActivityListener(_listener);
}

public Activity AssertAndRemoveActivity(string name)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
var activity = Activities.Dequeue();
Assert.AreEqual(name, activity.OperationName);
return activity;
}

public void Dispose()
{
_listener?.Dispose();
Expand Down
136 changes: 136 additions & 0 deletions sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.Diagnostics;
using Azure.Core.Pipeline;

#nullable enable

namespace Azure.Core.Shared
{
/// <summary>
/// Client Diagnostics support for messaging clients. Currently, this is only used for AMQP clients.
/// HTTP libraries should use the ClientDiagnostics type instead.
/// </summary>
internal class MessagingClientDiagnostics
{
private readonly string _fullyQualifiedNamespace;
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
private readonly string _entityPath;
private readonly string _messagingSystem;
private readonly DiagnosticScopeFactory _scopeFactory;

#region OTel-specific messaging attributes
public const string MessagingSystem = "messaging.system";
public const string DestinationName = "messaging.destination.name";
public const string SourceName = "messaging.source.name";
public const string MessagingOperation = "messaging.operation";
public const string NetPeerName = "net.peer.name";
public const string BatchCount = "messaging.batch.message_count";
#endregion

#region legacy compat attributes
public const string MessageBusDestination = "message_bus.destination";
public const string PeerAddress = "peer.address";
public const string Component = "component";
#endregion

public const string DiagnosticIdAttribute = "Diagnostic-Id";

public MessagingClientDiagnostics(string clientNamespace, string? resourceProviderNamespace, string messagingSystem, string fullyQualifiedNamespace, string entityPath)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
_messagingSystem = messagingSystem;
_fullyQualifiedNamespace = fullyQualifiedNamespace;
_entityPath = entityPath;
_scopeFactory = new DiagnosticScopeFactory(clientNamespace, resourceProviderNamespace, true, false);
}

public DiagnosticScope CreateScope(
string activityName,
DiagnosticScope.ActivityKind kind,
MessagingDiagnosticOperation diagnosticOperation)
{
DiagnosticScope scope = _scopeFactory.CreateScope(activityName, kind);
if (ActivityExtensions.SupportsActivitySource())
{
scope.AddAttribute(MessagingSystem, _messagingSystem);
scope.AddAttribute(MessagingOperation, diagnosticOperation.ToString());
scope.AddAttribute(NetPeerName, _fullyQualifiedNamespace);
scope.AddAttribute(diagnosticOperation == MessagingDiagnosticOperation.Publish ? DestinationName : SourceName, _entityPath);
}
else
{
scope.AddAttribute(Component, _messagingSystem);
scope.AddAttribute(PeerAddress, _fullyQualifiedNamespace);
scope.AddAttribute(MessageBusDestination, _entityPath);
}

return scope;
}

/// <summary>
/// Attempts to extract a diagnostic id from a message's properties.
/// </summary>
///
/// <param name="properties">The properties holding the diagnostic id.</param>
/// <param name="id">The value of the diagnostics identifier assigned to the event. </param>
///
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractDiagnosticId(IReadOnlyDictionary<string, object> properties, out string? id)
{
id = null;

if (properties.TryGetValue(DiagnosticIdAttribute, out var objectId) && objectId is string stringId)
{
id = stringId;
return true;
}

return false;
}

/// <summary>
/// Attempts to extract a diagnostic id from a message's properties.
/// </summary>
///
/// <param name="properties">The properties holding the diagnostic id.</param>
/// <param name="id">The value of the diagnostics identifier assigned to the event. </param>
///
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractDiagnosticId(IDictionary<string, object> properties, out string? id)
{
id = null;

if (properties.TryGetValue(DiagnosticIdAttribute, out var objectId) && objectId is string stringId)
{
id = stringId;
return true;
}

return false;
}

/// <summary>
/// Instrument the message properties for tracing.
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <param name="properties">The dictionary of application message properties.</param>
/// <param name="activityName">The activity name to use for the diagnostic scope.</param>
public void InstrumentMessage(IDictionary<string, object> properties, string activityName)
{
if (!properties.ContainsKey(DiagnosticIdAttribute))
{
using DiagnosticScope messageScope = CreateScope(
activityName,
DiagnosticScope.ActivityKind.Producer,
MessagingDiagnosticOperation.Publish);
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
messageScope.Start();

Activity activity = Activity.Current;
if (activity != null)
{
properties[DiagnosticIdAttribute] = activity.Id;
}
}
}
}
}
46 changes: 46 additions & 0 deletions sdk/core/Azure.Core/src/Shared/MessagingDiagnosticOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#nullable enable

using System;

namespace Azure.Core.Shared
{
/// <summary>
/// Represents the common set of operations for messaging diagnostics, as per the Open Telemetry semantic conventions.
/// This is defined as a partial struct so that it can be extended by other libraries.
/// <seealso href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#operation-names"/>
/// </summary>
internal readonly partial struct MessagingDiagnosticOperation : IEquatable<MessagingDiagnosticOperation>
{
private readonly string _operation;

private MessagingDiagnosticOperation(string operation)
{
Argument.AssertNotNull(operation, nameof(operation));
_operation = operation;
}

public static MessagingDiagnosticOperation Publish = new("publish");
public static MessagingDiagnosticOperation Receive = new("receive");
public static MessagingDiagnosticOperation Process = new("process");
public override string ToString() => _operation;

public static bool operator ==(MessagingDiagnosticOperation left, MessagingDiagnosticOperation right) => left.Equals(right);
public static bool operator !=(MessagingDiagnosticOperation left, MessagingDiagnosticOperation right) => !left.Equals(right);
public static implicit operator MessagingDiagnosticOperation(string value) => new MessagingDiagnosticOperation(value);

public bool Equals(MessagingDiagnosticOperation other)
{
return _operation == other._operation;
}

public override bool Equals(object obj)
{
return obj is MessagingDiagnosticOperation other && Equals(other);
}

public override int GetHashCode() => _operation.GetHashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
<Compile Include="$(AzureCoreSharedSources)PageResponseEnumerator.cs" LinkBase="SharedSource\Azure.Core" />
<Compile Include="$(AzureCoreSharedSources)AzureResourceProviderNamespaceAttribute.cs" LinkBase="SharedSource\Azure.Core" />
<Compile Include="$(AzureCoreSharedSources)GuidUtilities.cs" LinkBase="SharedSource\Azure.Core" />
<Compile Include="$(AzureCoreSharedSources)MessagingClientDiagnostics.cs" LinkBase="SharedSource\Azure.Core" />
<Compile Include="$(AzureCoreSharedSources)MessagingDiagnosticOperation.cs" LinkBase="SharedSource\Azure.Core" />
<Compile Include="$(AzureCoreAmqpSharedSources)AmqpAnnotatedMessageConverter.cs" LinkBase="SharedSource\Azure.Core.Amqp" />
<Compile Include="$(AzureCoreAmqpSharedSources)MessageBody.cs" LinkBase="SharedSource\Azure.Core.Amqp" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using Azure.Core.Pipeline;
using System.Collections.Generic;
using Azure.Core.Shared;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;

Expand All @@ -24,7 +25,7 @@ public static void SetMessageData(this DiagnosticScope scope, ServiceBusReceived

public static void SetMessageAsParent(this DiagnosticScope scope, ServiceBusReceivedMessage message)
{
if (EntityScopeFactory.TryExtractDiagnosticId(
if (MessagingClientDiagnostics.TryExtractDiagnosticId(
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
message.ApplicationProperties,
out string diagnosticId))
{
Expand Down Expand Up @@ -68,6 +69,11 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyCo
{
AddLinkedDiagnostics(scope, message.ApplicationProperties);
}

if (messages.Count > 1)
{
scope.AddAttribute(MessagingClientDiagnostics.BatchCount, messages.Count);
}
}
}

Expand All @@ -79,6 +85,11 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyCo
{
AddLinkedDiagnostics(scope, message.ApplicationProperties.Map);
}

if (messages.Count > 1)
{
scope.AddAttribute(MessagingClientDiagnostics.BatchCount, messages.Count);
}
}
}

Expand All @@ -98,12 +109,17 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyCo
{
AddLinkedDiagnostics(scope, message.ApplicationProperties);
}

if (messages.Count > 1)
{
scope.AddAttribute(MessagingClientDiagnostics.BatchCount, messages.Count);
}
}
}

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyDictionary<string, object> properties)
{
if (EntityScopeFactory.TryExtractDiagnosticId(
if (MessagingClientDiagnostics.TryExtractDiagnosticId(
properties,
out string diagnosticId))
{
Expand All @@ -113,7 +129,7 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyDi

private static void AddLinkedDiagnostics(this DiagnosticScope scope, PropertiesMap properties)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
if (EntityScopeFactory.TryExtractDiagnosticId(
if (TryExtractDiagnosticId(
properties,
out string diagnosticId))
{
Expand All @@ -123,12 +139,34 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, PropertiesM

private static void AddLinkedDiagnostics(this DiagnosticScope scope, IDictionary<string, object> properties)
{
if (EntityScopeFactory.TryExtractDiagnosticId(
if (MessagingClientDiagnostics.TryExtractDiagnosticId(
properties,
out string diagnosticId))
{
scope.AddLink(diagnosticId, null);
}
}

/// <summary>
/// Extracts a diagnostic id from a message's properties.
/// </summary>
///
/// <param name="properties">The properties holding the diagnostic id.</param>
/// <param name="id">The value of the diagnostics identifier assigned to the event. </param>
///
/// <returns><c>true</c> if the event was contained the diagnostic id; otherwise, <c>false</c>.</returns>
///
public static bool TryExtractDiagnosticId(PropertiesMap properties, out string id)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
id = null;

if (properties.TryGetValue<string>(MessagingClientDiagnostics.DiagnosticIdAttribute, out string stringId))
{
id = stringId;
return true;
}

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Core.Pipeline;

namespace Azure.Messaging.ServiceBus.Diagnostics
{
/// <summary>
Expand All @@ -9,17 +11,11 @@ namespace Azure.Messaging.ServiceBus.Diagnostics
///
internal static class DiagnosticProperty
{
/// <summary>The attribute which represents a unique identifier for the diagnostics context.</summary>
public const string DiagnosticIdAttribute = "Diagnostic-Id";

/// <summary>The attribute which represents the Azure service to associate with diagnostics information.</summary>
public const string ServiceContextAttribute = "component";

/// <summary>The attribute which represents the Service Bus entity instance to associate with diagnostics information.</summary>
public const string EntityAttribute = "message_bus.destination";
/// <summary>The namespace used for the Service Bus diagnostic scope.</summary>
public const string DiagnosticNamespace = "Azure.Messaging.ServiceBus";

/// <summary>The attribute which represents the fully-qualified endpoint address of the Service Bus namespace to associate with diagnostics information.</summary>
public const string EndpointAttribute = "peer.address";
/// <summary>The namespace used for the Azure Resource Manager provider namespace.</summary>
public const string ResourceProviderNamespace = "Microsoft.ServiceBus";

/// <summary>The value which identifies the Service Bus diagnostics context.</summary>
public const string ServiceBusServiceContext = "servicebus";
Expand Down
Loading