TurboMediator
Persistence

Inbox Pattern

At-most-once message processing with built-in idempotency and deduplication

The inbox pattern complements the outbox pattern by providing at-most-once processing on the consumer side. While the outbox guarantees messages are sent, the inbox prevents duplicate processing when the same message arrives more than once.

Why You Need It

The outbox pattern provides at-least-once delivery — meaning messages may be delivered more than once (retries, network issues, redeliveries). The inbox pattern deduplicates these messages so your handlers process each one exactly once.

Producer                          Consumer
┌──────────┐   at-least-once     ┌─────────┐
│ Outbox   │ ─────────────────>  │ Inbox   │   at-most-once
│ Pattern  │   (may duplicate)   │ Pattern │  (deduplicates)
└──────────┘                     └─────────┘
           Together = exactly-once semantics

Quick Start

1. Mark messages as idempotent

The simplest way is to implement IIdempotentMessage:

public record ProcessPaymentCommand(
    Guid PaymentId,
    decimal Amount
) : ICommand<PaymentResult>, IIdempotentMessage
{
    public string IdempotencyKey => $"payment:{PaymentId}";
}

2. Register the inbox behavior

builder.Services.AddTurboMediator(m =>
{
    m.UseEntityFramework();
    m.WithInbox();  // Registers InboxBehavior pipeline
});

That's it. Duplicate messages with the same IdempotencyKey will be silently skipped.

Three Ways to Define Idempotency Keys

public record ChargeCustomerCommand(
    Guid PaymentIntentId,
    decimal Amount,
    string CustomerId
) : ICommand<ChargeResult>, IIdempotentMessage
{
    public string IdempotencyKey => $"charge:{PaymentIntentId}";
}

Option 2: [Idempotent] attribute with key property

[Idempotent(KeyProperty = "OrderId")]
public record ProcessOrderCommand(string OrderId, string Data) : ICommand<OrderResult>;

Option 3: [Idempotent] attribute with content hash

When no KeyProperty is specified, the inbox uses a SHA-256 hash of the serialized message content:

[Idempotent]
public record ImportDataCommand(string Source, int BatchNumber) : ICommand<ImportResult>;

Content hashing is deterministic but less explicit. Prefer IIdempotentMessage or KeyProperty for production code — they make the deduplication key obvious and debuggable.

IInboxStore

public interface IInboxStore
{
    ValueTask<bool> HasBeenProcessedAsync(
        string messageId, string handlerType, CancellationToken ct);
    ValueTask RecordAsync(InboxMessage message, CancellationToken ct);
    ValueTask<int> CleanupAsync(TimeSpan olderThan, CancellationToken ct);
}

The store uses a composite key of (MessageId, HandlerType), allowing the same message to be independently tracked across different handlers.

InboxMessage

public class InboxMessage
{
    public string MessageId { get; set; }     // Idempotency key
    public string HandlerType { get; set; }   // Handler that processed it
    public string MessageType { get; set; }   // Full type name
    public DateTime ReceivedAt { get; set; }
    public DateTime? ProcessedAt { get; set; }
}

Configuration

Basic setup

builder.Services.AddTurboMediator(m =>
{
    m.UseEntityFramework();
    m.WithInbox(options =>
    {
        options.RetentionPeriod = TimeSpan.FromDays(30);
        options.EnableAutoCleanup = true;
        options.CleanupInterval = TimeSpan.FromHours(6);
    });
});

Via the outbox builder

builder.Services.AddTurboMediator(m =>
{
    m.WithOutbox(outbox =>
    {
        outbox.UseEfCoreStore()
              .AddProcessor()
              .WithInbox()                               // Enable inbox
              .WithInboxRetention(TimeSpan.FromDays(14)) // Cleanup after 14 days
              .UseEfCoreInboxStore();                    // EF Core for inbox
    });
});

With custom store

builder.Services.AddTurboMediator(m =>
{
    m.WithInbox<CustomInboxStore>();
});

InboxOptions

OptionDefaultDescription
RetentionPeriod7 daysHow long inbox records are kept
EnableAutoCleanuptrueAutomatically clean up old records
CleanupInterval1 hourInterval between cleanup runs

How It Works

1. Message arrives
2. InboxBehavior extracts idempotency key
     ├── IIdempotentMessage.IdempotencyKey
     ├── [Idempotent(KeyProperty = "...")].PropertyValue
     └── [Idempotent] → SHA-256 content hash
3. Check IInboxStore.HasBeenProcessedAsync(key, handlerType)
     ├── Already processed → return default (skip handler)
     └── Not processed → execute handler
4. On success → IInboxStore.RecordAsync(inboxMessage)
5. If recording fails → log warning (message was already processed successfully)

If inbox recording fails after the handler succeeds, the message is not re-processed on retry — the handler already executed. A warning is logged so you can detect this edge case. On the next delivery attempt, the record may not exist, allowing one additional execution. For strict exactly-once guarantees, use a database transaction that wraps both the handler and the inbox record.

Practical Example

// Webhook handler that must never process the same webhook twice
public record ProcessStripeWebhookCommand(
    string WebhookId,
    string EventType,
    string Payload
) : ICommand<WebhookResult>, IIdempotentMessage
{
    public string IdempotencyKey => $"stripe:{WebhookId}";
}

public class ProcessStripeWebhookHandler
    : ICommandHandler<ProcessStripeWebhookCommand, WebhookResult>
{
    private readonly IPaymentService _payments;

    public ProcessStripeWebhookHandler(IPaymentService payments)
        => _payments = payments;

    public async ValueTask<WebhookResult> Handle(
        ProcessStripeWebhookCommand command, CancellationToken ct)
    {
        // Safe: inbox guarantees this runs at most once per WebhookId
        return command.EventType switch
        {
            "payment_intent.succeeded" =>
                await _payments.ConfirmPaymentAsync(command.Payload, ct),
            "charge.refunded" =>
                await _payments.ProcessRefundAsync(command.Payload, ct),
            _ => new WebhookResult(Handled: false)
        };
    }
}

Inbox vs Enterprise Deduplication

TurboMediator provides two deduplication mechanisms:

FeatureInbox PatternEnterprise Deduplication
PackageTurboMediator.PersistenceTurboMediator.Enterprise
StoreIInboxStore (DB-backed)IIdempotencyStore (in-memory/distributed)
Use casePersistent, transactional deduplicationFast, cache-based deduplication
ConcurrencyChecks DB on each callDistributed lock with wait/retry
Response cachingNoYes — returns cached response
Best forOutbox consumers, webhooksAPI endpoints, concurrent request dedup

Use the inbox pattern when you need durable, database-backed deduplication that survives restarts — ideal for outbox consumers and webhook handlers. Use enterprise deduplication when you need fast, cache-based dedup with response caching — ideal for API endpoints.

DbContext Setup

Add the InboxMessage entity to your DbContext:

public class AppDbContext : DbContext
{
    public DbSet<InboxMessage> InboxMessages => Set<InboxMessage>();

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<InboxMessage>(entity =>
        {
            entity.HasKey(e => new { e.MessageId, e.HandlerType }); // Composite key
            entity.Property(e => e.MessageId).HasMaxLength(500);
            entity.Property(e => e.HandlerType).HasMaxLength(500);
            entity.Property(e => e.MessageType).HasMaxLength(500);
            entity.HasIndex(e => e.ProcessedAt); // For cleanup queries
        });
    }
}

On this page