nsq 使用手冊
nsqlookupd(什麼是nsqlookupd?)
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqadmin --lookupd-http-address=127.0.0.1:4161
建立topic http協議建立
curl -X POST http://127.0.0.1:4151/topic/create?topic=name 複製程式碼
針對topic 建立相應的channel
curl -X POST http://127.0.0.1:4151/channel/create?topic=name&channel=channelname 複製程式碼
針對nsqd_to_file模式(將資訊推送到各自的channel檔案中)
#這裡的topic=name,channel=1,channel檔案=$dir/t1 nsq_to_file --topic=name --channel=1 --output-dir=/tmp/t1 --lookupd-http-address=127.0.0.1:4161 複製程式碼
最後,傳送message到topic中,檢視channel檔案是否有該message
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=name' 複製程式碼
客戶端如何使用
#生產者 // Producer 生產者 func Producer() { producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) if err != nil { fmt.Println("NewProducer", err) panic(err) } for i := 0; i < 100; i++ { if err := producer.Publish("pdmqd", []byte(fmt.Sprintf("Hello World %d", i))); err != nil { fmt.Println("Publish", err) panic(err) } time.Sleep(time.Second * 5) i++ } } 複製程式碼
#消費者 // ConsumerHandler 消費者處理者 type ConsumerHandler struct{} // HandleMessage 處理訊息 func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error { fmt.Println(string(msg.Body)) return nil } // ConsumerA 消費者 func ConsumerA() { consumer, err := nsq.NewConsumer("pdmqd", "1", nsq.NewConfig()) if err != nil { fmt.Println("NewConsumer", err) panic(err) } consumer.AddHandler(&ConsumerHandler{}) if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil { fmt.Println("ConnectToNSQLookupd", err) panic(err) } } //一個每秒跑一次消費者的主程序 func main() { for i := 0; i < 100; i++ { ConsumerA() ConsumerB() time.Sleep(time.Second * 1) } } 複製程式碼
注意點:多個消費者使用同一個channel時,只有一個消費者 能夠接收到訊息,這是種類似負載均衡的效果,減輕單臺服務的壓力