作为一个常识,当我们在处理一些长连接的业务时,客户端往往需要负责断线重连。比如,在我们的一个系统中,是这么处理 RabbitMQ 的断线重连的:

  1. func (c *Consumer) Start() error {

  2. if err := c.Run(); err != nil {

  3. return err

  4. }

  5. go c.ReConnect()


  6. return nil

  7. }


  8. func (c *Consumer) Run() error {

  9. var err error

  10. if c.conn, err = amqp.Dial(c.addr); err != nil {

  11. return err

  12. }


  13. if c.channel, err = c.conn.Channel(); err != nil {

  14. c.conn.Close()

  15. return err

  16. }


  17. if _, err = c.channel.QueueDeclare(

  18. c.queue, // name

  19. false, // durable

  20. c.autoDelete, // delete when usused

  21. false, // exclusive

  22. false, // no-wait

  23. nil, // arguments

  24. ); err != nil {

  25. c.channel.Close()

  26. c.conn.Close()

  27. return err

  28. }


  29. var delivery <-chan amqp.Delivery

  30. if delivery, err = c.channel.Consume(

  31. c.queue, // queue

  32. c.consumerTag, // consumer

  33. false, // auto-ack

  34. false, // exclusive

  35. false, // no-local

  36. false, // no-wait

  37. nil, // args

  38. ); err != nil {

  39. c.channel.Close()

  40. c.conn.Close()

  41. return err

  42. }


  43. go c.Handle(delivery)


  44. c.connNotify = c.conn.NotifyClose(make(chan *amqp.Error))


  45. return err

  46. }


  47. func (c *Consumer) ReConnect() {

  48. for {

  49. select {

  50. case err := <-c.connNotify:

  51. if err != nil {

  52. log.Error("rabbitmq consumer - connection NotifyClose: ", err)

  53. }

  54. case <-c.quit:

  55. return

  56. }


  57. // backstop

  58. if !c.conn.IsClosed() {

  59. // close message delivery

  60. if err := c.channel.Cancel(c.consumerTag, true); err != nil {

  61. log.Error("rabbitmq consumer - channel cancel failed: ", err)

  62. }


  63. if err := c.conn.Close(); err != nil {

  64. log.Error("rabbitmq consumer - channel cancel failed: ", err)

  65. }

  66. }


  67. quit:

  68. for {

  69. select {

  70. case <-c.quit:

  71. return

  72. default:

  73. log.Error("rabbitmq consumer - reconnect")


  74. if err := c.Run(); err != nil {

  75. log.Error("rabbitmq consumer - failCheck: ", err)


  76. // sleep 5s reconnect

  77. time.Sleep(time.Second * 5)

  78. continue

  79. }


  80. break quit

  81. }

  82. }

  83. }

  84. }

conn.NotifyCloseerror

那么问题来了,RabbitMQ Server 为什么会关闭 channel 呢?官方列出了几个原因:

  • 406 PRECONDITION_FAILED

  • 403 ACCESS_REFUSED

  • 404 NOT_FOUND

  • 405 RESOURCE_LOCKED

406PRECONDITION_FAILEDdelivery.Ack(multiplebool)multiple=falsemultiple=true

举个例子:

  1. func (c *Consumer) Handle(delivery <-chan amqp.Delivery) {

  2. for d := range delivery {

  3. if err := c.handler(delivery.Body); err == nil {

  4. d.Ack(true)

  5. } else {

  6. // 重新入队,否则未确认的消息会持续占用内存

  7. d.Reject(true)

  8. }

  9. }

  10. }

handlerd.Ack(false)d.Ack(true)
  1. func (c *Consumer) Handle(delivery <-chan amqp.Delivery) {

  2. for d := range delivery {

  3. go func(delivery amqp.Delivery) {

  4. if err := c.handler(delivery.Body); err == nil {

  5. delivery.Ack(true)

  6. } else {

  7. // 重新入队,否则未确认的消息会持续占用内存

  8. delivery.Reject(true)

  9. }

  10. }(d)

  11. }

  12. }

delivery.Ack(true)delivery.Ack(true)

A message MUST not be acknowledged more than once. The receiving peer MUST validate that a non-zero delivery-tag refers to a delivered message, and raise a channel exception if this is not the case. ...

error
delivery.Ack(true)delivery.Ack(false)NotifyClose
  1. go c.Handle(delivery)


  2. c.connNotify = c.conn.NotifyClose(make(chan *amqp.Error))

  3. c.channelNotify = c.channel.NotifyClose(make(chan *amqp.Error))


  4. return err

  5. }


  6. func (c *Consumer) ReConnect() {

  7. for {

  8. select {

  9. case err := <-c.connNotify:

  10. if err != nil {

  11. log.Error("rabbitmq consumer - connection NotifyClose: ", err)

  12. }

  13. case err := <-c.channelNotify:

  14. if err != nil {

  15. log.Error("rabbitmq consumer - channel NotifyClose: ", err)

  16. }

  17. case <-c.quit:

  18. return

  19. }

这下似乎可以高枕无忧了吧,呵呵。。。

在测试环境跑了一阵子发现,服务虽然可以正常触发重连,但是老的连接却释放的很慢(10min+):

ss
ss
close()shutdown
  1. goroutine 45 [chan send]:

  2. foo/vendor/github.com/streadway/amqp.(*Channel).shutdown.func1()

  3. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/channel.go:104 +0x10a

  4. sync.(*Once).Do(0xc00025a000, 0xc000057d78)

  5. /usr/local/go/src/sync/once.go:44 +0xb3

  6. foo/vendor/github.com/streadway/amqp.(*Channel).shutdown(0xc00025a000, 0xc0000a3060)

  7. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/channel.go:93 +0x63

  8. foo/vendor/github.com/streadway/amqp.(*Connection).shutdown.func1()

  9. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:419 +0x1f8

  10. sync.(*Once).Do(0xc00016b180, 0xc000057eb8)

  11. /usr/local/go/src/sync/once.go:44 +0xb3

  12. foo/vendor/github.com/streadway/amqp.(*Connection).shutdown(0xc00016b180, 0xc0000a3060)

  13. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:389 +0x6c

  14. foo/vendor/github.com/streadway/amqp.(*Connection).reader(0xc00016b180, 0xaea4e0, 0xc00000e160)

  15. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:524 +0x196

  16. created by foo/vendor/github.com/streadway/amqp.Open

  17. /root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:233 +0x25f

shutdownclose()
  1. for _, ch := range c.channels {

  2. ch.shutdown(err)

  3. }


  4. c.conn.Close()

<-c.connNotifyshutdown()

你可能会有疑问:既然连接已经挂掉了,心跳会收到 RST 的啊。是的,没错。这就要介绍下我的网络环境了:

  1. server --> firewall --> rabbitmq-server

i/o timeouti/o timeout

也许有小伙伴会说,我们没有 firewall,所以不会存在这个问题。too young, too naive.

ss
NotifyCloseNotifyCloseNotifyClose
  1. select {

  2. case err := <-c.connNotify:

  3. if err != nil {

  4. log.Error("rabbitmq consumer - connection NotifyClose: ", err)

  5. }

  6. case err := <-c.channelNotify:

  7. if err != nil {

  8. log.Error("rabbitmq consumer - channel NotifyClose: ", err)

  9. }

  10. case <-c.quit:

  11. return

  12. }


  13. // backstop

  14. if !c.conn.IsClosed() {

  15. // 关闭 SubMsg message delivery

  16. if err := c.channel.Cancel(c.consumerTag, true); err != nil {

  17. log.Error("rabbitmq consumer - channel cancel failed: ", err)

  18. }


  19. if err := c.conn.Close(); err != nil {

  20. log.Error("rabbitmq consumer - channel cancel failed: ", err)

  21. }

  22. }


  23. // IMPORTANT: 必须清空 Notify,否则死连接不会释放

  24. for err := range c.channelNotify {

  25. println(err)

  26. }

  27. for err := range c.connNotify {

  28. println(err)

  29. }

其实对于 AMQP 的重连,社区已经讨论了很多。AMQP 的主要开发者认为重连是业务的事情,所以不愿意在 AMQP 底层中加入更多自动重连的逻辑。但是从系统设计的角度来看,到底是要业务优先,还是功能优先,似乎一直都是个问题。