3578 words
18 minutes
Concurrency Models in Golang

GoRoutines#

在使用 Golang 時,你可以輕鬆地使用 go 關鍵字創建一箇 goroutine。

go FunctionCall(params)

這會在背後啓動一箇新的 goroutine,竝立卽返回,不阻塞當前的執行線程。如果想要求值表達式,需要使用匿名函數來包裝。

go func() {
ans := 1 + 1
fmt.Println(ans)
// do something
}()

需要注意的是,如果主線程結束了,程序就會結束,所有的 goroutine 都會被終止。

package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("Goroutine started")
go func() {
time.Sleep(2 * time.Second)
fmt.Println("Goroutine finished")
}()
}

運行這段代碼,你會發現 Goroutine finished 不會被打印出來,因爲主線程結束了,等不到 goroutine 完成。爲了讓 Goroutine 完成,我們需要進行通信。

Channels#

Golang 中提倡通過通信來共享內存,而不是通過共享內存來進行通信。通信的主要方式就是 channels。

通過 make 函數來創建一箇 channel:

ch := make(chan int)

這會創建一箇無緩衝的 channel,對這種 channel,發送和接收操作都會阻塞代碼,直到通信成功。

利用這一點,我們可以讓主線程等待 goroutine 完成:

package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("Goroutine started")
ch := make(chan bool) // create a channel
go func() {
time.Sleep(2 * time.Second)
fmt.Println("Goroutine finished")
ch <- true // send a signal to the channel
}
<-ch // try to receive signals from the channel
// this blocks the main thread until a signal is received
}

在這段代碼中,主線程會阻塞在 <-ch 這一行,直到 goroutine 發送信號到 channel 中。

有時候我們像使用 channel 來傳遞數據流,這時候可以使用帶緩衝的 channel。

func main() {
ch := make(chan int, 10) // create a buffered channel
go func() {
for i := 0; i < 10; i++ {
ch <- i // send data to the channel
}
}()
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-ch) // receive data from the channel
}
}()
time.Sleep(2 * time.Second) // wait for goroutines to finish
}

這段代碼中,我們創建了一箇緩衝區長度爲 10 的 channel,然後在兩個 goroutine 中進行數據的發送和接收。這樣的 channel 可以在不阻塞的情況下進行數據的發送和接收,直到緩衝區滿了。

這裏主線程通過 time.Sleep 等待 goroutine 完成,更好的方法是使用信號來通知主線程。

func main() {
ch := make(chan int, 10)
done := make(chan bool) // create a done channel
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
done <- true // send a signal to the done channel
}()
time.Sleep(2 * time.Second)
<-done // wait for the done signal
}

對第二箇 goroutine,其實它竝不知道 channel 中有多少數據,所以更好的方法是使用 for-range。但 for-range 衹有在 channel 關閉、且消耗完內部緩衝區數據後纔會結束。

NOTE

向 channel 發送數據的 goroutine 有責任關閉 channel。

func main() {
ch := make(chan int, 10)
done := make(chan bool)
go func() {
defer close(ch) // close the channel when done
for i := 0; i < 10; i++ {
ch <- i
}
}()
go func() {
for i := range ch { // use for-range to receive data from the channel
fmt.Println(i)
}
done <- true
}()
time.Sleep(2 * time.Second)
<-done
}
WARNING

從關閉的 channel 接收數據是安全的,但往關閉的 channel 發送數據會引發 panic

所以一定確保關閉 channel 時所有生產者都已經完成了,如果衹有一箇生產者,則令其負責關閉 channel 是最佳實踐。

WARNING

不要試圖讓消費者關閉 channel,除非你眞的知道你在做什麼。

如果緩衝區滿了,發送操作會阻塞,直到有空間可用;如果緩衝區爲空,接收操作會阻塞,直到有數據可用。

這樣的 channel 可以用來實現生產者-消費者模式。注意,如果消费者無法及時消費數據,卽使有緩衝區,生產者也會被阻塞,因爲速度差異下緩衝區早晚會滿。

對於生產快、消費慢的情況,如果消費不必同步,那可以按速率比例安排多箇消費者來消費數據。

import (
"fmt"
"time"
)
func fastProducer(ch chan int, done chan bool) {
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond) // simulate fast producer
ch <- i
}
close(ch) // close the channel when done
done <- true
}
func slowConsumer(ch chan int, done chan bool) {
for i := range ch {
fmt.Println(i)
time.Sleep(1 * time.Second) // simulate slow consumer
}
done <- true
}
func main() {
ch := make(chan int, 2)
done := make(chan bool)
go fastProducer(ch, done)
// Here we know that 2 consumers are enough to keep up with the producer
go slowConsumer(ch, done)
go slowConsumer(ch, done)
// Wait for them to finish
<-done
<-done
<-done
}

這裏,我們分析出兩箇滿費者可以跟上一箇生產者,所以我們創建了兩箇消費者來消費數據。我們使用了 done channel 來通知主線程,因爲知道有 3 箇 goroutine,所以接收了 3 次信號。

更好的方案是使用 sync.WaitGroup

WaitGroup#

通過 done channel 來通知主線程通常衹用在要等某一箇 goroutine 的情況,如果有多箇 goroutine 的話,使用 sync.WaitGroup 會更方便。

package main
import (
"fmt"
"sync"
"time"
)
func sleeper(id int, wg *sync.WaitGroup) {
defer wg.Done() // When this function returns, tell wg to decrement the counter
fmt.Printf("Goroutine %d sleeping\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d awake\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // Increment the WaitGroup counter
go sleeper(i, &wg)
}
wg.Wait() // Wait for all goroutines to finish
fmt.Println("All goroutines finished")
}

每次啓動需要等待的 goroutine 時,使用 wg.Add(1) 增加計數器,當 goroutine 完成時,在 goroutine 内部使用 wg.Done() 減少計數器。 wg.Wait() 會阻塞,直到計數器爲 0 爲止,這樣就確保了所有 goroutine 都完成了。

Context#

context.Context 爲 goroutine 提供了取消信號和截止時間的功能。

import (
"context"
"fmt"
"time"
)
func sleeper(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Woke up and cancelled.")
return
case <-time.After(2 * time.Second):
fmt.Println("Zzzzz")
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go sleeper(ctx)
go sleeper(ctx)
go sleeper(ctx)
time.Sleep(10 * time.Second)
cancel() // Cancel the context
time.Sleep(2 * time.Second) // Wait for goroutines to finish
fmt.Println("Main function finished")
}

我們先創建了一箇上下文,附帶取消函數 cancel,然後啓動了3箇 goroutine。通過 select 語句,會自動地隨機命中一箇可以成功的子句。如果 cancel() 被調用,則 <-ctx.Done() 可以成功,進而結束 goroutine。

但注意,select 語句是隨機命中的,所以 <-ctx.Done() 未必能立刻命中,所以需要等待 goroutine 命中退出。最佳實踐是使用 sync.WaitGroup 來等待所有 goroutine 完成。

import (
"context"
"fmt"
"sync"
"time"
)
func sleeper(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() // When this function returns, tell wg to decrement the counter
for {
select {
case <-ctx.Done():
fmt.Println("Woke up and cancelled.")
return
case <-time.After(2 * time.Second):
fmt.Println("Zzzzz")
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(3) // Increment the WaitGroup counter
go sleeper(ctx)
go sleeper(ctx)
go sleeper(ctx)
time.Sleep(10 * time.Second)
cancel() // Cancel the context
time.Sleep(2 * time.Second) // Wait for goroutines to finish
wg.Wait() // Wait for all goroutines to finish
fmt.Println("Main function finished")
}

現在,我們不需要手動地等 2 秒了,而是改用 wg.Wait() 來等待所有 goroutine 收到 ctx.Done() 信號竝退出。

Common Patterns#

One Producer, One Consumer#

這是最簡單的模式,因爲衹有一箇生產者,讓它關閉 channel 就可以了,完成的信號則通過消費者來傳遞。

package main
import (
"fmt"
"time"
)
func producer(ch chan int) {
defer close(ch) // close the channel when done
for i :=0; i < 10; i++ {
ch <- i
time.Sleep(1 * time.Second)
}
}
func consumer(ch chan int, done chan bool) {
for i := range ch {
fmt.Println(i)
}
done <- true // send a signal to the done channel
}
func main() {
ch := make(chan int)
done := make(chan bool)
go producer(ch)
go consumer(ch, done)
<-done // wait for the done signal
fmt.Println("Main function finished")
}

有時候消費者纔能決定何時处理結束,而非被動等待生產者關閉 channel,這種情況下也不應讓消費者關閉 channel,而是通過消費者來通知生產者關閉 channel。

package main
import (
"fmt"
"time"
)
func producer(ch chan int, done chan bool) {
defer close(ch)
for i :=0; ; i++ {
select {
case ch <- i:
time.Sleep(1 * time.Second)
case <-done:
return
}
}
}
func consumer(ch chan int, done chan bool) {
for i := range ch {
fmt.Println(i)
if i == 9 {
close(done)
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan bool)
go producer(ch, done)
go consumer(ch, done)
<-done
fmt.Println("Main function finished")
}

可以看到,這次生產者不再單純地生產數據,而是通過 select 來在等待消費者的信號與生產數據間選擇。

NOTE

已經關閉的 channel 可以多次接收數據,如果 channel 中沒有數據,會得到相應類型的零值。

這裏我們利用了上述特性,同時做到了通知生產者關閉 channel、通知主線程結束的功能。注意,如果消費者不在關閉 done 後立卽退出,那麼 ch 就不會阻塞,生產者的 select 中的兩箇子句都是可命中的,不保證生產者能夠及時退出,有可能導致消費者打印出下一箇數字,直到生產者命中 <-done、關閉 ch,這時 consumer 纔能退出。

回憶 Channels 中最後一例,我們採用了每箇 goroutine 都向 done 發送 signal 的方案,主線程 <-done 三次來等待所有的 goroutine 完成,那能不能像上例一樣改爲直接關閉 done 呢?答案是不可以!

WARNING

關閉一箇已经關閉的 channel 會引發 panic

Multiple Producers, One Consumer#

這種情況下,前文已經提到過,使用 sync.WaitGroup 來等待所有的生產者完成,這時再由一箇線程來關閉 channel。

package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
ch <- i
time.Sleep(1 * time.Second)
}
}
func consumer(ch chan int) {
for i := range ch {
fmt.Println(i)
}
}
func watchdog(ch chan int, wg *sync.WaitGroup, done chan bool) {
defer close(done)
wg.Wait() // Wait for all producers to finish
close(ch) // Close the channel when done
}
func main() {
ch := make(chan int)
done := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1) // Increment the WaitGroup counter
go producer(ch, &wg)
}
go consumer(ch)
go watchdog(ch, &wg, done)
<-done
fmt.Println("Main function finished")
}

這裏我們使用了 watchdog 來監控所有的生產者,當所有的生產者完成後,關閉 channel,竝通知主線程結束。

同樣,有時候是由消費者來決定何時結束的,這時候就需要讓消費者來通知生產者關閉 channel。

package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int, wg *sync.WaitGroup, done chan bool) {
defer wg.Done()
for i := 0; ; i++ {
select {
case ch <- i:
time.Sleep(1 * time.Second)
case <-done:
return
}
}
}
func consumer(ch chan int, done chan bool) {
for i := range ch {
fmt.Println(i)
if i == 5 {
close(done)
return
}
}
}
func watchdog(ch chan int, wg *sync.WaitGroup, allDone chan bool) {
defer close(allDone)
wg.Wait()
close(ch)
}
func main() {
ch := make(chan int)
allDone := make(chan bool)
done := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go producer(ch, &wg, done)
}
go consumer(ch, done)
go watchdog(ch, &wg, allDone)
<-allDone
fmt.Println("Main function finished")
}

這箇例子組合了前面的兩箇例子,生產者通過 done channel 來通知消費者關閉 channel,而 watchdog 則用來監控所有的生產者,當所有的生產者完成後,通過 allDone channel 來通知主線程結束。

One Producer, Multiple Consumers#

這種情況衹有一箇生產者,所以由它來關閉 channel 卽可。

package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int, done chan bool) {
defer close(ch)
for i := 0; i < 20; i++ {
ch <- i
}
}
func consumer(ch chan int, id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range ch {
fmt.Printf("Consumer %d: %d\n", id, i)
time.Sleep(1 * time.Second)
}
}
func watchdog(done chan bool, wg *sync.WaitGroup) {
defer close(done)
wg.Wait() // Wait for all consumers to finish
}
func main() {
ch := make(chan int)
done := make(chan bool)
var wg sync.WaitGroup
go producer(ch, done)
for i := 0; i < 3; i++ {
wg.Add(1)
go consumer(ch, i, &wg)
}
go watchdog(done, &wg)
<-done
fmt.Println("Main function finished")
}

與多個生產者的情況類似,這裏我們使用 watchdog 來監控所有的消費者,當所有的消費者完成後,通知主線程結束,但關閉 channel 的工作直接交給生產者自己就行了。

同理,如果消費者決定何時結束,則需要消費者來通知生產者關閉 channel。這就有了一箇問題,我們要怎麼通知呢?每箇消費者都有可能做出決定,如果使用無緩衝的 done 來通知會導致後做出決定的消費者阻塞,又不能多次關閉 done,如果用緩衝則不夠優雅。

我們有兩種方案應對,第一種由 watchdog 來监控所有消費者退出後,再由它通知生產者關閉 channel,然後再通知主線程。

package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int, done chan bool) {
defer close(ch)
for i := 0; ; i++ {
select {
case ch <- i:
case <-done:
return
}
}
}
func consumer(ch chan int, id int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range ch {
if i > 10 {
return
}
fmt.Printf("Consumer %d: %d\n", id, i)
time.Sleep(1 * time.Second)
}
}
func watchdog(done chan bool, wg *sync.WaitGroup) {
defer close(done)
wg.Wait() // Wait for all consumers to finish
}
func main() {
ch := make(chan int)
done := make(chan bool)
var wg sync.WaitGroup
go producer(ch, done)
for i := 0; i < 3; i++ {
wg.Add(1)
go consumer(ch, i, &wg)
}
go watchdog(done, &wg)
<-done
fmt.Println("Main function finished")
}

這裏 watchdog 在所有消費者完成後,關閉 done channel,從而起到了通知主線程和生產者的作用。

㬎然這種方案是不夠健壯的,如果我們退出的理由是 i == 10 那麼這種方法就失效了,因爲衹有一箇 goroutine 會拿到這箇數據,所以 watchdog 永遠都等不到所有消費者結束。

應對這種情況,我們確實衹能讓消費者通知生產者和其它消費者。可以用 done 嗎?不能,因爲如果有兩箇消費者決定了退出就會出現多次關閉的問題。這裏,我們要使用前面的 context.Context

package main
import (
"context"
"fmt"
"time"
)
func producer(ctx context.Context, ch chan int) {
defer close(ch)
for i := 0; ; i++ {
select {
case ch <- i:
case <-ctx.Done():
return
}
}
}
func consumer(ch chan int, id int, ctx context.Context, cancel context.CancelFunc) {
defer cancel() // Cancel the context when done
for {
select {
case i := <-ch:
if i == 10 {
return
}
fmt.Printf("Consumer %d: %d\n", id, i)
time.Sleep(1 * time.Second)
case <-ctx.Done():
return
}
}
}
func main() {
ch := make(chan int)
ctx, cancel := context.WithCancel(context.Background())
go producer(ctx, ch)
for i := 0; i < 3; i++ {
go consumer(ch, i, ctx, cancel)
}
<-ctx.Done() // Wait for the context to be cancelled
fmt.Println("Main function finished")
}
NOTE

一箇 context.Context 可以被多次取消,衹有第一次取消生效。

這一優點讓我們可以放心地讓消費者調用 cancel 來取消 Context,而所有消費者、生產者及主線程都衹等待 Context 結束卽可。一箇可能的隱憂是有時要確實地等待所有 goroutine 退出再結束,怎麼做?使用 WaitGroup 卽可!

Multiple Producers, Multiple Consumers#

有了上面的模式,想必已經有能力應對這種情況了,組合第二種與第三種模式卽可。

Concurrency Models in Golang
https://blog.orbitoo.top/posts/golang/goroutines/
Author
Orbitoo
Published at
2025-05-23
License
CC BY-NC-SA 4.0