本文案例基于 kafka 2.8

消费组

消费组是kafka的一种可扩展的消息机制, 可以消费负载均衡, 横向扩展消费者的消费能力. 同一个消费组的消费者共享group id.

  • 注意
  1. 订阅主题的消费组中一个或多个消费者根据不同的分区规则消费同一个主题下 所有分区 消息.
  2. 一个主题分区只能被订阅主题的同一个消费组中的一个消费者消费, 而一个消费组中的消费者可以消费多个分区的消息.
  3. 同一个主题下的分区大于等于消费者, 因为每一个分区只能被一个消费者消费.
  4. 消费者变动, 增加或者减少, kafka会rebalance, 主题分区会动态重新分配, 在主题分区变更的时候, kafka也会执行rebalance, 为消费者重新分配主题分区.

为什么要有消费组

通过添加多个消费者一样可以增加消费能力, 为什么要有消费组这种机制呢? 比无群组通过直接添加消费者扩展消费能力好在哪里?

  1. 客户端单个消费者只能消费单个分区的消息, 由于主题分区可以配置很多, 可以跨多个broker服务, 多到一定程度, 为每个分区添加一个消费者不现实, 而且不好管理.
  2. 消费组扩展很方便, 可以随意添加消费者, 主题分区会根据分区策略,将所有分区分配给群组里面的消费者, 可以在服务端查看群组, 可以看到群组所有消费者信息, 以及当前消费offset.

消费组添加消费者

消费组添加消费者是动态添加的, 在客户端创建消费组, 起一个消费者客户端, 需要上传订阅的主题, 当前消费者订阅当前主题. 当需要添加多个消费者的时候, 同样的创建消费组, 起一个消费者客户端, 主题和群组名相同, 就能添加第二个消费者. 以此类推添加多个消费者.

  • 添加消费组消费者golang客户端代码
    主题为:topic-redux, 群组为ReduxConsumerGroup
# consumergroup.go
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
)

type Kafka struct {
    brokers           []string
    topics            []string
    startOffset       int64
    version           string
    ready             chan bool
    group             string
    channelBufferSize int
    assignor          string
}

var brokers = []string{"118.24.243.178:9092"}
var topics = []string{"topic-redux", "topic-test"}
var group = "ReduxConsumerGroup"
var assignor = "range"

func NewKafka() *Kafka {
    return &Kafka{
        brokers:           brokers,
        topics:            topics,
        group:             group,
        channelBufferSize: 1000,
        ready:             make(chan bool),
        version:           "2.8.0",
        assignor:          assignor,
    }
}

func (k *Kafka) Connect() func() {
    fmt.Println("kafka init...")
    version, err := sarama.ParseKafkaVersion(k.version)
    if err != nil {
        fmt.Printf("Error parsing Kafka version: %v", err)
    }
    config := sarama.NewConfig()
    config.Version = version
    //  Partition allocation strategy
    switch assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        fmt.Printf("Unrecognized consumer group partition assignor: %s", assignor)
    }
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
    config.ChannelBufferSize = k.channelBufferSize // channel length
    //  establish client
    newClient, err := sarama.NewClient(brokers, config)
    if err != nil {
        fmt.Println(err)
    }
    //  Get all topic
    topics, err := newClient.Topics()
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("topics: ", topics)
    //  according to client establish consumerGroup
    client, err := sarama.NewConsumerGroupFromClient(k.group, newClient)
    if err != nil {
        fmt.Printf("Error creating consumer group client: %v", err)
    }
    ctx, cancel := context.WithCancel(context.Background())
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := client.Consume(ctx, k.topics, k); err != nil {
                //  When setup When you fail ,error Will return here
                fmt.Println("Error from consumer: %v", err)
                return
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                fmt.Println(ctx.Err())
                return
            }
            k.ready = make(chan bool)
        }
    }()
    <-k.ready
    fmt.Println("Sarama consumer up and running!...")
    //  Ensure that when the system exits , Messages in the channel are consumed
    return func() {
        fmt.Println("kafka close")
        cancel()
        wg.Wait()
        if err = client.Close(); err != nil {
            fmt.Println("Error closing client: %v", err)
        }
    }
}

// Setup is run at the beginning of a new session, before ConsumeClaim

func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error {
    fmt.Println("setup")
    //session.ResetOffset("t2p4", 0, 13, "")
    fmt.Println(session.Claims())
    // Mark the consumer as ready
    close(k.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error {
    fmt.Println("cleanup")
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (k *Kafka) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    //  Specific consumption news
    for message := range claim.Messages() {
        fmt.Printf("[topic:%s] [partiton:%d] [offset:%d] [value:%s] [time:%v]",
            message.Topic, message.Partition, message.Offset, string(message.Value), message.Timestamp)
        //  Update displacement
        session.MarkMessage(message, "")
    }
    return nil
}
func main() {
    k := NewKafka()
    c := k.Connect()
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-sigterm:
        fmt.Println("terminating: via signal")
    }
    c()
}


  1. 创建消费组, 添加第一个消费者
    执行
go run  consumergroup.go

服务端查看群组以及消费者信息

redux@redux-2:/opt/kafka_2.12-2.8.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ReduxConsumerGroup --describe 
GROUP              TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
ReduxConsumerGroup topic-redux     0          -               76              -               sarama-5e439a22-7fed-4de3-a94e-bf4bb41c50da /120.204.160.142 sarama
ReduxConsumerGroup topic-redux     1          -               27              -               sarama-5e439a22-7fed-4de3-a94e-bf4bb41c50da /120.204.160.142 sarama

由上可见, 创建了一个ReduxConsumerGroup的消费组, 主题分区[0, 1]两个分区被分配给同一个消费者.

  1. 添加第二个消费者
    执行
go run  consumergroup.go

服务端查看群组以及消费者信息

redux@redux-2:/opt/kafka_2.12-2.8.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ReduxConsumerGroup --describe 

GROUP              TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
ReduxConsumerGroup topic-redux     1          -               27              -               sarama-39504479-7403-436d-a7ed-897207e9c24f /120.204.160.142 sarama
ReduxConsumerGroup topic-redux     0          -               76              -               sarama-5e439a22-7fed-4de3-a94e-bf4bb41c50da /120.204.160.142 sarama

同一个主题, 两个分区被分配给两个不同的消费者.

消费组去除消费者

只需要将消费者客户端退出即可.

  1. 去除第一个消费者
ctrl c

服务端查看群组以及消费者信息

redux@redux-2:/opt/kafka_2.12-2.8.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ReduxConsumerGroup --describe 

GROUP              TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
ReduxConsumerGroup topic-redux     0          -               76              -               sarama-39504479-7403-436d-a7ed-897207e9c24f /120.204.160.142 sarama
ReduxConsumerGroup topic-redux     1          -               27              -               sarama-39504479-7403-436d-a7ed-897207e9c24f /120.204.160.142 sarama

可见, 第一个消费者消失, 主题分区被分配给第二个消费者

  1. 去除第二个消费者
    对第二个消费者客户端执行退出

ctrl c

服务端查看群组以及消费者信息

edux@redux-2:/opt/kafka_2.12-2.8.0$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ReduxConsumerGroup --describe 

Consumer group 'ReduxConsumerGroup' has no active members.

消费者全部去除.