作为一个常识,当我们在处理一些长连接的业务时,客户端往往需要负责断线重连。比如,在我们的一个系统中,是这么处理 RabbitMQ 的断线重连的:
func (c *Consumer) Start() error {
if err := c.Run(); err != nil {
return err
}
go c.ReConnect()
return nil
}
func (c *Consumer) Run() error {
var err error
if c.conn, err = amqp.Dial(c.addr); err != nil {
return err
}
if c.channel, err = c.conn.Channel(); err != nil {
c.conn.Close()
return err
}
if _, err = c.channel.QueueDeclare(
c.queue, // name
false, // durable
c.autoDelete, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
c.channel.Close()
c.conn.Close()
return err
}
var delivery <-chan amqp.Delivery
if delivery, err = c.channel.Consume(
c.queue, // queue
c.consumerTag, // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
); err != nil {
c.channel.Close()
c.conn.Close()
return err
}
go c.Handle(delivery)
c.connNotify = c.conn.NotifyClose(make(chan *amqp.Error))
return err
}
func (c *Consumer) ReConnect() {
for {
select {
case err := <-c.connNotify:
if err != nil {
log.Error("rabbitmq consumer - connection NotifyClose: ", err)
}
case <-c.quit:
return
}
// backstop
if !c.conn.IsClosed() {
// close message delivery
if err := c.channel.Cancel(c.consumerTag, true); err != nil {
log.Error("rabbitmq consumer - channel cancel failed: ", err)
}
if err := c.conn.Close(); err != nil {
log.Error("rabbitmq consumer - channel cancel failed: ", err)
}
}
quit:
for {
select {
case <-c.quit:
return
default:
log.Error("rabbitmq consumer - reconnect")
if err := c.Run(); err != nil {
log.Error("rabbitmq consumer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
break quit
}
}
}
}
conn.NotifyCloseerror那么问题来了,RabbitMQ Server 为什么会关闭 channel 呢?官方列出了几个原因:
406 PRECONDITION_FAILED
403 ACCESS_REFUSED
404 NOT_FOUND
405 RESOURCE_LOCKED
406PRECONDITION_FAILEDdelivery.Ack(multiplebool)multiple=falsemultiple=true举个例子:
func (c *Consumer) Handle(delivery <-chan amqp.Delivery) {
for d := range delivery {
if err := c.handler(delivery.Body); err == nil {
d.Ack(true)
} else {
// 重新入队,否则未确认的消息会持续占用内存
d.Reject(true)
}
}
}
handlerd.Ack(false)d.Ack(true)
func (c *Consumer) Handle(delivery <-chan amqp.Delivery) {
for d := range delivery {
go func(delivery amqp.Delivery) {
if err := c.handler(delivery.Body); err == nil {
delivery.Ack(true)
} else {
// 重新入队,否则未确认的消息会持续占用内存
delivery.Reject(true)
}
}(d)
}
}
delivery.Ack(true)delivery.Ack(true)A message MUST not be acknowledged more than once. The receiving peer MUST validate that a non-zero delivery-tag refers to a delivered message, and raise a channel exception if this is not the case. ...
errordelivery.Ack(true)delivery.Ack(false)NotifyClose
go c.Handle(delivery)
c.connNotify = c.conn.NotifyClose(make(chan *amqp.Error))
c.channelNotify = c.channel.NotifyClose(make(chan *amqp.Error))
return err
}
func (c *Consumer) ReConnect() {
for {
select {
case err := <-c.connNotify:
if err != nil {
log.Error("rabbitmq consumer - connection NotifyClose: ", err)
}
case err := <-c.channelNotify:
if err != nil {
log.Error("rabbitmq consumer - channel NotifyClose: ", err)
}
case <-c.quit:
return
}
这下似乎可以高枕无忧了吧,呵呵。。。
在测试环境跑了一阵子发现,服务虽然可以正常触发重连,但是老的连接却释放的很慢(10min+):
ssssclose()shutdown
goroutine 45 [chan send]:
foo/vendor/github.com/streadway/amqp.(*Channel).shutdown.func1()
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/channel.go:104 +0x10a
sync.(*Once).Do(0xc00025a000, 0xc000057d78)
/usr/local/go/src/sync/once.go:44 +0xb3
foo/vendor/github.com/streadway/amqp.(*Channel).shutdown(0xc00025a000, 0xc0000a3060)
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/channel.go:93 +0x63
foo/vendor/github.com/streadway/amqp.(*Connection).shutdown.func1()
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:419 +0x1f8
sync.(*Once).Do(0xc00016b180, 0xc000057eb8)
/usr/local/go/src/sync/once.go:44 +0xb3
foo/vendor/github.com/streadway/amqp.(*Connection).shutdown(0xc00016b180, 0xc0000a3060)
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:389 +0x6c
foo/vendor/github.com/streadway/amqp.(*Connection).reader(0xc00016b180, 0xaea4e0, 0xc00000e160)
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:524 +0x196
created by foo/vendor/github.com/streadway/amqp.Open
/root/gofourge/src/foo/vendor/github.com/streadway/amqp/connection.go:233 +0x25f
shutdownclose()
for _, ch := range c.channels {
ch.shutdown(err)
}
c.conn.Close()
<-c.connNotifyshutdown()你可能会有疑问:既然连接已经挂掉了,心跳会收到 RST 的啊。是的,没错。这就要介绍下我的网络环境了:
server --> firewall --> rabbitmq-server
i/o timeouti/o timeout也许有小伙伴会说,我们没有 firewall,所以不会存在这个问题。too young, too naive.
ssNotifyCloseNotifyCloseNotifyClose
select {
case err := <-c.connNotify:
if err != nil {
log.Error("rabbitmq consumer - connection NotifyClose: ", err)
}
case err := <-c.channelNotify:
if err != nil {
log.Error("rabbitmq consumer - channel NotifyClose: ", err)
}
case <-c.quit:
return
}
// backstop
if !c.conn.IsClosed() {
// 关闭 SubMsg message delivery
if err := c.channel.Cancel(c.consumerTag, true); err != nil {
log.Error("rabbitmq consumer - channel cancel failed: ", err)
}
if err := c.conn.Close(); err != nil {
log.Error("rabbitmq consumer - channel cancel failed: ", err)
}
}
// IMPORTANT: 必须清空 Notify,否则死连接不会释放
for err := range c.channelNotify {
println(err)
}
for err := range c.connNotify {
println(err)
}
其实对于 AMQP 的重连,社区已经讨论了很多。AMQP 的主要开发者认为重连是业务的事情,所以不愿意在 AMQP 底层中加入更多自动重连的逻辑。但是从系统设计的角度来看,到底是要业务优先,还是功能优先,似乎一直都是个问题。