1.简介

  • 1.NSQ是Go语言编写的,开源的内存分布式消息队列中间件
  • 2.可以大规模的处理每天数十亿计级别的消息
  • 3.分布式和去中心化拓扑结构,无单点故障

2.应用场景

1.异步处理,把非关键流程异步化,提高系统的响应时间和健壮性

2.应用解耦

3.流量削峰

3.NSQ组件介绍

1.nsqd

nsqdTopicTopicChannel

2.nsqlookupd

  • 负责维护所有nsqd的状态,提供服务发现的进程(是管理拓扑信息并提供最终一致性发现服务的守护程序)

3.nsqadmin

  • 是一个web管理平台,实时监控集群以及执行各种管理任务

4.NSQ架构介绍

1.Topic

order_queue

2.Channel

channel

5.NSQ特性

  • 1.消息默认不持久化,可以通过配置修改为持久化
  • 2.每条消息至少传递一次
  • 3.消息不保证有序

6.NSQ接收和发送消息的流程

1.nsqd

input changoroutineoutput chanTopic/Channel

6.NSQ搭建

1.下载地址

https://github.com/nsqio/nsq/releases

2.以windows为例

nsqd.exe --lookupd-tcp-address=127.0.0.1:4160
nsqadmin.exe --lookupd-http-address=127.0.0.1:4161

3.代码示例

  • 1.依赖包安装
go get github.com/nsqio/go-nsq
  • 2.生产者示例
package main

import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "os"
    "strings"
)

var producer *nsq.Producer

func initProducer(str string) error {
    var err error
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(str, config)
    if err != nil {
	return err
    }
    return nil
}

func main()  {
    nsqAddr := "127.0.0.1:4150"
    err := initProducer(nsqAddr)
    if err != nil {
    	fmt.Println("init producer failed:", err)
	return
    }
    reader := bufio.NewReader(os.Stdin)
    for {
	// 接收命令行输入
	fmt.Println("请输入消息内容:>>>")
	data, err := reader.ReadString('\n')
	if err != nil {
	    fmt.Println("read string failed:", err)
	    continue
	}
	data = strings.TrimSpace(data)
	if data == "stop" {
	    break
	}

	if data == "" {
	    fmt.Println("请输入有效内容...")
	    continue
	}

	// 向 Topic 写入消息
	err = producer.Publish("user", []byte(data))
	if err != nil {
	    fmt.Println("publish message failed:", err)
	    continue
	}
	fmt.Printf("publish data: %s success\n", data)
    }
}
  • 2.消费者示例
package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "os"
    "os/signal"
    "syscall"
    "time"
)

// 消费者
type Consumer struct {
}

// 消息处理
func (*Consumer) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive >>>", msg.NSQDAddress, " message:", string(msg.Body))
    return nil
}

// 初始化消费者
func initConsumer(topic, channel, address string) error {
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second  // 设置服务轮询时间
    consume, err := nsq.NewConsumer(topic, channel, config)  // 新建一个消费者
    if err != nil {
	return err
    }
    consumer := &Consumer{}
    consume.AddHandler(consumer)  // 添加消费者接口,该接口包含一个HandleMessage方法

    // 建立NSQLookup连接
    if err := consume.ConnectToNSQLookupd(address); err != nil {
	return err
    }
    return nil
}

func main()  {
    // 连接的是NSQLookup HTTP端口
    err := initConsumer("user", "logout", "127.0.0.1:4161")
    if err != nil {
	fmt.Println("init consumer failed:", err)
	return
    }
    c := make(chan os.Signal)
    signal.Notify(c, syscall.SIGINT)
    <-c
}