在工作中发现,有些时候消息因为某些原因在消费一次后,如果消息失败,这时候不ack,消息就回一直重回队列首部,造成消息拥堵。


一:消息重试机制

如是有了如下思路:


消息进入队列前,header默认有参数 retry_num=0 表示尝试次数;

消费者在消费时候的,如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路;

当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);

这样消息就又回到原始消费队列尾部了;

最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;


db日志可以记录交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)

这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)

图解:


golang实现rabbitmq消息队列消费失败尝试重试_golang


golang实现rabbitmq消息队列消费失败尝试重试_消息处理_02


附上代码

git clone https://github.com/sunlongv520/golang-rabbitmq


send.go 消费者

golang实现rabbitmq消息队列消费失败尝试重试_php_03golang实现rabbitmq消息队列消费失败尝试重试_rabbitmq消息尝试_04

View Code



recv.go消费者

golang实现rabbitmq消息队列消费失败尝试重试_php_03golang实现rabbitmq消息队列消费失败尝试重试_rabbitmq消息尝试_04

View Code



utils/rabbitmq包

golang实现rabbitmq消息队列消费失败尝试重试_php_03golang实现rabbitmq消息队列消费失败尝试重试_rabbitmq消息尝试_04

View Code


二,延时队列

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:


实现方案

  1. 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
  2. pcntl_alarm为进程设置一个闹钟信号
  3. swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)
  4. rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现


golang实现rabbitmq消息队列消费失败尝试重试_golang使用rabbitmq_09


golang实现rabbitmq消息队列消费失败尝试重试_消息处理_10



生产者:

golang实现rabbitmq消息队列消费失败尝试重试_php_03golang实现rabbitmq消息队列消费失败尝试重试_rabbitmq消息尝试_04

View Code

消费者:

golang实现rabbitmq消息队列消费失败尝试重试_php_03golang实现rabbitmq消息队列消费失败尝试重试_rabbitmq消息尝试_04

View Code

延时队列实现和上面所讲的消息重试有异曲同工之处,都是利用了延时时间和死信队列这一特性实现


其它:该rabbitmq包实现中包含了,rabbitmq断线重连,有兴趣的同学可以看看


  (重试和重连接是两个概念)

  重连接 :rabbitmq链接失败导致任务失败,此时要等待rabbitmq服务器恢复正常后才能再次启动协程处理任务

  重试:rabbitmq服务正常,消息消费进程也正常,但是消息处理失败。尝试多次消费消息后还是失败就ack消息,在整个重试过程中不会阻塞消费



golang监听rabbitmq消息队列任务断线自动重连接: