golang实现rabbitmq的几种模式
这篇文章会简单的讲一下,rabbitmq的几种模式,并用golang语言实现一下。
第一种模式:simple模式。
这是最简单的模式了。也就是由生产者将消息送到队列里,然后由消费者到消息队列里来取。在这之前,我们先定义一个RabbitMQ的结构体和我们定义的函数。
1 package RabbitMq
2
3 import (
4 "fmt"
5 "github.com/streadway/amqp"
6 )
7
8 //这里主要是RabbitMQ的一些信息。包括其结构体和函数。
9
10 //连接信息
11 const MQURL = "amqp://du:du@129.211.78.6:5672/dudevirtualhost"
12
13 //RabbitMQ结构体
14 type RabbitMQ struct {
15 //连接
16 conn *amqp.Connection
17 channel *amqp.Channel
18 //队列
19 QueueName string
20 //交换机名称
21 ExChange string
22 //绑定的key名称
23 Key string
24 //连接的信息,上面已经定义好了
25 MqUrl string
26 }
27
28 //创建结构体实例,参数队列名称、交换机名称和bind的key(也就是几个大写的,除去定义好的常量信息)
29 func NewRabbitMQ(queueName string, exChange string, key string) *RabbitMQ {
30 return &RabbitMQ{QueueName: queueName, ExChange: exChange, Key: key, MqUrl: MQURL}
31 }
32
33 //关闭conn和chanel的方法
34 func (r *RabbitMQ) Destory() {
35 r.channel.Close()
36 r.conn.Close()
37 }
38
39 //错误的函数处理
40 func (r *RabbitMQ) failOnErr(err error, message string) {
41 if err != nil {
42 fmt.Printf("err是:%s,小杜同学手写的信息是:%s", err, message)
43 }
44 }
接着就是simple模式代码的书写了:
1 package RabbitMq
2
3 import (
4 "fmt"
5 "github.com/streadway/amqp"
6 "log"
7 )
8
9 //01,这里是rabbitmq最简单的模式:simple模式。
10 //也就是由生产者将消息送到队列里,然后由消费者到队列里取出来消费。
11
12 //创建简单模式下的实例,只需要queueName这个参数,其中exchange是默认的,key则不需要。
13 func NewRabbitMQSimple(queueName string) *RabbitMQ {
14 rabbitmq := NewRabbitMQ(queueName, "", "")
15 var err error
16 //获取参数connection
17 rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
18 rabbitmq.failOnErr(err, "连接connection失败")
19 //获取channel参数
20 rabbitmq.channel, err = rabbitmq.conn.Channel()
21 rabbitmq.failOnErr(err, "获取channel参数失败")
22 return rabbitmq
23 }
24
25 //直接模式,生产者.
26 func (r *RabbitMQ) PublishSimple(message string) {
27 //第一步,申请队列,如不存在,则自动创建之,存在,则路过。
28 _, err := r.channel.QueueDeclare(
29 r.QueueName,
30 false,
31 false,
32 false,
33 false,
34 nil,
35 )
36 if err != nil {
37 fmt.Printf("创建连接队列失败:%s", err)
38 }
39
40 //第二步,发送消息到队列中
41 r.channel.Publish(
42 r.ExChange,
43 r.QueueName,
44 false,
45 false,
46 amqp.Publishing{
47 ContentType: "text/plain",
48 Body: []byte(message),
49 })
50 }
51
52 //直接模式,消费者
53 func (r *RabbitMQ) ConsumeSimple() {
54 //第一步,申请队列,如果队列不存在则自动创建,存在则跳过
55 q, err := r.channel.QueueDeclare(
56 r.QueueName,
57 //是否持久化
58 false,
59 //是否自动删除
60 false,
61 //是否具有排他性
62 false,
63 //是否阻塞处理
64 false,
65 //额外的属性
66 nil,
67 )
68 if err != nil {
69 fmt.Println(err)
70 }
71 //第二步,接收消息
72 msgs, err := r.channel.Consume(
73 q.Name,
74 "", //用来区分多个消费者
75 true, //是否自动应答,告诉我已经消费完了
76 false,
77 false, //若设置为true,则表示为不能将同一个connection中发送的消息传递给这个connection中的消费者.
78 false, //消费队列是否设计阻塞
79 nil,
80 )
81 if err != nil {
82 fmt.Printf("消费者接收消息出现问题:%s", err)
83 }
84
85 forever := make(chan bool)
86 //启用协程处理消息
87 go func() {
88 for d := range msgs {
89 log.Printf("小杜同学写的Simple模式接收到了消息:%s\n", d.Body)
90 }
91 }()
92 log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
93 <-forever
94 }
simple模式发布者:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 )
7
8 func main() {
9 rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName1912161843")
10 rabbitmq.PublishSimple("他是客,你是心上人。 ---来自simple模式")
11 fmt.Println("发送成功!")
12 }
simple模式消费者:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 )
7
8 func main() {
9 rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName1912161843")
10 rabbitmq.ConsumeSimple()
11 fmt.Println("接收成功!")
12 }
run这个模式的发布者的消费者会发现会成功输入:
小杜同学写的Simple(或者Work)模式接收到了消息:他是客,你是心上人。---来自simple模式
表明rabbitmq的simple模式是成功了。
也就是这样:
当你弄懂了这个最简单的simple模式后也便可以往后看了。后面的四 种模式全部都是基于这个模式上做的优化或者说是修改。
第二种模式:work模式。
那么到此,可能就会有同学有疑问了,为什么我的输出结果里要加一个“(或者Work)”呢?因为其实simple模式和work模式其实用的是一套逻辑代码,只是work模式是可以有多个消费者的,work模式起到一个负载均衡的作用。
那么,我们来看一下work模式的发布者:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 "strconv"
7 "time"
8 )
9
10 func main() {
11 rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName191224")
12 for i := 0; i < 100; i++ {
13 rabbitmq.PublishSimple("hello du message" + strconv.Itoa(i) + "---来自work模式")
14 time.Sleep(1 * time.Second)
15 fmt.Printf("work模式,共产生了%d条消息\n", i)
16 }
17 }
work模式的消费者,建立2个,代码都是一样的:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName191224")
7 rabbitmq.ConsumeSimple()
8 }
将work的发布者和两个消费者run起来,会发现:
1 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message1---来自work模式 2 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message3---来自work模式 3 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message5---来自work模式 4 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message7---来自work模式 5 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message9---来自work模式 6 小杜同学写的Simple(或者Work)模式接收到了消息:hello du message11---来自work模式
也就是一个消费者只消费奇数,而另外一个则只消费偶数。那么work模式就也成功了。
也就是这样:
第三种模式:订阅模式。
上面简单介绍了两种模式,一个是simple模式,另外一个是work模式,他们有一个共同的特点就是一个消息只能被一个消费者消费,那么我们的消息能不能被多个消费者消费呢,这个自然是可以的,也就是我要说的订阅模式(Publish/Subscribe)。订阅模式的特别是:一个消息被投递到多个队列,一个消息能被多个消费者获取。过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是"fanout",这样才是广播类型。
发布订阅模式的代码:
1 package RabbitMq
2
3 import (
4 "fmt"
5 "github.com/streadway/amqp"
6 )
7
8 //这里是订阅模式的相关代码。
9 //订阅模式需要用到exchange。
10
11 //获取订阅模式下的rabbitmq的实例
12 func NewRabbitMqSubscription(exchangeName string) *RabbitMQ {
13 //创建rabbitmq实例
14 rabbitmq := NewRabbitMQ("", exchangeName, "")
15 var err error
16 //获取connection
17 rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
18 rabbitmq.failOnErr(err, "订阅模式连接rabbitmq失败。")
19 //获取channel
20 rabbitmq.channel, err = rabbitmq.conn.Channel()
21 rabbitmq.failOnErr(err, "订阅模式获取channel失败")
22 return rabbitmq
23 }
24
25 //订阅模式发布消息
26 func (r *RabbitMQ) PublishSubscription(message string) {
27 //第一步,尝试连接交换机
28 err := r.channel.ExchangeDeclare(
29 r.ExChange,
30 "fanout", //这里一定要设计为"fanout"也就是广播类型。
31 true,
32 false,
33 false,
34 false,
35 nil,
36 )
37 r.failOnErr(err, "订阅模式发布方法中尝试连接交换机失败。")
38
39 //第二步,发送消息
40 err = r.channel.Publish(
41 r.ExChange,
42 "",
43 false,
44 false,
45 amqp.Publishing{
46 ContentType: "text/plain",
47 Body: []byte(message),
48 })
49 }
50
51 //订阅模式消费者
52 func (r *RabbitMQ) ConsumeSbuscription() {
53 //第一步,试探性创建交换机exchange
54 err := r.channel.ExchangeDeclare(
55 r.ExChange,
56 "fanout",
57 true,
58 false,
59 false,
60 false,
61 nil,
62 )
63 r.failOnErr(err, "订阅模式消费方法中创建交换机失败。")
64
65 //第二步,试探性创建队列queue
66 q, err := r.channel.QueueDeclare(
67 "", //随机生产队列名称
68 false,
69 false,
70 true,
71 false,
72 nil,
73 )
74 r.failOnErr(err, "订阅模式消费方法中创建创建队列失败。")
75
76 //第三步,绑定队列到交换机中
77 err = r.channel.QueueBind(
78 q.Name,
79 "", //在pub/sub模式下key要为空
80 r.ExChange,
81 false,
82 nil,
83 )
84
85 //第四步,消费消息
86 messages, err := r.channel.Consume(
87 q.Name,
88 "",
89 true,
90 false,
91 false,
92 false,
93 nil,
94 )
95
96 forever := make(chan bool)
97 go func() {
98 for d := range messages {
99 fmt.Printf("小杜同学写的订阅模式收到的消息:%s\n", d.Body)
100 }
101 }()
102
103 fmt.Println("订阅模式消费者已开启,退出请按 CTRL+C\n")
104 <-forever
105
106 }
发布者的代码:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 "strconv"
7 "time"
8 )
9
10 func main() {
11 rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
12 for i := 0; i < 100; i++ {
13 rabbitmq.PublishSubscription("订阅模式生产第" + strconv.Itoa(i) + "条数据")
14 fmt.Printf("订阅模式生产第" + strconv.Itoa(i) + "条数据\n")
15 time.Sleep(1 * time.Second)
16 }
17 }
建立两个一样的消费者的代码:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
7 rabbitmq.ConsumeSbuscription()
8 }
接着,依旧是把发布者和两个消费者run起来,会发现两个消费者都同时消费了发布者发布的消息了。也就是发布订阅模式也成功了。
也就是这样:
第四种模式:routing模式。
那么上面看到订阅模式,是可以做到一个消息由多个消费者消费的,那么可不可以在一个消息由多个消费者消费的基础上还指定由哪些消息者来消费呢?
这个自然也是可以的,而这个就是笔者现在要讲的路由模式(routing模式)。这也就是路由模式的主要特点了。
//这里相对比订阅模式就多了一个routingkey的设计,也是通过这个来指定消费者的。
//创建exchange的kind需要是"direct",不然就不是roting模式了
routing模式的代码:
1 package RabbitMq
2
3 import (
4 "github.com/streadway/amqp"
5 "log"
6 )
7
8 //rabbitmq的路由模式。
9 //主要特点不仅一个消息可以被多个消费者消费还可以由生产端指定消费者。
10 //这里相对比订阅模式就多了一个routingkey的设计,也是通过这个来指定消费者的。
11 //创建exchange的kind需要是"direct",不然就不是roting模式了。
12
13 //创建rabbitmq实例,这里有了routingkey为参数了。
14 func NewRabbitMqRouting(exchangeName string, routingKey string) *RabbitMQ {
15 rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
16 var err error
17 //获取connection
18 rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
19 rabbitmq.failOnErr(err, "创建rabbit的路由实例的时候连接出现问题")
20 //获取channel
21 rabbitmq.channel, err = rabbitmq.conn.Channel()
22 rabbitmq.failOnErr(err, "创建rabbitmq的路由实例时获取channel出错")
23 return rabbitmq
24 }
25
26 //路由模式,产生消息。
27 func (r *RabbitMQ) PublishRouting(message string) {
28 //第一步,尝试创建交换机,与pub/sub模式不同的是这里的kind需要是direct
29 err := r.channel.ExchangeDeclare(r.ExChange, "direct", true, false, false, false, nil)
30 r.failOnErr(err, "路由模式,尝试创建交换机失败")
31 //第二步,发送消息
32 err = r.channel.Publish(
33 r.ExChange,
34 r.Key,
35 false,
36 false,
37 amqp.Publishing{
38 ContentType: "text/plain",
39 Body: []byte(message),
40 })
41 }
42
43 //路由模式,消费消息。
44 func (r *RabbitMQ) ConsumerRouting() {
45 //第一步,尝试创建交换机,注意这里的交换机类型与发布订阅模式不同,这里的是direct
46 err := r.channel.ExchangeDeclare(
47 r.ExChange,
48 "direct",
49 true,
50 false,
51 false,
52 false,
53 nil,
54 )
55 r.failOnErr(err, "路由模式,创建交换机失败。")
56
57 //第二步,尝试创建队列,注意这里队列名称不用写,这样就会随机产生队列名称
58 q, err := r.channel.QueueDeclare(
59 "",
60 false,
61 false,
62 true,
63 false,
64 nil,
65 )
66 r.failOnErr(err, "路由模式,创建队列失败。")
67
68 //第三步,绑定队列到exchange中
69 err = r.channel.QueueBind(q.Name, r.Key, r.ExChange, false, nil)
70
71 //第四步,消费消息。
72 messages, err := r.channel.Consume(q.Name, "", true, false, false, false, nil)
73 forever := make(chan bool)
74 go func() {
75 for d := range messages {
76 log.Printf("小杜同学写的路由模式(routing模式)收到消息为:%s。\n", d.Body)
77 }
78 }()
79 <-forever
80 }
发布消息到routingKey为one,two,three的路由上:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 "strconv"
7 "time"
8 )
9
10 func main() {
11 rabbitmq1 := RabbitMq.NewRabbitMqRouting("duExchangeName", "one")
12 rabbitmq2 := RabbitMq.NewRabbitMqRouting("duExchangeName", "two")
13 rabbitmq3 := RabbitMq.NewRabbitMqRouting("duExchangeName", "three")
14 for i := 0; i < 100; i++ {
15 rabbitmq1.PublishRouting("路由模式one" + strconv.Itoa(i))
16 rabbitmq2.PublishRouting("路由模式two" + strconv.Itoa(i))
17 rabbitmq3.PublishRouting("路由模式three" + strconv.Itoa(i))
18 time.Sleep(1 * time.Second)
19 fmt.Printf("在路由模式下,routingKey为one,为two,为three的都分别生产了%d条消息\n", i)
20 }
21 }
来消费routingKey为one的消息:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 one := RabbitMq.NewRabbitMqRouting("duExchangeName", "one")
7 one.ConsumerRouting()
8 }
来消费routingKey为two的消息:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 two := RabbitMq.NewRabbitMqRouting("duExchangeName", "two")
7 two.ConsumerRouting()
8 }
接着run起来,会发现RoutingConsumer1只会消费routingKey为one的消息,RoutingConsumer2则只会消费routingKey为two的消息,而routingKey为three是没有被消费到的。
也就是这样:
第五种模式:topic模式。
接着就是要说的最后一个模式,topic模式了。这个模式也是在routing模式上进一步升华而来,通过上面的介绍我们知道routing模式最大的特点是可以从生产端来指定消费端来消费消息,是通过routingKey来指定的。那么我们可不可以通过一定的规则来指定呢?比如用通配符来的指定,当然这个也是可以的。这也就是topic模式最大的特点了。
topic模式也是在routing的模式上演化而来。不同的是我们以通配符的方式来指定我们的消费者。
来看一下topic模式的代码,注意这里创建exchange的kind则是"topic"了:
1 package RabbitMq
2
3 import (
4 "github.com/streadway/amqp"
5 "log"
6 )
7
8 //topic模式
9 //与routing模式不同的是这个exchange的kind是"topic"类型的。
10 //topic模式的特别是可以以通配符的形式来指定与之匹配的消费者。
11 //"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
12
13 //创建rabbitmq实例
14 func NewRabbitMqTopic(exchangeName string, routingKey string) *RabbitMQ {
15 rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
16 var err error
17 //获取connection
18 rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
19 rabbitmq.failOnErr(err, "创建rabbit的topic模式时候连接出现问题")
20 //获取channel
21 rabbitmq.channel, err = rabbitmq.conn.Channel()
22 rabbitmq.failOnErr(err, "创建rabbitmq的topic实例时获取channel出错")
23 return rabbitmq
24 }
25
26 //topic模式。生产者。
27 func (r *RabbitMQ) PublishTopic(message string) {
28 //第一步,尝试创建交换机,这里的kind的类型要改为topic
29 err := r.channel.ExchangeDeclare(
30 r.ExChange,
31 "topic",
32 true,
33 false,
34 false,
35 false,
36 nil,
37 )
38 r.failOnErr(err, "topic模式尝试创建exchange失败。")
39
40 //第二步,发送消息。
41 err = r.channel.Publish(
42 r.ExChange,
43 r.Key,
44 false,
45 false,
46 amqp.Publishing{
47 ContentType: "text/plain",
48 Body: []byte(message),
49 })
50 }
51
52 //topic模式。消费者。"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
53 func (r *RabbitMQ) ConsumerTopic() {
54 //第一步,创建交换机。这里的kind需要是“topic”类型。
55 err := r.channel.ExchangeDeclare(
56 r.ExChange,
57 "topic",
58 true, //这里需要是true
59 false,
60 false,
61 false,
62 nil,
63 )
64 r.failOnErr(err, "topic模式,消费者创建exchange失败。")
65
66 //第二步,创建队列。这里不用写队列名称。
67 q, err := r.channel.QueueDeclare(
68 "",
69 false,
70 false,
71 true,
72 false,
73 nil,
74 )
75 r.failOnErr(err, "topic模式,消费者创建queue失败。")
76
77 //第三步,将队列绑定到交换机里。
78 err = r.channel.QueueBind(
79 q.Name,
80 r.Key,
81 r.ExChange,
82 false,
83 nil,
84 )
85
86 //第四步,消费消息。
87 messages, err := r.channel.Consume(
88 q.Name,
89 "",
90 true,
91 false,
92 false,
93 false,
94 nil,
95 )
96
97 forever := make(chan bool)
98 go func() {
99 for d := range messages {
100 log.Printf("小杜同学写的topic模式收到了消息:%s。\n", d.Body)
101 }
102 }()
103 <-forever
104
105 }
生产端的代码:
1 package main
2
3 import (
4 "fmt"
5 "rabbitmq20181121/RabbitMq"
6 "strconv"
7 "time"
8 )
9
10 func main() {
11 one := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.Jay")
12 two := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Persident.XIDADA")
13 for i := 0; i < 100; i++ {
14 one.PublishTopic("小杜同学,topic模式,Jay," + strconv.Itoa(i))
15 two.PublishTopic("小杜同学,topic模式,All," + strconv.Itoa(i))
16 time.Sleep(1 * time.Second)
17 fmt.Printf("topic模式。这是小杜同学发布的消息%v \n", i)
18 }
19 }
消费端1的代码:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 jay := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.*")
7 jay.ConsumerTopic()
8 }
消费端2的代码:
1 package main
2
3 import "rabbitmq20181121/RabbitMq"
4
5 func main() {
6 all := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "#")
7 all.ConsumerTopic()
8 }
结果:发现Jay只会配置到Singer来的消息,也就是topic模式也是成功了的了。
也就是这样: