All Posts All Posts

Common Go Concurrency Patterns

September 22, 2019·
Software Engineering
·8 min read
Tecker Yu
Tecker Yu
AI Native Cloud Engineer × Part-time Investor

Go’s concurrency patterns are among its most distinctive programming paradigms. This article summarizes the most common concurrency patterns in Go development, including Request-Response pattern, Single State Manager pattern, Producer-Consumer pattern, Pipeline pattern, and more. These patterns help us better organize and manage goroutines, making concurrent programs more robust and elegant. Through learning these patterns, you will be able to better master Go’s concurrency features and write higher quality concurrent code.

Minimal Request-Response Pattern

// The request function returns a receiver based on parameters
func Glob(pattern string) <-chan Item {
	c := make(chan Item)	// 1. Initialize receiver

	// 2. Anonymous goroutine closure executes program
	go func() {
		defer close(c)
		for name, item := range items {
			if ok, _ := filepath.Match(pattern, name); !ok {
				continue
			}
			c <- item
		}
	}()

	// 3. Quickly return receiver so subsequent programs can read from chan
	return c
}

func main() {
	for item := range Glob("[ab]*") {
		fmt.Println(item)
	}
}

Single State Manager

One goroutine holds a global state, other goroutines can access the state held by this goroutine through channels

reads := make(chan *readOp)
writes := make(chan *writeOp)

// State holder
go func() {
    var state = make(map[int]int)
    for {
        select {
        case read := <-reads:
			// Return result
            read.resp <- state[read.key]
        case write := <-writes:
            state[write.key] = write.val
			// Return result
            write.resp <- true
        }
    }
}()

// Requester
go func() {
    for {
        read := &readOp{
            key:  rand.Intn(5),
            resp: make(chan int)}
        reads <- read
        <-read.resp
     }
}()

go func() {
    for {
        write := &writeOp{
            key:  rand.Intn(5),
            val:  rand.Intn(100),
            resp: make(chan bool)}
        writes <- write
        <-write.resp
     }
}()

Producer First, Consumer Later

// Produce first, known number of productions
chanOwner := func() <-chan int {
    results := make(chan int, 5)
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}

consumer := func(results <-chan int) {
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner()
// Consume
consumer(results)

Consumer Notifies Producer

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)
        for {
            select {
            case randStream <- rand.Int():
            case <-done:
				// Stop production, exit
                return
            }
        }
    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
// Consumer actively requests production to stop
close(done)

Multi-channel Merge for Safe Exit (OR channel)

var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { 1
    switch len(channels) {
    case 0: 2
        return nil
    case 1: 3
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}
sig := func(after time.Duration) <-chan interface{}{
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

start := time.Now()
<-or(
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Second),
    sig(1*time.Hour),
    sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))

As long as one of the channels exits, all other channels exit

When a function needs to be executed concurrently, pack return values into structs and pass them via channels

For example: concurrent http.Get

// Unify return values into result struct
type Result struct {
    Error error
    Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp}
            select {
            case <-done:
                return
            case results <- result:
            }
        }
    }()
    return results
} 
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

At the same time, this pattern inspires us to introduce an observer of program state for fault tolerance, which can control the direction of the program simultaneously. In the above example, we default that observer to be main

Error Control

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        errCount++
        if errCount >= 3 {
			// When there are too many errors, break and terminate request goroutine
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

Pipeline Pattern

Break down the production process, decouple different stages, multiple goroutines collaborate while remaining independent from each other Prerequisite for decomposition: Different stages process data of the same type

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(
  done <-chan interface{},
  intStream <-chan int,
  multiplier int,
) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i*multiplier:
            }
        }
    }()
    return multipliedStream
}

add := func(
  done <-chan interface{},
  intStream <-chan int,
  additive int,
) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i+additive:
            }
        }
    }()
    return addedStream
}

// Each stage can safely exit goroutine using done channel
done := make(chan interface{})
defer close(done)

// Each stage returns a channel of the data type to be processed
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

Multiple goroutines connected together through multiple channels of the same type and sharing a done channel

Generator Pattern

Infinite repetitive generation

repeat := func(
    done <-chan interface{},
    values ...interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
		// Infinite loop repeatedly sends slice content until stop signal is received
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

The above pattern can generate a stream. Let’s now read from this stream.

Read on-demand from stream

take := func(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <- valueStream:
            }
        }
    }()
    return takeStream
}

done := make(chan interface{})
defer close(done)

// Take first 10 elements from stream then exit
for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

Worker Pool Pattern

worker := 8
c := make(chan int, l)

var wg sync.WaitGroup
wg.Add(worker)

for i:=0;i<worker;i++ {
	go func(){
		for row := range c {
			for col := range m.m[row] {
				fillPixel(m, row, col)
			}
		}
	}()
}

for row := range m.m {
	c <- row
}
close(c)

Pub-Sub Pattern

Each subscriber holds a channel Each subscriber has a filter for content filtering Filters are typically boolean function types Publisher uses a map to store mappings from subscriber channels to filter functions During each publish, iterate through the map and deliver content after filtering with corresponding filters

Event-Driven Pattern

Often used to trigger other operations during the execution of certain operations such as notifications These event handling operations need to be registered in advance, then triggered at appropriate times

package events

import (
    "time"
)

var UserCreated userCreated

// Define context payload
type UserCreatedPayload struct {
    Email string
    Time  time.Time
}

// Function chain with payload context
type userCreated struct {
    handlers []interface{ Handle(UserCreatedPayload) }
}

// Register adds event handler functions to slice
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
    u.handlers = append(u.handlers, handler)
}

// Trigger sequentially triggers goroutines to execute context-aware handler functions
func (u userCreated) Trigger(payload UserCreatedPayload) {
    for _, handler := range u.handlers {
        go handler.Handle(payload)
    }
}
package main

import (
    "time"

    "github.com/stephenafamo/demo/events"
)

func init() {
    createNotifier := userCreatedNotifier{
        adminEmail: "the.boss@example.com",
        slackHook: "https://hooks.slack.com/services/...",
    }

	// Register handler callbacks
    events.UserCreated.Register(createNotifier)
}

type userCreatedNotifier struct{
    adminEmail string
    slackHook string
}

func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
    // ...
}

func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
    // ...
}

// As long as it matches the handler function signature, internal implementation can be customized
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
    // Do something with our payload
    u.notifyAdmin(payload.Email, payload.Time)
    u.sendToSlack(payload.Email, payload.Time)
}

Triggering event handler functions

package auth

import (
    "time"

    "github.com/stephenafamo/demo/events"
    // Other imported packages
)

func CreateUser() {
    // ...
	// After decoupling, only need to pass required payload, no need to care about specific processing logic
    events.UserCreated.Trigger(events.UserCreatedPayload{
        Email: "new.user@example.com",
        Time: time.Now(),
    })
    // ...
}

Views