分享
package mainimport (    "fmt"    "github.com/Shopify/sarama"    "log"    "os"    "strings"    "sync")var (    wg     sync.WaitGroup    logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() {    sarama.Logger = logger    consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)    iferr != nil {        logger.Println("Failed to start consumer: %s", err)    }    partitionList, err := consumer.Partitions("hello")    iferr != nil {        logger.Println("Failed to get the list of partitions: ", err)    }    forpartition := range partitionList {        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)        iferr != nil {            logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)        }        defer pc.AsyncClose()        wg.Add(1)        go func(sarama.PartitionConsumer) {            defer wg.Done()            formsg := range pc.Messages() {                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))                fmt.Println()            }        }(pc)    }    wg.Wait()    logger.Println("Done consuming topic hello")    consumer.Close()}package mainimport (    "github.com/Shopify/sarama"    "log"    "os"    "strings")var (    logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() {    sarama.Logger = logger    config := sarama.NewConfig()    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Partitioner = sarama.NewRandomPartitioner    msg := &sarama.ProducerMessage{}    msg.Topic = "hello"    msg.Partition = int32(-1)    msg.Key = sarama.StringEncoder("key")    msg.Value = sarama.ByteEncoder("你好, 世界!")    producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)    iferr != nil {        logger.Println("Failed to produce message: %s", err)        os.Exit(500)    }    defer producer.Close()    partition, offset, err := producer.SendMessage(msg)    iferr != nil {        logger.Println("Failed to produce message: ", err)    }    logger.Printf("partition=%d, offset=%d\n", partition, offset)}`单行代码`
package mainimport (    "fmt"    "github.com/Shopify/sarama"    "log"    "os"    "strings"    "sync")var (    wg     sync.WaitGroup    logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() {    sarama.Logger = logger    consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)    iferr != nil {        logger.Println("Failed to start consumer: %s", err)    }    partitionList, err := consumer.Partitions("hello")    iferr != nil {        logger.Println("Failed to get the list of partitions: ", err)    }    forpartition := range partitionList {        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)        iferr != nil {            logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)        }        defer pc.AsyncClose()        wg.Add(1)        go func(sarama.PartitionConsumer) {            defer wg.Done()            formsg := range pc.Messages() {                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))                fmt.Println()            }        }(pc)    }    wg.Wait()    logger.Println("Done consuming topic hello")    consumer.Close()}package mainimport (    "github.com/Shopify/sarama"    "log"    "os"    "strings")var (    logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() {    sarama.Logger = logger    config := sarama.NewConfig()    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Partitioner = sarama.NewRandomPartitioner    msg := &sarama.ProducerMessage{}    msg.Topic = "hello"    msg.Partition = int32(-1)    msg.Key = sarama.StringEncoder("key")    msg.Value = sarama.ByteEncoder("你好, 世界!")    producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)    iferr != nil {        logger.Println("Failed to produce message: %s", err)        os.Exit(500)    }    defer producer.Close()    partition, offset, err := producer.SendMessage(msg)    iferr != nil {        logger.Println("Failed to produce message: ", err)    }    logger.Printf("partition=%d, offset=%d\n", partition, offset)}