Go Concurrency Patterns That Scale

August 22, 2022

Go’s goroutines and channels make concurrency accessible, but accessibility doesn’t mean simplicity. Race conditions, deadlocks, and goroutine leaks are common pitfalls. These patterns have proven reliable in production systems.

Here are Go concurrency patterns that actually work.

Foundation Patterns

Worker Pool

// Worker pool limits concurrent operations
func WorkerPool(jobs <-chan Job, workers int) <-chan Result {
    results := make(chan Result, len(jobs))

    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                results <- process(job)
            }
        }()
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    return results
}

// Usage
func main() {
    jobs := make(chan Job, 100)
    for _, j := range jobList {
        jobs <- j
    }
    close(jobs)

    results := WorkerPool(jobs, 10)
    for result := range results {
        handleResult(result)
    }
}

Fan-Out, Fan-In

// Fan-out: distribute work to multiple goroutines
func FanOut(input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        outputs[i] = worker(input)
    }
    return outputs
}

func worker(input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for n := range input {
            output <- process(n)
        }
    }()
    return output
}

// Fan-in: merge multiple channels into one
func FanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup

    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                output <- n
            }
        }(input)
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    return output
}

Pipeline

// Pipeline: chain of stages connected by channels
func GenerateNumbers(max int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= max; i++ {
            out <- i
        }
    }()
    return out
}

func Square(input <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range input {
            out <- n * n
        }
    }()
    return out
}

func Filter(input <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range input {
            if predicate(n) {
                out <- n
            }
        }
    }()
    return out
}

// Usage
func main() {
    numbers := GenerateNumbers(100)
    squared := Square(numbers)
    evens := Filter(squared, func(n int) bool { return n%2 == 0 })

    for n := range evens {
        fmt.Println(n)
    }
}

Cancellation and Timeouts

Context-Based Cancellation

func ProcessWithContext(ctx context.Context, items []Item) error {
    for _, item := range items {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            if err := processItem(item); err != nil {
                return err
            }
        }
    }
    return nil
}

// Usage
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := ProcessWithContext(ctx, items); err != nil {
        log.Printf("Processing failed: %v", err)
    }
}

Cancelable Worker

func CancelableWorker(ctx context.Context, jobs <-chan Job) <-chan Result {
    results := make(chan Result)

    go func() {
        defer close(results)
        for {
            select {
            case <-ctx.Done():
                return
            case job, ok := <-jobs:
                if !ok {
                    return
                }
                result := process(job)
                select {
                case results <- result:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()

    return results
}

Error Handling

Error Group

import "golang.org/x/sync/errgroup"

func ProcessAll(ctx context.Context, items []Item) error {
    g, ctx := errgroup.WithContext(ctx)

    for _, item := range items {
        item := item  // Capture loop variable
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()  // Returns first error, cancels others
}

// With limited concurrency
func ProcessAllLimited(ctx context.Context, items []Item, limit int) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(limit)

    for _, item := range items {
        item := item
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

Collecting All Errors

func ProcessCollectErrors(items []Item) []error {
    var mu sync.Mutex
    var errors []error
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1)
        go func(item Item) {
            defer wg.Done()
            if err := processItem(item); err != nil {
                mu.Lock()
                errors = append(errors, err)
                mu.Unlock()
            }
        }(item)
    }

    wg.Wait()
    return errors
}

Synchronization Patterns

Once

type Config struct {
    once   sync.Once
    config *AppConfig
}

func (c *Config) Get() *AppConfig {
    c.once.Do(func() {
        c.config = loadConfigFromFile()
    })
    return c.config
}

Semaphore

type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(max int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, max),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

// Usage
func ProcessWithLimit(items []Item, maxConcurrent int) {
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1)
        sem.Acquire()

        go func(item Item) {
            defer wg.Done()
            defer sem.Release()
            processItem(item)
        }(item)
    }

    wg.Wait()
}

Avoiding Common Pitfalls

Goroutine Leaks

// LEAK: Goroutine blocks forever
func LeakyFunction() <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i  // Blocks if no receiver
        }
    }()
    return ch
}

// FIXED: Context-based cancellation
func NoLeakFunction(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; ; i++ {
            select {
            case ch <- i:
            case <-ctx.Done():
                return
            }
        }
    }()
    return ch
}

Race Conditions

// RACE: Concurrent map access
func RacyCounter() {
    counts := make(map[string]int)
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counts["key"]++  // RACE!
        }()
    }
    wg.Wait()
}

// FIXED: sync.Map or mutex
func SafeCounter() {
    var mu sync.Mutex
    counts := make(map[string]int)
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counts["key"]++
            mu.Unlock()
        }()
    }
    wg.Wait()
}

Key Takeaways

Concurrency in Go is powerful when used correctly. These patterns help.