Introduction#
System.Threading.Channels (available since .NET Core 3.0) provides a high-performance, thread-safe producer-consumer pipeline primitive. It replaces the older BlockingCollection type with a fully async-compatible API. This post covers channels and how to compose them into multi-stage processing pipelines.
Channel Basics#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| using System.Threading.Channels;
// Unbounded channel: unlimited capacity
var channel = Channel.CreateUnbounded<string>();
// Bounded channel: blocks/drops when full
var bounded = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // await when full
// FullMode = BoundedChannelFullMode.DropOldest,
// FullMode = BoundedChannelFullMode.DropNewest,
SingleReader = false,
SingleWriter = false,
});
// Write (producer)
await channel.Writer.WriteAsync("message");
// Read (consumer)
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
// Signal completion
channel.Writer.Complete(); // consumers will drain and exit the foreach
|
Producer-Consumer Pipeline#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
| using System.Threading.Channels;
public class DataPipeline
{
public static async Task RunAsync(CancellationToken cancellationToken)
{
var sourceChannel = Channel.CreateBounded<RawData>(100);
var processedChannel = Channel.CreateBounded<ProcessedData>(100);
// Stage 1: Producer — reads from external source
var producerTask = ProduceAsync(sourceChannel.Writer, cancellationToken);
// Stage 2: Transformer — processes raw data
var transformerTask = TransformAsync(
sourceChannel.Reader,
processedChannel.Writer,
cancellationToken
);
// Stage 3: Consumer — writes to database
var consumerTask = ConsumeAsync(processedChannel.Reader, cancellationToken);
await Task.WhenAll(producerTask, transformerTask, consumerTask);
}
static async Task ProduceAsync(
ChannelWriter<RawData> writer,
CancellationToken ct)
{
try
{
await foreach (var data in FetchFromExternalSource(ct))
{
await writer.WriteAsync(data, ct);
}
}
finally
{
writer.Complete();
}
}
static async Task TransformAsync(
ChannelReader<RawData> reader,
ChannelWriter<ProcessedData> writer,
CancellationToken ct)
{
try
{
await foreach (var raw in reader.ReadAllAsync(ct))
{
var processed = Transform(raw);
await writer.WriteAsync(processed, ct);
}
}
finally
{
writer.Complete();
}
}
static async Task ConsumeAsync(
ChannelReader<ProcessedData> reader,
CancellationToken ct)
{
var batch = new List<ProcessedData>(50);
await foreach (var item in reader.ReadAllAsync(ct))
{
batch.Add(item);
if (batch.Count >= 50)
{
await SaveBatch(batch);
batch.Clear();
}
}
if (batch.Count > 0)
await SaveBatch(batch);
}
}
|
Fan-Out: One Producer, Multiple Consumers#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| // Multiple consumers reading from the same channel
public static async Task FanOutAsync(int workerCount, CancellationToken ct)
{
var channel = Channel.CreateBounded<WorkItem>(200);
// Single producer
var producerTask = ProduceWorkItems(channel.Writer, ct);
// Multiple consumers (fan-out)
var workerTasks = Enumerable.Range(0, workerCount)
.Select(id => ProcessWorkItems(id, channel.Reader, ct))
.ToArray();
await producerTask;
await Task.WhenAll(workerTasks);
}
static async Task ProcessWorkItems(
int workerId,
ChannelReader<WorkItem> reader,
CancellationToken ct)
{
await foreach (var item in reader.ReadAllAsync(ct))
{
await ProcessItem(item);
}
}
|
Backpressure with Bounded Channels#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // Bounded channel provides natural backpressure:
// When the channel is full, WriteAsync awaits — slowing the producer
var channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait, // producer waits when full
});
// Producer is automatically throttled by consumer speed
await foreach (var batch in eventSource.ReadBatchesAsync())
{
foreach (var evt in batch)
{
// If channel is full (consumers are slow), this awaits
await channel.Writer.WriteAsync(evt, cancellationToken);
}
}
|
Channel as a Bounded Buffer in ASP.NET Core#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| // Background service: drain a channel and write to database in batches
public class EventIngestionService : BackgroundService
{
private readonly Channel<DomainEvent> _channel;
private readonly IDbContextFactory<AppDbContext> _dbFactory;
public EventIngestionService(
Channel<DomainEvent> channel,
IDbContextFactory<AppDbContext> dbFactory)
{
_channel = channel;
_dbFactory = dbFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var batch = new List<DomainEvent>(100);
await foreach (var evt in _channel.Reader.ReadAllAsync(stoppingToken))
{
batch.Add(evt);
if (batch.Count >= 100 || _channel.Reader.Count == 0)
{
await using var db = await _dbFactory.CreateDbContextAsync(stoppingToken);
db.Events.AddRange(batch);
await db.SaveChangesAsync(stoppingToken);
batch.Clear();
}
}
}
}
// Register in Program.cs
builder.Services.AddSingleton(Channel.CreateBounded<DomainEvent>(10_000));
builder.Services.AddHostedService<EventIngestionService>();
|
Channels vs BlockingCollection#
| Feature |
Channel |
BlockingCollection |
| Async support |
Full async/await |
Blocking only |
| ReadAllAsync |
Yes |
No |
| Backpressure |
Built-in (bounded) |
Built-in |
| Cancellation |
CancellationToken |
CancellationToken |
| Performance |
Higher |
Lower |
| .NET version |
.NET Core 3.0+ |
.NET Framework 4.0+ |
Conclusion#
System.Threading.Channels is the modern way to implement producer-consumer patterns in .NET. Bounded channels provide automatic backpressure without custom throttling code. ReadAllAsync with await foreach integrates cleanly with cancellation tokens. Use channels for batch processing, event ingestion, and any multi-stage pipeline where work is produced faster than it is consumed.