前言:
Golang社区里有个曾经很火的消息队列nsq,现在貌似不热了。这边有很多的业务都依赖于nsq,python和golang项目还好说,直接TCP长连接挂上去。php是通过nsqd提供的http接口来投递数据。nsq有两种投递发布功能,一种是常规的消息推送(PublishMsg),另一个是延迟消息推送(DeferredPublish),比如我们可以指定一个任务在多久之后才可以被被消费。
说正题,在用nsq的中间遇到了一个问题,就是并发安全的问题。根据我的各方面测试,普通的PublishMsg并发推送是没有问题的,哪怕多协程粗暴的共用一个连接。但是延迟推送(DeferredPublish)是存在延迟效果失效的问题。不管是多协程共用一个连接,还是一个协程绑定一个连接,延迟消息都存在问题的。当然只是延迟时间失效,但是消息不会丢失。
2018-11-02 更新
问题已经修复了。
我分析的源代码是nsq的relase版本,但是线上延迟消息异常的nsq版本是rc版本的,不知道哪个神人配置的。。。 至此,nsq的延迟消息问题解决了。。。更新nsq服务端的版本就OK了。 下面的文章其实看不看都可以了….

测试
// xiaorui.cc
import (
"fmt"
"sync"
"time"
"github.com/bitly/go-nsq"
)
var lock sync.Mutex
var incrlock sync.Mutex
var counter int
func main() {
config := nsq.NewConfig()
q, _ := nsq.NewConsumer("aa", "ch", config)
q.AddHandler(nsq.HandlerFunc(handle))
q.ConnectToNSQD("xxx")
delayTS := time.Duration(20) * time.Second
for index := 0; index < 20; index++ {
val := fmt.Sprintf("hello id: %d", index)
go func(idx string) {
conf := nsq.NewConfig()
conn, _ := nsq.NewProducer("xxxx", conf)
conn.DeferredPublish("aa", delayTS, []byte(idx))
// conn.Close()
}(val)
// ok
// go tool.Nsq.PublishMsg("aa", val)
}
go func() {
for {
fmt.Println(counter)
time.Sleep(1 * time.Second)
}
}()
select {}
}
func handle(msg *nsq.Message) error {
incrlock.Lock()
defer incrlock.Unlock()
counter++
fmt.Println("recv new msg: ", string(msg.Body), time.Now().String())
return nil
}
下面是tcpdump的抓包,我们能看到确实多个客户端连接,包里面能看到 DPUB的命令及后面的时间戳参数。但问题来了,id:.17这个任务没有了延迟效果, 立马就可以收到。
// xiaorui.cc
14:00:32.618457 IP 192.168.116.205.53061 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
0x0000: 4500 0048 0000 4000 4006 9dc5 c0a8 74cd E..H..@.@.....t.
0x0010: 2f68 380d cf45 1036 f816 c627 7e33 bda7 /h8..E.6...'~3..
0x0020: 5018 2000 a164 0000 4450 5542 2061 6120 P....d..DPUB.aa.
0x0030: 3230 3030 300a 0000 000e 2268 656c 6c6f 20000....."hello
0x0040: 2069 643a 2031 3122 .id:.11"
14:00:32.618458 IP 192.168.116.205.53064 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
0x0000: 4500 0048 0000 4000 4006 9dc5 c0a8 74cd E..H..@.@.....t.
0x0010: 2f68 380d cf48 1036 afe5 ff88 6441 69ec /h8..H.6....dAi.
0x0020: 5018 2000 17df 0000 4450 5542 2061 6120 P.......DPUB.aa.
0x0030: 3230 3030 300a 0000 000e 2268 656c 6c6f 20000....."hello
0x0040: 2069 643a 2031 3722 .id:.17"
14:00:32.618496 IP 192.168.116.205.53059 > xxxx: Flags [P.], seq 339:370, ack 281, win 8192, length 31
0x0000: 4500 0047 0000 4000 4006 9dc6 c0a8 74cd E..G..@.@.....t.
0x0010: 2f68 380d cf43 1036 28a5 719a 065d 678b /h8..C.6(.q..]g.
0x0020: 5018 2000 a275 0000 4450 5542 2061 6120 P....u..DPUB.aa.
0x0030: 3230 3030 300a 0000 000d 2268 656c 6c6f 20000....."hello
0x0040: 2069 643a 2038 22 .id:.8"
...
...
...
但还是出现了问题…
分析go nsq源码
遇到这么诡异的问题,git issue里没人说明。只能看go nsq客户端的源码了, 我们发现其实他的逻辑很简单,每个nsq客户端连接都会go一个router()的协程,这个协程是可以保证消息推送的原子性。 Publish和DeferredPublish调用的都是sendCommandAsync函数, sendCommandAsync会把命令cmd结构发到一个channel里,然后由客户端自己的router协程去消费。另外可以注意到,WriteCommand往socket write数据的时候,也会尝试拿本客户端相关的锁。也就是说,go nsq客户端看起来从两个方面保证了协程 | 线程安全。
// xiaorui.cc
func (w *Producer) Publish(topic string, body []byte) error {
return w.sendCommand(Publish(topic, body))
}
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
return w.sendCommand(DeferredPublish(topic, delay, body))
}
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
...
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
func (w *Producer) connect() error {
w.guard.Lock()
defer w.guard.Unlock()
...
go w.router()
return nil
}
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
...
}
func (c *Conn) WriteCommand(cmd *Command) error {
c.mtx.Lock()
fmt.Printf("%v cccc %v \n", cmd, string(cmd.Body))
_, err := cmd.WriteTo(c)
if err != nil {
goto exit
}
err = c.Flush()
exit:
c.mtx.Unlock()
if err != nil {
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, err)
}
return err
}
// xiaorui.cc
很奇怪在多协程多连接下存在异常,go nsq client源码看起来又是合理的,没有发现奇怪的逻辑。 我尝试在所有的DeferredPublish逻辑前后共用一把全局锁,居然可以了。那么说明,貌似存在并发投递下的消息被串改写问题。但接下来我对这两个情况进行tcpdump抓包对比,发现很有意思的事情。加锁,所有的延迟任务是ok的,不加锁,有一些任务没有延迟效果,但是消息不丢。但通过抓包看到他们的body内容是一样的,没有什么字段被丢失或者覆盖。 记得以前给go-nsq和nsq都发过issue,询问他们是否可以加dpub和DeferredPublish的multi批量方法,最后没回复我,不知道有没有一些关联。
nsqd server 延迟消息源码
我们再来看看nsqd延迟消息的实现原理。nsq对于延迟消息没有用高大上的手段,直接存到本地内存的优先级队列里,因为没有持久化,所以逻辑相对简单。 当nsqd节点发生异常crash,那么数据自然丢失。下面nsqd的源码描述的很清楚,对于普通消息来说,当memoryMsgChan满了后会落盘持久化。对于延迟消息来说,直接就是内存里的优先级队列,没有做什么持久化方案。疑惑的是,这个事情在nsq的文档里没有标注。
// xiaorui.cc
// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
if c.Exiting() {
return errors.New("exiting")
}
err := c.put(m)
if err != nil {
return err
}
atomic.AddUint64(&c.messageCount, 1)
return nil
}
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
atomic.AddUint64(&c.messageCount, 1)
c.StartDeferredTimeout(msg, timeout)
}
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
return err
}
c.addToDeferredPQ(item)
return nil
}
需要关注的是nsqd对于存放延迟消息的优先级队列操作都有加锁。看起来也没有问题。
// xiaorui.cc
func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
c.deferredMutex.Lock()
heap.Push(&c.deferredPQ, item)
c.deferredMutex.Unlock()
}
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
c.deferredMutex.Lock()
id := item.Value.(*Message).ID
_, ok := c.deferredMessages[id]
...
c.deferredMutex.Unlock()
return nil
}
上面有说过,只需要在go-nsq客户端发送延迟消息时,统一加一把锁,让操作串行化就可以解决延迟失效的问题。问题又来了,在集群环境中又出现该问题了, 简单说单机测试脚本是没有问题,但并发执行脚本就又有出现延迟失效的问题。我个人怀疑是nsq的问题,但是nsqd服务端没有任何错误日志。
// xiaorui.cc
go func(idx string) {
conf := nsq.NewConfig()
conn, _ := nsq.NewProducer("ip:port", conf)
lock.Lock() // 加锁
conn.DeferredPublish("aa", delayTS, []byte(idx))
lock.Unlock()
}(val)
最后的解决方法
个人能力有限,搞不定这nsq问题。索性直接抛弃nsq延迟消息的方案,使用redis zset自己实现一个延迟消息机制。 redis最少比nsq的持久化机制靠谱一些,另外nsq延迟消息不能主动去删除,只能等待消费ack删除。而redis是可以直接zrem删除的。
总结:
现在不确定是go nsq客户端还是nsqd server的问题,单纯看源代码是没什么问题。其实确定是不是客户端的问题,可以用其他原因的客户端,尴尬的是本想用python nsq client测试一波,但pynsq太难用了。好吧,知道怎么该解决问题的朋友可以联系我,也让我学习学习。