Golang高併發工作池
go程式開發過程中,通過簡單的呼叫go func 函式來開啟協程,容易導致程式死鎖,並且會無限制的開啟groutine,groutine數量激增的情況下併發效能會明顯下降,所以需要考慮使用工作池來控制協程數量,以達到高併發的效果.
直接上程式碼(JobPool.go)
package utils import ( "fmt" ) // 定義任務介面,所有實現該介面的均實現工作池 type Task interface { DoTask() error } // 定義工作結構體 type Job struct { Task Task } // 定義全部的工作佇列 var JobQueue chan Job // 定義工作者 type Worker struct { WorkerPool chan chan Job// 工人物件池 JobChannelchan Job// 管道里面拿Job quit chan bool } // 新建一個工作者 func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, // 工人物件池 JobChannel: make(chan Job), //工人的任務 quit:make(chan bool), } } // 工作池啟動主函式 func(w *Worker)Start(){ // 開一個新的協程 go func() { for{ // 註冊任務到工作池 w.WorkerPool <- w.JobChannel select { // 接收到任務 case job := <- w.JobChannel: // 執行任務 err := job.Task.DoTask() if err != nil { fmt.Println("任務執行失敗") } // 接收退出的任務, 停止任務 case <- w.quit: return } } }() } // 退出執行工作 func (w *Worker) Stop(){ go func(){ w.quit <- true }() } // 定義任務傳送者 type Sender struct { maxWorkers int// 最大工人數 WorkerPool chan chan Job// 註冊工作通道 quit chan bool// 退出訊號 } // 註冊新發送者 func NewSender(maxWorkers int) *Sender{ Pool := make(chan chan Job, maxWorkers) return &Sender{ WorkerPool: Pool,// 將工作者放到一個工作池中 maxWorkers: maxWorkers, // 最大工作者數量 quit: make(chan bool), } } // 工作分發器 func(s *Sender)Run(){ for i:=0; i<s.maxWorkers; i++{ worker := NewWorker(s.WorkerPool) // 執行任務 worker.Start() } // 監控任務傳送 go s.Send() } // 退出發放工作 func (s *Sender) Quit(){ go func(){ s.quit <- true }() } func(s *Sender)Send(){ for { select { // 接收到任務 case job :=<- JobQueue: go func(job Job) { jobChan := <- s.WorkerPool jobChan <- job }(job) // 退出任務分發 case <- s.quit: return } } } // 初始化物件池 func InitPool(){ maxWorkers := 4 maxQueue := 20 // 初始化一個任務傳送者,指定工作者數量 send := NewSender(maxWorkers) // 指定任務的佇列長度 JobQueue = make(chan Job,maxQueue) // 一直執行任務傳送 send.Run() }
使用方法
package main import ( "fmt" "os" "test/utils"//引用: JobPool是放在test專案的utils包下 "time" ) type Test struct { num int } //任務,實現JobPool的Task介面 func(t *Test)DoTask() error{ f, err := os.OpenFile("log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777) if err != nil { return err } defer f.Close() f.WriteString(fmt.Sprintf("這是任務:%d號,執行時間為:%s \n", t.num, fmt.Sprintf("%s", time.Now()))) return nil } func main(){ // 初始化物件池 utils.InitPool() for i:=1;i<40 ;i++{ // 註冊任務到Job佇列中 task := &Test{i} utils.JobQueue <- utils.Job{ Task:task, } } // time.Sleep(180 * time.Second) // 執行結束,關閉管道 close(utils.JobQueue) }
參考文章:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/