Streaming
Using IAsyncEnumerable for streaming responses with TurboMediator
TurboMediator provides first-class support for streaming responses using IAsyncEnumerable<T>. This is ideal for scenarios where you want to return data incrementally — large datasets, real-time feeds, or server-sent events.
Stream Message Types
| Interface | Handler Interface |
|---|---|
IStreamRequest<TResponse> | IStreamRequestHandler<TRequest, TResponse> |
IStreamCommand<TResponse> | IStreamCommandHandler<TCommand, TResponse> |
IStreamQuery<TResponse> | IStreamQueryHandler<TQuery, TResponse> |
Defining Stream Messages
// Stream request
public record GenerateNumbersRequest(int Count) : IStreamRequest<int>;
// Stream query
public record GetAllUsersQuery : IStreamQuery<User>;
// Stream command
public record ProcessItemsCommand(IReadOnlyList<Item> Items) : IStreamCommand<ProcessResult>;Stream Handlers
Stream handlers return IAsyncEnumerable<T>:
public class GenerateNumbersHandler
: IStreamRequestHandler<GenerateNumbersRequest, int>
{
public async IAsyncEnumerable<int> Handle(
GenerateNumbersRequest request,
[EnumeratorCancellation] CancellationToken ct)
{
for (int i = 1; i <= request.Count; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct); // Simulate work
yield return i;
}
}
}
public class GetAllUsersHandler
: IStreamQueryHandler<GetAllUsersQuery, User>
{
private readonly IUserRepository _repository;
public GetAllUsersHandler(IUserRepository repository)
{
_repository = repository;
}
public async IAsyncEnumerable<User> Handle(
GetAllUsersQuery query,
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var user in _repository.GetAllAsync(ct))
{
yield return user;
}
}
}Consuming Streams
Use CreateStream on the mediator to get the IAsyncEnumerable<T>:
// Consume a stream
await foreach (var number in mediator.CreateStream(new GenerateNumbersRequest(10)))
{
Console.WriteLine($"Received: {number}");
}
// Use with LINQ (System.Linq.Async)
var users = await mediator
.CreateStream(new GetAllUsersQuery())
.Where(u => u.IsActive)
.Take(50)
.ToListAsync();Stream Pipeline Behaviors
Streaming has dedicated pipeline interfaces:
IStreamPipelineBehavior
Wraps the entire stream execution:
public class StreamLoggingBehavior<TMessage, TResponse>
: IStreamPipelineBehavior<TMessage, TResponse>
where TMessage : IStreamMessage
{
private readonly ILogger _logger;
public StreamLoggingBehavior(ILogger<StreamLoggingBehavior<TMessage, TResponse>> logger)
{
_logger = logger;
}
public async IAsyncEnumerable<TResponse> Handle(
TMessage message,
StreamHandlerDelegate<TResponse> next,
[EnumeratorCancellation] CancellationToken ct)
{
_logger.LogInformation("Starting stream for {MessageType}", typeof(TMessage).Name);
var count = 0;
await foreach (var item in next())
{
count++;
yield return item;
}
_logger.LogInformation("Stream {MessageType} completed with {Count} items",
typeof(TMessage).Name, count);
}
}IStreamPreProcessor
Runs before the stream starts:
public class StreamValidationPreProcessor<TMessage>
: IStreamPreProcessor<TMessage>
where TMessage : IStreamMessage
{
public ValueTask Process(TMessage message, CancellationToken ct)
{
// Validate message before streaming starts
return default;
}
}IStreamPostProcessor
Wraps the stream after the handler — can transform, log, or add metrics to each item:
public class StreamMetricsPostProcessor<TMessage, TResponse>
: IStreamPostProcessor<TMessage, TResponse>
where TMessage : IStreamMessage
{
public async IAsyncEnumerable<TResponse> Process(
TMessage message,
IAsyncEnumerable<TResponse> stream,
[EnumeratorCancellation] CancellationToken ct)
{
var count = 0;
await foreach (var item in stream.WithCancellation(ct))
{
count++;
yield return item;
}
// Record metrics after stream completes
}
}Registering Stream Behaviors
builder.Services.AddTurboMediator(m =>
{
m.WithStreamPipelineBehavior<StreamLoggingBehavior<IStreamMessage, object>>();
m.WithStreamPreProcessor<StreamValidationPreProcessor<IStreamMessage>>();
m.WithStreamPostProcessor<StreamMetricsPostProcessor<IStreamMessage, object>>();
});SSE Example (Minimal API)
app.MapGet("/stream/numbers", async (HttpContext context, IMediator mediator) =>
{
context.Response.ContentType = "text/event-stream";
await foreach (var number in mediator.CreateStream(new GenerateNumbersRequest(100)))
{
await context.Response.WriteAsync($"data: {number}\n\n");
await context.Response.Body.FlushAsync();
}
});Stream pipeline behaviors use separate interfaces (IStreamPipelineBehavior, IStreamPreProcessor, IStreamPostProcessor) that are independent from the regular pipeline behaviors.