From 66aef8a466ec29c0b572886bf9b6f3f2e38a1eed Mon Sep 17 00:00:00 2001 From: Alexey Raga Date: Sat, 31 Aug 2024 12:42:56 +1000 Subject: [PATCH] Retry dispatching on errors --- .../OutboxDispatcherService.cs | 58 ++++++++++++++----- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs index 0507532..2367ea6 100644 --- a/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs +++ b/src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Transactions; +using KafkaFlow.Producers; namespace KafkaFlow.Outbox; @@ -18,36 +19,67 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) _logger.LogInformation("Outbox dispatcher service has started"); while (!stoppingToken.IsCancellationRequested) { - var hadBatch = await DispatchNextBatchAsync(stoppingToken); - if (!hadBatch) + var dispatchResult = await DispatchNextBatchAsync(stoppingToken).ConfigureAwait(false); + switch (dispatchResult) { - _logger.LogDebug("The dispatcher queue is empty, will sleep before the next poll"); - // if there was nothing to dispatch, sleep for 1 second before the next check - await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); + case DispatchBatchResult.BatchDispatched: + break; + case DispatchBatchResult.NoBatchToDispatch _: + _logger.LogDebug("The dispatcher queue is empty, will sleep before the next poll"); + await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); + break; + case DispatchBatchResult.DispatchError(var error): + _logger.LogError(error, "Error while dispatching messages"); + await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + break; } } _logger.LogInformation("Outbox dispatcher service has stopped"); } - private async Task DispatchNextBatchAsync(CancellationToken stoppingToken) + private async Task DispatchNextBatchAsync(CancellationToken stoppingToken) { using var scope = BeginTransaction; - var batch = await _outboxBackend.Read(10, stoppingToken).ConfigureAwait(false); + try + { + var batch = await _outboxBackend.Read(10, stoppingToken).ConfigureAwait(false); + if (batch.Length == 0) return new DispatchBatchResult.NoBatchToDispatch(); + + var messagesToProduce = batch + .Select(x => new BatchProduceItem( + topic: x.TopicPartition.Topic, + messageKey: x.Message.Key, + messageValue: x.Message.Value, + headers: BuildHeaders(x))) + .ToList(); + - foreach (var record in batch) + await _producer.BatchProduceAsync(messagesToProduce).ConfigureAwait(false); + + scope.Complete(); + return new DispatchBatchResult.BatchDispatched(batch.Length); + } + catch (Exception ex) { - var headers = record.Message.Headers == null ? null : new MessageHeaders(record.Message.Headers); - await _producer.ProduceAsync(record.TopicPartition.Topic, record.Message.Key, record.Message.Value, headers); + return new DispatchBatchResult.DispatchError(ex); } - - scope.Complete(); - return batch.Length != 0; } + private MessageHeaders? BuildHeaders(OutboxRecord record) => + record.Message.Headers == null ? null : new MessageHeaders(record.Message.Headers); + private static TransactionScope BeginTransaction => new( scopeOption: TransactionScopeOption.RequiresNew, transactionOptions: new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) }, asyncFlowOption: TransactionScopeAsyncFlowOption.Enabled); + + private abstract record DispatchBatchResult + { + public sealed record BatchDispatched(int BatchSize) : DispatchBatchResult; + public sealed record NoBatchToDispatch : DispatchBatchResult; + public sealed record DispatchError(Exception Exception) : DispatchBatchResult; + } + }