高效能訊息中介軟體——NATS
前 言
這段時間我的主要工作內容是將公司系統中使用的RabbitMQ替換成NATS,而此之前我對Nats一無所知。經過一段時間緊張的學習和開發之後我順利的完成了任務,並對訊息中介軟體有了更深的瞭解。在此感謝同事鍾亮在此過程中對我的幫助。NATS屬於比較小眾的一款中介軟體產品,中文資料基本上是沒有的,故寫以記之,為想學習Nats的同學提供一點幫助。
原創作者:萬里
在介紹NATS之前先了解下什麼是分散式系統和訊息中介軟體
對於分散式系統的定義,一直以來我都沒有找到或者想到特別簡練而又合適的定義,這裡引用一下Distributed System Concepts and Design (Thrid Edition)中的一句話A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messages,從這句話我們可以看到幾個重點,一是元件分佈在網路計算機上,二是元件之間僅僅通過訊息傳遞來通訊並協調行動。訊息中介軟體維基百科給出的定義為Message-oriented middleware(MOM) is software infrastructure focused on sending and receiving messages between distrubuted systems,意思就是面向訊息的系統(訊息中介軟體)是在分散式系統中完成訊息的傳送和接收的基礎軟體
訊息中介軟體常被提及的好處即非同步和解耦,市面上常常被使用到的中介軟體有RabbitMQ, ActiveMQ, Kafka等,他們的關注度和使用率都非常的高,並且使用起來也非常的方便。公司的WiseCloud產品就集成了RabbitMQ。而在下一個版本的更新中將會使用NATS來替換RabbitMQ。使用NATS的好處比較多首先就是其效能非常好,下面引用官網的效能對比圖:
NATS介紹
NATS是一個開源、輕量級、高效能的分散式訊息中介軟體,實現了高可伸縮性和優雅的Publish/Subscribe模型,使用Golang語言開發。NATS的開發哲學認為高質量的QoS應該在客戶端構建,故只建立了Request-Reply,不提供 1.持久化 2.事務處理 3.增強的交付模式 4.企業級佇列。
NATS訊息傳遞模型
NATS支援各種訊息傳遞模型,包括:
釋出訂閱(Publish Subscribe)
請求回覆(Request Reply)
佇列訂閱(Queue Subscribers )
提供的功能:
純粹的釋出訂閱模型(Pure pub-sub)
伺服器叢集(Cluster mode server)
自動精簡訂閱者(Auto-pruning of subscribers)
基於文字協議(Text-based protocol)
多服務質量保證(Multiple qualities of service - QoS)
釋出訂閱(Publish Subscribe)
NATS將publish/subscribe訊息分發模型實現為一對多通訊,釋出者在Subject上傳送訊息,並且監聽該Subject在任何活動的訂閱者都會收到該訊息
java:
//publish Connection nc = Nats.connect("nats://127.0.0.1:4222"); nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
//subscribe Subscription sub = nc.subscribe("subject"); Message msg = sub.nextMessage(Duration.ofMillis(500)); String response = new String(msg.getData(), StandardCharsets. UTF_8);
或者是基於回撥的subscribe
//subscribe Dispatcher d = nc.createDispatcher(msg - >{ String response = new String(msg.getData(), StandardCharsets.UTF_8) //do something }) d.subscribe("subject");
請求響應(Request Reply)
NATS支援兩種請求響應訊息:點對點或多對多。點對點涉及最快或首先響應。在一對多的訊息交換中,需要限制請求響應的限制
在Request Reply過程中,釋出請求釋出帶有響應主題的訊息,期望對該subject做出響應操作
java:
// publish Connection connection = Nats.connect("nats://127.0.0.1:4222"); String reply = "replyMsg"; //請求迴應方法回撥 Dispatcher d = connection.createDispatcher(msg -> System.out.println("reply: " + JSON.toJSONString(msg)); }) d.unsubscribe(repl , 1); //訂閱請求 d.subscribe(reply); //釋出請求 connection.publish("requestSub", reply, "request".getBytes(StandardCharsets. UTF_8));
//subscribe Connection nc = Nats.connect("nats://127.0.0.1:4222"); //註冊訂閱 Dispatcher dispatcher = nc.createDispatcher(msg -> { System.out.println(JSON.toJSONString(msg)); nc.publish(msg.getReplyTo(), "this is reply".getBytes(StandardCharsets.UTF_8)); }); dispatcher.subscribe("requestSub");
佇列訂閱&分享工作(Queue Subscribers & Sharing Work)
NATS提供稱為佇列訂閱的負載均衡功能,雖然名字為queue(佇列)但是並不是我們所認為的那樣。他的主要功能是將具有相同queue名字的subject進行負載均衡。使用佇列訂閱功能訊息釋出者不需要做任何改動,訊息接受者需要具有相同的對列名
// Subscribe Connection nc = Nats.connect(); Dispatcher d = nc.createDispatcher(msg -> { //do something System.out.println("msg: " + new String(msg.getData(),StandardCharsets.UTF_8)); }); d.subscribe("queSub", "queName");
Nats-Spring整合
NATS雖說是一個性能非常好的訊息中間鍵,但是和Spring的整合不是很好。這裡提供兩個整合的思路
- CloudFoundry-Community/java-nats
- Wanlinus/nats-spring
java-nats
這是一個由CloudFoundry主導的一個NATS java客戶端。提供了區別於官方的nats客戶端,支援註解配置,對Spring有比較好的支援,但是此專案已經有1年多沒有更新且不支援NATS Streaming。相應用法參考Github,這裡不做詳細講解.
nats-spring
由於開源社群只提供一個簡單的NATS Client,缺少對註解和Spring的支援,所以我基於官方jnats客戶端寫了一個SpringBoot的相容外掛.主要是為了相容spring boot amqp開發模式,儘量使用註解解決問題開發出來的,所以使用方法類似於在程式碼中使用@RabbitListener.具體使用方法如下
{{git clone ofollow,noindex" target="_blank">https://github.com/wanlinus/nats-spring.git
cd nats-spring
mvn clean install}}}
<dependency> <groupId>cn.wanlinus</groupId> <artifactId>nats-spring</artifactId> <version>1.0.0.RELEASE</version> </dependency>
application.yml
spring: nats: urls: - nats://127.0.0.1:4222
@EnableNats @SpringBootApplication public class NatsDemo2Application { public static void main(String[] args) { SpringApplication.run(NatsDemo2Application.class, args); } } @Component public class Foo{ @NatsSubscribe("haha") public void message(Message message) { System.out.println(message.getSubject() + " : " + new String(message.getData())); }
}
NATS Streaming介紹
NATS由於不能保證訊息的投遞正確性和存在其他的缺點,NATS Streaming就孕育而生.他是一個由NATS提供支援的資料流系統,採用Go語言編寫,NATS Streaming與核心NATS平臺無縫嵌入,擴充套件和互操作.除了核心NATS平臺的功能外,他還提供了以下功能:
NATS Streaming特徵
增強訊息協議(Enhanced message protocol)
訊息/事件持久化(Message/event persistence)
至少一次資料傳輸(At-least-once-delivery)
Publisher限速(Publisher rate limiting)
Subscriber速率匹配(Rate matching/limiting per subscriber)
按主題重發訊息(Historical message replay by subject)
持續訂閱(Durable subscriptions)
基本用法
在使用NATS Streaming之前首先要啟動伺服器,在這裡我選擇使用docker容器
# 4222 client預設連線埠
8222 Web埠
6222 叢集通訊埠
$ docker run -d -p 4222:4222 -p 8222:8222 -p 6222:6222 nats-streaming
STREAM: Starting nats-streaming-server[test-cluster] version 0.11.0
STREAM: ServerID: bzkKJL3jI4KW9Hqb0bC1Ae
STREAM: Go version: go1.11
Starting nats-server version 1.3.0
Git commit [not set]
Starting http monitor on 0.0.0.0:8222
Listening for client connections on 0.0.0.0:4222
Server is ready
STREAM: Recovering the state...
STREAM: No recovered state
STREAM: Message store is MEMORY
STREAM: ---------- Store Limits ----------
STREAM: Channels: 100 *
STREAM: --------- Channels Limits --------
STREAM: Subscriptions: 1000 *
STREAM: Messages : 1000000 *
STREAM: Bytes : 976.56 MB *
STREAM: Age : unlimited *
STREAM: Inactivity : unlimited *
STREAM: ----------------------------------
java:
// 第一個引數表示clusterId,在啟動NATS Streaming容器的時候確定 // 第二個引數表示clientID,連線客戶端的唯一識別符號 StreamingConnectionFactory cf = new StreamingConnectionFactory ("test-cluster", "bar"); //設定Nats伺服器地址和埠,預設是nats://127.0.0.1:4222 cf.setNatsConnection(Nats.connect("nats://127.0.0.1:4222")); StreamingConnection sc = cf.createConnection();
Publish: sc.publish("foo", "Hello World".getBytes());
Subscribe:
sc.subscribe("foo", msg -> { System.out.println(new String(msg.getData(), StandardCharsets.UTF_8)); }, new SubscriptionOptions.Builder() .durableName("aa") .deliverAllAvailable().build());
在使用NATS Streaming的時候需要注意訂閱主題不支援萬用字元,在訂閱訊息時傳入MessageHandler函式是介面實現和SubscriptionOptions物件.MessageHandler提供訊息回撥處理, SubscriptionOptions用於設定訂閱選項,比如設定Queue, durableName, ack等。
Streaming-Spring整合
作為一款優秀的訊息中介軟體,卻沒有對Spring做整合,這是非常的可惜的事情.所以為了在工作中方便的使用他,我開發了一個很小的外掛.雖然還有很大的改進空間,不過在公司的專案中卻能夠很好的執行.他開發思路和nats-spring差不多,所以使用方式也是大同小異,具體如下:
{{git clone https://github.com/wanlinus/na ... g.git
cd nats-streaming-spring
mvn clean install}}}
<dependency> <groupId>cn.wanlinus</groupId> <artifactId>nats-streaming-spring</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
application.yml
spring: nats: streaming: nats-url: nats://127.0.0.1:4222 cluster-id: test-cluster
@EnableNatsStreaming @SpringBootApplication public class StreamingDemoApplication { public static void main(String[] args) { SpringApplication.run(StreamingDemoApplication.class, args); } //釋出訊息只需要注入StreamingConnection @Autowired private StreamingConnection sc; public void sendMsg(){ sc.publish("foo", "publish message".getBytes()) } } @Service public class A { @Subscribe(value = "foo", durableName = "dname", queue = "queue") public void asd(Message message) throws IOException { System.out.println(new String(message.getData(), StandardCharsets.UTF_8)); }
}
兩個外掛由於是為了結合專案所寫的,所以裡面有些部分並不通用。後續的開發中我將會繼續進行抽象和改進。
原文連結: https://mp.weixin.qq.com/s/lOKjyYQgcFT9xKR4Zulb6A
關於睿雲智合
深圳睿雲智合科技有限公司成立於2012年,總部位於深圳,並分別在成都、深圳設立了研發中心,北京、上海設立了分支機構,核心骨幹人員全部為來自金融、科技行業知名企業資深業務專家、技術專家。早期專注於為中國金融保險等大型企業提供創新技術、電子商務、CRM等領域專業諮詢服務。
自2016年始,在率先將容器技術引進到中國保險行業客戶後,公司組建了專業的容器技術產品研發和實施服務團隊,旨在幫助中國金融行業客戶將容器創新技術應用於企業資訊科技支援業務發展的基礎能力改善與提升,成為中國金融保險行業容器技術服務領導品牌。
此外,憑藉多年來在呼叫中心領域的業務經驗與技術積累,睿雲智合率先在業界推出基於開源軟交換平臺FreeSwitch的微服務架構多媒體數字化業務平臺,將語音、視訊、webchat、微信、微博等多種客戶接觸渠道整合,實現客戶統一接入、精準識別、智慧路由的CRM策略,並以容器化治理來支援平臺的全應用生命週期管理,顯著提升了數字化業務處理的靈活、高效、彈性、穩定等特性,為幫助傳統企業向“以客戶為中心”的數字化業務轉型提供完美的一站式整體解決方案。