前言
消息队列是一种很常见的工具,常见的应用场景有:系统解耦、异步消费、削峰填谷。
KafkaRocketMQRedis
不需要再单独部署Kafka、RocketMQ服务
不支持大量消息堆积
这篇文章主要是使用Go+Redis实现几种消息队列方案,并列举存在的问题。
下面的代码使用到了go-redis客户端。
List
ListPushPoplpush+brpop
发送消息
LPush
type Msg struct {
Topic string // 消息的主题
Body []byte // 消息的Body
}
复制代码
func (q *ListMQ) SendMsg(ctx context.Context, msg *Msg) error {
return q.client.LPush(ctx, msg.Topic, msg.Body).Err()
}
复制代码
消费消息
HandlerBRPop
// Handler 返回值代表消息是否消费成功
type Handler func(msg *Msg) error
复制代码
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *ListMQ) Consume(ctx context.Context, topic string, h Handler) error {
for {
// 获取消息
result, err := q.client.BRPop(ctx, 0, topic).Result()
if err != nil {
return err
}
// 处理消息
err = h(&Msg{
Topic: result[0],
Body: []byte(result[1]),
})
if err != nil {
return err
}
}
}
复制代码
这里会存在一个问题,如果消费者无法正常消费消息,消息会丢失,因为消息一旦被取出就不再存在Redis。
实现ACK机制
其实我们可以每次先取出消息,但不删除消息,直到消费成功后再把消息删除。
lindexrpopblmove
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *ACKListMQ) Consume(ctx context.Context, topic string, h Handler) error {
for {
// 获取消息
body, err := q.client.LIndex(ctx, topic, -1).Bytes()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}
// 没有消息了,休眠一会
if errors.Is(err, redis.Nil) {
time.Sleep(time.Second)
continue
}
// 处理消息
err = h(&Msg{
Topic: topic,
Body: body,
})
if err != nil {
continue
}
// 如果处理成功,删除消息
if err := q.client.RPop(ctx, topic).Err(); err != nil {
return err
}
}
}
复制代码
没办法同时多个消费者进行消费
实现多分区
上面我们虽然实现了ACK,但是却造成了一个队列只能被一个消费者消费,其实我们只要多搞几个队列,那么就可以同时被几个消费者进行消费,类似于Kafka的分区。
发送消息
分区字段topic:0topic:1
type Msg struct {
Topic string // 消息的主题
Body []byte // 消息的Body
Partition int // 分区号
}
复制代码
func (q *PartitionACKListMQ) SendMsg(ctx context.Context, msg *Msg) error {
return q.client.LPush(ctx, q.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
func (q *PartitionACKListMQ) partitionTopic(topic string, partition int) string {
return fmt.Sprintf("%s:%d", topic, partition)
}
复制代码
消费消息
和上面代码相同,只是主题多加了一个分区号。
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *PartitionACKListMQ) Consume(ctx context.Context, topic string, partition int, h Handler) error {
for {
// 获取消息
body, err := q.client.LIndex(ctx, q.partitionTopic(topic, partition), -1).Bytes()
if err != nil && !errors.Is(err, redis.Nil) {
return err
}
// 没有消息了,休眠一会
if errors.Is(err, redis.Nil) {
time.Sleep(time.Second)
continue
}
// 处理消息
err = h(&Msg{
Topic: topic,
Body: body,
Partition: partition,
})
if err != nil {
continue
}
// 如果处理成功,删除消息
if err := q.client.RPop(ctx, q.partitionTopic(topic, partition)).Err(); err != nil {
return err
}
}
}
复制代码
进一步改进
消费者自动分配分区
存在的问题
- 不支持多个消费者组
适合不需要多个消费者组的场景。
Pub/Sub
publishsubscribepsubscribe
subscribepsubscribe
publish
发送消息
publish
func (q *PubSubMQ) SendMsg(ctx context.Context, msg *Msg) error {
return q.client.Publish(ctx, q.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
复制代码
消费消息
subscribe
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (q *PubSubMQ) Consume(ctx context.Context, topic string, partition int, h Handler) error {
// 订阅频道
channel := q.client.Subscribe(ctx, q.partitionTopic(topic, partition)).Channel()
for msg := range channel {
// 处理消息
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channel closed")
}
复制代码
改进点
subscribepsubscribe
比如下面代码每个消费者可以一次消费多个分区的消息:
// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误
func (q *PubSubMQ) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error {
// 订阅频道
channels := make([]string, len(partitions))
for i, partition := range partitions {
channels[i] = q.partitionTopic(topic, partition)
}
channel := q.client.Subscribe(ctx, channels...).Channel()
for msg := range channel {
// 处理消息
_, partitionString, _ := strings.Cut(msg.Channel, ":")
partition, _ := strconv.Atoi(partitionString)
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channels closed")
}
复制代码
存在的问题
看起来
- 消费者只能接收到在它订阅之后的消息,在订阅成功之前的消息都会丢失;也就是说如果生产者先发送消息,消费者再订阅是接收不到的。
- Pub/Sub不会持久化消息,Redis下线消息将丢失。
- 消息缓冲区达到无法暂存太多消息,只要满足下面其中一个条件Redis就会直接把消费者踢下线:
- 缓冲区达到32MB
- 缓冲区达到8MB并且持续60秒
所以Pub/Sub只适合对消息可靠性没有要求的场景。
Stream
xaddxreadgroupxack
发送消息
xadd
XADD msg.Topic MAXLEN q.approx q.maxLen * body msg.Body
其中:
-1526919030474-551526919030474-**消息内容键值对让Redis在消息数量大于此值时删除旧消息,避免内存溢出
func (q *StreamMQ) SendMsg(ctx context.Context, msg *Msg) error {
return q.client.XAdd(ctx, &redis.XAddArgs{
Stream: msg.Topic,
MaxLen: q.maxLen,
Approx: q.approx,
ID: "*",
Values: []interface{}{"body", msg.Body},
}).Err()
}
复制代码
消费消息
xgroup createstartID或$$
xreadgroup
">">0
xack
// Consume 返回值代表消费过程中遇到的无法处理的错误
// group 消费者组
// consumer 消费者组里的消费者
// batchSize 每次批量获取一批的大小
// start 用于创建消费者组的时候指定起始消费ID,0表示从头开始消费,$表示从最后一条消息开始消费
func (q *StreamMQ) Consume(ctx context.Context, topic, group, consumer, start string, batchSize int, h Handler) error {
err := q.client.XGroupCreateMkStream(ctx, topic, group, start).Err()
if err != nil && err.Error() != errBusyGroup {
return err
}
for {
// 拉取新消息
if err := q.consume(ctx, topic, group, consumer, ">", batchSize, h); err != nil {
return err
}
// 拉取已经投递却未被ACK的消息,保证消息至少被成功消费1次
if err := q.consume(ctx, topic, group, consumer, "0", batchSize, h); err != nil {
return err
}
}
}
func (q *StreamMQ) consume(ctx context.Context, topic, group, consumer, id string, batchSize int, h Handler) error {
// 阻塞的获取消息
result, err := q.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{topic, id},
Count: int64(batchSize),
}).Result()
if err != nil {
return err
}
// 处理消息
for _, msg := range result[0].Messages {
err := h(&Msg{
ID: msg.ID,
Topic: topic,
Body: []byte(msg.Values["body"].(string)),
Group: group,
Consumer: consumer,
})
if err == nil {
err := q.client.XAck(ctx, topic, group, msg.ID).Err()
if err != nil {
return err
}
}
}
return nil
}
复制代码
存在的问题
不支持大量消息堆积可能的消息丢失
特别是消息都存储在内存中,如果不小心可能会溢出导致Redis宕机。
总结
- 如果能够使用Stream(Redis5.0以上),可以优先考虑它,因为它的功能是最完整的;
- 否则也可以使用list,不过它不支持多个消费者组;
- 如果对消息可靠性没有要求,也可以使用Pub/Sub模式。
完整代码和测试代码:github.com/jiaxwu/rmq