TurboMediator
Persistence

Outbox Pattern

Reliable message delivery with the transactional outbox pattern

The outbox pattern ensures reliable notification delivery by storing notifications in the database within the same transaction as your business data, then processing them asynchronously.

How It Works

  1. When a notification is published, it's serialized and stored in the outbox table (within the current transaction)
  2. The transaction commits atomically — both business data and outbox messages
  3. A background processor picks up pending outbox messages and publishes them to an external message broker (via IOutboxMessageBrokerPublisher)

This ensures at-least-once delivery — notifications are never lost even if the application crashes after the transaction commits.

IOutboxStore

public interface IOutboxStore
{
    ValueTask SaveAsync(OutboxMessage message, CancellationToken ct);
    IAsyncEnumerable<OutboxMessage> GetPendingAsync(int batchSize, CancellationToken ct);
    ValueTask MarkAsProcessingAsync(Guid messageId, CancellationToken ct);
    ValueTask<bool> TryClaimAsync(Guid messageId, string workerId, CancellationToken ct);
    ValueTask MarkAsProcessedAsync(Guid messageId, CancellationToken ct);
    ValueTask IncrementRetryAsync(Guid messageId, string error, CancellationToken ct);
    ValueTask MoveToDeadLetterAsync(Guid messageId, string reason, CancellationToken ct);
    ValueTask<int> CleanupAsync(TimeSpan olderThan, CancellationToken ct);
}

OutboxMessage

public class OutboxMessage
{
    public Guid Id { get; set; }
    public string MessageType { get; set; }     // Full type name
    public string Payload { get; set; }          // JSON serialized
    public string? CorrelationId { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? ProcessedAt { get; set; }
    public int RetryCount { get; set; }
    public int MaxRetries { get; set; }          // Per-message retry limit
    public string? Error { get; set; }
    public OutboxMessageStatus Status { get; set; }
    public string? ClaimedBy { get; set; }       // Worker that claimed this message
    public Dictionary<string, string>? Headers { get; set; }
}

public enum OutboxMessageStatus
{
    Pending = 0,       // Awaiting processing (includes messages pending retry)
    Processing = 1,
    Processed = 2,
    DeadLettered = 3   // Exceeded max retries, moved to dead letter
}

Configuration

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(options =>
    {
        options.PublishImmediately = false;
        options.MaxRetries = 5;
        options.CorrelationIdGenerator = () => Guid.NewGuid().ToString();
    });
});

Using the Attribute

Mark notifications for outbox delivery:

[WithOutbox]
public record OrderCreatedNotification(Guid OrderId) : INotification;

// With a specific destination (queue/topic)
[WithOutbox("orders-queue")]
public record OrderShippedNotification(Guid OrderId) : INotification;

// With destination, partition key, and custom retry settings
[WithOutbox("payments", PartitionKey = nameof(PaymentProcessedNotification.OrderId), MaxRetries = 10)]
public record PaymentProcessedNotification(Guid OrderId, decimal Amount) : INotification;

// Without destination but with custom settings
[WithOutbox(MaxRetries = 10, PublishImmediately = true)]
public record CriticalNotification(string Data) : INotification;

When PublishImmediately = true, the notification is saved to the outbox and published to the broker inline (if one is registered). On failure, the retry count is incremented and the message stays Pending so the background processor can pick it up.

Practical Example

[Transactional]
public record PlaceOrderCommand(Guid CustomerId, List<OrderItem> Items)
    : ICommand<Order>;

public class PlaceOrderHandler : ICommandHandler<PlaceOrderCommand, Order>
{
    private readonly AppDbContext _db;
    private readonly IMediator _mediator;

    public PlaceOrderHandler(AppDbContext db, IMediator mediator)
    {
        _db = db;
        _mediator = mediator;
    }

    public async ValueTask<Order> Handle(PlaceOrderCommand command, CancellationToken ct)
    {
        var order = new Order { CustomerId = command.CustomerId, /* ... */ };
        _db.Orders.Add(order);

        // This notification is stored in the outbox, not published immediately
        // It will be published after the transaction commits
        await _mediator.Publish(new OrderCreatedNotification(order.Id), ct);

        return order;
    }
}

The outbox pattern provides at-least-once delivery. Your notification handlers must be idempotent — designed to handle the same notification being delivered more than once. See the Inbox Pattern for built-in deduplication on the consumer side.

Message Broker Integration

By default the outbox processor marks messages as processed without forwarding them anywhere. To actually deliver messages to an external broker (RabbitMQ, Azure Service Bus, AWS SNS/SQS, Kafka, etc.) you need to:

  1. Implement IOutboxMessageBrokerPublisher
  2. Register your implementation
  3. Enable broker publishing on the processor

IOutboxMessageBrokerPublisher

public interface IOutboxMessageBrokerPublisher
{
    /// <summary>Publishes to the default destination.</summary>
    ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default);

    /// <summary>Publishes to a specific destination (queue / topic).</summary>
    ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default);
}

The processor calls the two-argument overload when a destination is resolved by the router (see Message Routing), and the single-argument overload otherwise.

Example: RabbitMQ

public class RabbitMqBrokerPublisher : IOutboxMessageBrokerPublisher
{
    private readonly IConnection _connection;
    private readonly ILogger<RabbitMqBrokerPublisher> _logger;

    public RabbitMqBrokerPublisher(IConnection connection, ILogger<RabbitMqBrokerPublisher> logger)
    {
        _connection = connection;
        _logger = logger;
    }

    public ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default)
        => PublishAsync(message, "outbox-messages", cancellationToken);

    public ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default)
    {
        using var channel = _connection.CreateModel();
        channel.QueueDeclare(destination, durable: true, exclusive: false, autoDelete: false);

        var body = Encoding.UTF8.GetBytes(message.Payload);

        var props = channel.CreateBasicProperties();
        props.Persistent = true;
        props.MessageId = message.Id.ToString();
        props.Type = message.MessageType;
        props.CorrelationId = message.CorrelationId;

        // Forward any headers stored on the message
        if (message.Headers?.Count > 0)
        {
            props.Headers = new Dictionary<string, object>(
                message.Headers.ToDictionary(kv => kv.Key, kv => (object)kv.Value));
        }

        channel.BasicPublish(exchange: "", routingKey: destination, basicProperties: props, body: body);

        _logger.LogDebug("Published outbox message {Id} to queue '{Queue}'", message.Id, destination);
        return ValueTask.CompletedTask;
    }
}

Example: Azure Service Bus

public class ServiceBusBrokerPublisher : IOutboxMessageBrokerPublisher
{
    private readonly ServiceBusClient _client;

    public ServiceBusBrokerPublisher(ServiceBusClient client) => _client = client;

    public ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default)
        => PublishAsync(message, "outbox-messages", cancellationToken);

    public async ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default)
    {
        var sender = _client.CreateSender(destination);

        var sbMessage = new ServiceBusMessage(message.Payload)
        {
            MessageId      = message.Id.ToString(),
            CorrelationId  = message.CorrelationId,
            ContentType    = "application/json",
            Subject        = message.MessageType,
        };

        // Partition key for ordered delivery (set by [WithOutbox("dest", PartitionKey = ...)])
        if (message.Headers?.TryGetValue("partition-key", out var partitionKey) == true)
            sbMessage.PartitionKey = partitionKey;

        if (message.Headers is not null)
            foreach (var (key, value) in message.Headers)
                sbMessage.ApplicationProperties[key] = value;

        await sender.SendMessageAsync(sbMessage, cancellationToken);
    }
}

Registration

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(outbox =>
    {
        outbox.AddProcessor()
              .PublishToMessageBroker()   // enable broker mode
              .WithMaxRetries(5)
              .WithBatchSize(50);
    });
});

// Register your publisher implementation
builder.Services.AddScoped<IOutboxMessageBrokerPublisher, RabbitMqBrokerPublisher>();
// or
builder.Services.AddScoped<IOutboxMessageBrokerPublisher, ServiceBusBrokerPublisher>();

If PublishToMessageBroker() is called but no IOutboxMessageBrokerPublisher is registered, the processor logs a warning and stops processing — messages remain Pending. Always register a publisher when broker mode is enabled.

When PublishToMessageBroker() is not called (the default), the processor simply marks messages as Processed after they are picked up. This is useful for local development, testing, or scenarios where you consume outbox messages via direct polling instead of a broker.

Message Routing

The router decides which queue or topic each message type is published to. You can specify the destination directly on the [WithOutbox] attribute or use the fluent builder for global configuration.

Destination via [WithOutbox]

Pass the destination as the first argument and an optional PartitionKey:

[WithOutbox("orders")]
public record OrderCreatedNotification(Guid OrderId) : INotification;

// With partition key for ordered delivery (e.g. Azure Service Bus sessions)
[WithOutbox("payments", PartitionKey = nameof(PaymentProcessedNotification.OrderId))]
public record PaymentProcessedNotification(Guid OrderId, decimal Amount) : INotification;

The PartitionKey property names the property on the message whose value is read at publish time and forwarded to the broker (stored in OutboxMessage.Headers["partition-key"]).

When no destination is specified ([WithOutbox]), the router falls back to naming conventions or the default destination.

Fluent Routing Configuration

Use the builder to configure global routing behavior:

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(outbox =>
    {
        outbox
            .AddProcessor()
            .PublishToMessageBroker()
            // Auto-generate destination names from the type name
            .UseKebabCaseNaming()               // OrderCreatedNotification → "order-created-notification"
            // or: .UseSnakeCaseNaming()          // → "order_created_notification"
            // or: .UseTypeNameRouting()           // → "OrderCreatedNotification"
            .WithDestinationPrefix("myapp.")    // "myapp.order-created-notification"
            .WithDefaultDestination("fallback") // fallback when no rule matches
            // Explicit per-type overrides
            .MapType<OrderCreatedNotification>("orders")
            .MapType<PaymentProcessedNotification>("payments");
    });
});

IOutboxMessageRouter

For advanced scenarios you can replace the default router entirely:

public interface IOutboxMessageRouter
{
    string  GetDestination(string messageType);
    string  GetDestination<T>();
    string  GetDestination(Type type);
    string? GetPartitionKey(string messageType);
    string? GetPartitionKey<T>();
    string? GetPartitionKey(Type type);
}

Register a custom implementation via DI:

builder.Services.AddSingleton<IOutboxMessageRouter, MyCustomRouter>();

Routing Priority

The default OutboxMessageRouter resolves destinations in this order:

  1. Explicit TypeMappings configured via .MapType<T>() or OutboxRoutingOptions.MapType()
  2. [WithOutbox("destination")] — destination specified on the attribute
  3. Naming convention (KebabCase / SnakeCase / TypeName)
  4. DefaultDestination (fallback, default value: "outbox-messages")

Dead Letter Queue (DLQ)

Messages that exceed their maximum retry attempts are moved to a dead letter status instead of being retried indefinitely. This prevents poison messages from blocking the outbox.

How It Works

  1. OutboxProcessor picks up a pending message with high retry count
  2. If RetryCount >= MaxRetries, the message is dead-lettered
  3. If an IOutboxDeadLetterHandler is registered, it's called first (for alerts, DLQ publishing, etc.)
  4. The message status changes to DeadLettered — even if the handler throws an exception

IOutboxDeadLetterHandler

public interface IOutboxDeadLetterHandler
{
    ValueTask HandleAsync(OutboxMessage message, string reason, CancellationToken ct);
}

Example: Alerting on Dead Letters

public class AlertingDeadLetterHandler : IOutboxDeadLetterHandler
{
    private readonly ILogger<AlertingDeadLetterHandler> _logger;
    private readonly IAlertService _alertService;

    public AlertingDeadLetterHandler(
        ILogger<AlertingDeadLetterHandler> logger,
        IAlertService alertService)
    {
        _logger = logger;
        _alertService = alertService;
    }

    public async ValueTask HandleAsync(
        OutboxMessage message, string reason, CancellationToken ct)
    {
        _logger.LogCritical(
            "Message {MessageId} of type {Type} dead-lettered: {Reason}",
            message.Id, message.MessageType, reason);

        await _alertService.SendAsync(
            $"Outbox DLQ: {message.MessageType}",
            $"Message {message.Id} failed after {message.RetryCount} retries. Error: {message.Error}",
            ct);
    }
}

Registration

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(outbox =>
    {
        outbox.AddProcessor()
              .WithMaxRetries(5)
              .WithDeadLetterHandler<AlertingDeadLetterHandler>();
    });
});

// Or standalone
builder.Services.AddTurboMediator(m =>
{
    m.WithDeadLetterHandler<AlertingDeadLetterHandler>();
});

The dead letter handler is optional. Even without one, messages will still be moved to DeadLettered status and stop being retried. The handler allows you to take additional action (alerting, external DLQ, compensation). If the handler throws, the message is still moved to DeadLettered — the error is logged and appended to the reason.

Multi-Worker Concurrency

The outbox processor supports running multiple worker instances safely — whether in the same process, multiple pods, or separate services.

How It Works

The processor uses optimistic concurrency via TryClaimAsync. When a worker picks up a batch of candidates:

  1. GetPendingAsync returns candidate messages (a simple SELECT)
  2. For each candidate, the worker calls TryClaimAsync(messageId, workerId)
  3. TryClaimAsync performs an atomic UPDATE ... WHERE Status = 'Pending' AND Id = @id
  4. If rowsAffected == 1, this worker won the claim and processes the message
  5. If rowsAffected == 0, another worker already claimed it — skip

This is safe for any database (PostgreSQL, SQL Server, MySQL, SQLite) because a single-row UPDATE is atomic at the database level.

Configuration

Each worker gets an auto-generated ID by default, or you can assign one explicitly:

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(outbox =>
    {
        outbox.AddProcessor()
              .WithProcessingInterval(TimeSpan.FromSeconds(1))
              .WithBatchSize(50)
              .WithMaxRetries(5)
              .WithAutoCleanup(cleanupAge: TimeSpan.FromDays(30));
    });
});

// Or configure the processor options directly
builder.Services.AddOutboxProcessor(options =>
{
    options.WorkerId = "worker-pod-1";       // Explicit worker ID
    options.ProcessingInterval = TimeSpan.FromSeconds(1);
    options.BatchSize = 100;
});

Custom Store Implementation

If you implement IOutboxStore for a custom database, the TryClaimAsync implementation must be atomic:

public async ValueTask<bool> TryClaimAsync(
    Guid messageId, string workerId, CancellationToken ct)
{
    // The key: UPDATE with WHERE ensures only one worker wins
    var sql = """
        UPDATE outbox_messages
        SET status = 'Processing', claimed_by = @workerId
        WHERE id = @messageId
          AND status = 'Pending'
        """;

    var rowsAffected = await db.ExecuteAsync(sql, new { messageId, workerId }, ct);
    return rowsAffected > 0;
}

The ClaimedBy column is useful for debugging: you can see which worker processed each message. It's optional for the concurrency mechanism itself — the Status column is what prevents duplicates.

On this page