Skip to content

Commit

Permalink
Retry dispatching on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyRaga committed Aug 31, 2024
1 parent b4af29d commit 66aef8a
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions src/Contrib.KafkaFlow.Outbox/OutboxDispatcherService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Transactions;
using KafkaFlow.Producers;

namespace KafkaFlow.Outbox;

Expand All @@ -18,36 +19,67 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
_logger.LogInformation("Outbox dispatcher service has started");
while (!stoppingToken.IsCancellationRequested)
{
var hadBatch = await DispatchNextBatchAsync(stoppingToken);
if (!hadBatch)
var dispatchResult = await DispatchNextBatchAsync(stoppingToken).ConfigureAwait(false);
switch (dispatchResult)
{
_logger.LogDebug("The dispatcher queue is empty, will sleep before the next poll");
// if there was nothing to dispatch, sleep for 1 second before the next check
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
case DispatchBatchResult.BatchDispatched:
break;
case DispatchBatchResult.NoBatchToDispatch _:
_logger.LogDebug("The dispatcher queue is empty, will sleep before the next poll");
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
break;
case DispatchBatchResult.DispatchError(var error):
_logger.LogError(error, "Error while dispatching messages");
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
break;
}
}
_logger.LogInformation("Outbox dispatcher service has stopped");
}

private async Task<bool> DispatchNextBatchAsync(CancellationToken stoppingToken)
private async Task<DispatchBatchResult> DispatchNextBatchAsync(CancellationToken stoppingToken)
{
using var scope = BeginTransaction;
var batch = await _outboxBackend.Read(10, stoppingToken).ConfigureAwait(false);
try
{
var batch = await _outboxBackend.Read(10, stoppingToken).ConfigureAwait(false);
if (batch.Length == 0) return new DispatchBatchResult.NoBatchToDispatch();

var messagesToProduce = batch
.Select(x => new BatchProduceItem(
topic: x.TopicPartition.Topic,
messageKey: x.Message.Key,
messageValue: x.Message.Value,
headers: BuildHeaders(x)))
.ToList();


foreach (var record in batch)
await _producer.BatchProduceAsync(messagesToProduce).ConfigureAwait(false);

scope.Complete();
return new DispatchBatchResult.BatchDispatched(batch.Length);
}
catch (Exception ex)
{
var headers = record.Message.Headers == null ? null : new MessageHeaders(record.Message.Headers);
await _producer.ProduceAsync(record.TopicPartition.Topic, record.Message.Key, record.Message.Value, headers);
return new DispatchBatchResult.DispatchError(ex);
}

scope.Complete();
return batch.Length != 0;
}

private MessageHeaders? BuildHeaders(OutboxRecord record) =>
record.Message.Headers == null ? null : new MessageHeaders(record.Message.Headers);

private static TransactionScope BeginTransaction =>
new(
scopeOption: TransactionScopeOption.RequiresNew,
transactionOptions: new TransactionOptions
{ IsolationLevel = IsolationLevel.ReadCommitted, Timeout = TimeSpan.FromSeconds(30) },
asyncFlowOption: TransactionScopeAsyncFlowOption.Enabled);

private abstract record DispatchBatchResult
{
public sealed record BatchDispatched(int BatchSize) : DispatchBatchResult;
public sealed record NoBatchToDispatch : DispatchBatchResult;
public sealed record DispatchError(Exception Exception) : DispatchBatchResult;
}

}

0 comments on commit 66aef8a

Please sign in to comment.