RabbitMQ的五种消息模型#

RabbitMQ支持以下五种消息模型,第六种RPC本质上是服务调用,所以不算做服务通信消息模型。


image


image


Hello World#


image


P(producer/ publisher):生产者,发送消息的服务

C(consumer):消费者,接收消息的服务

红色区域就是MQ中的Queue,可以把它理解成一个邮箱

  • 首先信件来了不强求必须马上马去拿
  • 其次,它是有最大容量的(受主机和磁盘的限制,是一个缓存区)
  • 允许多个消费者监听同一个队列,争抢消息


Worker模型#


image


Worker模型中也只有一个工作队列。但它是一种竞争消费模式。可以看到同一个队列我们绑定上了多个消费者,消费者争抢着消费消息,这可以有效的避免消息堆积

比如对于短信微服务集群来说就可以使用这种消息模型,来了请求大家抢着消费掉。

如何实现这种架构:对于上面的HelloWorld这其实就是相同的服务我们启动了多次罢了,自然就是这种架构。


订阅模型#


订阅模型借助一个新的概念:Exchange(交换机)实现,不同的订阅模型本质上是根据交换机(Exchange)的类型划分的。

订阅模型有三种

  1. Fanout(广播模型): 将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)。
  2. Direct(定向): 把消息发送给拥有指定Routing Key (路由键)的队列。
  3. Topic(通配符): 把消息传递给拥有 符合Routing Patten(路由模式)的队列。


订阅之Fanout模型


image


这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播。


订阅之Direct模型


image


P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

拥有不同的RoutingKey的消费者,会收到来自交换机的不同信息,而不是大家都使用同一个Routing Key 和广播模型区分开来。


订阅之Topic模型


image


类似于Direct模型。区别是Topic的Routing Key支持通配符。


### JAVA客户端


后台回复:rbmq 即可获取如下资料:

本文中涉及到的:Golang Case、Java Case以及erlang虚拟机rpm包、rabbitmq-server的rpm包等软件,直接通过yum安装即可。


Hello World#


在本小节中你可以重点看一下当你通过代码建立连接、创建channel、发送消息、接受消

息的同时,在web view中,都有何变化。

Send.java:


image


查看新创建的连接:


image


查看新创建的通道:


image


查看RabbitMQ中消息的传送状态:


image


Recv.java:

执行如下的消息接受者,可以收到发送过来的消息。


image


再去web view中观察RabbitMQ中消息的消费状态:


image


查看系统中连接的状态,由于我没有显示的关闭连接和channl,所以你能看到系统中有两个连接:


image


channel也还存在:


image


Worker模型#


image


本质上是相同的服务我们启动了多次罢了,自然就是这种架构。

补充点1:可以给队列添加一条属性,不再是队列把任务平均分配开给消费者。而是让消费者消费完了后,问队列要新的任务,这样能者多劳。



补充点2:接受者接受消息时,可以像下图这样配置手动ACK


image


订阅模型#


订阅模型借助一个新的概念:Exchange(交换机)实现,不同的订阅模型本质上是根据交换机(Exchange)的类型划分的。


订阅模型有三种

  1. Fanout(广播模型): 将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)。
  2. Direct(定向): 把消息发送给拥有指定Routing Key (路由键)的队列。
  3. Topic(通配符): 把消息传递给拥有 符合Routing Patten(路由模式)的队列。


订阅之Fanout模型


image


这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key ,或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播。

发送者:


image


去web view中查看状态:


image

运行接受者消费消息


image


订阅之Direct模型


image


和Fanout模型相似,发送方发送时:指定了routingkey如下


image


接收方接受时,也指定了routingkey如下:


image


订阅之Topic模型


image


topic模型和direc模型相似。


区别:交换机的类型:topic、routingkey:支持正则表达式


发送者:


image


接收者:


image


消息确认机制#


ACK机制


image


所谓的ACK确认机制:

自动ACK:消费者接收到消息后自动发送ACK给RabbitMQ。

手动ACK:我们手动控制消费者接收到并成功消息后发送ACK给RabbitMQ。

你可以看上图:如果使用自动ACK,当消息者将消息从channel中取出后,RabbitMQ随即将消息给删除。接着不幸的是,消费者没来得及处理消息就挂了。那也就意味着消息其实丢失了。

你可能会说:会不会存在重复消费的情况呢?这其实就不是MQ的问题了。你完全可以在你代码的逻辑层面上进行诸如去重、插入前先检查是否已存在等逻辑规避重复消费问题。

具体的实现方式可以参考上面的:JAVA客户端/Worker模型


持久化交换机


image


持久化队列


image


持久化消息


image


SpringAMQP#


SpringAMQP帮我们实现了--生产者确认机制,对于不可路由的消息交换机会告诉生产者,使其重新发送


环境搭建


image


配置文件:生产者


image


生产者使用AmqpTemplate模板发送消息


image


消费端不需要AmqpTemplate模板发送消息,因此不配置


image


virtual-host,和当前用户绑定的虚拟主机名, 这就Oralce里面,不同限权的用户可以看到的界面,拥有的能力是不用的,在RabbitMQ中,用户只能看到和它相关的虚拟主机下面的信息。


image


Golang客户端#

关注白日梦,后台回复:rbmq 即可获取如下资料:

本文中涉及到的:Golang Case、Java Case以及erlang虚拟机rpm包、rabbitmq-server的rpm包等软件,直接通过yum安装即可。


下载依赖包:



Hello World#


image


发送端:

//test/test"no access to this vhost"


image


Step2: 创建channel


image


Step3: 声明queue,后续往这个队列中发送消息


image


Step5: 发送消息


image


接受端:


消费者同样需要建立连接和channel、然后声明我们想消费的channel,和上面的生产者代码相同,就不粘出来了。

消费者从channel中接受消息:


image


处理消息:


image


Worker 模型#


image


同样的Worker模型和Simple模型也是相似的。无外乎是simple模型的消费者启动了多个实例。


消息分发策略:默认情况下RabbitMQ后将P生产的消息以round-robin的策略分发给C1、C2。

你也可以像下图这样设置一个相对公平的分发策略: 当消费者把消息处理完后MQ才会给他新的消息,这样可以实现能者多劳。


image


消息确认机制


什么是ACK机制,你可以往下翻看 Golang客户端/消息确认机制/ACK机制部分的描述。

如果手动ACK如下:


image


当我们像上面这样设置手动ACK之后,可以确保如果消费者没处理完消息就挂了,MQ中的消息不会丢失。


但是如果这时MQ挂了,消息同样会丢失。

为了避免这种情况,可以将设置将MQ中的消息也持久化


image


订阅模型#


订阅模型借助一个新的概念:Exchange(交换机)实现,不同的订阅模型本质上是根据交换机(Exchange)的类型划分的。


订阅模型有三种

  1. Fanout(广播模型): 将消息发送给绑定给交换机的所有队列(因为他们使用的是同一个RoutingKey)。
  2. Direct(定向): 把消息发送给拥有指定Routing Key (路由键)的队列。
  3. Topic(通配符): 把消息传递给拥有 符合Routing Patten(路由模式)的队列。


订阅模型之Fanout模型


image


这个模型的特点就是它在发送消息的时候,并没有指明Rounting Key , 或者说他指定了Routing Key,但是所有的消费者都知道,大家都能接收到消息,就像听广播。

生产者:在获取channel之后紧接着创建一个交换机,交换机的类型为 fanout 扇出。

注意,fanout对应的routingkey(路由key为空)


image


消费者:需要消费者获取到channel后也要声明交换机。消费者的queue无名称,queue没有routingkey。注意交换机的名字别写错。


image


订阅模型之Direct模型


image


生产者:和Fanout类似,注意交换机的名称为direct 以及添加 特定的routingkey


image


消费者:


image


订阅模型之Topic模型


image


和Direct模型相似,不同点:type为topic、并别routingkey支持正则表达式。

详细代码不再重复贴了。可以自行领取源码学习。


消息确认机制#


ACK机制


image


所谓的ACK确认机制:

自动ACK:消费者接收到消息后自动发送ACK给RabbitMQ。

手动ACK:我们手动控制消费者接收到并成功消息后发送ACK给RabbitMQ。

你可以看上图:如果使用自动ACK,当消息者将消息从channel中取出后,RabbitMQ随即将消息给删除。接着不幸的是,消费者没来得及处理消息就挂了。那也就意味着消息其实丢失了。

你可能会说:会不会存在重复消费的情况呢?这其实就不是MQ的问题了。你完全可以在你代码的逻辑层面上进行诸如去重、插入前先检查是否已存在等逻辑规避重复消费问题。

具体的实现方式可以参考上面的Golang或JAVA客户端的Worker模型部分。


持久化交换机


image


持久化队列


image


持久化消息


image


参考: