前言

消息队列是一种很常见的工具,常见的应用场景有:系统解耦、异步消费、削峰填谷。

KafkaRocketMQRedis
不需要再单独部署Kafka、RocketMQ服务
不支持大量消息堆积

这篇文章主要是使用Go+Redis实现几种消息队列方案,并列举存在的问题。

下面的代码使用到了go-redis客户端。

List

ListPushPoplpush+brpop

发送消息

LPush
type Msg struct {
   Topic string // 消息的主题
   Body  []byte // 消息的Body
}
复制代码
func (q *ListMQ) SendMsg(ctx context.Context, msg *Msg) error {
   return q.client.LPush(ctx, msg.Topic, msg.Body).Err()
}
复制代码

消费消息

HandlerBRPop
// Handler 返回值代表消息是否消费成功
type Handler func(msg *Msg) error
复制代码
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *ListMQ) Consume(ctx context.Context, topic string, h Handler) error {
   for {
      // 获取消息
      result, err := q.client.BRPop(ctx, 0, topic).Result()
      if err != nil {
         return err
      }
      // 处理消息
      err = h(&Msg{
         Topic: result[0],
         Body:  []byte(result[1]),
      })
      if err != nil {
         return err
      }
   }
}
复制代码

这里会存在一个问题,如果消费者无法正常消费消息,消息会丢失,因为消息一旦被取出就不再存在Redis。

实现ACK机制

其实我们可以每次先取出消息,但不删除消息,直到消费成功后再把消息删除。

lindexrpopblmove
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *ACKListMQ) Consume(ctx context.Context, topic string, h Handler) error {
   for {
      // 获取消息
      body, err := q.client.LIndex(ctx, topic, -1).Bytes()
      if err != nil && !errors.Is(err, redis.Nil) {
         return err
      }
      // 没有消息了,休眠一会
      if errors.Is(err, redis.Nil) {
         time.Sleep(time.Second)
         continue
      }
      // 处理消息
      err = h(&Msg{
         Topic: topic,
         Body:  body,
      })
      if err != nil {
         continue
      }
      // 如果处理成功,删除消息
      if err := q.client.RPop(ctx, topic).Err(); err != nil {
         return err
      }
   }
}
复制代码
没办法同时多个消费者进行消费

实现多分区

上面我们虽然实现了ACK,但是却造成了一个队列只能被一个消费者消费,其实我们只要多搞几个队列,那么就可以同时被几个消费者进行消费,类似于Kafka的分区。

发送消息

分区字段topic:0topic:1
type Msg struct {
   Topic     string // 消息的主题
   Body      []byte // 消息的Body
   Partition int    // 分区号
}
复制代码
func (q *PartitionACKListMQ) SendMsg(ctx context.Context, msg *Msg) error {
   return q.client.LPush(ctx, q.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}

func (q *PartitionACKListMQ) partitionTopic(topic string, partition int) string {
   return fmt.Sprintf("%s:%d", topic, partition)
}
复制代码

消费消息

和上面代码相同,只是主题多加了一个分区号。

// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *PartitionACKListMQ) Consume(ctx context.Context, topic string, partition int, h Handler) error {
   for {
      // 获取消息
      body, err := q.client.LIndex(ctx, q.partitionTopic(topic, partition), -1).Bytes()
      if err != nil && !errors.Is(err, redis.Nil) {
         return err
      }
      // 没有消息了,休眠一会
      if errors.Is(err, redis.Nil) {
         time.Sleep(time.Second)
         continue
      }
      // 处理消息
      err = h(&Msg{
         Topic:     topic,
         Body:      body,
         Partition: partition,
      })
      if err != nil {
         continue
      }
      // 如果处理成功,删除消息
      if err := q.client.RPop(ctx, q.partitionTopic(topic, partition)).Err(); err != nil {
         return err
      }
   }
}
复制代码

进一步改进

消费者自动分配分区

存在的问题

  • 不支持多个消费者组

适合不需要多个消费者组的场景。

Pub/Sub

publishsubscribepsubscribe
subscribepsubscribe
publish

发送消息

publish
func (q *PubSubMQ) SendMsg(ctx context.Context, msg *Msg) error {
   return q.client.Publish(ctx, q.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
复制代码

消费消息

subscribe
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *PubSubMQ) Consume(ctx context.Context, topic string, partition int, h Handler) error {
   // 订阅频道
   channel := q.client.Subscribe(ctx, q.partitionTopic(topic, partition)).Channel()
   for msg := range channel {
      // 处理消息
      h(&Msg{
         Topic:     topic,
         Body:      []byte(msg.Payload),
         Partition: partition,
      })
   }
   return errors.New("channel closed")
}
复制代码

改进点

subscribepsubscribe

比如下面代码每个消费者可以一次消费多个分区的消息:

// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误
func (q *PubSubMQ) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error {
   // 订阅频道
   channels := make([]string, len(partitions))
   for i, partition := range partitions {
      channels[i] = q.partitionTopic(topic, partition)
   }
   channel := q.client.Subscribe(ctx, channels...).Channel()
   for msg := range channel {
      // 处理消息
      _, partitionString, _ := strings.Cut(msg.Channel, ":")
      partition, _ := strconv.Atoi(partitionString)
      h(&Msg{
         Topic:     topic,
         Body:      []byte(msg.Payload),
         Partition: partition,
      })
   }
   return errors.New("channels closed")
}
复制代码

存在的问题

看起来

  • 消费者只能接收到在它订阅之后的消息,在订阅成功之前的消息都会丢失;也就是说如果生产者先发送消息,消费者再订阅是接收不到的。
  • Pub/Sub不会持久化消息,Redis下线消息将丢失。
  • 消息缓冲区达到无法暂存太多消息,只要满足下面其中一个条件Redis就会直接把消费者踢下线:
    • 缓冲区达到32MB
    • 缓冲区达到8MB并且持续60秒

所以Pub/Sub只适合对消息可靠性没有要求的场景。

Stream

xaddxreadgroupxack

发送消息

xadd
XADD msg.Topic MAXLEN q.approx q.maxLen * body msg.Body

其中:

-1526919030474-551526919030474-**消息内容键值对让Redis在消息数量大于此值时删除旧消息,避免内存溢出
func (q *StreamMQ) SendMsg(ctx context.Context, msg *Msg) error {
   return q.client.XAdd(ctx, &redis.XAddArgs{
      Stream: msg.Topic,
      MaxLen: q.maxLen,
      Approx: q.approx,
      ID:     "*",
      Values: []interface{}{"body", msg.Body},
   }).Err()
}
复制代码

消费消息

xgroup createstartID或$$
xreadgroup
">">0
xack
// Consume 返回值代表消费过程中遇到的无法处理的错误
// group 消费者组
// consumer 消费者组里的消费者
// batchSize 每次批量获取一批的大小
// start 用于创建消费者组的时候指定起始消费ID,0表示从头开始消费,$表示从最后一条消息开始消费
func (q *StreamMQ) Consume(ctx context.Context, topic, group, consumer, start string, batchSize int, h Handler) error {
   err := q.client.XGroupCreateMkStream(ctx, topic, group, start).Err()
   if err != nil && err.Error() != errBusyGroup {
      return err
   }
   for {
      // 拉取新消息
      if err := q.consume(ctx, topic, group, consumer, ">", batchSize, h); err != nil {
         return err
      }
      // 拉取已经投递却未被ACK的消息,保证消息至少被成功消费1次
      if err := q.consume(ctx, topic, group, consumer, "0", batchSize, h); err != nil {
         return err
      }
   }
}

func (q *StreamMQ) consume(ctx context.Context, topic, group, consumer, id string, batchSize int, h Handler) error {
   // 阻塞的获取消息
   result, err := q.client.XReadGroup(ctx, &redis.XReadGroupArgs{
      Group:    group,
      Consumer: consumer,
      Streams:  []string{topic, id},
      Count:    int64(batchSize),
   }).Result()
   if err != nil {
      return err
   }
   // 处理消息
   for _, msg := range result[0].Messages {
      err := h(&Msg{
         ID:       msg.ID,
         Topic:    topic,
         Body:     []byte(msg.Values["body"].(string)),
         Group:    group,
         Consumer: consumer,
      })
      if err == nil {
         err := q.client.XAck(ctx, topic, group, msg.ID).Err()
         if err != nil {
            return err
         }
      }
   }
   return nil
}
复制代码

存在的问题

不支持大量消息堆积可能的消息丢失

特别是消息都存储在内存中,如果不小心可能会溢出导致Redis宕机。

总结

  • 如果能够使用Stream(Redis5.0以上),可以优先考虑它,因为它的功能是最完整的;
  • 否则也可以使用list,不过它不支持多个消费者组;
  • 如果对消息可靠性没有要求,也可以使用Pub/Sub模式。

完整代码和测试代码:github.com/jiaxwu/rmq

参考