1.简介
- 1.NSQ是Go语言编写的,开源的内存分布式消息队列中间件
- 2.可以大规模的处理每天数十亿计级别的消息
- 3.分布式和去中心化拓扑结构,无单点故障
2.应用场景
1.异步处理,把非关键流程异步化,提高系统的响应时间和健壮性
2.应用解耦
3.流量削峰
3.NSQ组件介绍
1.nsqd
nsqdTopicTopicChannel
2.nsqlookupd
- 负责维护所有nsqd的状态,提供服务发现的进程(是管理拓扑信息并提供最终一致性发现服务的守护程序)
3.nsqadmin
- 是一个web管理平台,实时监控集群以及执行各种管理任务
4.NSQ架构介绍
1.Topic
order_queue
2.Channel
channel
5.NSQ特性
- 1.消息默认不持久化,可以通过配置修改为持久化
- 2.每条消息至少传递一次
- 3.消息不保证有序
6.NSQ接收和发送消息的流程
1.nsqd
input changoroutineoutput chanTopic/Channel
6.NSQ搭建
1.下载地址
https://github.com/nsqio/nsq/releases
2.以windows为例
nsqd.exe --lookupd-tcp-address=127.0.0.1:4160
nsqadmin.exe --lookupd-http-address=127.0.0.1:4161
3.代码示例
go get github.com/nsqio/go-nsq
package main
import (
"bufio"
"fmt"
"github.com/nsqio/go-nsq"
"os"
"strings"
)
var producer *nsq.Producer
func initProducer(str string) error {
var err error
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
return err
}
return nil
}
func main() {
nsqAddr := "127.0.0.1:4150"
err := initProducer(nsqAddr)
if err != nil {
fmt.Println("init producer failed:", err)
return
}
reader := bufio.NewReader(os.Stdin)
for {
// 接收命令行输入
fmt.Println("请输入消息内容:>>>")
data, err := reader.ReadString('\n')
if err != nil {
fmt.Println("read string failed:", err)
continue
}
data = strings.TrimSpace(data)
if data == "stop" {
break
}
if data == "" {
fmt.Println("请输入有效内容...")
continue
}
// 向 Topic 写入消息
err = producer.Publish("user", []byte(data))
if err != nil {
fmt.Println("publish message failed:", err)
continue
}
fmt.Printf("publish data: %s success\n", data)
}
}
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"os"
"os/signal"
"syscall"
"time"
)
// 消费者
type Consumer struct {
}
// 消息处理
func (*Consumer) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive >>>", msg.NSQDAddress, " message:", string(msg.Body))
return nil
}
// 初始化消费者
func initConsumer(topic, channel, address string) error {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second // 设置服务轮询时间
consume, err := nsq.NewConsumer(topic, channel, config) // 新建一个消费者
if err != nil {
return err
}
consumer := &Consumer{}
consume.AddHandler(consumer) // 添加消费者接口,该接口包含一个HandleMessage方法
// 建立NSQLookup连接
if err := consume.ConnectToNSQLookupd(address); err != nil {
return err
}
return nil
}
func main() {
// 连接的是NSQLookup HTTP端口
err := initConsumer("user", "logout", "127.0.0.1:4161")
if err != nil {
fmt.Println("init consumer failed:", err)
return
}
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
<-c
}