Go 語言中的同步佇列
問題
假設我們在運營一家 IT 公司,公司裡面有程式員和測試員。為了給個機會他們互相認識對方,並且讓他們能夠在工作中放鬆一點,我們買了一個乒乓球檯,並且制定瞭如下規則:
- 每次只能兩個人(不能少於或多於兩人)玩。
- 只有上一對玩家結束了,下一對玩家才能玩,也就是說,不能只換下一個人。
- 只能是測試員和程式設計師組成一對來玩,(不能出現兩個測試員或者兩個程式設計師一起玩的情況)。如果員工想要玩的話,那麼他得等到有合適的對手了才能開始遊戲。
func main() { for i := 0; i < 10; i++ { go programmer() } for i := 0; i < 5; i++ { go tester() } select {} // 漫長的工作日... } func programmer() { for { code() fmt.Println("Programmer starts") pingPong() fmt.Println("Programmer ends") } } func tester() { for { test() fmt.Println("Tester starts") pingPong() fmt.Println("Tester ends") } }
我們用time.sleep
來模擬測試、開發、和玩乒乓球的行為。
func test() { work() } func code() { work() } func work() { // Sleep up to 10 seconds. time.Sleep(time.Duration(rand.Intn(10000)) * time.Millisecond) } func pingPong() { // Sleep up to 2 seconds. time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) }
這個程式的輸出類似這樣:
> go run pingpong.go Tester starts Programmer starts Programmer starts Tester ends Programmer ends Programmer starts Programmer ends Programmer ends
但是如果我們要按照我們制定的規矩去玩乒乓球的話,那輸出只能是下面四種情況:
Tester starts Programmer starts Tester ends Programmer ends Tester starts Programmer starts Programmer ends Tester ends Programmer starts Tester starts Tester ends Programmer ends Programmer starts Tester starts Programmer ends Tester ends
程式設計師或者測試員先走到乒乓球桌上,然後等待他的合法對手加入。當他們打完離開時,他們離開的順序是任意的。所以只有上述四種輸出序列是有效的。
下面有兩種解決方案,第一種是基於 mutex (互斥量)的,而第二種使用了不同的 worker ,它們協調整個處理的過程,確保所有事情都能按照規則來執行。
解決方案 #1
兩種解決方案都使用了同一種資料結構(queue.Queue
),來使得程式設計師和測試員在走上乒乓球桌之前先排好隊。當至少有一對玩家(一個程式設計師和一個測試員)準備好之後,這一對玩家才能開始玩乒乓球。
func tester(q *queue.Queue) { for { test() q.StartT() fmt.Println("Tester starts") pingPong() fmt.Println("Tester ends") q.EndT() } } func programmer(q *queue.Queue) { for { code() q.StartP() fmt.Println("Programmer starts") pingPong() fmt.Println("Programmer ends") q.EndP() } } func main() { q := queue.New() for i := 0; i < 10; i++ { go programmer(q) } for i := 0; i < 5; i++ { go tester(q) } select {} }
包queue
是這麼定義的:
package queue import "sync" type Queue struct { mutsync.Mutex numP, numTint queueP, queueT, doneP chan int } func New() *Queue { q := Queue{ queueP: make(chan int), queueT: make(chan int), doneP:make(chan int), } return &q } func (q *Queue) StartT() { q.mut.Lock() if q.numP > 0 { q.numP -= 1 q.queueP <- 1 } else { q.numT += 1 q.mut.Unlock() <-q.queueT } } func (q *Queue) EndT() { <-q.doneP q.mut.Unlock() } func (q *Queue) StartP() { q.mut.Lock() if q.numT > 0 { q.numT -= 1 q.queueT <- 1 } else { q.numP += 1 q.mut.Unlock() <-q.queueP } } func (q *Queue) EndP() { q.doneP <- 1 }
佇列裡面的 mutex 有兩個用途:
-
同步共享變數
numT
、numP
的訪問。 - 作為一個令牌,可以開始遊戲的一對玩家才能持有該令牌,其他玩家嘗試進入遊戲會被阻塞。
程式設計師和測試員通過非緩衝的 channel<-q.queueP
或者<-q.queueT
來等待對手。
從這些 channel 接收資料時,如果此時沒有可配對的對手,那麼當前的 goroutine 會被阻塞。
我們來分析一下給測試員呼叫的StartT
函式:
func (q* Queue) StartT() { q.mut.Lock() if q.numP > 0 { q.numP -= 1 q.queueP <- 1 } else { q.numT += 1 q.mut.Unlock() <-q.queueT } }
如果numP
大於 0(表示當前至少有一個程式設計師在等待加入遊戲),那麼正在等待中的程式設計師的數量就會減一,並且有一個正在等待中的程式設計師批准加入遊戲(q.queueP <- 1
)。有趣的是在這個過程中 mutex 不會被釋放掉,這時它的職能就是作為一個允許進入乒乓球桌的令牌。
如果當前沒有正在等待的程式設計師,那麼numT
(等待中的測試員的數量)將會加一,並且當前的 goroutine 會被阻塞在q.queueT
。
StartP
函式基本上是一樣的,只是它是給程式設計師呼叫的。
整個遊戲的過程中,mutex 會被鎖定,所以它需要被程式設計師或者測試員釋放。要釋放 mutex,只能是雙方都結束遊戲了才行,我們使用了doneP
作為一個屏障:
func (q *Queue) EndT() { <-q.doneP q.mut.Unlock() } func (q *Queue) EndP() { q.doneP <- 1 }
如果程式設計師還在遊戲,而測試員已經結束遊戲了,那麼測試員會被阻塞在<-q.doneP
。一旦程式設計師執行到q.doneP<-1
時。這個屏障就會開啟,而 mutex 就能得以釋放,從而使這些員工可以回去繼續工作。
如果測試員還在遊戲,而程式設計師已經結束遊戲了,那麼程式設計師會阻塞在q.done<-1
,直到測試員結束遊戲時,執行<-q.doneP
,從而恢復程式設計師的執行,並且釋放掉 mutex。
這個過程中有趣的是,無論當時是測試員還是程式設計師把 mutex 鎖定的,mutex 永遠都是測試員負責釋放。這也就是為什麼這個解決方案第一看上去沒有那麼直觀。
解決方案 #2
package queue const ( msgPStart = iota msgTStart msgPEnd msgTEnd ) type Queue struct { waitP, waitTint playP, playTbool queueP, queueT chan int msgchan int } func New() *Queue { q := Queue{ msg:make(chan int), queueP: make(chan int), queueT: make(chan int), } go func() { for { select { case n := <-q.msg: switch n { case msgPStart: q.waitP++ case msgPEnd: q.playP = false case msgTStart: q.waitT++ case msgTEnd: q.playT = false } if q.waitP > 0 && q.waitT > 0 && !q.playP && !q.playT { q.playP = true q.playT = true q.waitT-- q.waitP-- q.queueP <- 1 q.queueT <- 1 } } } }() return &q } func (q *Queue) StartT() { q.msg <- msgTStart <-q.queueT } func (q *Queue) EndT() { q.msg <- msgTEnd } func (q *Queue) StartP() { q.msg <- msgPStart <-q.queueP } func (q *Queue) EndP() { q.msg <- msgPEnd }
我們會有個專門的中央協調器在一個獨立的 goroutine 裡面執行,它負責協調整個過程。協調器通過msg
channel 獲取所有想要玩乒乓球的和剛玩完乒乓球的員工的資訊。收到訊息時,排程器的狀態將會更新:
- 等待中的程式設計師或者測試員的數量會增加。
- 正在遊戲的員工的資訊會被更新。
在收到符合定義的訊息時,排程器會檢查現在是否更夠讓一對新的選手開始遊戲:
if q.waitP > 0 && q.waitT > 0 && !q.playP && !q.playT {
如果相應的狀態都已經更新了的話,那麼一個代表程式設計師的 goroutine 和一個代表測試員的 goroutine 將會被喚醒。
我們在這個方案中沒有使用 mutex,而是使用了一個獨立的 goroutine,它通過 channel 與外部世界通訊,這讓我們的程式成為一個更”地道“(符合 Go 語言風格)的 Go 語言程式。
Don’t communicate by sharing memory, share memory by communicating.
不要通過共享記憶體來通訊,而要通過通訊來共享記憶體。
參考資料
- “The Little Book of Semaphores” by Allen B. Downey(譯註:PDF版地址 )
- https://medium.com/golangspec/reusable-barriers-in-golang-156db1f75d0b (譯文: https://studygolang.com/articles/12718)
- https://blog.golang.org/share-memory-by-communicating