TurboMediator
Core

Streaming

Using IAsyncEnumerable for streaming responses with TurboMediator

TurboMediator provides first-class support for streaming responses using IAsyncEnumerable<T>. This is ideal for scenarios where you want to return data incrementally — large datasets, real-time feeds, or server-sent events.

Stream Message Types

InterfaceHandler Interface
IStreamRequest<TResponse>IStreamRequestHandler<TRequest, TResponse>
IStreamCommand<TResponse>IStreamCommandHandler<TCommand, TResponse>
IStreamQuery<TResponse>IStreamQueryHandler<TQuery, TResponse>

Defining Stream Messages

// Stream request
public record GenerateNumbersRequest(int Count) : IStreamRequest<int>;

// Stream query
public record GetAllUsersQuery : IStreamQuery<User>;

// Stream command
public record ProcessItemsCommand(IReadOnlyList<Item> Items) : IStreamCommand<ProcessResult>;

Stream Handlers

Stream handlers return IAsyncEnumerable<T>:

public class GenerateNumbersHandler
    : IStreamRequestHandler<GenerateNumbersRequest, int>
{
    public async IAsyncEnumerable<int> Handle(
        GenerateNumbersRequest request,
        [EnumeratorCancellation] CancellationToken ct)
    {
        for (int i = 1; i <= request.Count; i++)
        {
            ct.ThrowIfCancellationRequested();
            await Task.Delay(100, ct); // Simulate work
            yield return i;
        }
    }
}

public class GetAllUsersHandler
    : IStreamQueryHandler<GetAllUsersQuery, User>
{
    private readonly IUserRepository _repository;

    public GetAllUsersHandler(IUserRepository repository)
    {
        _repository = repository;
    }

    public async IAsyncEnumerable<User> Handle(
        GetAllUsersQuery query,
        [EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var user in _repository.GetAllAsync(ct))
        {
            yield return user;
        }
    }
}

Consuming Streams

Use CreateStream on the mediator to get the IAsyncEnumerable<T>:

// Consume a stream
await foreach (var number in mediator.CreateStream(new GenerateNumbersRequest(10)))
{
    Console.WriteLine($"Received: {number}");
}

// Use with LINQ (System.Linq.Async)
var users = await mediator
    .CreateStream(new GetAllUsersQuery())
    .Where(u => u.IsActive)
    .Take(50)
    .ToListAsync();

Stream Pipeline Behaviors

Streaming has dedicated pipeline interfaces:

IStreamPipelineBehavior

Wraps the entire stream execution:

public class StreamLoggingBehavior<TMessage, TResponse>
    : IStreamPipelineBehavior<TMessage, TResponse>
    where TMessage : IStreamMessage
{
    private readonly ILogger _logger;

    public StreamLoggingBehavior(ILogger<StreamLoggingBehavior<TMessage, TResponse>> logger)
    {
        _logger = logger;
    }

    public async IAsyncEnumerable<TResponse> Handle(
        TMessage message,
        StreamHandlerDelegate<TResponse> next,
        [EnumeratorCancellation] CancellationToken ct)
    {
        _logger.LogInformation("Starting stream for {MessageType}", typeof(TMessage).Name);
        var count = 0;

        await foreach (var item in next())
        {
            count++;
            yield return item;
        }

        _logger.LogInformation("Stream {MessageType} completed with {Count} items",
            typeof(TMessage).Name, count);
    }
}

IStreamPreProcessor

Runs before the stream starts:

public class StreamValidationPreProcessor<TMessage>
    : IStreamPreProcessor<TMessage>
    where TMessage : IStreamMessage
{
    public ValueTask Process(TMessage message, CancellationToken ct)
    {
        // Validate message before streaming starts
        return default;
    }
}

IStreamPostProcessor

Wraps the stream after the handler — can transform, log, or add metrics to each item:

public class StreamMetricsPostProcessor<TMessage, TResponse>
    : IStreamPostProcessor<TMessage, TResponse>
    where TMessage : IStreamMessage
{
    public async IAsyncEnumerable<TResponse> Process(
        TMessage message,
        IAsyncEnumerable<TResponse> stream,
        [EnumeratorCancellation] CancellationToken ct)
    {
        var count = 0;
        await foreach (var item in stream.WithCancellation(ct))
        {
            count++;
            yield return item;
        }
        // Record metrics after stream completes
    }
}

Registering Stream Behaviors

builder.Services.AddTurboMediator(m =>
{
    m.WithStreamPipelineBehavior<StreamLoggingBehavior<IStreamMessage, object>>();
    m.WithStreamPreProcessor<StreamValidationPreProcessor<IStreamMessage>>();
    m.WithStreamPostProcessor<StreamMetricsPostProcessor<IStreamMessage, object>>();
});

SSE Example (Minimal API)

app.MapGet("/stream/numbers", async (HttpContext context, IMediator mediator) =>
{
    context.Response.ContentType = "text/event-stream";

    await foreach (var number in mediator.CreateStream(new GenerateNumbersRequest(100)))
    {
        await context.Response.WriteAsync($"data: {number}\n\n");
        await context.Response.Body.FlushAsync();
    }
});

Stream pipeline behaviors use separate interfaces (IStreamPipelineBehavior, IStreamPreProcessor, IStreamPostProcessor) that are independent from the regular pipeline behaviors.

On this page