NSQ是一个内存+磁盘型的消息中间件,它使用push流的方式源源不断把消息推送给客户端,并且为了使服务端更加简单、高效,NSQ并不提供有序的消息队列。因此,如果对消息有顺序要求,只有两种解决办法:
- 改用类似kafka之类的有序消息队列;
- 生产者和消费者达成一个协议,比如增加一个序列号或者时间戳来表示顺序。
本文要介绍的是第二种方法,下面就来简单用golang实现一个有序的NSQ顺序消息队列。
首先,使用go-nsq客户端写一个main函数,并且连接上nsqlookupd:
func main() {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second
customer, err := nsq.NewConsumer("test2", "t1", cfg)
if err != nil {
log.Panic(err)
}
customer.AddHandler(&Customter{})
customer.SetLogger(nil, 0)
if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
log.Panic(err)
}
select {}
}
test2t1
sort.Interface
type MyMessage struct {
Name string `json:"name"`
CreateAt int64 `json:"create_at"`
}
type MyMessageList []MyMessage
func (list MyMessageList) Len() int {
return len(list)
}
func (list MyMessageList) Less(i, j int) bool {
return list[i].CreateAt < list[j].CreateAt
}
func (list MyMessageList) Swap(i, j int) {
list[i], list[j] = list[j], list[i]
}
sort.InterfaceLenLessSwapMyMessageListMyMessagecreateAt
CustomerHandleMessage
type Customter struct{}
// 存放消息结构体
var messageBuffer []MyMessage
func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
var msg MyMessage
err := json.Unmarshal(nsqMsg.Body, &msg)
if err != nil {
log.Panicln(err)
}
messageBuffer = append(messageBuffer, msg)
if len(messageBuffer) == 3 {
sort.Sort(MyMessageList(messageBuffer))
// do something for the ordered message buffer
fmt.Println(messageBuffer)
// reset buffer
messageBuffer = messageBuffer[:0]
}
return nil
}
sort.Sort
我们来简单实验一下,首先启动客户端(nsqd和nsqlookupd的启动参考官网文档,这里不介绍):
$ ~/code/golang/src/nsq-test > go run main.go
curl
curl -d '{"name":"aaa","create_at":1560262051}' http://127.0.0.1:4151/pub\?topic\=test2
curl -d '{"name":"bbb","create_at":1560262060}' http://127.0.0.1:4151/pub\?topic\=test2
curl -d '{"name":"ccc","create_at":1560262053}' http://127.0.0.1:4151/pub\?topic\=test2
aaacccbbb
最后,我们回到启动客户端的终端查看,会发现打印出正确的结果:
# aaa ccc bbb 顺序是正确的
[{aaa 1560262051} {ccc 1560262053} {bbb 1560262060}]
到此就大功告成了~
完整代码在此:
package main
import (
"encoding/json"
"fmt"
"github.com/nsqio/go-nsq"
"log"
"sort"
"time"
)
type MyMessage struct {
Name string `json:"name"`
CreateAt int64 `json:"create_at"`
}
type MyMessageList []MyMessage
func (list MyMessageList) Len() int {
return len(list)
}
func (list MyMessageList) Less(i, j int) bool {
return list[i].CreateAt < list[j].CreateAt
}
func (list MyMessageList) Swap(i, j int) {
list[i], list[j] = list[j], list[i]
}
type Customter struct{}
var messageBuffer []MyMessage
func (c *Customter) HandleMessage(nsqMsg *nsq.Message) error {
var msg MyMessage
err := json.Unmarshal(nsqMsg.Body, &msg)
if err != nil {
log.Panicln(err)
}
messageBuffer = append(messageBuffer, msg)
if len(messageBuffer) == 3 {
sort.Sort(MyMessageList(messageBuffer))
// do something for the ordered message buffer
fmt.Println(messageBuffer)
// reset buffer
messageBuffer = messageBuffer[:0]
}
return nil
}
func main() {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second
customer, err := nsq.NewConsumer("test2", "t1", cfg)
if err != nil {
log.Panic(err)
}
customer.AddHandler(&Customter{})
customer.SetLogger(nil, 0)
if err := customer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
log.Panic(err)
}
select {}
}