Go’s goroutines and channels make concurrency accessible, but accessibility doesn’t mean simplicity. Race conditions, deadlocks, and goroutine leaks are common pitfalls. These patterns have proven reliable in production systems.
Here are Go concurrency patterns that actually work.
Foundation Patterns
Worker Pool
// Worker pool limits concurrent operations
func WorkerPool(jobs <-chan Job, workers int) <-chan Result {
results := make(chan Result, len(jobs))
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- process(job)
}
}()
}
go func() {
wg.Wait()
close(results)
}()
return results
}
// Usage
func main() {
jobs := make(chan Job, 100)
for _, j := range jobList {
jobs <- j
}
close(jobs)
results := WorkerPool(jobs, 10)
for result := range results {
handleResult(result)
}
}
Fan-Out, Fan-In
// Fan-out: distribute work to multiple goroutines
func FanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
outputs[i] = worker(input)
}
return outputs
}
func worker(input <-chan int) <-chan int {
output := make(chan int)
go func() {
defer close(output)
for n := range input {
output <- process(n)
}
}()
return output
}
// Fan-in: merge multiple channels into one
func FanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
output <- n
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
Pipeline
// Pipeline: chain of stages connected by channels
func GenerateNumbers(max int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= max; i++ {
out <- i
}
}()
return out
}
func Square(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range input {
out <- n * n
}
}()
return out
}
func Filter(input <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range input {
if predicate(n) {
out <- n
}
}
}()
return out
}
// Usage
func main() {
numbers := GenerateNumbers(100)
squared := Square(numbers)
evens := Filter(squared, func(n int) bool { return n%2 == 0 })
for n := range evens {
fmt.Println(n)
}
}
Cancellation and Timeouts
Context-Based Cancellation
func ProcessWithContext(ctx context.Context, items []Item) error {
for _, item := range items {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := processItem(item); err != nil {
return err
}
}
}
return nil
}
// Usage
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := ProcessWithContext(ctx, items); err != nil {
log.Printf("Processing failed: %v", err)
}
}
Cancelable Worker
func CancelableWorker(ctx context.Context, jobs <-chan Job) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
result := process(job)
select {
case results <- result:
case <-ctx.Done():
return
}
}
}
}()
return results
}
Error Handling
Error Group
import "golang.org/x/sync/errgroup"
func ProcessAll(ctx context.Context, items []Item) error {
g, ctx := errgroup.WithContext(ctx)
for _, item := range items {
item := item // Capture loop variable
g.Go(func() error {
return processItem(ctx, item)
})
}
return g.Wait() // Returns first error, cancels others
}
// With limited concurrency
func ProcessAllLimited(ctx context.Context, items []Item, limit int) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(limit)
for _, item := range items {
item := item
g.Go(func() error {
return processItem(ctx, item)
})
}
return g.Wait()
}
Collecting All Errors
func ProcessCollectErrors(items []Item) []error {
var mu sync.Mutex
var errors []error
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
if err := processItem(item); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(item)
}
wg.Wait()
return errors
}
Synchronization Patterns
Once
type Config struct {
once sync.Once
config *AppConfig
}
func (c *Config) Get() *AppConfig {
c.once.Do(func() {
c.config = loadConfigFromFile()
})
return c.config
}
Semaphore
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, max),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
// Usage
func ProcessWithLimit(items []Item, maxConcurrent int) {
sem := NewSemaphore(maxConcurrent)
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
sem.Acquire()
go func(item Item) {
defer wg.Done()
defer sem.Release()
processItem(item)
}(item)
}
wg.Wait()
}
Avoiding Common Pitfalls
Goroutine Leaks
// LEAK: Goroutine blocks forever
func LeakyFunction() <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i // Blocks if no receiver
}
}()
return ch
}
// FIXED: Context-based cancellation
func NoLeakFunction(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; ; i++ {
select {
case ch <- i:
case <-ctx.Done():
return
}
}
}()
return ch
}
Race Conditions
// RACE: Concurrent map access
func RacyCounter() {
counts := make(map[string]int)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counts["key"]++ // RACE!
}()
}
wg.Wait()
}
// FIXED: sync.Map or mutex
func SafeCounter() {
var mu sync.Mutex
counts := make(map[string]int)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counts["key"]++
mu.Unlock()
}()
}
wg.Wait()
}
Key Takeaways
- Worker pools limit concurrent operations and resource usage
- Fan-out/fan-in distributes work and collects results
- Pipelines chain processing stages cleanly
- Always use context for cancellation and timeouts
- errgroup simplifies concurrent error handling
- sync.Once for one-time initialization
- Semaphores limit concurrency when worker pools are overkill
- Always handle the case where no one reads from your channel
- Run with -race flag to detect race conditions
- Goroutine leaks are real—always have an exit path
Concurrency in Go is powerful when used correctly. These patterns help.