Outbox Pattern
Reliable message delivery with the transactional outbox pattern
The outbox pattern ensures reliable notification delivery by storing notifications in the database within the same transaction as your business data, then processing them asynchronously.
How It Works
- When a notification is published, it's serialized and stored in the outbox table (within the current transaction)
- The transaction commits atomically — both business data and outbox messages
- A background processor picks up pending outbox messages and publishes them to an external message broker (via
IOutboxMessageBrokerPublisher)
This ensures at-least-once delivery — notifications are never lost even if the application crashes after the transaction commits.
IOutboxStore
public interface IOutboxStore
{
ValueTask SaveAsync(OutboxMessage message, CancellationToken ct);
IAsyncEnumerable<OutboxMessage> GetPendingAsync(int batchSize, CancellationToken ct);
ValueTask MarkAsProcessingAsync(Guid messageId, CancellationToken ct);
ValueTask<bool> TryClaimAsync(Guid messageId, string workerId, CancellationToken ct);
ValueTask MarkAsProcessedAsync(Guid messageId, CancellationToken ct);
ValueTask IncrementRetryAsync(Guid messageId, string error, CancellationToken ct);
ValueTask MoveToDeadLetterAsync(Guid messageId, string reason, CancellationToken ct);
ValueTask<int> CleanupAsync(TimeSpan olderThan, CancellationToken ct);
}OutboxMessage
public class OutboxMessage
{
public Guid Id { get; set; }
public string MessageType { get; set; } // Full type name
public string Payload { get; set; } // JSON serialized
public string? CorrelationId { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? ProcessedAt { get; set; }
public int RetryCount { get; set; }
public int MaxRetries { get; set; } // Per-message retry limit
public string? Error { get; set; }
public OutboxMessageStatus Status { get; set; }
public string? ClaimedBy { get; set; } // Worker that claimed this message
public Dictionary<string, string>? Headers { get; set; }
}
public enum OutboxMessageStatus
{
Pending = 0, // Awaiting processing (includes messages pending retry)
Processing = 1,
Processed = 2,
DeadLettered = 3 // Exceeded max retries, moved to dead letter
}Configuration
builder.Services.AddTurboMediator(m =>
{
m.WithOutbox(options =>
{
options.PublishImmediately = false;
options.MaxRetries = 5;
options.CorrelationIdGenerator = () => Guid.NewGuid().ToString();
});
});Using the Attribute
Mark notifications for outbox delivery:
[WithOutbox]
public record OrderCreatedNotification(Guid OrderId) : INotification;
// With a specific destination (queue/topic)
[WithOutbox("orders-queue")]
public record OrderShippedNotification(Guid OrderId) : INotification;
// With destination, partition key, and custom retry settings
[WithOutbox("payments", PartitionKey = nameof(PaymentProcessedNotification.OrderId), MaxRetries = 10)]
public record PaymentProcessedNotification(Guid OrderId, decimal Amount) : INotification;
// Without destination but with custom settings
[WithOutbox(MaxRetries = 10, PublishImmediately = true)]
public record CriticalNotification(string Data) : INotification;When PublishImmediately = true, the notification is saved to the outbox and published to the broker inline (if one is registered). On failure, the retry count is incremented and the message stays Pending so the background processor can pick it up.
Practical Example
[Transactional]
public record PlaceOrderCommand(Guid CustomerId, List<OrderItem> Items)
: ICommand<Order>;
public class PlaceOrderHandler : ICommandHandler<PlaceOrderCommand, Order>
{
private readonly AppDbContext _db;
private readonly IMediator _mediator;
public PlaceOrderHandler(AppDbContext db, IMediator mediator)
{
_db = db;
_mediator = mediator;
}
public async ValueTask<Order> Handle(PlaceOrderCommand command, CancellationToken ct)
{
var order = new Order { CustomerId = command.CustomerId, /* ... */ };
_db.Orders.Add(order);
// This notification is stored in the outbox, not published immediately
// It will be published after the transaction commits
await _mediator.Publish(new OrderCreatedNotification(order.Id), ct);
return order;
}
}The outbox pattern provides at-least-once delivery. Your notification handlers must be idempotent — designed to handle the same notification being delivered more than once. See the Inbox Pattern for built-in deduplication on the consumer side.
Message Broker Integration
By default the outbox processor marks messages as processed without forwarding them anywhere. To actually deliver messages to an external broker (RabbitMQ, Azure Service Bus, AWS SNS/SQS, Kafka, etc.) you need to:
- Implement
IOutboxMessageBrokerPublisher - Register your implementation
- Enable broker publishing on the processor
IOutboxMessageBrokerPublisher
public interface IOutboxMessageBrokerPublisher
{
/// <summary>Publishes to the default destination.</summary>
ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default);
/// <summary>Publishes to a specific destination (queue / topic).</summary>
ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default);
}The processor calls the two-argument overload when a destination is resolved by the router (see Message Routing), and the single-argument overload otherwise.
Example: RabbitMQ
public class RabbitMqBrokerPublisher : IOutboxMessageBrokerPublisher
{
private readonly IConnection _connection;
private readonly ILogger<RabbitMqBrokerPublisher> _logger;
public RabbitMqBrokerPublisher(IConnection connection, ILogger<RabbitMqBrokerPublisher> logger)
{
_connection = connection;
_logger = logger;
}
public ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default)
=> PublishAsync(message, "outbox-messages", cancellationToken);
public ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default)
{
using var channel = _connection.CreateModel();
channel.QueueDeclare(destination, durable: true, exclusive: false, autoDelete: false);
var body = Encoding.UTF8.GetBytes(message.Payload);
var props = channel.CreateBasicProperties();
props.Persistent = true;
props.MessageId = message.Id.ToString();
props.Type = message.MessageType;
props.CorrelationId = message.CorrelationId;
// Forward any headers stored on the message
if (message.Headers?.Count > 0)
{
props.Headers = new Dictionary<string, object>(
message.Headers.ToDictionary(kv => kv.Key, kv => (object)kv.Value));
}
channel.BasicPublish(exchange: "", routingKey: destination, basicProperties: props, body: body);
_logger.LogDebug("Published outbox message {Id} to queue '{Queue}'", message.Id, destination);
return ValueTask.CompletedTask;
}
}Example: Azure Service Bus
public class ServiceBusBrokerPublisher : IOutboxMessageBrokerPublisher
{
private readonly ServiceBusClient _client;
public ServiceBusBrokerPublisher(ServiceBusClient client) => _client = client;
public ValueTask PublishAsync(OutboxMessage message, CancellationToken cancellationToken = default)
=> PublishAsync(message, "outbox-messages", cancellationToken);
public async ValueTask PublishAsync(OutboxMessage message, string destination, CancellationToken cancellationToken = default)
{
var sender = _client.CreateSender(destination);
var sbMessage = new ServiceBusMessage(message.Payload)
{
MessageId = message.Id.ToString(),
CorrelationId = message.CorrelationId,
ContentType = "application/json",
Subject = message.MessageType,
};
// Partition key for ordered delivery (set by [WithOutbox("dest", PartitionKey = ...)])
if (message.Headers?.TryGetValue("partition-key", out var partitionKey) == true)
sbMessage.PartitionKey = partitionKey;
if (message.Headers is not null)
foreach (var (key, value) in message.Headers)
sbMessage.ApplicationProperties[key] = value;
await sender.SendMessageAsync(sbMessage, cancellationToken);
}
}Registration
builder.Services.AddTurboMediator(m =>
{
m.WithOutbox(outbox =>
{
outbox.AddProcessor()
.PublishToMessageBroker() // enable broker mode
.WithMaxRetries(5)
.WithBatchSize(50);
});
});
// Register your publisher implementation
builder.Services.AddScoped<IOutboxMessageBrokerPublisher, RabbitMqBrokerPublisher>();
// or
builder.Services.AddScoped<IOutboxMessageBrokerPublisher, ServiceBusBrokerPublisher>();If PublishToMessageBroker() is called but no IOutboxMessageBrokerPublisher is registered, the processor logs a warning and stops processing — messages remain Pending. Always register a publisher when broker mode is enabled.
When PublishToMessageBroker() is not called (the default), the processor simply marks messages as Processed after they are picked up. This is useful for local development, testing, or scenarios where you consume outbox messages via direct polling instead of a broker.
Message Routing
The router decides which queue or topic each message type is published to. You can specify the destination directly on the [WithOutbox] attribute or use the fluent builder for global configuration.
Destination via [WithOutbox]
Pass the destination as the first argument and an optional PartitionKey:
[WithOutbox("orders")]
public record OrderCreatedNotification(Guid OrderId) : INotification;
// With partition key for ordered delivery (e.g. Azure Service Bus sessions)
[WithOutbox("payments", PartitionKey = nameof(PaymentProcessedNotification.OrderId))]
public record PaymentProcessedNotification(Guid OrderId, decimal Amount) : INotification;The PartitionKey property names the property on the message whose value is read at publish time and forwarded to the broker (stored in OutboxMessage.Headers["partition-key"]).
When no destination is specified ([WithOutbox]), the router falls back to naming conventions or the default destination.
Fluent Routing Configuration
Use the builder to configure global routing behavior:
builder.Services.AddTurboMediator(m =>
{
m.WithOutbox(outbox =>
{
outbox
.AddProcessor()
.PublishToMessageBroker()
// Auto-generate destination names from the type name
.UseKebabCaseNaming() // OrderCreatedNotification → "order-created-notification"
// or: .UseSnakeCaseNaming() // → "order_created_notification"
// or: .UseTypeNameRouting() // → "OrderCreatedNotification"
.WithDestinationPrefix("myapp.") // "myapp.order-created-notification"
.WithDefaultDestination("fallback") // fallback when no rule matches
// Explicit per-type overrides
.MapType<OrderCreatedNotification>("orders")
.MapType<PaymentProcessedNotification>("payments");
});
});IOutboxMessageRouter
For advanced scenarios you can replace the default router entirely:
public interface IOutboxMessageRouter
{
string GetDestination(string messageType);
string GetDestination<T>();
string GetDestination(Type type);
string? GetPartitionKey(string messageType);
string? GetPartitionKey<T>();
string? GetPartitionKey(Type type);
}Register a custom implementation via DI:
builder.Services.AddSingleton<IOutboxMessageRouter, MyCustomRouter>();Routing Priority
The default OutboxMessageRouter resolves destinations in this order:
- Explicit
TypeMappingsconfigured via.MapType<T>()orOutboxRoutingOptions.MapType() [WithOutbox("destination")]— destination specified on the attribute- Naming convention (
KebabCase/SnakeCase/TypeName) DefaultDestination(fallback, default value:"outbox-messages")
Dead Letter Queue (DLQ)
Messages that exceed their maximum retry attempts are moved to a dead letter status instead of being retried indefinitely. This prevents poison messages from blocking the outbox.
How It Works
OutboxProcessorpicks up a pending message with high retry count- If
RetryCount >= MaxRetries, the message is dead-lettered - If an
IOutboxDeadLetterHandleris registered, it's called first (for alerts, DLQ publishing, etc.) - The message status changes to
DeadLettered— even if the handler throws an exception
IOutboxDeadLetterHandler
public interface IOutboxDeadLetterHandler
{
ValueTask HandleAsync(OutboxMessage message, string reason, CancellationToken ct);
}Example: Alerting on Dead Letters
public class AlertingDeadLetterHandler : IOutboxDeadLetterHandler
{
private readonly ILogger<AlertingDeadLetterHandler> _logger;
private readonly IAlertService _alertService;
public AlertingDeadLetterHandler(
ILogger<AlertingDeadLetterHandler> logger,
IAlertService alertService)
{
_logger = logger;
_alertService = alertService;
}
public async ValueTask HandleAsync(
OutboxMessage message, string reason, CancellationToken ct)
{
_logger.LogCritical(
"Message {MessageId} of type {Type} dead-lettered: {Reason}",
message.Id, message.MessageType, reason);
await _alertService.SendAsync(
$"Outbox DLQ: {message.MessageType}",
$"Message {message.Id} failed after {message.RetryCount} retries. Error: {message.Error}",
ct);
}
}Registration
builder.Services.AddTurboMediator(m =>
{
m.WithOutbox(outbox =>
{
outbox.AddProcessor()
.WithMaxRetries(5)
.WithDeadLetterHandler<AlertingDeadLetterHandler>();
});
});
// Or standalone
builder.Services.AddTurboMediator(m =>
{
m.WithDeadLetterHandler<AlertingDeadLetterHandler>();
});The dead letter handler is optional. Even without one, messages will still be moved to DeadLettered status and stop being retried. The handler allows you to take additional action (alerting, external DLQ, compensation). If the handler throws, the message is still moved to DeadLettered — the error is logged and appended to the reason.
Multi-Worker Concurrency
The outbox processor supports running multiple worker instances safely — whether in the same process, multiple pods, or separate services.
How It Works
The processor uses optimistic concurrency via TryClaimAsync. When a worker picks up a batch of candidates:
GetPendingAsyncreturns candidate messages (a simple SELECT)- For each candidate, the worker calls
TryClaimAsync(messageId, workerId) TryClaimAsyncperforms an atomicUPDATE ... WHERE Status = 'Pending' AND Id = @id- If
rowsAffected == 1, this worker won the claim and processes the message - If
rowsAffected == 0, another worker already claimed it — skip
This is safe for any database (PostgreSQL, SQL Server, MySQL, SQLite) because a single-row UPDATE is atomic at the database level.
Configuration
Each worker gets an auto-generated ID by default, or you can assign one explicitly:
builder.Services.AddTurboMediator(m =>
{
m.WithOutbox(outbox =>
{
outbox.AddProcessor()
.WithProcessingInterval(TimeSpan.FromSeconds(1))
.WithBatchSize(50)
.WithMaxRetries(5)
.WithAutoCleanup(cleanupAge: TimeSpan.FromDays(30));
});
});
// Or configure the processor options directly
builder.Services.AddOutboxProcessor(options =>
{
options.WorkerId = "worker-pod-1"; // Explicit worker ID
options.ProcessingInterval = TimeSpan.FromSeconds(1);
options.BatchSize = 100;
});Custom Store Implementation
If you implement IOutboxStore for a custom database, the TryClaimAsync implementation must be atomic:
public async ValueTask<bool> TryClaimAsync(
Guid messageId, string workerId, CancellationToken ct)
{
// The key: UPDATE with WHERE ensures only one worker wins
var sql = """
UPDATE outbox_messages
SET status = 'Processing', claimed_by = @workerId
WHERE id = @messageId
AND status = 'Pending'
""";
var rowsAffected = await db.ExecuteAsync(sql, new { messageId, workerId }, ct);
return rowsAffected > 0;
}The ClaimedBy column is useful for debugging: you can see which worker processed each message. It's optional for the concurrency mechanism itself — the Status column is what prevents duplicates.