Golang 分散式非同步任務佇列 Machinery 教程
Golang的分散式任務佇列還不算多,目前比較成熟的應該就只有ofollow,noindex" target="_blank">Machinery 了。
這篇文章裡我們簡略的看一下Machinery怎麼用。但是我們首先簡單介紹一下非同步任務這個概念。
如果你熟悉Python中的非同步任務框架的話,想必一定聽過Celery。非同步任務框架是什麼呢?非同步任務的主要作用是將需要長時間執行 的程式碼放到一個單獨的程式中,例如呼叫第三方郵件介面,但是這個介面可能非常慢才響應,而你又想確保自己的API及時響應。這個 時候就可以採用非同步任務來進行解耦。
一般來說,非同步任務都由這麼幾部分組成:
- broker:broker是用來傳遞資訊的,我們可以想象成“信使”,“外賣配送員”,它的作用是暫時儲存產生的任務以便於消費 - 生產者:它負責產生任務 - 消費者:它負責消費任務 - result backend:這個不是必需,但是如果有儲存結果的需要,那麼就需要它。
而流程則是:
生產者釋出任務 -> broker -> 消費者競爭一個任務,然後進行消費 -> (可選:消費後向broker確認已經消費,然後broker刪除此任務, 否則將超時重發任務) -> result backend儲存結果
Machinery
首先我們來把Machinery
程式碼拉下來:
$ go get -u github.com/RichardKnop/machinery/v1
Machinery 對訊息的定義是:
// Signature represents a single task invocation type Signature struct { UUIDstring Namestring RoutingKeystring ETA*time.Time GroupUUIDstring GroupTaskCount int Args[]Arg HeadersHeaders Immutablebool RetryCountint RetryTimeoutint OnSuccess[]*Signature OnError[]*Signature ChordCallback*Signature }
就如同自己寫任務佇列可能用json一樣。
一般生產者先呼叫signature := tasks.NewSignature
定義好任務,然後machineryServer.SendTask
就完成了任務的產生。
Machinery 的非同步任務長這樣:
func Add(args ...int64) (int64, error) { sum := int64(0) for _, arg := range args { sum += arg } return sum, nil }
要注意一點,函式的最後一個引數必需是 error。然後這樣註冊任務。
server.RegisterTasks(map[string]interface{}{ "add":Add, })
消費者先呼叫worker := machineryServer.NewWorker("send_sms", 10)
然後worker.Launch()
開始監聽broker並且消費任務。
當你產生一個任務,名字是add
時,這個函式就會被呼叫。
一般你可以把生產者和消費者放到兩個檔案裡,分別定義main函式,然後自己寫Makefile,這樣就可以直接make然後產生兩個可執行
檔案,不過我個人更喜歡用flag
來標識到底是什麼身份:
func main() { // parse cmd args flag.Parse() // init config initConfig() // init machinery worker initMachinery() // register tasks machineryServer.RegisterTask("sendSMS", sendSMS) if *worker { startWorker() } else { startWebServer() } }