golang原始碼閱讀---tunny協程池的基本實現原理
前段時間因為一個爬蟲專案,最開始做的時候是無腦的一個下載任務就開一個協程,但是後期出現了比較大的記憶體問題,並且下載的效果也不是那麼的好,後面發現是因為協程開的太多了,並且下行的頻寬就只有那麼的大,所以並不能和想象中的那樣。哎,還是因為too young,too simple,sometimes naive.
這篇主要是講解的tunny是如何是如何實現並保持一個goroutine pool的。
因為本人是小菜,加上時間倉促,所以要是有什麼問題的話希望大佬指正。
1.簡介
tunny地址:ofollow,noindex" target="_blank">https://github.com/Jeffail/tunny
這是一個goroutine pool包,可以設定或者動態改變goroutine pool中goroutine的數量,生成一個固定的數量的pool,實現goroutine的重複使用,並且能在一定程度上控制goroutine
2.原始碼
1.基本的資料型別
通過tunny的原始碼包檔案數量並不多,只有3個檔案,tonny.go和worker.go,沒有那麼多的檔案層次結構,所以閱讀起來特別的方便。這也是我比較喜歡閱讀go語言程式碼的原因。
tunny.go中
Pool結構
主要是用於對整個pool的管理,其中包括pool
type Pool struct { ctorfunc() Worker //goroutine中使用者的業務邏輯函式 workers []*workerWrapper //目前已經存在的goroutine資訊,workerWrapper結構定義在worker.go的中, reqChan chan workRequest //任務排程管道,主要是使用者管理當前goroutine是否執行任務,它和workerWrapper中的reqChan 其實是一個,但是workerWrapper的reqChan只是一個傳送管道,這個後面會繼續講解 workerMutsync.Mutex //鎖 queuedJobs int64 計數,表示當前已經在執行的任務 }
worker介面主要使用者包裝使用者的業務邏輯的func
type Worker interface { // Process will synchronously perform a job and return the result. // Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
closureWorker 顧明思議,主要是用於包裝使用者的業務邏輯,
並且是Worker的完全接收者
type closureWorker struct { processor func(interface{}) interface{} }
在worker.go中
type workerWrapper struct { workerWorker//使用者存放使用者定義的業務邏輯函式 interruptChan chan struct{} //用於外部干預,使當前goroutine提前終止 // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest //這個和pool.go中Pool型別中的reqChan是一個,只不過當前這個是一個傳送管道 // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{}//這個是用於傳遞關閉當前goroutine的訊息 // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} //這個我感覺並沒有太大的實際意義 }
這個主要是用於傳遞任務引數。以及返回任務執行結果的型別
type workRequest struct { // jobChan is used to send the payload to this worker. jobChan chan<- interface{} // retChan is used to read the result from this worker. retChan <-chan interface{} // interruptFunc can be called to cancel a running job. When called it is no // longer necessary to read from retChan. interruptFunc func() }
2.如何建立一個goroutine pool
根據程式碼的呼叫步驟,
首先是例項化一個Pool型別的資料,並將使用者使用者的業務func包裝成closureWorker型別並存儲在Pool型別例項中的ctor欄位中
使用外部呼叫建立一個Pool物件:
包中建立一個Pool的邏輯
邏輯很簡單,一眼就能看明白。
那麼在哪裡啟動一個goroutine,請看下面
注意這裡的引數傳遞,這裡傳遞了一個channel型別的引數,眾所周知,在go中,分為兩種型別,一種是值型別,一種是引用型別(map,slice,channel),說這麼多有什麼用呢,怎麼扯到引用型別上面去了呢,但這個很重要
我們接下我們看在newWorkerWrapper中的邏輯
上面說到,我們傳遞過去了兩個引數,其中一個是一個channel型別的,因為channel引用型別,所以他的傳遞是地址,所以在最後newWorkerWrapper中賦值的時候workerWrapper.reqChan和pool.reqChan實際指向的是同一個地址 ,區別就是workerWrapper.reqChan是一個傳送管道罷了
我們可以輸出看看
下面是run函式中的程式碼
run函式中的程式碼算是是整個包中最重要的程式碼了。
他的實現原理是比較簡單的,就是採用的是一個for+select+channel來實現的,並且select採用是巢狀的形式,但是其中還是有些比較難以理解的 (當然對我小白我來說哈,2333333)
我感覺主要是這兩段
這兩段的程式碼,需要結合到下一個小姐來說,請看下一個。
2.呼叫goroutine pool
這裡呼叫很簡單,只需要ret := pool.Process(引數)就ok了
我們來看看Process中是怎麼樣的
Process中邏輯很簡單,上一個小姐我們知道,pool的reqChan和 pool.workers.reqChan是指向的同一個地址,但是後者為一個傳送管道 所以,在這樣來使用時安全的,資料是不會錯誤的 。
在前面我的run函式中,有兩段程式碼還沒說明意思,現在我就說明一下, 第一個就是這段,
(1)在我們定義reqChan管道的時候,我們定義的是一個沒有緩衝區的管道 ,所以在沒有接受操作的情況下,我們向管道里面推送資料是會被阻塞 住的。
(2)在go中select是在有IO操作的情況下會被觸發,所以要是我們沒有在Process函式中呼叫reqChan接收資料, 當前goroutine是會被阻塞住的 這樣當前select內層的select也會被阻塞住。
然後我們在來看通過reqChan傳遞過來的值
上面講到,channel是引用型別,所以它在傳遞的時候是傳遞的地址,而不是值,所以,我們接收到的jobChan和retChan和傳遞過來指向的是同樣的地址 ,這樣我們就能實現共享通訊了。我們可以輸出裡面兩邊的地址看看,這裡我開了一個容量為2的pool,然後我呼叫pool裡面的其中一個goroutine,我們看列印的地址
看。。。。沒錯吧。。。。。
3.Extra
有一個問題,就是當我們的pool有2個goroutinr的時候,但是我們有200個任務需要完成,也就是需要呼叫200測goroutine,Tunny是怎麼樣實現排程的呢,這個後面的文章補充吧,下班。。。。。。。。
看,就是這樣的嘛。