Go RabbitMQ (一)
RabbitMQ
簡介
RabbitMQ是一個訊息代理,用來負責接收和轉發訊息。
術語
- 生產者:生產者是負責傳送訊息的
- 佇列:佇列是RabbitMQ用來儲存訊息的,受主機記憶體和磁碟大小的限制,本質上是一個訊息的緩衝區。生產者可以將訊息傳送至佇列中,消費者可以從佇列中接收到訊息
- 消費者:消費者是用來等待接收訊息
生產者,消費者,代理可以駐留在不同主機或同一主機,一個應用可以是生產者也可以是消費者
Hello World
接下來我們來實現RabbitMQ的“Hello World”,生產者將“Hello World”傳送進佇列中,消費者將其接收並列印
-
RabbitMQ客戶端的安裝
**go get github.com/streadway/amqp
傳送
-
連線RabbitMQ
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatal(err) } defer conn.Close()
RabbitMQ的連線已經為我們抽象了socket的連線,同時為我們處理了協議版本號和身份認證等等
-
建立通道
ch,err := conn.Channel() if err != nil { log.Fatal(err) } defer ch.Close()
在使用其他API完成任務的時候我們首先通過以上方式建立通道
-
在開始傳送訊息之前我們首先應該宣告一個佇列。宣告佇列之後我們就可以將訊息傳送至隊列當中
q, err := ch.QueueDeclare( "hello", // name false,// durable false,// delete when unused false,// exclusive false,// no-wait nil,// arguments ) if err != nil { log.Fatal(err) } body := "Hello World!" err = ch.Publish( "",// exchange q.Name, // routing key false,// mandatory false,// immediate amqp.Publishing { ContentType: "text/plain", Body:[]byte(body), }) if err != nil { log.Fatal(err) }
佇列的宣告是一個冪等性操作,如果不存在該佇列的話則會建立。此處注意,如果佇列存在,修改了佇列引數並不會影響已經存在的佇列,並且會返回錯誤。訊息內容是一個位元組陣列,所以我們必須進行編碼
接收
-
連線,建立通道,佇列
在接收端我們同樣需要像傳送端一樣連線RabbitMQ,建立通道後再建立佇列,注意此處佇列的建立是跟傳送端的佇列完全匹配的。佇列在接收端也建立是因為我們接收端有可能比傳送端先啟動,所以為了保證我們要消費的佇列存在我們在此處也進行建立
-
消費訊息
msgs, err := ch.Consume( q.Name, // queue "",// consumer true,// auto-ack false,// exclusive false,// no-local false,// no-wait nil,// args ) if err != nil { log.Fatal(err) } forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
使用通道消費佇列中的訊息,當佇列有訊息的時候將會非同步的推送給我們