前言

tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~
如果解决不了,可以在文末加我微信,进群交流。

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。

安装和部署

官网文档:https://nsq.io/overview/quick_start.html
中文文档:http://wiki.jikexueyuan.com/project/nsq-guide/

我是在ubuntu系统中按照官方操作进行部署测试。

$ tar -zxvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
$ cd nsq-1.2.0.linux-amd64.go1.12.9/bin	
$ sudo cp ~/Downloads/nsq-1.2.0.linux-amd64.go1.12.9/bin/ -r /usr/local/nsq/bin
$ sudo vim /etc/profile
$ source 
$ ./nsqlookupd > /dev/null 2>&1 &
[1] 20076
$ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
[2] 20420
$ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
[3] 20620

特性

默认一开始消息不是持久化的
nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘

--mem-queue-size0

nsq优点&缺点

优点:

  1. 部署极其方便,没有任何环境依赖,直接启动就行
  2. 轻量没有过多的配置参数,只需要简单的配置就可以直接使用
  3. 性能高
  4. 消息不存在丢失的情况

缺点:

  1. 消息无顺序
  2. 节点之间没有消息复制
  3. 没有鉴权

客户端

curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'

Golang的客户端

deb安装

$ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh

依赖包下载:

$ go get github.com/nsqio/go-nsq

生产者:

package main

// 生产者
import (
	"fmt"

	"github.com/nsqio/go-nsq"
)

var tcpNsqdAddr = "127.0.0.1:4150"

func main() {
	// 初始化配置
	config := nsq.NewConfig()
	for i := 0; i < 100; i++ {
		// 创建100个生产者
		tPro, err := nsq.NewProducer(tcpNsqdAddr, config)
		if err != nil {
			fmt.Printf("tPro new failed:%s", err)
		}

		// 主题
		topic := "Insert"
		// 主题内容
		tCommand := "New data!"
		// 发布消息
		err = tPro.Publish(topic, []byte(tCommand))
		if err != nil {
			fmt.Printf("Publish failed:%s", err)
		}
	}
}

127.0.0.1:4150

消费者:

package main
// 消费者
import (
	"fmt"
	"sync"
	"time"

	"github.com/nsqio/go-nsq"
)

var tcpNsqdAddr = "127.0.0.1:4150"

type NsqHandler struct {
	// 消息数
	msqCount int
	// 标识id
	nsqHandlerID string
}

func main() {
	// 初始化配置
	config := nsq.NewConfig()
	// 创造消费者,参数一是订阅的主题,参数二是使用的通道
	com, err := nsq.NewConsumer("Insert", "channel1", config)
	if err != nil {
		fmt.Println(err)
	}

	// 添加处理回调
	com.AddHandler(&NsqHandler{nsqHandlerID: "One"})

	// 连接对应的nsqd
	err = com.ConnectToNSQD(tcpNsqdAddr)
	if err != nil {
		fmt.Println(err)
	}

	// 只是为了不结束进程,这里没有意义
	var wg = &sync.WaitGroup{}
	wg.Add(1)
	wg.Wait()

}

// HandleMessage 实现HandleMessage方法
// message是接收到的消息
func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
	// 每接收到一条消息]+1
	s.msqCount ++ 
	// 打印输出信息和ID
	fmt.Println(s.msqCount,s.nsqHandlerID)
	// 打印消息的一些基本信息
	fmt.Printf("msg.Timestamp=]%v,msg.nsqaddress=%s,msg.body=]%s", time.Unix(0,message.Timestamp).Format("2006-01-02 03:04:05"),message.NSQDAddress,string(message.Body))
	return nil
}