Go Goroutine

Go 非同步運算 (Goroutine)

Goroutine

Goroutine 是由 Go 語言本身管理的輕量級執行緒(User-space Thread),而不是由作業系統管理的執行緒(OS Thread),切換與創建成本極低。

背後使用 M:N 排程模型。這意味著,M 個 Goroutine 會被分配到 N 個作業系統執行緒上執行,當 Goroutine 阻塞時會調度繼續執行其他事情。

Kernel Space

Go Runtime

Goroutine 1

Goroutine 2

Goroutine 3

Goroutine 4

Goroutine 5

OS Thread 1

OS Thread 2

go 關鍵字用於創建 goroutine:

func someFunc(num string) {
fmt.Println(num)
}
func main() {
go someFunc("1")
go someFunc("2")
go someFunc("3")
// 避免 main goroutine 執行完畢關閉,故意等待 2 秒等待其他 goroutine 執行完畢
time.Sleep(time.Second * 2)
fmt.Println("Hi")
}

可以發現每次打印數字的順序都可能不同,這就是 goroutine 背後併發執行代碼的證據。

Channel

不要透過共享記憶體通訊,而是透過通訊來共享記憶體

相較於其他語言共享記憶體並透過「替程式上鎖」來避免 Race Condition,Go 透過 Channel 本身避免了資料爭奪,因為值在 Goroutine 之間以複製方式傳遞。

想像一個傳送門,從一端的 Goroutine 輸入,從另外一端的 Goroutine 輸出。

func main() {
myChannel := make(chan string)
go func() {
myChannel <- "data"
}()
// Blocking,只有關閉或接受訊息才會繼續
msg := <-myChannel
fmt.Println(msg)
}

Unbuffered vs Buffered Channel

Channel 之間有輸入與輸出端,而 Unbuffered 意味著,只要送出 goroutine 就會一直等待直到接收,它沒有任何的緩衝空間:

ch := make(chan int)
go func() {
fmt.Println("sending...")
ch <- 1 // 阻塞到主 goroutine 接收
fmt.Println("sent")
}()
fmt.Println("receiving...")
value := <-ch // 進入此行,送端才會繼續
fmt.Println("received:", value)

而 Buffered 意味著送出並不一定馬上應對接收,而是送出訊息時 Buffer 滿了才堵塞或是接收訊息空了才堵塞。

Terminal window
# 代表目前程式中 goroutine 都在等待某件永遠不會發生的事情(例如 channel 永遠不會再有人送資料或接資料),導致整個執行緒停住。
fatal error: all goroutines are asleep - deadlock!
ch := make(chan int, 2)
ch <- 1 // 不阻塞
ch <- 2 // 不阻塞
ch <- 3 // 會阻塞,因為 buffer 已滿
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2

Buffered channel 常用於「速率不平衡」的生產者-消費者模型,但不適合當作併發控制工具(限制併發數、互斥訪問⋯⋯)。

Close

Channel 可以被關閉。舉例輸入與輸出兩端的 Goroutine 各自透過 Channel 交流,但當輸入端停止寫入資料後輸出端仍持續接收資料就會導致永遠卡住,解決方法是關閉使用完的 Channel。

func main() {
dataChan := make(chan int)
go func() {
for i := 0; i < 1000; i++ {
dataChan <- i
}
close(dataChan)
}()
for n := range dataChan {
fmt.Printf("n = %d\n", n)
}
}

defer

把某段程式排到當前函式結束時才執行,不論正常回傳、panic、還是被 return 提早結束都會執行,不會造成 Wait() 永遠卡住。

func main() {
defer fmt.Println("world")
fmt.Println("hello")
}

WaitGroup

Fork-join🔗 並行計算模型實踐把大任務切成獨立的小任務同時執行並整合起來。

  • Fork: 分叉出多個並行任務
  • Join: 等待所有任務完成後合併結果

WaitGroup 就是實現 fork-join 模式的工具,用來等待一組 Goroutine 全部結束。:

並行執行階段

分派任務

分派任務

分派任務

執行中

執行中

執行中

合併結果

主執行緒 Main Thread

Fork
分岔/拆分

子任務 1

子任務 2

子任務 3

Join
匯合/等待

主執行緒繼續/結束

  • wg.Add(n):宣告要等幾件工作(通常是幾個 goroutine)
  • wg.Done():做完了的訊號,等同 wg.Add(-1)
  • wg.Wait(): Goroutine 在這裡等待全部工作完成後再往下執行。
func main() {
var wg sync.WaitGroup
wg.Add(3) // 等三個 goroutine
for i := 1; i <= 3; i++ {
go func(id int) {
defer wg.Done()
fmt.Println("Worker", id, "done")
}(i)
}
wg.Wait() // 等全部 done
fmt.Println("All workers finished")
}

Select

處理多個 Channel 操作的控制結構,哪個 Channel 先準備好就執行哪個,類似 switch 但專門用於 Channel 通訊。如果多個 case 都可以執行,系統會進行統一偽隨機選擇來決定執行哪一個。

func main() {
myChannel := make(chan string)
myAnotherChannel := make(chan string)
go func() {
myChannel <- "data"
}()
go func() {
myAnotherChannel <- "data2"
}()
// Blocking,只有任一 case 關閉或接受訊息才會繼續
// 如果多個 case 都可以執行,系統會進行統一偽隨機選擇來決定執行哪一個。
select {
case msg := <-myChannel:
fmt.Println("a", msg)
case msg2 := <-myAnotherChannel:
fmt.Println("b", msg2)
}
}
select {
case result := <-ch:
fmt.Println("成功:", result)
case <-time.After(3 * time.Second):
fmt.Println("超時了!")
}

延伸閱讀