Enterprise
Deduplication
Prevent duplicate message processing with idempotency support
The deduplication behavior prevents the same message from being processed more than once by using idempotency keys and a pluggable store.
IIdempotentMessage
Mark messages that support deduplication:
public interface IIdempotentMessage
{
string IdempotencyKey { get; }
}
// Example
public record ProcessPaymentCommand(
Guid PaymentId,
decimal Amount,
string Currency
) : ICommand<PaymentResult>, IIdempotentMessage
{
public string IdempotencyKey => $"payment:{PaymentId}";
}IIdempotencyStore
public interface IIdempotencyStore
{
ValueTask<bool> TryAcquireAsync(string key, TimeSpan ttl, CancellationToken ct);
ValueTask<IdempotencyEntry<T>?> GetAsync<T>(string key, CancellationToken ct);
ValueTask SetAsync<T>(string key, T response, TimeSpan ttl, CancellationToken ct);
ValueTask ReleaseAsync(string key, CancellationToken ct);
}Configuration
builder.Services.AddTurboMediator(m =>
{
m.WithDeduplication<IMessage, object>(options =>
{
options.TimeToLive = TimeSpan.FromHours(24);
options.ThrowOnDuplicate = false;
options.MaxWaitRetries = 5;
options.WaitRetryInterval = TimeSpan.FromMilliseconds(200);
options.ReleaseOnError = true;
});
m.WithInMemoryIdempotencyStore();
});DeduplicationOptions
| Option | Default | Description |
|---|---|---|
TimeToLive | 24 hours | How long idempotency records are kept |
ThrowOnDuplicate | false | Throw exception on duplicate vs return cached response |
MaxWaitRetries | 5 | Retries when a concurrent duplicate is being processed |
WaitRetryInterval | 200ms | Delay between retries |
ReleaseOnError | true | Release lock if handler throws |
How It Works
- Checks if
IdempotencyKeyexists in the store - If found → return cached response (or throw if
ThrowOnDuplicate) - If not found → acquire lock and execute handler
- Store the response with the key
- If handler throws and
ReleaseOnErroris true → release the lock
For concurrent duplicate requests:
- First request acquires the lock and processes
- Second request waits (up to
MaxWaitRetries) for the first to complete - Returns the cached response once available
Practical Example
public record ChargeCustomerCommand(
string PaymentIntentId,
decimal Amount,
string CustomerId
) : ICommand<ChargeResult>, IIdempotentMessage
{
public string IdempotencyKey => $"charge:{PaymentIntentId}";
}
public class ChargeCustomerHandler
: ICommandHandler<ChargeCustomerCommand, ChargeResult>
{
private readonly IPaymentGateway _gateway;
public ChargeCustomerHandler(IPaymentGateway gateway) => _gateway = gateway;
public async ValueTask<ChargeResult> Handle(
ChargeCustomerCommand command, CancellationToken ct)
{
// Safe to execute — deduplication ensures this runs only once
// per PaymentIntentId, even with network retries
return await _gateway.ChargeAsync(
command.CustomerId, command.Amount, ct);
}
}Deduplication is essential for webhook handlers, payment processing, and any operation where retries from external systems could cause duplicates.