Go Channels and Select: Advanced Patterns

Go channels are the primary mechanism for goroutine communication. Beyond basic send/receive, channels enable sophisticated coordination patterns: fan-out, fan-in, pipelines, timeouts, and cancellatio

Introduction#

Go channels are the primary mechanism for goroutine communication. Beyond basic send/receive, channels enable sophisticated coordination patterns: fan-out, fan-in, pipelines, timeouts, and cancellation. This post covers the patterns used in production Go systems.

Channel Fundamentals#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Unbuffered: send blocks until receiver is ready (synchronization)
ch := make(chan int)

// Buffered: send blocks only when buffer is full (asynchronous up to capacity)
ch := make(chan int, 10)

// Directional: communicate intent in function signatures
func producer(out chan<- int) { out <- 1 }
func consumer(in <-chan int)  { v := <-in; _ = v }

// Closing channels
close(ch)
v, ok := <-ch  // ok=false if channel is closed and empty
for v := range ch { /* iterates until closed */ }

Pipeline Pattern#

Connect stages with channels; each stage transforms data and passes it forward.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
    "context"
    "fmt"
)

func generate(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func square(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    nums := generate(ctx, 1, 2, 3, 4, 5)
    squares := square(ctx, nums)

    for v := range squares {
        fmt.Println(v)  // 1, 4, 9, 16, 25
    }
}

Fan-Out: One Channel, Multiple Workers#

Distribute work from one channel to multiple goroutines.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func fanOut(in <-chan Job, workers int) []<-chan Result {
    outputs := make([]<-chan Result, workers)
    for i := 0; i < workers; i++ {
        outputs[i] = worker(in)  // all read from same input channel
    }
    return outputs
}

func worker(in <-chan Job) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for job := range in {
            out <- process(job)
        }
    }()
    return out
}

Fan-In: Merge Multiple Channels#

Combine results from multiple goroutines into one channel.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func fanIn(channels ...<-chan Result) <-chan Result {
    merged := make(chan Result)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Select: Non-Deterministic Multi-Channel Operations#

select waits on multiple channels simultaneously. When multiple are ready, one is chosen at random.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func processWithTimeout(jobs <-chan Job, timeout time.Duration) {
    timer := time.NewTimer(timeout)
    defer timer.Stop()

    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return  // channel closed
            }
            process(job)

        case <-timer.C:
            fmt.Println("timeout, stopping")
            return

        case <-time.After(5 * time.Second):
            fmt.Println("idle for 5s, flushing buffer")
        }
    }
}

Done Channel Pattern (Context)#

Signal cancellation to goroutines.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func runWorkers(ctx context.Context, jobs <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("cancelled:", ctx.Err())
            return
        case job, ok := <-jobs:
            if !ok {
                return
            }
            if err := process(job); err != nil {
                // log error, continue
            }
        }
    }
}

// Usage
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
runWorkers(ctx, jobs)

Rate Limiter with time.Ticker#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func rateLimitedProcessor(ctx context.Context, jobs <-chan Job, rps int) {
    ticker := time.NewTicker(time.Second / time.Duration(rps))
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            select {
            case job, ok := <-jobs:
                if !ok {
                    return
                }
                go process(job)  // process asynchronously
            default:
                // no job available, continue waiting
            }
        }
    }
}

Worker Pool#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type WorkerPool struct {
    jobs    chan Job
    results chan Result
    wg      sync.WaitGroup
}

func NewWorkerPool(size int) *WorkerPool {
    p := &WorkerPool{
        jobs:    make(chan Job, size*2),
        results: make(chan Result, size*2),
    }
    for i := 0; i < size; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for job := range p.jobs {
                p.results <- process(job)
            }
        }()
    }
    return p
}

func (p *WorkerPool) Submit(job Job) {
    p.jobs <- job
}

func (p *WorkerPool) Close() <-chan Result {
    close(p.jobs)
    go func() {
        p.wg.Wait()
        close(p.results)
    }()
    return p.results
}

Channel Anti-Patterns#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// WRONG: sending to a nil channel blocks forever
var ch chan int
ch <- 1  // blocks

// WRONG: receiving from a nil channel blocks forever
v := <-ch  // blocks

// WRONG: closing a channel twice panics
close(ch)
close(ch)  // panic: close of closed channel

// WRONG: sending to a closed channel panics
close(ch)
ch <- 1  // panic: send on closed channel

// CORRECT: only the sender should close the channel
// Use sync.Once if multiple goroutines might close
var once sync.Once
closeChannel := func() { once.Do(func() { close(ch) }) }

Conclusion#

Channels and select are Go’s primary concurrency primitives. Use pipelines to connect processing stages, fan-out to parallelize work, fan-in to aggregate results. Always propagate cancellation via context.Context. The most common bugs are closed-channel panics (close only from the sender) and goroutine leaks (always provide a way for goroutines to exit).

Contents