Common Concurrency Patterns in Go
Minimal Request-Response Pattern
// The function returns a receiver based on parameters
func Glob(pattern string) <-chan Item {
c := make(chan Item) // 1. Initialize receiver
// 2. Anonymous goroutine closure executes the program
go func() {
defer close(c)
for name, item := range items {
if ok, _ := filepath.Match(pattern, name); !ok {
continue
}
c <- item
}
}()
// 3. Return receiver quickly so later code can read from the channel
return c
}
func main() {
for item := range Glob("[ab]*") {
fmt.Println(item)
}
}Single State Holder
One goroutine holds global state. Other goroutines access this state through channels.
reads := make(chan *readOp)
writes := make(chan *writeOp)
// State holder
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
// Return result
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
// Return result
write.resp <- true
}
}
}()
// Requester
go func() {
for {
read := &readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
}
}()
go func() {
for {
write := &writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
}
}()Produce First, Consume Later
// Produce first, known number of items to produce
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}
consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
}
results := chanOwner()
// Consume
consumer(results)Consumer Tells Producer to Stop
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done:
// Stop producing, exit
return
}
}
}()
return randStream
}
done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
// Consumer actively stops production
close(done)Multiple Channel Merge for Safe Exit (OR Channel)
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}sig := func(after time.Duration) <-chan interface{}{
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))When any channel exits, all other channels exit too.
Pack Return Values into Struct When Running Functions Concurrently
Example: Concurrent http.Get
// Unify return values into a result struct
type Result struct {
Error error
Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- result:
}
}
}()
return results
} done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}This pattern also shows we can introduce an observer that controls program flow. In the example above, the observer defaults to main.
Error control:
done := make(chan interface{})
defer close(done)
errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
if errCount >= 3 {
// Break and terminate requesting goroutine when too many errors
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}Pipeline Pattern
Split the production process into stages. Multiple goroutines coordinate while staying independent. Each stage handles the same data type.
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}
add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+additive:
}
}
}()
return addedStream
}
// Each stage can safely exit goroutine with done channel
done := make(chan interface{})
defer close(done)
// Each stage returns a channel of the data type it processes
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}Multiple goroutines connect through multiple channels of the same type and share one done channel.
Generator Pattern
Infinite repetition:
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
// Loop forever repeating slice contents until stop signal
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}The pattern creates a stream. Now read from it:
Read on demand:
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <- valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
// Take first 10 elements from stream then exit
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}Worker Pool Pattern
worker := 8
c := make(chan int, l)
var wg sync.WaitGroup
wg.Add(worker)
for i:=0;i<worker;i++ {
go func(){
for row := range c {
for col := range m.m[row] {
fillPixel(m, row, col)
}
}
}()
}
for row := range m.m {
c <- row
}
close(c)Pub-Sub Pattern
Each subscriber holds a channel. Each subscriber has a filter for content filtering. Filters are typically boolean functions. Publishers use maps to store subscriber channels mapped to filter functions. On each publish, iterate the map and deliver filtered content.
Event-Driven Pattern
Often used to trigger other operations during execution, like notifications. Event handlers register beforehand, then trigger at the right time.
package events
import (
"time"
)
var UserCreated userCreated
// Define context payload
type UserCreatedPayload struct {
Email string
Time time.Time
}
// Function chain with payload context
type userCreated struct {
handlers []interface{ Handle(UserCreatedPayload) }
}
// Register adds event handlers to the slice
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
u.handlers = append(u.handlers, handler)
}
// Trigger starts goroutines to execute context-aware handlers
func (u userCreated) Trigger(payload UserCreatedPayload) {
for _, handler := range u.handlers {
go handler.Handle(payload)
}
}package main
import (
"time"
"github.com/stephenafamo/demo/events"
)
func init() {
createNotifier := userCreatedNotifier{
adminEmail: "the.boss@example.com",
slackHook: "https://hooks.slack.com/services/...",
}
// Register callback handlers
events.UserCreated.Register(createNotifier)
}
type userCreatedNotifier struct{
adminEmail string
slackHook string
}
func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
// ...
}
func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
// ...
}
// Custom handler implementation as long as signature matches
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
// Do something with our payload
u.notifyAdmin(payload.Email, payload.Time)
u.sendToSlack(payload.Email, payload.Time)
}Trigger event handlers:
package auth
import (
"time"
"github.com/stephenafamo/demo/events"
// Other imported packages
)
func CreateUser() {
// ...
// Decoupled - just pass needed payload, no need to know specific handling logic
events.UserCreated.Trigger(events.UserCreatedPayload{
Email: "new.user@example.com",
Time: time.Now(),
})
// ...
}