这篇文章主要讲解了“消息队列原理之如何掌握rabbitmq”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“消息队列原理之如何掌握rabbitmq”吧!
介绍
RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。支持多种客户端语言。
架构
整体架构对照下面的图说明
先看看图片上各个名次的解释:
Rabbitmq BrokerTCPQueueExchangeBrokerBrokerRoutingKeyExchangeBindingQueueExchangeExchangeQueueExchangeBindingRoutingKeyExchangedirect,fanout,topic,headersQueueExchangekeyBindkeyProducerExchangekeykeyRoutekeyRoutekeyBindkeyRoutekeyBindkeyBingkey*#benz.carcar*.carbenz.car*.carbenz.carQueueRoutekeyBindkeyheaders对照上面图和名次解释应该比较清晰明了了,下面我们通过几个例子说明如何使用。
用法(golang)
direct
QueueRoutekey==QueueNamepackage main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func handlerError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
var url = "amqp://username:password@ip:port"
func main() {
conn, err := amqp.Dial(url)
handlerError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err := conn.Channel()
handlerError(err, "Failed to open a Channel")
defer channel.Close()
queueNameCar := "car"
if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil {
handlerError(err, "Failed to decare Queue")
}
if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}
}main() 函数cardefalut exchangecarcarrejectexchangeproducerexchangerejectRoutekeyproducerroutekeyroutekey10我们自己创建一个 direct 类型的 exchange 并绑定一些队列看看是什么效果。
func main() {
conn, err := amqp.Dial(url)
handlerError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err := conn.Channel()
handlerError(err, "Failed to open a Channel")
defer channel.Close()
directExchangeNameCar := "direct.car"
if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil {
handlerError(err, "Failed to decalare exchange")
}
queueNameCar := "car"
queueNameBigCar := "big-car"
queueNameMiddleCar := "middle-car"
queueNameSmallCar := "small-car"
channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)
if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}
}ExchangeQueueBindingBindingcarQueueExchangeExchangeunbindqueueexchangeexchangeexchangefanout
fanout 工作方式类似于广播,看看下面的代码
func main() {
conn, err := amqp.Dial(url)
handlerError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err := conn.Channel()
handlerError(err, "Failed to open a Channel")
defer channel.Close()
fanoutExchangeNameCar := "fanout.car"
if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil {
handlerError(err, "Failed to decalare exchange")
}
queueNameCar := "car"
queueNameBigCar := "big-car"
queueNameMiddleCar := "middle-car"
queueNameSmallCar := "small-car"
channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)
if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}
}fanoutfanout.carmiddle.cartopic
topicexchangequeuefunc main() {
conn, err := amqp.Dial(url)
handlerError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
channel, err := conn.Channel()
handlerError(err, "Failed to open a Channel")
defer channel.Close()
topicExchangeNameCar := "topic.car"
if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil {
handlerError(err, "Failed to decalare exchange")
}
queueNameCar := "car"
queueNameBigCar := "big-car"
queueNameMiddleCar := "middle-car"
queueNameSmallCar := "small-car"
channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)
if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil {
handlerError(err, "Failed to bind queue to exchange")
}
}producerqueue if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}每个 queue 都会收到消息
if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}small-carsmall.car*.small.car#.small.car if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}small-car*.small.car#.small.car if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}small-car#.small.car if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
handlerError(err, "Failed to publish message")
}都不会收到消息,没有符合的 routekey 。
headers
这种类型很少有实际的应用场景。
感谢各位的阅读,以上就是“消息队列原理之如何掌握rabbitmq”的内容了,经过本文的学习后,相信大家对消息队列原理之如何掌握rabbitmq这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!