GoRoutines
在使用 Golang 時,你可以輕鬆地使用 go 關鍵字創建一箇 goroutine。
go FunctionCall(params)這會在背後啓動一箇新的 goroutine,竝立卽返回,不阻塞當前的執行線程。如果想要求值表達式,需要使用匿名函數來包裝。
go func() { ans := 1 + 1 fmt.Println(ans) // do something}()需要注意的是,如果主線程結束了,程序就會結束,所有的 goroutine 都會被終止。
package main
import ( "fmt" "time")
func main() { fmt.Println("Goroutine started") go func() { time.Sleep(2 * time.Second) fmt.Println("Goroutine finished") }()}運行這段代碼,你會發現 Goroutine finished 不會被打印出來,因爲主線程結束了,等不到 goroutine 完成。爲了讓 Goroutine 完成,我們需要進行通信。
Channels
Golang 中提倡通過通信來共享內存,而不是通過共享內存來進行通信。通信的主要方式就是 channels。
通過 make 函數來創建一箇 channel:
ch := make(chan int)這會創建一箇無緩衝的 channel,對這種 channel,發送和接收操作都會阻塞代碼,直到通信成功。
利用這一點,我們可以讓主線程等待 goroutine 完成:
package main
import ( "fmt" "time")
func main() { fmt.Println("Goroutine started") ch := make(chan bool) // create a channel go func() { time.Sleep(2 * time.Second) fmt.Println("Goroutine finished") ch <- true // send a signal to the channel } <-ch // try to receive signals from the channel // this blocks the main thread until a signal is received}在這段代碼中,主線程會阻塞在 <-ch 這一行,直到 goroutine 發送信號到 channel 中。
有時候我們像使用 channel 來傳遞數據流,這時候可以使用帶緩衝的 channel。
func main() { ch := make(chan int, 10) // create a buffered channel go func() { for i := 0; i < 10; i++ { ch <- i // send data to the channel } }() go func() { for i := 0; i < 10; i++ { fmt.Println(<-ch) // receive data from the channel } }() time.Sleep(2 * time.Second) // wait for goroutines to finish}這段代碼中,我們創建了一箇緩衝區長度爲 10 的 channel,然後在兩個 goroutine 中進行數據的發送和接收。這樣的 channel 可以在不阻塞的情況下進行數據的發送和接收,直到緩衝區滿了。
這裏主線程通過 time.Sleep 等待 goroutine 完成,更好的方法是使用信號來通知主線程。
func main() { ch := make(chan int, 10) done := make(chan bool) // create a done channel go func() { for i := 0; i < 10; i++ { ch <- i } }() go func() { for i := 0; i < 10; i++ { fmt.Println(<-ch) } done <- true // send a signal to the done channel }() time.Sleep(2 * time.Second) <-done // wait for the done signal}對第二箇 goroutine,其實它竝不知道 channel 中有多少數據,所以更好的方法是使用 for-range。但 for-range 衹有在 channel 關閉、且消耗完內部緩衝區數據後纔會結束。
NOTE向 channel 發送數據的 goroutine 有責任關閉 channel。
func main() { ch := make(chan int, 10) done := make(chan bool) go func() { defer close(ch) // close the channel when done for i := 0; i < 10; i++ { ch <- i } }() go func() { for i := range ch { // use for-range to receive data from the channel fmt.Println(i) } done <- true }() time.Sleep(2 * time.Second) <-done}WARNING從關閉的 channel 接收數據是安全的,但往關閉的 channel 發送數據會引發 panic!
所以一定確保關閉 channel 時所有生產者都已經完成了,如果衹有一箇生產者,則令其負責關閉 channel 是最佳實踐。
WARNING不要試圖讓消費者關閉 channel,除非你眞的知道你在做什麼。
如果緩衝區滿了,發送操作會阻塞,直到有空間可用;如果緩衝區爲空,接收操作會阻塞,直到有數據可用。
這樣的 channel 可以用來實現生產者-消費者模式。注意,如果消费者無法及時消費數據,卽使有緩衝區,生產者也會被阻塞,因爲速度差異下緩衝區早晚會滿。
對於生產快、消費慢的情況,如果消費不必同步,那可以按速率比例安排多箇消費者來消費數據。
import ( "fmt" "time")
func fastProducer(ch chan int, done chan bool) { for i := 0; i < 10; i++ { time.Sleep(500 * time.Millisecond) // simulate fast producer ch <- i } close(ch) // close the channel when done done <- true}
func slowConsumer(ch chan int, done chan bool) { for i := range ch { fmt.Println(i) time.Sleep(1 * time.Second) // simulate slow consumer } done <- true}
func main() { ch := make(chan int, 2) done := make(chan bool) go fastProducer(ch, done) // Here we know that 2 consumers are enough to keep up with the producer go slowConsumer(ch, done) go slowConsumer(ch, done) // Wait for them to finish <-done <-done <-done}這裏,我們分析出兩箇滿費者可以跟上一箇生產者,所以我們創建了兩箇消費者來消費數據。我們使用了 done channel 來通知主線程,因爲知道有 3 箇 goroutine,所以接收了 3 次信號。
更好的方案是使用 sync.WaitGroup。
WaitGroup
通過 done channel 來通知主線程通常衹用在要等某一箇 goroutine 的情況,如果有多箇 goroutine 的話,使用 sync.WaitGroup 會更方便。
package main
import ( "fmt" "sync" "time")
func sleeper(id int, wg *sync.WaitGroup) { defer wg.Done() // When this function returns, tell wg to decrement the counter fmt.Printf("Goroutine %d sleeping\n", id) time.Sleep(2 * time.Second) fmt.Printf("Goroutine %d awake\n", id)}
func main() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) // Increment the WaitGroup counter go sleeper(i, &wg) } wg.Wait() // Wait for all goroutines to finish fmt.Println("All goroutines finished")}每次啓動需要等待的 goroutine 時,使用 wg.Add(1) 增加計數器,當 goroutine 完成時,在 goroutine 内部使用 wg.Done() 減少計數器。
wg.Wait() 會阻塞,直到計數器爲 0 爲止,這樣就確保了所有 goroutine 都完成了。
Context
context.Context 爲 goroutine 提供了取消信號和截止時間的功能。
import ( "context" "fmt" "time")
func sleeper(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("Woke up and cancelled.") return case <-time.After(2 * time.Second): fmt.Println("Zzzzz") } }}
func main() { ctx, cancel := context.WithCancel(context.Background()) go sleeper(ctx) go sleeper(ctx) go sleeper(ctx) time.Sleep(10 * time.Second) cancel() // Cancel the context time.Sleep(2 * time.Second) // Wait for goroutines to finish fmt.Println("Main function finished")}我們先創建了一箇上下文,附帶取消函數 cancel,然後啓動了3箇 goroutine。通過 select 語句,會自動地隨機命中一箇可以成功的子句。如果 cancel() 被調用,則 <-ctx.Done() 可以成功,進而結束 goroutine。
但注意,select 語句是隨機命中的,所以 <-ctx.Done() 未必能立刻命中,所以需要等待 goroutine 命中退出。最佳實踐是使用 sync.WaitGroup 來等待所有 goroutine 完成。
import ( "context" "fmt" "sync" "time")
func sleeper(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() // When this function returns, tell wg to decrement the counter for { select { case <-ctx.Done(): fmt.Println("Woke up and cancelled.") return case <-time.After(2 * time.Second): fmt.Println("Zzzzz") } }}
func main() { ctx, cancel := context.WithCancel(context.Background()) wg.Add(3) // Increment the WaitGroup counter go sleeper(ctx) go sleeper(ctx) go sleeper(ctx) time.Sleep(10 * time.Second) cancel() // Cancel the context time.Sleep(2 * time.Second) // Wait for goroutines to finish wg.Wait() // Wait for all goroutines to finish fmt.Println("Main function finished")}現在,我們不需要手動地等 2 秒了,而是改用 wg.Wait() 來等待所有 goroutine 收到 ctx.Done() 信號竝退出。
Common Patterns
One Producer, One Consumer
這是最簡單的模式,因爲衹有一箇生產者,讓它關閉 channel 就可以了,完成的信號則通過消費者來傳遞。
package main
import ( "fmt" "time")
func producer(ch chan int) { defer close(ch) // close the channel when done for i :=0; i < 10; i++ { ch <- i time.Sleep(1 * time.Second) }}
func consumer(ch chan int, done chan bool) { for i := range ch { fmt.Println(i) } done <- true // send a signal to the done channel}
func main() { ch := make(chan int) done := make(chan bool) go producer(ch) go consumer(ch, done) <-done // wait for the done signal fmt.Println("Main function finished")}有時候消費者纔能決定何時处理結束,而非被動等待生產者關閉 channel,這種情況下也不應讓消費者關閉 channel,而是通過消費者來通知生產者關閉 channel。
package main
import ( "fmt" "time")
func producer(ch chan int, done chan bool) { defer close(ch) for i :=0; ; i++ { select { case ch <- i: time.Sleep(1 * time.Second) case <-done: return } }}
func consumer(ch chan int, done chan bool) { for i := range ch { fmt.Println(i) if i == 9 { close(done) return } }}
func main() { ch := make(chan int) done := make(chan bool) go producer(ch, done) go consumer(ch, done) <-done fmt.Println("Main function finished")}可以看到,這次生產者不再單純地生產數據,而是通過 select 來在等待消費者的信號與生產數據間選擇。
NOTE已經關閉的 channel 可以多次接收數據,如果 channel 中沒有數據,會得到相應類型的零值。
這裏我們利用了上述特性,同時做到了通知生產者關閉 channel、通知主線程結束的功能。注意,如果消費者不在關閉 done 後立卽退出,那麼 ch 就不會阻塞,生產者的 select 中的兩箇子句都是可命中的,不保證生產者能夠及時退出,有可能導致消費者打印出下一箇數字,直到生產者命中 <-done、關閉 ch,這時 consumer 纔能退出。
回憶 Channels 中最後一例,我們採用了每箇 goroutine 都向 done 發送 signal 的方案,主線程 <-done 三次來等待所有的 goroutine 完成,那能不能像上例一樣改爲直接關閉 done 呢?答案是不可以!
WARNING關閉一箇已经關閉的 channel 會引發 panic。
Multiple Producers, One Consumer
這種情況下,前文已經提到過,使用 sync.WaitGroup 來等待所有的生產者完成,這時再由一箇線程來關閉 channel。
package main
import ( "fmt" "sync" "time")
func producer(ch chan int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 10; i++ { ch <- i time.Sleep(1 * time.Second) }}
func consumer(ch chan int) { for i := range ch { fmt.Println(i) }}
func watchdog(ch chan int, wg *sync.WaitGroup, done chan bool) { defer close(done) wg.Wait() // Wait for all producers to finish close(ch) // Close the channel when done}
func main() { ch := make(chan int) done := make(chan bool) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) // Increment the WaitGroup counter go producer(ch, &wg) } go consumer(ch) go watchdog(ch, &wg, done) <-done fmt.Println("Main function finished")}這裏我們使用了 watchdog 來監控所有的生產者,當所有的生產者完成後,關閉 channel,竝通知主線程結束。
同樣,有時候是由消費者來決定何時結束的,這時候就需要讓消費者來通知生產者關閉 channel。
package main
import ( "fmt" "sync" "time")
func producer(ch chan int, wg *sync.WaitGroup, done chan bool) { defer wg.Done() for i := 0; ; i++ { select { case ch <- i: time.Sleep(1 * time.Second) case <-done: return } }}
func consumer(ch chan int, done chan bool) { for i := range ch { fmt.Println(i) if i == 5 { close(done) return } }}
func watchdog(ch chan int, wg *sync.WaitGroup, allDone chan bool) { defer close(allDone) wg.Wait() close(ch)}
func main() { ch := make(chan int) allDone := make(chan bool) done := make(chan bool) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go producer(ch, &wg, done) } go consumer(ch, done) go watchdog(ch, &wg, allDone) <-allDone fmt.Println("Main function finished")}這箇例子組合了前面的兩箇例子,生產者通過 done channel 來通知消費者關閉 channel,而 watchdog 則用來監控所有的生產者,當所有的生產者完成後,通過 allDone channel 來通知主線程結束。
One Producer, Multiple Consumers
這種情況衹有一箇生產者,所以由它來關閉 channel 卽可。
package main
import ( "fmt" "sync" "time")
func producer(ch chan int, done chan bool) { defer close(ch) for i := 0; i < 20; i++ { ch <- i }}
func consumer(ch chan int, id int, wg *sync.WaitGroup) { defer wg.Done() for i := range ch { fmt.Printf("Consumer %d: %d\n", id, i) time.Sleep(1 * time.Second) }}
func watchdog(done chan bool, wg *sync.WaitGroup) { defer close(done) wg.Wait() // Wait for all consumers to finish}
func main() { ch := make(chan int) done := make(chan bool) var wg sync.WaitGroup go producer(ch, done) for i := 0; i < 3; i++ { wg.Add(1) go consumer(ch, i, &wg) } go watchdog(done, &wg) <-done fmt.Println("Main function finished")}與多個生產者的情況類似,這裏我們使用 watchdog 來監控所有的消費者,當所有的消費者完成後,通知主線程結束,但關閉 channel 的工作直接交給生產者自己就行了。
同理,如果消費者決定何時結束,則需要消費者來通知生產者關閉 channel。這就有了一箇問題,我們要怎麼通知呢?每箇消費者都有可能做出決定,如果使用無緩衝的 done 來通知會導致後做出決定的消費者阻塞,又不能多次關閉 done,如果用緩衝則不夠優雅。
我們有兩種方案應對,第一種由 watchdog 來监控所有消費者退出後,再由它通知生產者關閉 channel,然後再通知主線程。
package main
import ( "fmt" "sync" "time")
func producer(ch chan int, done chan bool) { defer close(ch) for i := 0; ; i++ { select { case ch <- i: case <-done: return } }}
func consumer(ch chan int, id int, wg *sync.WaitGroup) { defer wg.Done() for i := range ch { if i > 10 { return } fmt.Printf("Consumer %d: %d\n", id, i) time.Sleep(1 * time.Second) }}
func watchdog(done chan bool, wg *sync.WaitGroup) { defer close(done) wg.Wait() // Wait for all consumers to finish}
func main() { ch := make(chan int) done := make(chan bool) var wg sync.WaitGroup go producer(ch, done) for i := 0; i < 3; i++ { wg.Add(1) go consumer(ch, i, &wg) } go watchdog(done, &wg) <-done fmt.Println("Main function finished")}這裏 watchdog 在所有消費者完成後,關閉 done channel,從而起到了通知主線程和生產者的作用。
㬎然這種方案是不夠健壯的,如果我們退出的理由是 i == 10 那麼這種方法就失效了,因爲衹有一箇 goroutine 會拿到這箇數據,所以 watchdog 永遠都等不到所有消費者結束。
應對這種情況,我們確實衹能讓消費者通知生產者和其它消費者。可以用 done 嗎?不能,因爲如果有兩箇消費者決定了退出就會出現多次關閉的問題。這裏,我們要使用前面的 context.Context。
package main
import ( "context" "fmt" "time")
func producer(ctx context.Context, ch chan int) { defer close(ch) for i := 0; ; i++ { select { case ch <- i: case <-ctx.Done(): return } }}
func consumer(ch chan int, id int, ctx context.Context, cancel context.CancelFunc) { defer cancel() // Cancel the context when done for { select { case i := <-ch: if i == 10 { return } fmt.Printf("Consumer %d: %d\n", id, i) time.Sleep(1 * time.Second) case <-ctx.Done(): return } }}
func main() { ch := make(chan int) ctx, cancel := context.WithCancel(context.Background()) go producer(ctx, ch) for i := 0; i < 3; i++ { go consumer(ch, i, ctx, cancel) } <-ctx.Done() // Wait for the context to be cancelled fmt.Println("Main function finished")}NOTE一箇
context.Context可以被多次取消,衹有第一次取消生效。
這一優點讓我們可以放心地讓消費者調用 cancel 來取消 Context,而所有消費者、生產者及主線程都衹等待 Context 結束卽可。一箇可能的隱憂是有時要確實地等待所有 goroutine 退出再結束,怎麼做?
Multiple Producers, Multiple Consumers
有了上面的模式,想必已經有能力應對這種情況了,組合第二種與第三種模式卽可。