[譯] 使用 Goroutines 池來處理基於任務的操作
首發於:https://studygolang.com/articles/14481
我在多個場合都被問到為什麼使用工作池模式,為什麼不在需要的時候啟動所需要的 Goroutines
?我的答案一直是:受限於工作的型別、你所擁有的計算資源和所處平臺的限制,盲目地使用 Goroutines
將會導致程式執行緩慢,進 而傷害整個系統的響應和效能。
每個程式、系統和平臺都有短板。不管是記憶體、 CPU
或者頻寬資源也都不是無限的。因此對於我們的程式來說,減少資源消耗、重用有限資源是非常重要的。工作池恰好提供了這樣一種模式,可以幫助程式管理資源,提供調節資源的選項。
下圖展示了工作池的原理:
如上圖所示,主業務例程提交了100個任務到工作池中。工作池將它們都排入佇列,當一個 Goroutine
空閒,工作池從任務佇列中取出一個任務分配到此 Goroutine
上,此任務將會得到執行。執行完畢後此 Goroutine
將會再次空閒並等待處理其他任務。 Goroutines
的數量和佇列的容量是可配置的,這意味著工作池可以用於程式的效能調節。
Go
語言使用 Goroutine
替代了執行緒。 Go
執行環境管理了一個內部的執行緒池並且在這個池內排程 Goroutines
。執行緒池是最小化 Go
執行環境的負載和最大化程式效能的關鍵手段。當我們建立了一個新的 Goroutine
時, Go
執行環境將在內部執行緒池中管理和排程這個 Goroutine
。這個原理就和作業系統在空閒的 CPU
核心上排程執行緒一樣。通過 Goroutine
我們可以獲得同調度執行緒池一樣的效果,甚至可能更好。
對於處理基於任務的操作我有一個簡單的原則:少即是多。我總是想要知道對於特定操作,最好的結果需要的 Goroutines
的最小值是多少。最好的結果不僅僅是全部的任務需要花費多長時間來完成,同樣還包括處理這些任務對程式、系統和平臺所產生的影響。你必須同時考慮到短期影響和長期影響。
在系統或程式負載較輕的情況下,我們很容易就能獲取到非常快的處理速度。但是某天系統負荷的輕微增加就會導致之前的配置不起作用,而我們並沒有意識到正是我們在嚴重傷害和我們互動的系統。我們可能把資料庫或者網路伺服器用的太狠了,最終造成了系統的宕機。突發的100個併發任務可以執行正常,但是持續一個小時的併發可能就是致命的。
工作池並不是可以解決全世界運算問題的魔力仙女,它卻可以用在你的程式中處理基於任務的操作。它可以根據你的系統表現提供配置選項和控制功能。隨著系統變化,你也有足夠的靈活度來改變。
現在讓我們舉個例子來證明在處理基於任務的操作方面工作池要比盲目的產生 Goroutines
更有效率。我們的測試程式執行某一個任務,它會獲取一個 MongoDB
的連線,在資料庫上執行查詢命令並返回資料。一般的業務中都會有類似的功能。這個測試程式將會提交100個任務到工作池中,執行5次後統計平均執行時間。
開啟終端,執行如下的命令來下載程式碼:
1 export GOPATH=$HOME/example 2 go get github.com/goinggo/workpooltest 3 cd $HOME/example/bin
我們建立一個包含 100 個 Goroutines
的工作池,用它來模擬盲目的根據任務數產生相同數量的 Goroutine
的模型。
1 ./workpooltest 100 off
第一個引數告訴程式建立100個 Goroutines
的工作池,第二個引數告訴程式關閉詳細的日誌輸出。
在我的 Macbook
上,執行上面這個命令的結果是:
1 CPU[8] Routines[100] AmountOfWork[100] Duration[4.599752] MaxRoutines[100] MaxQueued[3] 2 CPU[8] Routines[100] AmountOfWork[100] Duration[5.799874] MaxRoutines[100] MaxQueued[3] 3 CPU[8] Routines[100] AmountOfWork[100] Duration[5.325222] MaxRoutines[100] MaxQueued[3] 4 CPU[8] Routines[100] AmountOfWork[100] Duration[4.652793] MaxRoutines[100] MaxQueued[3] 5 CPU[8] Routines[100] AmountOfWork[100] Duration[4.552223] MaxRoutines[100] MaxQueued[3] 6 Average[4.985973]
輸出結果中的引數含義:
CPU[8]: The number of cores on my machine Routines[100]: The number of routines in the work pool AmountOfWork[100]: The number of tasks to run Duration[4.599752] : The amount of time in seconds the run took MaxRoutines[100]: The max number of routines that were active during the run MaxQueued[3]: The max number of tasks waiting in queued during the run
現在讓我們執行 64 個 Goroutines
的工作池:
CPU[8] Routines[64] AmountOfWork[100] Duration[4.574367] MaxRoutines[64] MaxQueued[35] CPU[8] Routines[64] AmountOfWork[100] Duration[4.549339] MaxRoutines[64] MaxQueued[35] CPU[8] Routines[64] AmountOfWork[100] Duration[4.483110] MaxRoutines[64] MaxQueued[35] CPU[8] Routines[64] AmountOfWork[100] Duration[4.595183] MaxRoutines[64] MaxQueued[35] CPU[8] Routines[64] AmountOfWork[100] Duration[4.579676] MaxRoutines[64] MaxQueued[35] Average[4.556335]
接著是 24 個 Goroutines
的結果:
CPU[8] Routines[24] AmountOfWork[100] Duration[4.595832] MaxRoutines[24] MaxQueued[75] CPU[8] Routines[24] AmountOfWork[100] Duration[4.430000] MaxRoutines[24] MaxQueued[75] CPU[8] Routines[24] AmountOfWork[100] Duration[4.477544] MaxRoutines[24] MaxQueued[75] CPU[8] Routines[24] AmountOfWork[100] Duration[4.550768] MaxRoutines[24] MaxQueued[75] CPU[8] Routines[24] AmountOfWork[100] Duration[4.629989] MaxRoutines[24] MaxQueued[75] Average[4.536827]
最後是 8 個 Goroutines
:
CPU[8] Routines[8] AmountOfWork[100] Duration[4.616843] MaxRoutines[8] MaxQueued[91] CPU[8] Routines[8] AmountOfWork[100] Duration[4.477796] MaxRoutines[8] MaxQueued[91] CPU[8] Routines[8] AmountOfWork[100] Duration[4.841476] MaxRoutines[8] MaxQueued[91] CPU[8] Routines[8] AmountOfWork[100] Duration[4.906065] MaxRoutines[8] MaxQueued[91] CPU[8] Routines[8] AmountOfWork[100] Duration[5.035139] MaxRoutines[8] MaxQueued[91] Average[4.775464]
讓我們收集一下這幾個執行結果:
100 Go Routines : 4.985973 : 64Go Routines : 4.556335 : ~430 Milliseconds Faster 24Go Routines : 4.536827 : ~450 Milliseconds Faster 8Go Routines : 4.775464 : ~210 Milliseconds Faster
上述測試結果告訴我們如果單核執行 3 個 Goroutines
將獲得最好的結果。3 似乎是個神奇的數字,這個配置在我寫的每個 Go 程式中都會產生很好的結果。如果我們執行的程式擁有更多的核心,我們可以簡單地增加 Goroutines
的數量來充分利用更多的資源和能耗。這就意味著如果 MongoDB
可以處理多出來的連線,那麼我們總歸可以調整工作池的尺寸和容量來獲取最優結果。
我們已經證明了對於特定的操作,每個任務都盲目的產生 Goroutines
並不是最好的解決方案。我們來看看工作池的程式碼是怎麼工作的:
工作池的程式碼可以在你下載的程式碼路徑中找到:
cd $HOME/example/src/github.com/goinggo/workpool
workpool.go
這個檔案中包含了所有的程式碼。我移除了全部的註釋和部分程式碼行使我們聚焦在重要的部分。
我們首先看看構建工作池的型別:
1 type WorkPool struct { 2shutdownQueueChannel chan string 3shutdownWorkChannelchan struct{} 4shutdownWaitGroupsync.WaitGroup 5queueChannelchan poolWork 6workChannelchan PoolWorker 7queuedWorkint32 8activeRoutinesint32 9queueCapacityint32 10 } 11 12 type poolWork struct { 13WorkPoolWorker 14ResultChannel chan error 15 } 16 17 type PoolWorker interface { 18DoWork(workRoutine int) 19 }
WorkPool
是代表工作池的公共型別。它實現了兩個 channel
。
WorkChannel
處於工作池的核心位置,它管理著需要處理的工作佇列。所有 vGoroutines
都會等待這個 channel
的訊號。
QueueChannel
用於管理提交工作到 WorkChannel
。 QueueChannel
將工作是否進入佇列的確認提供給呼叫方,它同時負責維護 QueuedWork
和 QueuedCapacity
這兩個計數器。
PoolWork
結構體定義了傳送給 QueueChannel
用於處理進入佇列請求的資料。它包含了涉及到使用者 PoolWorker
物件的介面和一個接收任務已經進入佇列的確認的 channel
。
PoolWorker
的介面定義了 DoWork
函式,其中的一個引數代表了執行此任務的 Goroutines
的內部 id
。此 id
對於記錄日誌和其他針對 Goroutines
級別的事務都很有幫助。
PoolWorker
介面是工作池中用於接收和執行任務的核心。讓我們看一個簡單的客戶端實現:
1 type MyTask struct { 2Name string 3WP *workpool.WorkPool 4 } 5 6 func (mt *MyTask) DoWork(workRoutine int) { 7fmt.Println(mt.Name) 8 9fmt.Printf("*******> WR: %d QW: %d AR: %d\n", 10workRoutine, 11mt.WP.QueuedWork(), 12mt.WP.ActiveRoutines()) 13 14time.Sleep(100 * time.Millisecond) 15 } 16 17 func main() { 18runtime.GOMAXPROCS(runtime.NumCPU()) 19 20workPool := workpool.New(runtime.NumCPU() * 3, 100) 21 22task := MyTask{ 23Name: "A" + strconv.Itoa(i), 24WP: workPool, 25} 26 27err := workPool.PostWork("main", &task) 28 29… 30 }
我建立了一個 MyTask
的型別,它定義了工作執行的狀態。接著我實現一個 MyTask
的函式成員 DoWork
,它同時符合 PoolWorker
介面的簽名。由於 MyTask
實現了 PoolWorker
的介面, MyTask
型別的物件也被認為是 PoolWorker
型別的物件。現在我們把 MyTask
型別的物件傳入 PostWork
方法中。
要學習更多的 Go
語言中介面和基於物件程式設計,可以參考如下連結:
https://www.ardanlabs.com/blog/2013/07/object-oriented-programming-in-go.html
我設定 Go
執行環境使用我本機上的全部 CPU
和核心,我建立了一個 24 個 Goroutines
的工作池。我本機有 8 個核心,就像上面我們得到的結論,每個核心分配 3 個 Goroutines
是比較好的配置。最後一個引數是告訴工作池建立一個容量為 100 個任務的佇列。
接著我建立了一個 MyTask
的物件並且提交到佇列中。為了記錄日誌, PostWork
方法的第一個引數可以設定成呼叫方的名稱。如果呼叫返回的 err
引數是空,表明此任務已經得到提交;如果非空,大概率意味著已經超過了佇列的容量,你的任務未能得到提交。
我們到程式碼內部看看 WorkPool
物件是如何被建立和啟動的:
1 func New(numberOfRoutines int, queueCapacity int32) *WorkPool { 2workPool = WorkPool{ 3shutdownQueueChannel: make(chan string), 4shutdownWorkChannel:make(chan struct{}), 5queueChannel:make(chan poolWork), 6workChannel:make(chan PoolWorker, queueCapacity), 7queuedWork:0, 8activeRoutines:0, 9queueCapacity:queueCapacity, 10} 11 12for workRoutine := 0; workRoutine < numberOfRoutines; workRoutine++ { 13workPool.shutdownWaitGroup.Add(1) 14go workPool.workRoutine(workRoutine) 15} 16 17go workPool.queueRoutine() 18return &workPool 19 }
我們看到在上面的客戶端示例程式碼中 Goroutines
的數量和佇列長度被傳入 New
函式。 WorkChannel
是一個緩衝 channel
,是用於儲存需要處理的工作的佇列。 QueueChannel
是一個非緩衝 channel
,用於同步對 WorkChannel
緩衝區的訪問並維護計數器。
要學習更多關於緩衝和非緩衝 channel
的知識,請訪問此連結:
http://golang.org/doc/effective_go.html#channels
當 channel
初始化完畢後,我們就可以去建立 Goroutines
了。首先我們對每個 Goroutine
的 WaitGroup
加1來關閉它們。接著建立 Goroutines
。最後開啟 QueueRoutine
來接收工作。
要學習關閉 Goroutines
的程式碼和 WaitGroup
是如何工作的,請閱讀此連結:
http://dave.cheney.net/2013/04/30/curious-channels
關閉工作池的實現如下所示:
1 func (wp *WorkPool) Shutdown(goRoutine string) { 2wp.shutdownQueueChannel <- "Down" 3<-wp.sutdownQueueChannel 4 5close(wp.queueChannel) 6close(wp.shutdownQueueChannel) 7 8close(wp.shutdownWorkChannel) 9wp.shutdownWaitGroup.Wait() 10 11close(wp.workChannel) 12 }
Shutdown
函式首先關閉 QueueRoutine
,這樣就不會接收更多的請求。接著關閉 ShutdownWorkChannel
,並等待每個 Goroutine
去對 WaitGroup
計數器做減操作。一旦最後一個 Goroutine
呼叫了 Done
函式,等待函式 Wait
將會返回,工作池將會被關閉。
現在讓我們看看 PostWork
和 QueueRoutine
函式:
1 func (wp *WorkPool) PostWork(goRoutine string, work PoolWorker) (err error) { 2poolWork := poolWork{work, make(chan error)} 3 4defer close(poolWork.ResultChannel) 5 6wp.queueChannel <- poolWork 7return <-poolWork.ResultChannel 8 } 9 func (wp *WorkPool) queueRoutine() { 10for { 11select { 12case <-wp.shutdownQueueChannel: 13wp.shutdownQueueChannel <- "Down" 14return 15 16case queueItem := <-wp.queuechannel: 17if atomic.AddInt32(℘.queuedWork, 0) == wp.queueCapacity { 18queueItem.ResultChannel <- fmt.Errorf("Thread Pool At Capacity") 19continue 20} 21 22atomic.AddInt32(℘.queuedWork, 1) 23 24wp.workChannel <- queueItem.Work 25 26queueItem.ResultChannel <- nil 27break 28} 29} 30 }
PostWork
和 QueueRoutine
函式背後的思想是把對 WorkChannel
緩衝區的訪問序列化,保證佇列順序和維護計數器。當工作被提交到 channel
的時候, Go
執行環境保證它總是會被置於 WorkChannel
的末尾。
當 QueueChannel
收到訊號, QueueRoutine
將會接收到一項工作。程式碼先檢查佇列是否還有空位,如果有 PoolWorker
的物件將排入 WorkChannel
的緩衝區。當所有的事務都排入佇列後,呼叫方將獲得返回結果。
我們來看一下 WorkRoutine
的函式:
1 func (wp *WorkPool) workRoutine(workRoutine int) { 2for { 3select { 4case <-wp.shutdownworkchannel: 5wp.shutdownWaitGroup.Done() 6return 7 8case poolWorker := <-wp.workChannel: 9wp.safelyDoWork(workRoutine, poolWorker) 10break 11} 12} 13 } 14 func (wp *WorkPool) safelyDoWork(workRoutine int, poolWorker PoolWorker) { 15defer catchPanic(nil, "workRoutine", "workpool.WorkPool", "SafelyDoWork") 16defer atomic.AddInt32(℘.activeRoutines, -1) 17 18atomic.AddInt32(℘.queuedWork, -1) 19atomic.AddInt32(℘.activeRoutines, 1) 20 21poolWorker.DoWork(workRoutine) 22 }
Go
執行環境通過向空閒中的 Goroutine
對應的 WorkChannel
傳送訊號的方式給 Goroutine
分配工作。當 channel
接收到訊號, Go
執行環境將會把 channel
緩衝區的第一個任務傳給 Goroutine
來處理。這個 channel
的緩衝區就像是一個先入先出的佇列。
如果全部的 Goroutines
都處於忙碌狀態,那所有的剩下的工作都要等待。只要一個 routine
完成它被分配的工作,它就會返回並繼續等待 WorkChannel
的通知。如果 channel
的緩衝區有工作,那 Go
執行環境將會喚醒這個 Goroutine
。
程式碼使用了 SafelyDo
模式,因為程式碼會呼叫處於使用者模式下的程式碼,存在崩潰的可能,而你肯定不希望 Goroutine
跟著一起停止工作。注意第一個 defer
的宣告,它將會捕獲任何的崩潰,保持程式碼的持續執行。
其他部分的程式碼會安全的增加或減少計數器,通過介面呼叫使用者模式下的部分。
要學習更多捕獲崩潰的知識請閱讀如下的文章:
https://www.ardanlabs.com/blog/2013/06/understanding-defer-panic-and-recover.html
這就是程式碼的核心以及它如何實現了這樣的模式。 WorkPool
優雅的展示了 channel
的使用。我可以使用很少量的程式碼來處理工作。增加佇列的保證機制和計數器的維護都只是小菜一碟。
請從 GoingGo
的程式碼倉庫下載程式碼並且親自試試吧。
via: https://www.ardanlabs.com/blog/2013/09/pool-go-routines-to-process-task.html
作者:William Kennedy
譯者:lebai03
校對:polaris1119
本文由 GCTT 原創編譯,Go語言中文網 榮譽推出