Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTEL Metrics #629

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly int _arrayPoolInitialSize;
private readonly object _lock = new();
private readonly CancellationTokenSource _cts;
private readonly ConnectionStatsCounter _counter;
private readonly Memory<byte> _consolidateMem = new byte[SendMemSize].AsMemory();
private readonly TimeSpan _defaultCommandTimeout;
private readonly Action<PingCommand> _enqueuePing;
Expand All @@ -55,7 +54,7 @@ internal sealed class CommandWriter : IAsyncDisposable
private CancellationTokenSource? _ctsReader;
private volatile bool _disposed;

public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
{
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
_trace = _logger.IsEnabled(LogLevel.Trace);
Expand All @@ -67,7 +66,6 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na
// avoid defining another option.
_arrayPoolInitialSize = opts.WriterBufferSize / 256;

_counter = counter;
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
_enqueuePing = enqueuePing;
_protocolWriter = new ProtocolWriter(opts.SubjectEncoding);
Expand Down Expand Up @@ -693,11 +691,20 @@ private void EnqueueCommand()
return;
}

Interlocked.Add(ref _counter.PendingMessages, 1);
NatsMetrics.AddPendingMessages(1);

_channelSize.Writer.TryWrite(size);
var flush = _pipeWriter.FlushAsync();
_flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask();

if (flush.IsCompletedSuccessfully)
{
_flushTask = null;
NatsMetrics.AddPendingMessages(-1);
}
else
{
_flushTask = flush.AsTask();
}
}

private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken)
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable
{
private int _disposed;

public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, Action<PingCommand> enqueuePing)
{
CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing);
CommandWriter = new CommandWriter("init", connection, pool, opts, enqueuePing);
CommandWriter.Reset(socketConnection);
}

Expand Down
94 changes: 94 additions & 0 deletions src/NATS.Client.Core/Internal/NatsMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System.Diagnostics.Metrics;

namespace NATS.Client.Core.Internal;

public class NatsMetrics
{
public const string MeterName = "NATS.Client";
public const string PendingMessagesInstrumentName = $"{InstrumentPrefix}.pending.messages";
public const string SentBytesInstrumentName = $"{InstrumentPrefix}.sent.bytes";
public const string ReceivedBytesInstrumentName = $"{InstrumentPrefix}.received.bytes";
public const string SentMessagesInstrumentName = $"{InstrumentPrefix}.sent.messages";
public const string ReceivedMessagesInstrumentName = $"{InstrumentPrefix}.received.messages";
public const string SubscriptionInstrumentName = $"{InstrumentPrefix}.subscription.count";

private const string InstrumentPrefix = "nats.client";

private static readonly Meter _meter;
private static readonly Counter<long> _subscriptionCounter;
private static readonly Counter<long> _pendingMessagesCounter;
private static readonly Counter<long> _sentBytesCounter;
private static readonly Counter<long> _receivedBytesCounter;
private static readonly Counter<long> _sentMessagesCounter;
private static readonly Counter<long> _receivedMessagesCounter;

static NatsMetrics()
{
_meter = new Meter(MeterName);

_subscriptionCounter = _meter.CreateCounter<long>(
SubscriptionInstrumentName,
unit: "{subscriptions}",
description: "Number of subscriptions");

_pendingMessagesCounter = _meter.CreateCounter<long>(
PendingMessagesInstrumentName,
unit: "{messages}",
description: "Number of pending messages");

_sentBytesCounter = _meter.CreateCounter<long>(
SentBytesInstrumentName,
unit: "{bytes}",
description: "Number of bytes sent");

_receivedBytesCounter = _meter.CreateCounter<long>(
ReceivedBytesInstrumentName,
unit: "{bytes}",
description: "Number of bytes received");

_sentMessagesCounter = _meter.CreateCounter<long>(
SentMessagesInstrumentName,
unit: "{messages}",
description: "Number of messages sent");

_receivedMessagesCounter = _meter.CreateCounter<long>(
ReceivedMessagesInstrumentName,
unit: "{messages}",
description: "Number of messages received");
}

public static void IncrementSubscriptionCount()
{
_subscriptionCounter.Add(1);
}

public static void DecrementSubscriptionCount()
{
_subscriptionCounter.Add(-1);
}

public static void AddPendingMessages(long messages)
{
_pendingMessagesCounter.Add(messages);
}

public static void AddSentBytes(long bytes)
{
_sentBytesCounter.Add(bytes);
}

public static void AddReceivedBytes(long bytes)
{
_receivedBytesCounter.Add(bytes);
}

public static void AddSentMessages(long messages)
{
_sentMessagesCounter.Add(messages);
}

public static void AddReceivedMessages(long messages)
{
_receivedMessagesCounter.Add(messages);
}
}
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnect
_waitForPongOrErrorSignal = waitForPongOrErrorSignal;
_infoParsed = infoParsed;
_pingCommands = new ConcurrentQueue<PingCommand>();
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Counter, connection.Opts.LoggerFactory);
_socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Opts.LoggerFactory);
_readLoop = Task.Run(ReadLoopAsync);
}

Expand Down Expand Up @@ -156,7 +156,7 @@ private async Task ReadLoopAsync()
code = GetCode(buffer);
}

Interlocked.Increment(ref _connection.Counter.ReceivedMessages);
NatsMetrics.AddReceivedMessages(1);

// Optimize for Msg parsing, Inline async code
if (code == ServerOpCodes.Msg)
Expand Down Expand Up @@ -443,7 +443,7 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
{
// reaches invalid line, log warn and try to get newline and go to nextloop.
_logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line");
Interlocked.Decrement(ref _connection.Counter.ReceivedMessages);
NatsMetrics.AddReceivedMessages(-1);

var position = buffer.PositionOf((byte)'\n');
if (position == null)
Expand Down
8 changes: 3 additions & 5 deletions src/NATS.Client.Core/Internal/SocketReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace NATS.Client.Core.Internal;
internal sealed class SocketReader
{
private readonly int _minimumBufferSize;
private readonly ConnectionStatsCounter _counter;
private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder();
private readonly Stopwatch _stopwatch = new Stopwatch();
private readonly ILogger<SocketReader> _logger;
Expand All @@ -18,11 +17,10 @@ internal sealed class SocketReader

private Memory<byte> _availableMemory;

public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ConnectionStatsCounter counter, ILoggerFactory loggerFactory)
public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ILoggerFactory loggerFactory)
{
_socketConnection = socketConnection;
_minimumBufferSize = minimumBufferSize;
_counter = counter;
_logger = loggerFactory.CreateLogger<SocketReader>();
_isTraceLogging = _logger.IsEnabled(LogLevel.Trace);
}
Expand Down Expand Up @@ -66,7 +64,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadAtLeastAsync(int minimumSize)
}

totalRead += read;
Interlocked.Add(ref _counter.ReceivedBytes, read);
NatsMetrics.AddReceivedBytes(read);
_seqeunceBuilder.Append(_availableMemory.Slice(0, read));
_availableMemory = _availableMemory.Slice(read);
}
Expand Down Expand Up @@ -112,7 +110,7 @@ public async ValueTask<ReadOnlySequence<byte>> ReadUntilReceiveNewLineAsync()
throw ex;
}

Interlocked.Add(ref _counter.ReceivedBytes, read);
NatsMetrics.AddReceivedBytes(read);
var appendMemory = _availableMemory.Slice(0, read);
_seqeunceBuilder.Append(appendMemory);
_availableMemory = _availableMemory.Slice(read);
Expand Down
10 changes: 3 additions & 7 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

#if NETSTANDARD
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
#endif
Expand Down Expand Up @@ -35,7 +35,6 @@ public partial class NatsConnection : INatsConnection
/// </summary>
public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync;

internal readonly ConnectionStatsCounter Counter; // allow to call from external sources
internal volatile ServerInfo? WritableServerInfo;

#pragma warning restore SA1401
Expand Down Expand Up @@ -81,8 +80,7 @@ public NatsConnection(NatsOpts opts)
_disposedCancellationTokenSource = new CancellationTokenSource();
_pool = new ObjectPool(opts.ObjectPoolSize);
_name = opts.Name;
Counter = new ConnectionStatsCounter();
CommandWriter = new CommandWriter("main", this, _pool, Opts, Counter, EnqueuePing);
CommandWriter = new CommandWriter("main", this, _pool, Opts, EnqueuePing);
InboxPrefix = NewInbox(opts.InboxPrefix);
SubscriptionManager = new SubscriptionManager(this, InboxPrefix);
_clientOpts = ClientOpts.Create(Opts);
Expand Down Expand Up @@ -220,8 +218,6 @@ internal string SpanDestinationName(string subject)
return tokens.Length < 2 ? subject : $"{tokens[0]}.{tokens[1]}";
}

internal NatsStats GetStats() => Counter.ToStats();

internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
{
return SubscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer);
Expand Down Expand Up @@ -455,7 +451,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect)
// Authentication
_userCredentials?.Authenticate(_clientOpts, WritableServerInfo);

await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Counter, EnqueuePing))
await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, EnqueuePing))
{
// add CONNECT and PING command to priority lane
await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false);
Expand Down
28 changes: 0 additions & 28 deletions src/NATS.Client.Core/NatsStats.cs

This file was deleted.

Loading