Designing a Message Bus Using IAsyncEnumerable<T> and Channels

We’re all familiar with message buses as networked systems, RabbitMQ, Azure Service Bus, Kafka, NATS. But inside many applications, a quieter kind of messaging happens constantly, background services hand off work to each other, pipelines stream data between layers, and modules coordinate without ever crossing a network boundary. For this kind of internal process communication, introducing a full message broker can be a waste of time. The .NET runtime already gives us two powerful tools for building lightweight, resilient pipelines inside a single process,System.Threading.Channels and IAsyncEnumerable<T>.
Together, they form the foundation of a high performance in process message bus, one that supports multiple publishers and consumers, provides backpressure, and integrates cleanly with async/await .
Why Bother with an In Process Bus?
In process buses occupy a useful middle ground between events and queues. They are perfect when:
Components need to communicate asynchronously but still within the same host process.
You want to decouple producers from consumers without introducing infrastructure.
You need streaming behaviour, continuous consumption of messages as they arrive.
You care about flow control and backpressure, preventing runaway memory growth.
Examples include telemetry pipelines, background email dispatchers, integration event routers, or even a miniature “domain event” system within a modular monolith.
Channels and Async Streams
Before diving in, it helps to recall what these primitives do.
Channels provide asynchronous producer consumer queues. Writers call
WriteAsync, readers callReadAsync, and both sides are decoupled yet synchronised through backpressure.IAsyncEnumerable<T>represents asynchronous streams of data, consumable withawait foreach. When used with channels, it creates a natural “push pull” flow that fits perfectly with .NET’s async model.
A Channel<T> essentially becomes a bounded, thread safe buffer that can be streamed through an async iterator.
Core Abstraction: IMessageBus
We’ll start by defining a minimal interface for our in process message bus.
namespace InProcessBus;
public interface IMessageBus
{
ValueTask PublishAsync<T>(T message, CancellationToken token = default);
IAsyncEnumerable<T> SubscribeAsync<T>(CancellationToken token = default);
}
Producers PublishAsync messages of any type, while consumers SubscribeAsync to an asynchronous stream of messages. Each message type effectively acts as its own topic.
Implementing the Channel Registry
We’ll store one Channel per message type. Because different message types may be published concurrently, we’ll use a thread-safe dictionary keyed by Type.
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace InProcessBus;
public sealed class MessageBus(ChannelOptions? options = null) : IMessageBus
{
private readonly ConcurrentDictionary<Type, Channel<object>> _channels = new();
private readonly ChannelOptions _opts = options ?? new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = false,
AllowSynchronousContinuations = false
};
private Channel<object> GetOrCreateChannel(Type type)
=> _channels.GetOrAdd(type, _ => Channel.CreateUnbounded<object>(_opts));
public ValueTask PublishAsync<T>(T message, CancellationToken token = default)
{
var channel = GetOrCreateChannel(typeof(T));
return channel.Writer.WriteAsync(message!, token);
}
public async IAsyncEnumerable<T> SubscribeAsync<T>(
[EnumeratorCancellation] CancellationToken token = default)
{
var channel = GetOrCreateChannel(typeof(T));
while (await channel.Reader.WaitToReadAsync(token))
{
while (channel.Reader.TryRead(out var item))
yield return (T)item!;
}
}
}
With this, any component can publish messages of type T, and any number of subscribers can listen to that stream asynchronously. The simplicity is deceptive, this single class can orchestrate hundreds of thousands of in memory messages per second.
Publishing and Consuming Messages
Here’s how a background service might consume domain events.
public sealed class EmailHandler(MessageBus bus, ILogger<EmailHandler> log) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var evt in bus.SubscribeAsync<UserRegistered>(stoppingToken))
{
log.LogInformation("Sending welcome email to {Email}", evt.Email);
await SendEmailAsync(evt.Email);
}
}
private static Task SendEmailAsync(string to)
{
// Simulate I/O latency
return Task.Delay(100);
}
}
And somewhere else in the system:
await bus.PublishAsync(new UserRegistered("alice@example.com"));
That single call enqueues the message. The background consumer processes it in its own time, fully decoupled from the publisher.
Using Bounded Channels for Backpressure
The example so far uses UnboundedChannelOptions, which risks growing indefinitely under heavy load. For production scenarios, bounded channels are safer.
var bounded = new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
};
var bus = new MessageBus(bounded);
This ensures producers naturally apply backpressure, if consumers fall behind, the PublishAsync call will await until there’s capacity.
Adding Filtering and Topic Isolation
Many systems need to filter messages or route them by topic. Since our bus already differentiates by message type, you can extend it easily with message filters or named topics.
public record LogEvent(string Category, string Message);
await bus.PublishAsync(new LogEvent("Audit", "User login"));
await bus.PublishAsync(new LogEvent("Debug", "Cache miss"));
await foreach (var evt in bus.SubscribeAsync<LogEvent>())
{
if (evt.Category == "Audit")
Console.WriteLine($"AUDIT: {evt.Message}");
}
Filtering is as simple as applying a Where clause inside an await foreach loop.
Ensuring Safe Completion
When shutting down, we want to complete all channels so that consumers can finish gracefully.
public void Complete()
{
foreach (var kvp in _channels)
kvp.Value.Writer.TryComplete();
}
In a hosted application, you’d typically call this during graceful shutdown in IHostApplicationLifetime.ApplicationStopping.
Observability and Diagnostics
Because everything happens in process, adding observability is straightforward.
You can instrument the bus using ActivitySource or simple metrics counters:
private static readonly ActivitySource Tracer = new("InProcessBus");
public async ValueTask PublishAsync<T>(T message, CancellationToken token = default)
{
using var act = Tracer.StartActivity($"publish:{typeof(T).Name}");
var channel = GetOrCreateChannel(typeof(T));
await channel.Writer.WriteAsync(message!, token);
}
You could then export these spans via OpenTelemetry to see how fast messages flow through your system, how many awaiters are pending, and how long subscribers take to process them.
Comparing to External Brokers
This design doesn’t replace distributed message queues, it complements them. An in process bus is memory-local and doesn’t persist messages across restarts. It’s perfect for high speed coordination between components within a single service, not for cross service delivery guarantees. Think of it as the internal plumbing of a service. When you later introduce Azure Service Bus or Kafka, your in process bus becomes the layer that connects local operations to external integration points.
Example in a Modular Monolith
Imagine a modular application with separate assemblies for Users, Orders, and Notifications.
Rather than reference each module directly, they communicate via the bus:
The User module publishes a
UserRegisteredevent.The Notification module subscribes to that event and sends an email.
The Analytics module subscribes too and updates metrics.
All modules live in the same process, yet remain loosely coupled, testable in isolation and replaceable without touching each other’s code.
A Diagnostic Example
The bus can also power internal diagnostics. Suppose you want to stream structured logs to a live dashboard:
await foreach (var evt in bus.SubscribeAsync<LogEvent>())
{
Console.WriteLine($"{DateTime.UtcNow:O} [{evt.Category}] {evt.Message}");
}
Because SubscribeAsync returns IAsyncEnumerable<T>, this stream can feed directly into SignalR, WebSockets, or even Blazor components. You’ve effectively built a lightweight reactive pipeline without any external dependency.
Performance
On a standard workstation, an unbounded Channel<T> can easily push over a million messages per second across tasks when consumers keep up. Bounded channels reduce that slightly due to waiting, but maintain predictable latency. The major performance advantage is memory locality, there’s no serialisation, no network IO, and almost zero GC pressure because messages move as raw object references.
Threading Considerations
Channels are already thread safe. Writers and readers can operate concurrently without locks, but you should avoid blocking inside consumers, always use async APIs. If multiple consumers subscribe to the same message type, they will share the same channel and thus compete for messages (each message delivered once). If you need fan-out (each consumer receives all messages), create a small multiplexer that writes to multiple channels per type.
These design decisions are explicit trade-offs, do you want queue semantics or broadcast semantics?
Because you control the implementation, you can choose either.

An in process message bus built on Channels and IAsyncEnumerable<T> demonstrates how far modern .NET concurrency has evolved. What once required third party frameworks or complex TPL Dataflow setups now fits into a few concise, testable classes.






