From 7124125173236e2688b1b768c4be90e7ac1a41a0 Mon Sep 17 00:00:00 2001 From: Divyesh Bhandari <79130336+divyeshio@users.noreply.github.com> Date: Fri, 13 Sep 2024 21:39:52 +0530 Subject: [PATCH 1/2] Replace NatsStats with System.Diagnostics.Metrics implementation --- .../Commands/CommandWriter.cs | 8 +- .../Commands/PriorityCommandWriter.cs | 4 +- src/NATS.Client.Core/Internal/NatsMetrics.cs | 74 +++++++++++++++++++ .../Internal/NatsReadProtocolProcessor.cs | 7 +- src/NATS.Client.Core/Internal/SocketReader.cs | 10 +-- src/NATS.Client.Core/NatsConnection.cs | 13 ++-- src/NATS.Client.Core/NatsStats.cs | 28 ------- .../NatsBuilder.cs | 3 + 8 files changed, 98 insertions(+), 49 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/NatsMetrics.cs delete mode 100644 src/NATS.Client.Core/NatsStats.cs diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index ff77e0e89..8085ec0e3 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -37,7 +37,7 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly int _arrayPoolInitialSize; private readonly object _lock = new(); private readonly CancellationTokenSource _cts; - private readonly ConnectionStatsCounter _counter; + private readonly NatsMetrics _metrics; private readonly Memory _consolidateMem = new byte[SendMemSize].AsMemory(); private readonly TimeSpan _defaultCommandTimeout; private readonly Action _enqueuePing; @@ -55,7 +55,7 @@ internal sealed class CommandWriter : IAsyncDisposable private CancellationTokenSource? _ctsReader; private volatile bool _disposed; - public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) + public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, NatsMetrics metrics, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) { _logger = opts.LoggerFactory.CreateLogger(); _trace = _logger.IsEnabled(LogLevel.Trace); @@ -67,7 +67,7 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na // avoid defining another option. _arrayPoolInitialSize = opts.WriterBufferSize / 256; - _counter = counter; + _metrics = metrics; _defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout; _enqueuePing = enqueuePing; _protocolWriter = new ProtocolWriter(opts.SubjectEncoding); @@ -693,7 +693,7 @@ private void EnqueueCommand() return; } - Interlocked.Add(ref _counter.PendingMessages, 1); + _metrics.AddPendingMessages(1); _channelSize.Writer.TryWrite(size); var flush = _pipeWriter.FlushAsync(); diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs index 627c9afc9..fb3f20801 100644 --- a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable { private int _disposed; - public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing) + public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, NatsMetrics metrics, Action enqueuePing) { - CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing); + CommandWriter = new CommandWriter("init", connection, pool, opts, metrics, enqueuePing); CommandWriter.Reset(socketConnection); } diff --git a/src/NATS.Client.Core/Internal/NatsMetrics.cs b/src/NATS.Client.Core/Internal/NatsMetrics.cs new file mode 100644 index 000000000..479100b31 --- /dev/null +++ b/src/NATS.Client.Core/Internal/NatsMetrics.cs @@ -0,0 +1,74 @@ +using System.Diagnostics.Metrics; + +namespace NATS.Client.Core.Internal; + +public sealed class NatsMetrics +{ + public const string MeterName = "NATS.Client"; + + private readonly Meter _meter; + + private readonly Counter _subscriptionCounter; + private readonly Counter _sentBytesCounter; + private readonly Counter _receivedBytesCounter; + private readonly Counter _pendingMessagesCounter; + private readonly Counter _sentMessagesCounter; + private readonly Counter _receivedMessagesCounter; + + public NatsMetrics(IMeterFactory meterFactory) + { + _meter = meterFactory.Create(MeterName); + + _subscriptionCounter = _meter.CreateCounter( + "nats.client.subscription.count", + unit: "{subscriptions}", + description: "Number of subscriptions"); + + _sentBytesCounter = _meter.CreateCounter( + "nats.client.sent.bytes", + unit: "bytes", + description: "Number of bytes sent"); + + _receivedBytesCounter = _meter.CreateCounter( + "nats.client.received.bytes", + unit: "bytes", + description: "Number of bytes received"); + + _pendingMessagesCounter = _meter.CreateCounter( + "nats.client.pending.messages", + unit: "messages", + description: "Number of pending messages"); + + _sentMessagesCounter = _meter.CreateCounter( + "nats.client.sent.messages", + unit: "messages", + description: "Number of messages sent"); + + _receivedMessagesCounter = _meter.CreateCounter( + "nats.client.received.messages", + unit: "messages", + description: "Number of messages received"); + } + + public void IncrementSubscriptionCount() => _subscriptionCounter.Add(1); + + public void DecrementSubscriptionCount() => _subscriptionCounter.Add(-1); + + public void AddSentBytes(long bytes) => _sentBytesCounter.Add(bytes); + + public void AddReceivedBytes(long bytes) => _receivedBytesCounter.Add(bytes); + + public void AddPendingMessages(long messages) => _pendingMessagesCounter.Add(messages); + + public void AddSentMessages(long messages) => _sentMessagesCounter.Add(messages); + + public void AddReceivedMessages(long messages) => _receivedMessagesCounter.Add(messages); + + // This factory used when type is created without DI. + internal sealed class DummyMeterFactory : IMeterFactory + { + public Meter Create(MeterOptions options) => new(options); + + public void Dispose() { } + } +} diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index c0d426d15..79bf492b5 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -21,6 +21,7 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable private readonly Task _infoParsed; // wait for an upgrade private readonly ConcurrentQueue _pingCommands; // wait for pong private readonly ILogger _logger; + private readonly NatsMetrics _metrics; private readonly bool _trace; private int _disposed; @@ -33,7 +34,7 @@ public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnect _waitForPongOrErrorSignal = waitForPongOrErrorSignal; _infoParsed = infoParsed; _pingCommands = new ConcurrentQueue(); - _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Counter, connection.Opts.LoggerFactory); + _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, _connection.Metrics, connection.Opts.LoggerFactory); _readLoop = Task.Run(ReadLoopAsync); } @@ -156,7 +157,7 @@ private async Task ReadLoopAsync() code = GetCode(buffer); } - Interlocked.Increment(ref _connection.Counter.ReceivedMessages); + _metrics.AddReceivedMessages(1); // Optimize for Msg parsing, Inline async code if (code == ServerOpCodes.Msg) @@ -443,7 +444,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { // reaches invalid line, log warn and try to get newline and go to nextloop. _logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line"); - Interlocked.Decrement(ref _connection.Counter.ReceivedMessages); + _metrics.AddReceivedMessages(-1); var position = buffer.PositionOf((byte)'\n'); if (position == null) diff --git a/src/NATS.Client.Core/Internal/SocketReader.cs b/src/NATS.Client.Core/Internal/SocketReader.cs index 2aa1feb76..c398b59db 100644 --- a/src/NATS.Client.Core/Internal/SocketReader.cs +++ b/src/NATS.Client.Core/Internal/SocketReader.cs @@ -9,7 +9,7 @@ namespace NATS.Client.Core.Internal; internal sealed class SocketReader { private readonly int _minimumBufferSize; - private readonly ConnectionStatsCounter _counter; + private readonly NatsMetrics _metrics; private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder(); private readonly Stopwatch _stopwatch = new Stopwatch(); private readonly ILogger _logger; @@ -18,11 +18,11 @@ internal sealed class SocketReader private Memory _availableMemory; - public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ConnectionStatsCounter counter, ILoggerFactory loggerFactory) + public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, NatsMetrics metrics, ILoggerFactory loggerFactory) { _socketConnection = socketConnection; _minimumBufferSize = minimumBufferSize; - _counter = counter; + _metrics = metrics; _logger = loggerFactory.CreateLogger(); _isTraceLogging = _logger.IsEnabled(LogLevel.Trace); } @@ -66,7 +66,7 @@ public async ValueTask> ReadAtLeastAsync(int minimumSize) } totalRead += read; - Interlocked.Add(ref _counter.ReceivedBytes, read); + _metrics.AddReceivedBytes(read); _seqeunceBuilder.Append(_availableMemory.Slice(0, read)); _availableMemory = _availableMemory.Slice(read); } @@ -112,7 +112,7 @@ public async ValueTask> ReadUntilReceiveNewLineAsync() throw ex; } - Interlocked.Add(ref _counter.ReceivedBytes, read); + _metrics.AddReceivedBytes(read); var appendMemory = _availableMemory.Slice(0, read); _seqeunceBuilder.Append(appendMemory); _availableMemory = _availableMemory.Slice(read); diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index f08315b8f..b79a2a6e4 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,10 +1,11 @@ using System.Buffers; using System.Diagnostics; -using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; using NATS.Client.Core.Internal; +using static NATS.Client.Core.Internal.NatsMetrics; + #if NETSTANDARD using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random; #endif @@ -35,12 +36,12 @@ public partial class NatsConnection : INatsConnection /// public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync; - internal readonly ConnectionStatsCounter Counter; // allow to call from external sources internal volatile ServerInfo? WritableServerInfo; #pragma warning restore SA1401 private readonly object _gate = new object(); private readonly ILogger _logger; + internal readonly NatsMetrics Metrics; private readonly ObjectPool _pool; private readonly CancellationTokenSource _disposedCancellationTokenSource; private readonly string _name; @@ -81,8 +82,8 @@ public NatsConnection(NatsOpts opts) _disposedCancellationTokenSource = new CancellationTokenSource(); _pool = new ObjectPool(opts.ObjectPoolSize); _name = opts.Name; - Counter = new ConnectionStatsCounter(); - CommandWriter = new CommandWriter("main", this, _pool, Opts, Counter, EnqueuePing); + Metrics = new NatsMetrics(new DummyMeterFactory()); + CommandWriter = new CommandWriter("main", this, _pool, Opts, Metrics, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _clientOpts = ClientOpts.Create(Opts); @@ -220,8 +221,6 @@ internal string SpanDestinationName(string subject) return tokens.Length < 2 ? subject : $"{tokens[0]}.{tokens[1]}"; } - internal NatsStats GetStats() => Counter.ToStats(); - internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { return SubscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer); @@ -455,7 +454,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // Authentication _userCredentials?.Authenticate(_clientOpts, WritableServerInfo); - await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Counter, EnqueuePing)) + await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Metrics, EnqueuePing)) { // add CONNECT and PING command to priority lane await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsStats.cs b/src/NATS.Client.Core/NatsStats.cs deleted file mode 100644 index 403a75a08..000000000 --- a/src/NATS.Client.Core/NatsStats.cs +++ /dev/null @@ -1,28 +0,0 @@ -namespace NATS.Client.Core; - -public readonly record struct NatsStats -( - long SentBytes, - long ReceivedBytes, - long PendingMessages, - long SentMessages, - long ReceivedMessages, - long SubscriptionCount); - -internal sealed class ConnectionStatsCounter -{ - // for operate Interlocked.Increment/Decrement/Add, expose field as public -#pragma warning disable SA1401 - public long SentBytes; - public long SentMessages; - public long PendingMessages; - public long ReceivedBytes; - public long ReceivedMessages; - public long SubscriptionCount; -#pragma warning restore SA1401 - - public NatsStats ToStats() - { - return new NatsStats(SentBytes, ReceivedBytes, PendingMessages, SentMessages, ReceivedMessages, SubscriptionCount); - } -} diff --git a/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs b/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs index cc9bc7877..b104366e0 100644 --- a/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs +++ b/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using NATS.Client.Core; +using NATS.Client.Core.Internal; namespace NATS.Extensions.Microsoft.DependencyInjection; @@ -98,6 +99,8 @@ internal IServiceCollection Build() } } + _services.AddSingleton(); + return _services; } From cdf2a6f774f4a2476a1490e9ded8719f8a6595c4 Mon Sep 17 00:00:00 2001 From: Divyesh Bhandari <79130336+divyeshio@users.noreply.github.com> Date: Mon, 16 Sep 2024 01:40:02 +0530 Subject: [PATCH 2/2] Update impl --- .../Commands/CommandWriter.cs | 17 ++- .../Commands/PriorityCommandWriter.cs | 4 +- src/NATS.Client.Core/Internal/NatsMetrics.cs | 100 +++++++++++------- .../Internal/NatsReadProtocolProcessor.cs | 7 +- src/NATS.Client.Core/Internal/SocketReader.cs | 8 +- src/NATS.Client.Core/NatsConnection.cs | 7 +- .../NatsBuilder.cs | 3 - 7 files changed, 82 insertions(+), 64 deletions(-) diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 8085ec0e3..1c6794bfb 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -37,7 +37,6 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly int _arrayPoolInitialSize; private readonly object _lock = new(); private readonly CancellationTokenSource _cts; - private readonly NatsMetrics _metrics; private readonly Memory _consolidateMem = new byte[SendMemSize].AsMemory(); private readonly TimeSpan _defaultCommandTimeout; private readonly Action _enqueuePing; @@ -55,7 +54,7 @@ internal sealed class CommandWriter : IAsyncDisposable private CancellationTokenSource? _ctsReader; private volatile bool _disposed; - public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, NatsMetrics metrics, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) + public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) { _logger = opts.LoggerFactory.CreateLogger(); _trace = _logger.IsEnabled(LogLevel.Trace); @@ -67,7 +66,6 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na // avoid defining another option. _arrayPoolInitialSize = opts.WriterBufferSize / 256; - _metrics = metrics; _defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout; _enqueuePing = enqueuePing; _protocolWriter = new ProtocolWriter(opts.SubjectEncoding); @@ -693,11 +691,20 @@ private void EnqueueCommand() return; } - _metrics.AddPendingMessages(1); + NatsMetrics.AddPendingMessages(1); _channelSize.Writer.TryWrite(size); var flush = _pipeWriter.FlushAsync(); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + + if (flush.IsCompletedSuccessfully) + { + _flushTask = null; + NatsMetrics.AddPendingMessages(-1); + } + else + { + _flushTask = flush.AsTask(); + } } private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken) diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs index fb3f20801..a1c5393f9 100644 --- a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable { private int _disposed; - public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, NatsMetrics metrics, Action enqueuePing) + public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, Action enqueuePing) { - CommandWriter = new CommandWriter("init", connection, pool, opts, metrics, enqueuePing); + CommandWriter = new CommandWriter("init", connection, pool, opts, enqueuePing); CommandWriter.Reset(socketConnection); } diff --git a/src/NATS.Client.Core/Internal/NatsMetrics.cs b/src/NATS.Client.Core/Internal/NatsMetrics.cs index 479100b31..a502033b1 100644 --- a/src/NATS.Client.Core/Internal/NatsMetrics.cs +++ b/src/NATS.Client.Core/Internal/NatsMetrics.cs @@ -2,73 +2,93 @@ namespace NATS.Client.Core.Internal; -public sealed class NatsMetrics +public class NatsMetrics { public const string MeterName = "NATS.Client"; - - private readonly Meter _meter; - - private readonly Counter _subscriptionCounter; - private readonly Counter _sentBytesCounter; - private readonly Counter _receivedBytesCounter; - private readonly Counter _pendingMessagesCounter; - private readonly Counter _sentMessagesCounter; - private readonly Counter _receivedMessagesCounter; - - public NatsMetrics(IMeterFactory meterFactory) + public const string PendingMessagesInstrumentName = $"{InstrumentPrefix}.pending.messages"; + public const string SentBytesInstrumentName = $"{InstrumentPrefix}.sent.bytes"; + public const string ReceivedBytesInstrumentName = $"{InstrumentPrefix}.received.bytes"; + public const string SentMessagesInstrumentName = $"{InstrumentPrefix}.sent.messages"; + public const string ReceivedMessagesInstrumentName = $"{InstrumentPrefix}.received.messages"; + public const string SubscriptionInstrumentName = $"{InstrumentPrefix}.subscription.count"; + + private const string InstrumentPrefix = "nats.client"; + + private static readonly Meter _meter; + private static readonly Counter _subscriptionCounter; + private static readonly Counter _pendingMessagesCounter; + private static readonly Counter _sentBytesCounter; + private static readonly Counter _receivedBytesCounter; + private static readonly Counter _sentMessagesCounter; + private static readonly Counter _receivedMessagesCounter; + + static NatsMetrics() { - _meter = meterFactory.Create(MeterName); + _meter = new Meter(MeterName); _subscriptionCounter = _meter.CreateCounter( - "nats.client.subscription.count", + SubscriptionInstrumentName, unit: "{subscriptions}", description: "Number of subscriptions"); + _pendingMessagesCounter = _meter.CreateCounter( + PendingMessagesInstrumentName, + unit: "{messages}", + description: "Number of pending messages"); + _sentBytesCounter = _meter.CreateCounter( - "nats.client.sent.bytes", - unit: "bytes", + SentBytesInstrumentName, + unit: "{bytes}", description: "Number of bytes sent"); _receivedBytesCounter = _meter.CreateCounter( - "nats.client.received.bytes", - unit: "bytes", + ReceivedBytesInstrumentName, + unit: "{bytes}", description: "Number of bytes received"); - _pendingMessagesCounter = _meter.CreateCounter( - "nats.client.pending.messages", - unit: "messages", - description: "Number of pending messages"); - _sentMessagesCounter = _meter.CreateCounter( - "nats.client.sent.messages", - unit: "messages", + SentMessagesInstrumentName, + unit: "{messages}", description: "Number of messages sent"); _receivedMessagesCounter = _meter.CreateCounter( - "nats.client.received.messages", - unit: "messages", + ReceivedMessagesInstrumentName, + unit: "{messages}", description: "Number of messages received"); } - public void IncrementSubscriptionCount() => _subscriptionCounter.Add(1); - - public void DecrementSubscriptionCount() => _subscriptionCounter.Add(-1); - - public void AddSentBytes(long bytes) => _sentBytesCounter.Add(bytes); + public static void IncrementSubscriptionCount() + { + _subscriptionCounter.Add(1); + } - public void AddReceivedBytes(long bytes) => _receivedBytesCounter.Add(bytes); + public static void DecrementSubscriptionCount() + { + _subscriptionCounter.Add(-1); + } - public void AddPendingMessages(long messages) => _pendingMessagesCounter.Add(messages); + public static void AddPendingMessages(long messages) + { + _pendingMessagesCounter.Add(messages); + } - public void AddSentMessages(long messages) => _sentMessagesCounter.Add(messages); + public static void AddSentBytes(long bytes) + { + _sentBytesCounter.Add(bytes); + } - public void AddReceivedMessages(long messages) => _receivedMessagesCounter.Add(messages); + public static void AddReceivedBytes(long bytes) + { + _receivedBytesCounter.Add(bytes); + } - // This factory used when type is created without DI. - internal sealed class DummyMeterFactory : IMeterFactory + public static void AddSentMessages(long messages) { - public Meter Create(MeterOptions options) => new(options); + _sentMessagesCounter.Add(messages); + } - public void Dispose() { } + public static void AddReceivedMessages(long messages) + { + _receivedMessagesCounter.Add(messages); } } diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index 79bf492b5..f278a2f03 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -21,7 +21,6 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable private readonly Task _infoParsed; // wait for an upgrade private readonly ConcurrentQueue _pingCommands; // wait for pong private readonly ILogger _logger; - private readonly NatsMetrics _metrics; private readonly bool _trace; private int _disposed; @@ -34,7 +33,7 @@ public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnect _waitForPongOrErrorSignal = waitForPongOrErrorSignal; _infoParsed = infoParsed; _pingCommands = new ConcurrentQueue(); - _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, _connection.Metrics, connection.Opts.LoggerFactory); + _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Opts.LoggerFactory); _readLoop = Task.Run(ReadLoopAsync); } @@ -157,7 +156,7 @@ private async Task ReadLoopAsync() code = GetCode(buffer); } - _metrics.AddReceivedMessages(1); + NatsMetrics.AddReceivedMessages(1); // Optimize for Msg parsing, Inline async code if (code == ServerOpCodes.Msg) @@ -444,7 +443,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { // reaches invalid line, log warn and try to get newline and go to nextloop. _logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line"); - _metrics.AddReceivedMessages(-1); + NatsMetrics.AddReceivedMessages(-1); var position = buffer.PositionOf((byte)'\n'); if (position == null) diff --git a/src/NATS.Client.Core/Internal/SocketReader.cs b/src/NATS.Client.Core/Internal/SocketReader.cs index c398b59db..bd48da542 100644 --- a/src/NATS.Client.Core/Internal/SocketReader.cs +++ b/src/NATS.Client.Core/Internal/SocketReader.cs @@ -9,7 +9,6 @@ namespace NATS.Client.Core.Internal; internal sealed class SocketReader { private readonly int _minimumBufferSize; - private readonly NatsMetrics _metrics; private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder(); private readonly Stopwatch _stopwatch = new Stopwatch(); private readonly ILogger _logger; @@ -18,11 +17,10 @@ internal sealed class SocketReader private Memory _availableMemory; - public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, NatsMetrics metrics, ILoggerFactory loggerFactory) + public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ILoggerFactory loggerFactory) { _socketConnection = socketConnection; _minimumBufferSize = minimumBufferSize; - _metrics = metrics; _logger = loggerFactory.CreateLogger(); _isTraceLogging = _logger.IsEnabled(LogLevel.Trace); } @@ -66,7 +64,7 @@ public async ValueTask> ReadAtLeastAsync(int minimumSize) } totalRead += read; - _metrics.AddReceivedBytes(read); + NatsMetrics.AddReceivedBytes(read); _seqeunceBuilder.Append(_availableMemory.Slice(0, read)); _availableMemory = _availableMemory.Slice(read); } @@ -112,7 +110,7 @@ public async ValueTask> ReadUntilReceiveNewLineAsync() throw ex; } - _metrics.AddReceivedBytes(read); + NatsMetrics.AddReceivedBytes(read); var appendMemory = _availableMemory.Slice(0, read); _seqeunceBuilder.Append(appendMemory); _availableMemory = _availableMemory.Slice(read); diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index b79a2a6e4..cf54f24e3 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -4,7 +4,6 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; using NATS.Client.Core.Internal; -using static NATS.Client.Core.Internal.NatsMetrics; #if NETSTANDARD using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random; @@ -41,7 +40,6 @@ public partial class NatsConnection : INatsConnection #pragma warning restore SA1401 private readonly object _gate = new object(); private readonly ILogger _logger; - internal readonly NatsMetrics Metrics; private readonly ObjectPool _pool; private readonly CancellationTokenSource _disposedCancellationTokenSource; private readonly string _name; @@ -82,8 +80,7 @@ public NatsConnection(NatsOpts opts) _disposedCancellationTokenSource = new CancellationTokenSource(); _pool = new ObjectPool(opts.ObjectPoolSize); _name = opts.Name; - Metrics = new NatsMetrics(new DummyMeterFactory()); - CommandWriter = new CommandWriter("main", this, _pool, Opts, Metrics, EnqueuePing); + CommandWriter = new CommandWriter("main", this, _pool, Opts, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _clientOpts = ClientOpts.Create(Opts); @@ -454,7 +451,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // Authentication _userCredentials?.Authenticate(_clientOpts, WritableServerInfo); - await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Metrics, EnqueuePing)) + await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, EnqueuePing)) { // add CONNECT and PING command to priority lane await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false); diff --git a/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs b/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs index b104366e0..cc9bc7877 100644 --- a/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs +++ b/src/NATS.Extensions.Microsoft.DependencyInjection/NatsBuilder.cs @@ -3,7 +3,6 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using NATS.Client.Core; -using NATS.Client.Core.Internal; namespace NATS.Extensions.Microsoft.DependencyInjection; @@ -99,8 +98,6 @@ internal IServiceCollection Build() } } - _services.AddSingleton(); - return _services; }