目录

1.为什么写这个库

2.应用场景有哪些

3.如何使用

4.总结

 为什么要写这个库?

go-queue

beanstalkd

beanstalkdbeanstalkd
jobtubejobproducerjobconsumerjob

很幸运的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。

但是这对不熟悉 beanstalkd 操作的 go 开发者而言,需要学习成本。

kafka

kafkatopictopicdq_1616324404788, dq_1616324417622

而且在 go 生态中,

同时考虑以下因素:

  • 支持延时任务

  • 高可用,保证数据不丢失

  • 可扩展资源和性能

go-queue
beanstalkddqrediskafkakq

整体设计如下:

应用场景

首先在消费场景来说,一个是针对任务队列,一个是消息队列。而两者最大的区别:

  • 任务是没有顺序约束;消息需要;

  • 任务在加入中,或者是等待中,可能存在状态更新(或是取消);消息则是单一的存储即可;

所以在背后的基础设施选型上,也是基于这种消费场景。

dqbeanstalkdkqkafka
dq
// 延迟任务执行
- dq.Delay(msg, delayTime);

// 定时任务执行
- dq.At(msg, atTime);

而在我们内部:

kqkq.Push(msg)dq

如何使用

dqkq

dq

// [Producer]
producer := dq.NewProducer([]dq.Beanstalk{
    {
        Endpoint: "localhost:11300",
        Tube:     "tube",
    },
    {
        Endpoint: "localhost:11301",
        Tube:     "tube",
    },
})  

for i := 1000; i < 1005; i++ {
    _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
    if err != nil {
        fmt.Println(err)
    }
}
// [Consumer]
consumer := dq.NewConsumer(dq.DqConf{
  Beanstalks: []dq.Beanstalk{
    {
      Endpoint: "localhost:11300",
      Tube:     "tube",
    },
    {
      Endpoint: "localhost:11301",
      Tube:     "tube",
    },
  },
  Redis: redis.RedisConf{
    Host: "localhost:6379",
    Type: redis.NodeType,
  },
})
consumer.Consume(func(body []byte) {
  // your consume logic
  fmt.Println(string(body))
})

和普通的 生产者 - 消费者 模型类似,开发者也只需要关注以下:

  1. 开发者只需要关注自己的任务类型「延时/定时」

  2. 消费端的消费逻辑

kq

producer.go
// message structure
type message struct {
    Key     string `json:"key"`
    Value   string `json:"value"`
    Payload string `json:"message"`
}

pusher := kq.NewPusher([]string{
    "127.0.0.1:19092",
    "127.0.0.1:19093",
    "127.0.0.1:19094",
}, "kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
    select {
    case <-ticker.C:
        count := rand.Intn(100)
    // 准备消息
        m := message{
            Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
            Value:   fmt.Sprintf("%d,%d", round, count),
            Payload: fmt.Sprintf("%d,%d", round, count),
        }
        body, err := json.Marshal(m)
        if err != nil {
            log.Fatal(err)
        }

        fmt.Println(string(body))
    // push to kafka broker
        if err := pusher.Push(string(body)); err != nil {
            log.Fatal(err)
        }
    }
}

config.yaml
Name: kq
Brokers:
  - 127.0.0.1:19092
  - 127.0.0.1:19092
  - 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1
consumer.go
var c kq.KqConf
conf.MustLoad("config.yaml", &c)

// WithHandle: 具体的处理msg的logic
// 这也是开发者需要根据自己的业务定制化
q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
  fmt.Printf("=> %s\n", v)
  return nil
}))
defer q.Stop()
q.Start()
dqmessage data
dq

总结 

kqdqgo-zero
go-queue

https://github.com/tal-tech/go-queue

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持我们!

更多请查看:https://github.com/tidwall/gjson

欢迎加入我们GOLANG中国社区:https://gocn.vip/

《酷Go推荐》招募:

各位Gopher同学,最近我们社区打算推出一个类似GoCN每日新闻的新栏目《酷Go推荐》,主要是每周推荐一个库或者好的项目,然后写一点这个库使用方法或者优点之类的,这样可以真正的帮助到大家能够学习到新的库,并且知道怎么用。

大概规则和每日新闻类似,如果报名人多的话每个人一个月轮到一次,欢迎大家报名!

点击 阅读原文 即刻报名