At present commonly used Golang Kafka There are several types of clients , Each has its own advantages and disadvantages

Client name Advantages and disadvantages
sarama There are many users , But it's relatively difficult to use , Better performance
confluent-kafka-go Yes C Language version Kafka The packaging that's going on , Strong performance , But depends on librdkafka
kafka-go Easy to use , But the performance is poor , Common dual core in production environment CPU machine , Only deal with... In a second 300 Bar or so
healer The operation is extremely simple , The performance and sarama Similar , This product is the work of a big bull in Ctrip , At present, the number of community users is small , Lack of corresponding support maintenance

The above several clients have been tested in the retest environment , After comparison , In the end sarama As a production environment Kfaka Client .

At present, I see some online consumption Kafka topic Appoint group The examples are all using sarama-cluster To complete the , at present sarama It also supports the designation of consumer group Of .

Consumer Group It's a logical concept , yes Kafka The means to realize two message models of unicast and broadcast . The same topic Data , Will broadcast to different group; The same group Medium worker, Only one worker Can get this data . Let me put it another way , For the same topic, each group You can get all the same data , But the data goes into group Only one of them worker consumption .group Internal worker You can use multithreading or multiprocessing to achieve , You can also spread processes across multiple machines ,worker The number of is usually no more than partition Quantity , And it's better to keep the integral multiple relationship between them , because Kafka The design assumes a partition Only one worker consumption ( The same group within )

 package main
 import (
"fmt"
"github.com/Shopify/sarama"
)
/*
Initialization NewConfig To configure sarama.NewConfig
Create producer sarama.NewSyncProducer
Create a message sarama.ProducerMessage
send message client.SendMessage
*/
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
 msg := &sarama.ProducerMessage{}
msg.Topic = "TestTopic"
msg.Value = sarama.StringEncoder("this is a test")
 client, err := sarama.NewSyncProducer([]string{"172.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
 defer client.Close()
 pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
 fmt.Printf("pid:%v offset:%vn", pid, offset)
}

Operation result :

 pid:0 offset:0

Run again :

 pid:0 offset:1

Run again :

 pid:0 offset:
 package main
 import (
"fmt"
"strings"
"sync"
"github.com/Shopify/sarama"
)
 var (
wg sync.WaitGroup
)
 func main() {
// Create consumer
consumer, err := sarama.NewConsumer(strings.Split("192.168.1.125:9092", ","), nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
// Set up zones
partitionList, err := consumer.Partitions("nginx_log")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)
// Cycle partition
for partition := range partitionList {
pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %sn", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
 }(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
 package main
 import (
"context"
"flag"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
 "github.com/Shopify/sarama"
)
 // Sarama configuration options
var (
brokers = ""
version = ""
group = ""
topics = ""
assignor = ""
oldest = true
verbose = false
)
 func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma seperated list")
flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
flag.Parse()
 if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
 if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}
 if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}
 func main() {
log.Println("Starting a new Sarama consumer")
 if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
 version, err := sarama.ParseKafkaVersion(version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
 /**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version
 switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
 if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
 /**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
}
 ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
 wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
 <-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
 sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
 // Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
 // Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 // NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
 return nil
}

`$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"