Golang 常見的併發模式
常見的併發模式
Go語言最吸引人的地方是它內建的併發支援。Go語言併發體系的理論是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process,通訊順序程序)。CSP有著精確的數學模型,並實際應用在了Hoare參與設計的T9000通用計算機上。從NewSqueak、Alef、Limbo到現在的Go語言,對於對CSP有著20多年實戰經驗的Rob Pike來說,他更關注的是將CSP應用在通用程式語言上產生的潛力。作為Go併發程式設計核心的CSP理論的核心概念只有一個:同步通訊。關於同步通訊的話題我們在前面一節已經講過,本節我們將簡單介紹下Go語言中常見的併發模式。
首先要明確一個概念:併發不是並行。併發更關注的是程式的設計層面,併發的程式完全是可以順序執行的,只有在真正的多核CPU上才可能真正地同時執行。並行更關注的是程式的執行層面,並行一般是簡單的大量重複,例如GPU中對影象處理都會有大量的並行運算。為更好的編寫併發程式,從設計之初Go語言就注重如何在程式語言層級上設計一個簡潔安全高效的抽象模型,讓程式設計師專注於分解問題和組合方案,而且不用被執行緒管理和訊號互斥這些繁瑣的操作分散精力。
在併發程式設計中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數語言中,都是通過加鎖等執行緒同步方案來解決這一困難問題,而Go語言卻另闢蹊徑,它將共享的值通過Channel傳遞(實際上多個獨立執行的執行緒很少主動共享資源)。在任意給定的時刻,最好只有一個Goroutine能夠擁有該資源。資料競爭從設計層面上就被杜絕了。為了提倡這種思考方式,Go語言將其併發程式設計哲學化為一句口號:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享記憶體來通訊,而應通過通訊來共享記憶體。
這是更高層次的併發程式設計哲學(通過管道來傳值是Go語言推薦的做法)。雖然像引用計數這類簡單的併發問題通過原子操作或互斥鎖就能很好地實現,但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程式。
併發版本的Hello world
我們先以在一個新的Goroutine中輸出“Hello world”,main
等待後臺執行緒輸出工作完成之後退出,這樣一個簡單的併發程式作為熱身。
併發程式設計的核心概念是同步通訊,但是同步的方式卻有多種。我們先以大家熟悉的互斥量sync.Mutex
來實現同步通訊。根據文件,我們不能直接對一個未加鎖狀態的sync.Mutex
進行解鎖,這會導致執行時異常。下面這種方式並不能保證正常工作:
func main() { var mu sync.Mutex go func(){ fmt.Println("你好, 世界") mu.Lock() }() mu.Unlock() }
因為mu.Lock()
和mu.Unlock()
並不在同一個Goroutine中,所以也就不滿足順序一致性記憶體模型。同時它們也沒有其它的同步事件可以參考,這兩個事件不可排序也就是可以併發的。因為可能是併發的事件,所以main
函式中的mu.Unlock()
很有可能先發生,而這個時刻mu
互斥物件還處於未加鎖的狀態,從而會導致執行時異常。
下面是修復後的程式碼:
func main() { var mu sync.Mutex mu.Lock() go func(){ fmt.Println("你好, 世界") mu.Unlock() }() mu.Lock() }
修復的方式是在main
函式所線上程中執行兩次mu.Lock()
,當第二次加鎖時會因為鎖已經被佔用(不是遞迴鎖)而阻塞,main
函式的阻塞狀態驅動後臺執行緒繼續向前執行。當後臺執行緒執行到mu.Unlock()
時解鎖,此時列印工作已經完成了,解鎖會導致main
函式中的第二個mu.Lock()
阻塞狀態取消,此時後臺執行緒和主執行緒再沒有其它的同步事件參考,它們退出的事件將是併發的:在main
函式退出導致程式退出時,後臺執行緒可能已經退出了,也可能沒有退出。雖然無法確定兩個執行緒退出的時間,但是列印工作是可以正確完成的。
使用sync.Mutex
互斥鎖同步是比較低階的做法。我們現在改用無快取的管道來實現同步:
func main() { done := make(chan int) go func(){ fmt.Println("你好, 世界") <-done }() done <- 1 }
根據Go語言記憶體模型規範,對於從無緩衝Channel進行的接收,發生在對該Channel進行的傳送完成之前。因此,後臺執行緒<-done
接收操作完成之後,main
執行緒的done <- 1
傳送操作才可能完成(從而退出main、退出程式),而此時列印工作已經完成了。
上面的程式碼雖然可以正確同步,但是對管道的快取大小太敏感:如果管道有快取的話,就無法保證main退出之前後臺執行緒能正常列印了。更好的做法是將管道的傳送和接收方向調換一下,這樣可以避免同步事件受管道快取大小的影響:
func main() { done := make(chan int, 1) // 帶快取的管道 go func(){ fmt.Println("你好, 世界") done <- 1 }() <-done }
對於帶緩衝的Channel,對於Channel的第K個接收完成操作發生在第K+C個傳送操作完成之前,其中C是Channel的快取大小。雖然管道是帶快取的,main
執行緒接收完成是在後臺執行緒傳送開始但還未完成的時刻,此時列印工作也是已經完成的。
基於帶快取的管道,我們可以很容易將列印執行緒擴充套件到N個。下面的例子是開啟10個後臺執行緒分別列印:
func main() { done := make(chan int, 10) // 帶 10 個快取 // 開N個後臺列印執行緒 for i := 0; i < cap(done); i++ { go func(){ fmt.Println("你好, 世界") done <- 1 }() } // 等待N個後臺執行緒完成 for i := 0; i < cap(done); i++ { <-done } }
對於這種要等待N個執行緒完成後再進行下一步的同步操作有一個簡單的做法,就是使用sync.WaitGroup
來等待一組事件:
func main() { var wg sync.WaitGroup // 開N個後臺列印執行緒 for i := 0; i < 10; i++ { wg.Add(1) go func() { fmt.Println("你好, 世界") wg.Done() }() } // 等待N個後臺執行緒完成 wg.Wait() }
其中wg.Add(1)
用於增加等待事件的個數,必須確保在後臺執行緒啟動之前執行(如果放到後臺執行緒之中執行則不能保證被正常執行到)。當後臺執行緒完成列印工作之後,呼叫wg.Done()
表示完成一個事件。main
函式的wg.Wait()
是等待全部的事件完成。
生產者消費者模型
併發程式設計中最常見的例子就是生產者消費者模式,該模式主要通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。簡單地說,就是生產者生產一些資料,然後放到成果佇列中,同時消費者從成果佇列中來取這些資料。這樣就讓生產消費變成了非同步的兩個過程。當成果佇列中沒有資料時,消費者就進入飢餓的等待中;而當成果佇列中資料已滿時,生產者則面臨因產品擠壓導致CPU被剝奪的下崗問題。
Go語言實現生產者消費者併發很簡單:
// 生產者: 生成 factor 整數倍的序列 func Producer(factor int, out chan<- int) { for i := 0; ; i++ { out <- i*factor } } // 消費者 func Consumer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { ch := make(chan int, 64) // 成果佇列 go Producer(3, ch) // 生成 3 的倍數的序列 go Producer(5, ch) // 生成 5 的倍數的序列 go Consumer(ch)// 消費 生成的佇列 // 執行一定時間後退出 time.Sleep(5 * time.Second) }
我們開啟了2個Producer
生產流水線,分別用於生成3和5的倍數的序列。然後開啟1個Consumer
消費者執行緒,列印獲取的結果。我們通過在main
函式休眠一定的時間來讓生產者和消費者工作一定時間。正如前面一節說的,這種靠休眠方式是無法保證穩定的輸出結果的。
我們可以讓main
函式儲存阻塞狀態不退出,只有當用戶輸入Ctrl-C
時才真正退出程式:
func main() { ch := make(chan int, 64) // 成果佇列 go Producer(3, ch) // 生成 3 的倍數的序列 go Producer(5, ch) // 生成 5 的倍數的序列 go Consumer(ch)// 消費 生成的佇列 // Ctrl+C 退出 sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) fmt.Printf("quit (%v)\n", <-sig) }
我們這個例子中有2個生產者,並且2個生產者之間並無同步事件可參考,它們是併發的。因此,消費者輸出的結果序列的順序是不確定的,這並沒有問題,生產者和消費者依然可以相互配合工作。
釋出訂閱模型
釋出訂閱(publish-and-subscribe)模型通常被簡寫為pub/sub模型。在這個模型中,訊息生產者成為釋出者(publisher),而訊息消費者則成為訂閱者(subscriber),生產者和消費者是M:N的關係。在傳統生產者和消費者模型中,是將訊息傳送到一個佇列中,而釋出訂閱模型則是將訊息釋出給一個主題。
為此,我們構建了一個名為pubsub
的釋出訂閱模型支援包:
// Package pubsub implements a simple multi-topic pub-sub library. package pubsub import ( "sync" "time" ) type ( subscriber chan interface{}// 訂閱者為一個管道 topicFuncfunc(v interface{}) bool // 主題為一個過濾器 ) // 釋出者物件 type Publisher struct { msync.RWMutex// 讀寫鎖 bufferint// 訂閱佇列的快取大小 timeouttime.Duration// 釋出超時時間 subscribers map[subscriber]topicFunc // 訂閱者資訊 } // 構建一個釋出者物件, 可以設定釋出超時時間和快取佇列的長度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer:buffer, timeout:publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 新增一個新的訂閱者,訂閱全部主題 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } // 新增一個新的訂閱者,訂閱過濾器篩選後的主題 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // 退出訂閱 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() defer p.m.Unlock() delete(p.subscribers, sub) close(sub) } // 釋出一個主題 func (p *Publisher) Publish(v interface{}) { p.m.RLock() defer p.m.RUnlock() var wg sync.WaitGroup for sub, topic := range p.subscribers { wg.Add(1) go p.sendTopic(sub, topic, v, &wg) } wg.Wait() } // 關閉釋出者物件,同時關閉所有的訂閱者管道。 func (p *Publisher) Close() { p.m.Lock() defer p.m.Unlock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } } // 傳送主題,可以容忍一定的超時 func (p *Publisher) sendTopic( sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup, ) { defer wg.Done() if topic != nil && !topic(v) { return } select { case sub <- v: case <-time.After(p.timeout): } }
下面的例子中,有兩個訂閱者分別訂閱了全部主題和含有"golang"的主題:
import "path/to/pubsub" func main() { p := pubsub.NewPublisher(100*time.Millisecond, 10) defer p.Close() all := p.Subscribe() golang := p.SubscribeTopic(func(v interface{}) bool { if s, ok := v.(string); ok { return strings.Contains(s, "golang") } return false }) p.Publish("hello,world!") p.Publish("hello, golang!") go func() { formsg := range all { fmt.Println("all:", msg) } } () go func() { formsg := range golang { fmt.Println("golang:", msg) } } () // 執行一定時間後退出 time.Sleep(3 * time.Second) }
在釋出訂閱模型中,每條訊息都會傳送給多個訂閱者。釋出者通常不會知道、也不關心哪一個訂閱者正在接收主題訊息。訂閱者和釋出者可以在執行時動態新增,是一種鬆散的耦合關係,這使得系統的複雜性可以隨時間的推移而增長。在現實生活中,像天氣預報之類的應用就可以應用這個併發模式。
控制併發數
很多使用者在適應了Go語言強大的併發特性之後,都傾向於編寫最大併發的程式,因為這樣似乎可以提供最大的效能。在現實中我們行色匆匆,但有時卻需要我們放慢腳步享受生活,併發的程式也是一樣:有時候我們需要適當地控制併發的程度,因為這樣不僅僅可給其它的應用/任務讓出/預留一定的CPU資源,也可以適當降低功耗緩解電池的壓力。
在Go語言自帶的godoc程式實現中有一個vfs
的包對應虛擬的檔案系統,在vfs
包下面有一個gatefs
的子包,gatefs
子包的目的就是為了控制訪問該虛擬檔案系統的最大併發數。gatefs
包的應用很簡單:
import ( "golang.org/x/tools/godoc/vfs" "golang.org/x/tools/godoc/vfs/gatefs" ) func main() { fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8)) // ... }
其中vfs.OS("/path")
基於本地檔案系統構造一個虛擬的檔案系統,然後gatefs.New
基於現有的虛擬檔案系統構造一個併發受控的虛擬檔案系統。併發數控制的原理在前面一節已經講過,就是通過帶快取管道的傳送和接收規則來實現最大併發阻塞:
var limit = make(chan int, 3) func main() { for _, w := range work { go func() { limit <- 1 w() <-limit }() } select{} }
不過gatefs
對此做一個抽象型別gate
,增加了enter
和leave
方法分別對應併發程式碼的進入和離開。當超出併發數目限制的時候,enter
方法會阻塞直到併發數降下來為止。
type gate chan bool func (g gate) enter() { g <- true } func (g gate) leave() { <-g }
gatefs
包裝的新的虛擬檔案系統就是將需要控制併發的方法增加了enter
和leave
呼叫而已:
type gatefs struct { fs vfs.FileSystem gate } func (fs gatefs) Lstat(p string) (os.FileInfo, error) { fs.enter() defer fs.leave() return fs.fs.Lstat(p) }
我們不僅可以控制最大的併發數目,而且可以通過帶快取Channel的使用量和最大容量比例來判斷程式執行的併發率。當管道為空的時候可以認為是空閒狀態,當管道滿了時任務是繁忙狀態,這對於後臺一些低階任務的執行是有參考價值的。
贏者為王
採用併發程式設計的動機有很多:併發程式設計可以簡化問題,比如一類問題對應一個處理執行緒會更簡單;併發程式設計還可以提升效能,在一個多核CPU上開2個執行緒一般會比開1個執行緒快一些。其實對於提升效能而言,程式並不是簡單地執行速度快就表示使用者體驗好的;很多時候程式能快速響應使用者請求才是最重要的,當沒有使用者請求需要處理的時候才合適處理一些低優先順序的後臺任務。
假設我們想快速地搜尋“golang”相關的主題,我們可能會同時開啟Bing、Google或百度等多個檢索引擎。當某個搜尋最先返回結果後,就可以關閉其它搜尋頁面了。因為受網路環境和搜尋引擎演算法的影響,某些搜尋引擎可能很快返回搜尋結果,某些搜尋引擎也可能等到他們公司倒閉也沒有完成搜尋。我們可以採用類似的策略來編寫這個程式:
func main() { ch := make(chan string, 32) go func() { ch <- searchByBing("golang") }() go func() { ch <- searchByGoogle("golang") }() go func() { ch <- searchByBaidu("golang") }() fmt.Println(<-ch) }
首先,我們建立了一個帶快取的管道,管道的快取數目要足夠大,保證不會因為快取的容量引起不必要的阻塞。然後我們開啟了多個後臺執行緒,分別向不同的搜尋引擎提交搜尋請求。當任意一個搜尋引擎最先有結果之後,都會馬上將結果發到管道中(因為管道帶了足夠的快取,這個過程不會阻塞)。但是最終我們只從管道取第一個結果,也就是最先返回的結果。
通過適當開啟一些冗餘的執行緒,嘗試用不同途徑去解決同樣的問題,最終以贏者為王的方式提升了程式的相應效能。
素數篩
在“Hello world 的革命”一節中,我們為了演示Newsqueak的併發特性,文中給出了併發版本素數篩的實現。併發版本的素數篩是一個經典的併發例子,通過它我們可以更深刻地理解Go語言的併發特性。“素數篩”的原理如圖:
[圖片上傳失敗...(image-b304d-1554179460547)]
圖 1-13 素數篩
我們需要先生成最初的2, 3, 4, ...
自然數序列(不包含開頭的0、1):
// 返回生成自然數序列的管道: 2, 3, 4, ... func GenerateNatural() chan int { ch := make(chan int) go func() { for i := 2; ; i++ { ch <- i } }() return ch }
GenerateNatural
函式內部啟動一個Goroutine生產序列,返回對應的管道。
然後是為每個素數構造一個篩子:將輸入序列中是素數倍數的數提出,並返回新的序列,是一個新的管道。
// 管道過濾器: 刪除能被素數整除的數 func PrimeFilter(in <-chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { out <- i } } }() return out }
PrimeFilter
函式也是內部啟動一個Goroutine生產序列,返回過濾後序列對應的管道。
現在我們可以在main
函式中驅動這個併發的素數篩了:
func main() { ch := GenerateNatural() // 自然數序列: 2, 3, 4, ... for i := 0; i < 100; i++ { prime := <-ch // 新出現的素數 fmt.Printf("%v: %v\n", i+1, prime) ch = PrimeFilter(ch, prime) // 基於新素數構造的過濾器 } }
我們先是呼叫GenerateNatural()
生成最原始的從2開始的自然數序列。然後開始一個100次迭代的迴圈,希望生成100個素數。在每次迴圈迭代開始的時候,管道中的第一個數必定是素數,我們先讀取並列印這個素數。然後基於管道中剩餘的數列,並以當前取出的素數為篩子過濾後面的素數。不同的素數篩子對應的管道是串聯在一起的。
素數篩展示了一種優雅的併發程式結構。但是因為每個併發體處理的任務粒度太細微,程式整體的效能並不理想。對於細粒度的併發程式,CSP模型中固有的訊息傳遞的代價太高了(多執行緒併發模型同樣要面臨執行緒啟動的代價)。
併發的安全退出
有時候我們需要通知goroutine停止它正在乾的事情,特別是當它工作在錯誤的方向上的時候。Go語言並沒有提供在一個直接終止Goroutine的方法,由於這樣會導致goroutine之間的共享變數處在未定義的狀態上。但是如果我們想要退出兩個或者任意多個Goroutine怎麼辦呢?
Go語言中不同Goroutine之間主要依靠管道進行通訊和同步。要同時處理多個管道的傳送或接收操作,我們需要使用select
關鍵字(這個關鍵字和網路程式設計中的select
函式的行為類似)。當select
有多個分支時,會隨機選擇一個可用的管道分支,如果沒有可用的管道分支則選擇default
分支,否則會一直儲存阻塞狀態。
基於select
實現的管道的超時判斷:
select { case v := <-in: fmt.Println(v) case <-time.After(time.Second): return // 超時 }
通過select
的default
分支實現非阻塞的管道傳送或接收操作:
select { case v := <-in: fmt.Println(v) default: // 沒有資料 }
通過select
來阻止main
函式退出:
func main() { // do some thins select{} }
當有多個管道均可操作時,select
會隨機選擇一個管道。基於該特性我們可以用select
實現一個生成隨機數序列的程式:
func main() { ch := make(chan int) go func() { for { select { case ch <- 0: case ch <- 1: } } }() for v := range ch { fmt.Println(v) } }
我們通過select
和default
分支可以很容易實現一個Goroutine的退出控制:
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } } } func main() { cannel := make(chan bool) go worker(cannel) time.Sleep(time.Second) cannel <- true }
但是管道的傳送操作和接收操作是一一對應的,如果要停止多個Goroutine那麼可能需要建立同樣數量的管道,這個代價太大了。其實我們可以通過close
關閉一個管道來實現廣播的效果,所有從關閉管道接收的操作均會收到一個零值和一個可選的失敗標誌。
func worker(cannel chan bool) { for { select { default: fmt.Println("hello") // 正常工作 case <-cannel: // 退出 } } } func main() { cancel := make(chan bool) for i := 0; i < 10; i++ { go worker(cancel) } time.Sleep(time.Second) close(cancel) }
我們通過close
來關閉cancel
管道向多個Goroutine廣播退出的指令。不過這個程式依然不夠穩健:當每個Goroutine收到退出指令退出時一般會進行一定的清理工作,但是退出的清理工作並不能保證被完成,因為main
執行緒並沒有等待各個工作Goroutine退出工作完成的機制。我們可以結合sync.WaitGroup
來改進:
func worker(wg *sync.WaitGroup, cannel chan bool) { defer wg.Done() for { select { default: fmt.Println("hello") case <-cannel: return } } } func main() { cancel := make(chan bool) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go worker(&wg, cancel) } time.Sleep(time.Second) close(cancel) wg.Wait() }
現在每個工作者併發體的建立、執行、暫停和退出都是在main
函式的安全控制之下了。
context包
在Go1.7釋出時,標準庫增加了一個context
包,用來簡化對於處理單個請求的多個Goroutine之間與請求域的資料、超時和退出等操作,官方有博文對此做了專門介紹。我們可以用context
包來重新實現前面的執行緒安全退出或超時的控制:
func worker(ctx context.Context, wg *sync.WaitGroup) error { defer wg.Done() for { select { default: fmt.Println("hello") case <-ctx.Done(): return ctx.Err() } } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go worker(ctx, &wg) } time.Sleep(time.Second) cancel() wg.Wait() }
當併發體超時或main
主動停止工作者Goroutine時,每個工作者都可以安全退出。
Go語言是帶記憶體自動回收特性的,因此記憶體一般不會洩漏。在前面素數篩的例子中,GenerateNatural
和PrimeFilter
函式內部都啟動了新的Goroutine,當main
函式不再使用管道時後臺Goroutine有洩漏的風險。我們可以通過context
包來避免這個問題,下面是改進的素數篩實現:
// 返回生成自然數序列的管道: 2, 3, 4, ... func GenerateNatural(ctx context.Context) chan int { ch := make(chan int) go func() { for i := 2; ; i++ { select { case <- ctx.Done(): return case ch <- i: } } }() return ch } // 管道過濾器: 刪除能被素數整除的數 func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int { out := make(chan int) go func() { for { if i := <-in; i%prime != 0 { select { case <- ctx.Done(): return case out <- i: } } } }() return out } func main() { // 通過 Context 控制後臺Goroutine狀態 ctx, cancel := context.WithCancel(context.Background()) ch := GenerateNatural(ctx) // 自然數序列: 2, 3, 4, ... for i := 0; i < 100; i++ { prime := <-ch // 新出現的素數 fmt.Printf("%v: %v\n", i+1, prime) ch = PrimeFilter(ctx, ch, prime) // 基於新素數構造的過濾器 } cancel() }
當main函式完成工作前,通過呼叫cancel()
來通知後臺Goroutine退出,這樣就避免了Goroutine的洩漏。
併發是一個非常大的主題,我們這裡只是展示幾個非常基礎的併發程式設計的例子。官方文件也有很多關於併發程式設計的討論,國內也有專門討論Go語言併發程式設計的書籍。讀者可以根據自己的需求查閱相關的文獻。