Nsq:如何從一條佇列開始,設計出一個靠譜的訊息中介軟體
假設我們在淘寶下了一筆訂單後,淘寶後臺需要做這些事情:
- 訊息通知系統:通知商家,你有一筆新的訂單,請及時發貨
- 推薦系統:更新使用者畫像,重新給使用者推薦他可能感興趣的商品
- 會員系統:更新使用者的積分和等級資訊
於是一個建立訂單的函式,至少需要取呼叫三個其他服務的介面,就像這樣: img
寫成偽碼:
createOrder(...) { doCreateOrder(...); // 呼叫其他服務介面 sendMsg(...); updateUserInterestedGoods(...); updateMemberCreditInfo(...); }
這樣的做法,顯然很挫,至少有兩個問題:
- 過度耦合:如果後面建立訂單時,需要觸發新的動作,那就得去改程式碼,在原有的建立訂單函式末尾,追加一行程式碼
- 缺少緩衝:如果建立訂單時,會員系統恰好處於非常忙碌或者宕機的狀態,那這是更新會員資訊就會失敗
因此,我們急需引入一個訊息中介軟體,來實現解耦和緩衝的功能。
img
訊息中介軟體的實現很多,比較常見的有kafka、rocketmq以及我們今天要講的nsq。
相比於前面兩個mq,nsq可以說是非常輕量級的,理解了它,也有助於學習kafka和rocketmq。
首先,讓我們從訊息佇列最原始的形態開始。
Nsq 1.0 —— 我是一條佇列
我們給訂單系統和其他系統的中間,引入了一個訊息中介軟體,或者說,引入了一條佇列。
當訂單系統建立完訂單時,它只需要往佇列裡,塞入(push)一條topic為“order_created”的訊息。
接著,我們的nsq1.0,會把這條訊息,再推送給所有訂閱了這個topic的訊息的機器,告訴他們,“有新的訂單,你們該幹嘛幹嘛”。
這樣一個簡單的佇列,就解決了上面的兩個問題:
- 解耦:如果後面有新的動作,需要在建立訂單後執行,那麼只需要讓新同學自己去訂閱topic為“order_created”的訊息即可
- 緩衝:如果會員系統現在很忙,沒空處理訊息,那麼只需跟nsq說,“我很忙,不要再發訊息過來了”,那麼nsq就不會給它推送訊息,或者會員系統出了故障,訊息雖然推送過去了,但是它給處理失敗了,那麼也只需給nsq回覆一個“requeue”的命令,nsq就會把訊息重新放入佇列,進行重試。具體實現細節,後面再聊。
Nsq 2.0 —— channel
作為一個靠譜的中介軟體,你必須做到:高效、可靠、方便。
上面這個使用一條簡單的佇列來實現的訊息中介軟體,肯定是不滿足這三點的。
首先,假設我的會員系統,部署了三臺例項,他們都訂閱了topic為“order_created”的訊息,那麼一旦有訂單建立,這三臺例項就都會收到訊息,並且去更新會員積分資訊,而其實我只需要更新一次就ok了。
這就涉及到一個消費者組(Comsumer Group)的概念。消費者組是Kafka裡提到的,在Nsq,對應的術語是channel。
會員系統的三個例項,你們收到訊息時,要做的事情是一樣的,並且只需要有有一個例項執行,那麼你們就是一個消費者組裡面的,要標識為同一個channel,比如說叫“update_memeber_credit”的channel,而簡訊系統和推薦系統,也要有自己的channel,用來和會員系統作區分,比如說叫“send_msg”和“update_user_interesting_goods”
當nsq收到訊息時,會給每個channel複製一份訊息,然後channel再給對應的消費者組,推送一條訊息。消費者組裡有多個例項,那麼要推給誰呢?這就涉及到負載均衡,比如有一個消費者組裡有ABC三個例項,這次推給了A,那麼下次有可能是推送給B,再下次,也許就是C …
img
Nsq 3.0 —— nsqlookup
上面講過,nsq收到生產者生產的訊息後,需要將訊息複製多份,然後推送給對應topic和channel的消費者。
那麼,nsq怎麼知道哪些消費者訂閱了topic為“order_created”的訊息呢?
總不能在配置檔案裡寫死吧?ip為10.12.65.123的,埠8878,這個消費者的topic是xxx,channel是xxx,…
因此,我們需要一個類似於微服務裡頭的註冊中心的模組,來實現服務發現的功能,這就是nsqlookup.
nsqlookup提供了類似於etcd、zookeeper一樣的kv儲存服務,裡面記錄了topic下面都有哪些nsq。
nsqlookup提供了一個/lookup
介面,比如你想知道哪些nsq上面,有topic為test的訊息,那麼只需要調一下:
curl 'http://127.0.0.1:4161/lookup?topic=test'
nsqlookup就會給你返回對應topic的nsq列表:
{ "channels": [ "xxx" ], "producers": [ { "remote_address": "127.0.0.1:52796", "hostname": "hongzeyangdeMacBook-Pro", "broadcast_address": "127.0.0.1", "tcp_port": 4150, "http_port": 4151, "version": "1.0.0-compat" } ] }
接著消費者只需要遍歷返回的json串裡的producers列表,把broadcast_address和tcp_port或者http_port拼起來,就可以得到有對應topic訊息的nsq列表。
然後消費者會和這些nsq,逐個建立連線,nsq收到對應topic的訊息後,就會給和他們建立連線的消費者,推送訊息。
這個過程,可以從nsq的消費者客戶端實現的程式碼中,很清楚的看出來。
我這裡用nsq的Java 客戶端實現ofollow,noindex" target="_blank">brainlag/JavaNSQClient 作為例子:
和擁有對應topic的nsq建立連線:
呼叫lookup介面,獲取擁有對應topic的nsq列表。注意看程式碼,裡面是遍歷了nsqlookup的列表,然後把所有lookup的返回結構,進行合併。
com.github.brainlag.nsq.lookup.DefaultNSQLookup#lookup:
img
畫紅框的地方,正是之前講的拼湊邏輯。
接著和舊的nsq列表比較,進行刪除和新增,保證本地的nsq列表資料是最新的。
com.github.brainlag.nsq.NSQConsumer#connect:
img
當然,這個過程不會只在消費者啟動時才執行,而是定期去執行,不斷去獲取最新的nsq列表。
img
Nsq 4.0 —— nsqd叢集
作為一個靠譜的中介軟體,你必須支援叢集部署,這樣才能實現可靠、高效。
nsq的叢集部署非常簡單,官方推薦一個生產者對應的部署一個nsqd:
What is the recommended topology for nsqd? We strongly recommend running an nsqd alongside any service(s) that produce messages.
img
這也能解釋,為什麼上面的/lookup
介面,返回的屬性是叫producers
,而不是叫nsqs
,因為nsq認為一個producer,就對應一個nsq。
當然這樣的做法有不少壞處,如果生產者對應的nsq掛掉了,那它就生產不了訊息了。而且每個生產者都要部署一個nsq,未免有些奢侈。
不過對於大多數業務來說,這樣的nsq已經夠用。如果你像有贊一樣,擁有一群Go語言大神,那也不放對nsq做一下改造。
Get it all together
串起來
總結
參考
-
Redis-Dsitributed-Lock-2/" rel="nofollow,noindex" target="_blank">
Previous
如何用Redis實現分散式鎖(2)—— 叢集版