这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:
package mainimport ("fmt""log""sync""github.com/Shopify/sarama")// 消费者练习func main() {// 生成消费者 实例consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)if err != nil {log.Print(err)return}// 拿到 对应主题下所有分区partitionList, err := consumer.Partitions("test")if err != nil {log.Println(err)return}var wg sync.WaitGroupwg.Add(1)// 遍历所有分区for partition := range partitionList {//消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset return 对应分区的对象pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)if err != nil {log.Println(err)return}// 运行完毕记得关闭defer pc.AsyncClose()// 去出对应的 消息// 通过异步 拿到 消息go func(sarama.PartitionConsumer) {defer wg.Done()for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)}wg.Wait()}
分三个部分:
1,sarama.NewConsumer ,创建一个consumer
2,consumer.ConsumePartition 从指定topic,指定分区消费消息
3, msg := range pc.Messages() 获取消息
如果不需要拿到所有的分区,也可以只指定comsumer group
package mainimport ("context""fmt""os""os/signal""sync""github.com/Shopify/sarama")type consumerGroupHandler struct {name string}func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))// 手动确认消息sess.MarkMessage(msg, "")}return nil}func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {wg.Done()for err := range (*group).Errors() {fmt.Println("ERROR", err)}}func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {fmt.Println(name + "start")wg.Done()ctx := context.Background()for {topics := []string{"test"}handler := consumerGroupHandler{name: name}err := (*group).Consume(ctx, topics, handler)fmt.Println("consume group end")if err != nil {panic(err)}}}func main() {var wg sync.WaitGroupconfig := sarama.NewConfig()config.Consumer.Return.Errors = falseconfig.Version = sarama.V0_10_2_0client, err := sarama.NewClient([]string{"localhost:9092"}, config)defer client.Close()if err != nil {panic(err)}group1, err := sarama.NewConsumerGroupFromClient("c1", client)if err != nil {panic(err)}group2, err := sarama.NewConsumerGroupFromClient("c2", client)if err != nil {panic(err)}group3, err := sarama.NewConsumerGroupFromClient("c3", client)if err != nil {panic(err)}defer group1.Close()defer group2.Close()defer group3.Close()wg.Add(3)go consume(&group1, &wg, "c1")go consume(&group2, &wg, "c2")go consume(&group3, &wg, "c3")wg.Wait()signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)select {case <-signals:}}
我们从NewConsumerGroup作为入口开始源码分析:
consumer_group.go
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {client, err := NewClient(addrs, config)c, err := newConsumerGroup(groupID, client)}
先创建一个client,然后生成一个consumerGroup 对象:
type ConsumerGroup interface {Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error// Errors returns a read channel of errors that occurred during the consumer life-cycle.// By default, errors are logged and not returned over this channel.// If you want to implement any custom error handling, set your config's// Consumer.Return.Errors setting to true, and read from this channel.Errors() <-chan error// Close stops the ConsumerGroup and detaches any running sessions. It is required to call// this function before the object passes out of scope, as it will otherwise leak memory.Close() error}
type consumerGroup struct {client Clientconfig *Configconsumer ConsumergroupID stringmemberID stringerrors chan errorlock sync.Mutexclosed chan nonecloseOnce sync.OnceuserData []byte}
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {consumer, err := NewConsumerFromClient(client)}
创建consumerGroup的同时会创建consumer对象:
consumer.go
func NewConsumerFromClient(client Client) (Consumer, error) {cli := &nopCloserClient{client}return newConsumer(cli)}
func newConsumer(client Client) (Consumer, error) {}
type consumer struct {conf *Configchildren map[string]map[int32]*partitionConsumerbrokerConsumers map[*Broker]*brokerConsumerclient Clientlock sync.Mutex}
创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {c.client.RefreshMetadata(topics...)//加载元数据sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)go c.loopCheckPartitionNumbers(topics, sess)}
RefreshMetadata用于获取对应元数据信息,代码在client.go
func (client *client) RefreshMetadata(topics ...string) error {return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)}
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {broker = client.any()req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}response, err := broker.GetMetadata(req)shouldRetry, err := client.updateMetadata(response, allKnownMetaData)}
每个 partition 与 consumer 的分配关系称作一个 “claim”;一组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {coordinator, err := c.client.Coordinator(c.groupID)join, err := c.joinGroupRequest(coordinator, topics)groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)}
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)go sess.heartbeatLoop()// start consumingfor topic, partitions := range claims {for _, partition := range partitions {sess.waitGroup.Add(1)go func(topic string, partition int32) {sess.consume(topic, partition)}(topic, partition)}}}
type consumerGroupSession struct {parent *consumerGroupmemberID stringgenerationID int32handler ConsumerGroupHandlerclaims map[string][]int32offsets *offsetManagerctx context.Contextcancel func()waitGroup sync.WaitGroupreleaseOnce sync.OncehbDying, hbDead chan none}
调用了 sess.consume(topic, partition) 这个接口:
func (s *consumerGroupSession) consume(topic string, partition int32) {// create new claimclaim, err := newConsumerGroupClaim(s, topic, partition, offset)s.handler.ConsumeClaim(s, claim)}
func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)}
type consumerGroupClaim struct {topic stringpartition int32offset int64PartitionConsumer}
调用了ConsumePartition消费对应的partition
consumer.go
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {child := &partitionConsumerif err := child.chooseStartingOffset(offset); err != nilif leader, err = c.client.Leader(child.topic, child.partition); err != nilif err := c.addChild(child); err != nilc.children[child.topic] = topicChildrentopicChildren[child.partition] = childgo withRecover(child.dispatcher)go withRecover(child.responseFeeder)child.broker = c.refBrokerConsumer(leader)bc := c.brokerConsumers[broker]bc.refs++child.broker.input <- child}
创建了一个partitionConsumer对象:
type partitionConsumer struct {highWaterMarkOffset int64 must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUGconsumer *consumerconf *Configbroker *brokerConsumermessages chan *ConsumerMessageerrors chan *ConsumerErrorfeeder chan *FetchResponsepreferredReadReplica int32trigger, dying chan nonecloseOnce sync.Oncetopic stringpartition int32responseResult errorfetchSize int32offset int64retries int32}
同时起了两个协程,这两个协程是核心
1,先看dispatcher,主要是维护订阅者信息
func (child *partitionConsumer) dispatcher()for range child.triggerif err := child.dispatch(); err != nil {child.consumer.unrefBrokerConsumer(child.broker)child.consumer.removeChild(child)close(child.feeder
看下dispatcher协程里的dispatch方法
func (child *partitionConsumer) dispatch() errorif err := child.consumer.client.RefreshMetadata(child.topic); err != nilbroker, err := child.preferredBroker()child.broker = child.consumer.refBrokerConsumer(broker)child.broker.input <- child
先获得一个brokerConsumer 对象:
type brokerConsumer struct {consumer *consumerbroker *Brokerinput chan *partitionConsumernewSubscriptions chan []*partitionConsumersubscriptions map[*partitionConsumer]nonewait chan noneacks sync.WaitGrouprefs int}
func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {bc = c.newBrokerConsumer(broker)}
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumergo withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)
起了两个协程:
func (bc *brokerConsumer) subscriptionManager(){case event, ok := <-bc.input:buffer = append(buffer, event)case bc.newSubscriptions <- bufferbuffer = nil}
input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice
func (bc *brokerConsumer) subscriptionConsumer()for newSubscriptions := range bc.newSubscriptions {bc.updateSubscriptions(newSubscriptions)response, err := bc.fetchNewMessages()bc.acks.Add(len(bc.subscriptions))child.feeder <- responsebc.acks.Wait()bc.handleResponses()
每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {for child := range bc.subscriptions {request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)}return bc.broker.Fetch(request)}
Fetch就是请求broker,获取消息
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {response := new(FetchResponse)err := b.sendAndReceive(request, response)}
2,接着看下responseFeeder协程
func (child *partitionConsumer) responseFeeder() {feederLoop:从broker获取消息的大循环for response := range child.feederfor i, msg := range msgs {case child.messages <- msg:child.broker.input <- childcontinue feederLoop}
这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。
subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环
最后看下Messages接口
func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {return child.messages}
很简单,就是把处理好的消息从messages这个chanel里面取出来。
总结下:
partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:
1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。
最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。
2, responseFeeder 用于跟踪消息的到来,偏数据侧。
child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。
值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。