Introduction#
Go channels are the primary mechanism for goroutine communication. Beyond basic send/receive, channels enable sophisticated coordination patterns: fan-out, fan-in, pipelines, timeouts, and cancellation. This post covers the patterns used in production Go systems.
Channel Fundamentals#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // Unbuffered: send blocks until receiver is ready (synchronization)
ch := make(chan int)
// Buffered: send blocks only when buffer is full (asynchronous up to capacity)
ch := make(chan int, 10)
// Directional: communicate intent in function signatures
func producer(out chan<- int) { out <- 1 }
func consumer(in <-chan int) { v := <-in; _ = v }
// Closing channels
close(ch)
v, ok := <-ch // ok=false if channel is closed and empty
for v := range ch { /* iterates until closed */ }
|
Pipeline Pattern#
Connect stages with channels; each stage transforms data and passes it forward.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| package main
import (
"context"
"fmt"
)
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := generate(ctx, 1, 2, 3, 4, 5)
squares := square(ctx, nums)
for v := range squares {
fmt.Println(v) // 1, 4, 9, 16, 25
}
}
|
Fan-Out: One Channel, Multiple Workers#
Distribute work from one channel to multiple goroutines.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func fanOut(in <-chan Job, workers int) []<-chan Result {
outputs := make([]<-chan Result, workers)
for i := 0; i < workers; i++ {
outputs[i] = worker(in) // all read from same input channel
}
return outputs
}
func worker(in <-chan Job) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for job := range in {
out <- process(job)
}
}()
return out
}
|
Fan-In: Merge Multiple Channels#
Combine results from multiple goroutines into one channel.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func fanIn(channels ...<-chan Result) <-chan Result {
merged := make(chan Result)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
|
Select: Non-Deterministic Multi-Channel Operations#
select waits on multiple channels simultaneously. When multiple are ready, one is chosen at random.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func processWithTimeout(jobs <-chan Job, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel closed
}
process(job)
case <-timer.C:
fmt.Println("timeout, stopping")
return
case <-time.After(5 * time.Second):
fmt.Println("idle for 5s, flushing buffer")
}
}
}
|
Done Channel Pattern (Context)#
Signal cancellation to goroutines.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func runWorkers(ctx context.Context, jobs <-chan Job) {
for {
select {
case <-ctx.Done():
fmt.Println("cancelled:", ctx.Err())
return
case job, ok := <-jobs:
if !ok {
return
}
if err := process(job); err != nil {
// log error, continue
}
}
}
}
// Usage
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
runWorkers(ctx, jobs)
|
Rate Limiter with time.Ticker#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func rateLimitedProcessor(ctx context.Context, jobs <-chan Job, rps int) {
ticker := time.NewTicker(time.Second / time.Duration(rps))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
select {
case job, ok := <-jobs:
if !ok {
return
}
go process(job) // process asynchronously
default:
// no job available, continue waiting
}
}
}
}
|
Worker Pool#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| type WorkerPool struct {
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(size int) *WorkerPool {
p := &WorkerPool{
jobs: make(chan Job, size*2),
results: make(chan Result, size*2),
}
for i := 0; i < size; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for job := range p.jobs {
p.results <- process(job)
}
}()
}
return p
}
func (p *WorkerPool) Submit(job Job) {
p.jobs <- job
}
func (p *WorkerPool) Close() <-chan Result {
close(p.jobs)
go func() {
p.wg.Wait()
close(p.results)
}()
return p.results
}
|
Channel Anti-Patterns#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // WRONG: sending to a nil channel blocks forever
var ch chan int
ch <- 1 // blocks
// WRONG: receiving from a nil channel blocks forever
v := <-ch // blocks
// WRONG: closing a channel twice panics
close(ch)
close(ch) // panic: close of closed channel
// WRONG: sending to a closed channel panics
close(ch)
ch <- 1 // panic: send on closed channel
// CORRECT: only the sender should close the channel
// Use sync.Once if multiple goroutines might close
var once sync.Once
closeChannel := func() { once.Do(func() { close(ch) }) }
|
Conclusion#
Channels and select are Go’s primary concurrency primitives. Use pipelines to connect processing stages, fan-out to parallelize work, fan-in to aggregate results. Always propagate cancellation via context.Context. The most common bugs are closed-channel panics (close only from the sender) and goroutine leaks (always provide a way for goroutines to exit).