分享
package mainimport ( "fmt" "github.com/Shopify/sarama" "log" "os" "strings" "sync")var ( wg sync.WaitGroup logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() { sarama.Logger = logger consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil) iferr != nil { logger.Println("Failed to start consumer: %s", err) } partitionList, err := consumer.Partitions("hello") iferr != nil { logger.Println("Failed to get the list of partitions: ", err) } forpartition := range partitionList { pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest) iferr != nil { logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err) } defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() formsg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) fmt.Println() } }(pc) } wg.Wait() logger.Println("Done consuming topic hello") consumer.Close()}package mainimport ( "github.com/Shopify/sarama" "log" "os" "strings")var ( logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() { sarama.Logger = logger config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner msg := &sarama.ProducerMessage{} msg.Topic = "hello" msg.Partition = int32(-1) msg.Key = sarama.StringEncoder("key") msg.Value = sarama.ByteEncoder("你好, 世界!") producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config) iferr != nil { logger.Println("Failed to produce message: %s", err) os.Exit(500) } defer producer.Close() partition, offset, err := producer.SendMessage(msg) iferr != nil { logger.Println("Failed to produce message: ", err) } logger.Printf("partition=%d, offset=%d\n", partition, offset)}`单行代码`
package mainimport ( "fmt" "github.com/Shopify/sarama" "log" "os" "strings" "sync")var ( wg sync.WaitGroup logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() { sarama.Logger = logger consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil) iferr != nil { logger.Println("Failed to start consumer: %s", err) } partitionList, err := consumer.Partitions("hello") iferr != nil { logger.Println("Failed to get the list of partitions: ", err) } forpartition := range partitionList { pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest) iferr != nil { logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err) } defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() formsg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) fmt.Println() } }(pc) } wg.Wait() logger.Println("Done consuming topic hello") consumer.Close()}package mainimport ( "github.com/Shopify/sarama" "log" "os" "strings")var ( logger = log.New(os.Stderr, "[srama]", log.LstdFlags))func main() { sarama.Logger = logger config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner msg := &sarama.ProducerMessage{} msg.Topic = "hello" msg.Partition = int32(-1) msg.Key = sarama.StringEncoder("key") msg.Value = sarama.ByteEncoder("你好, 世界!") producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config) iferr != nil { logger.Println("Failed to produce message: %s", err) os.Exit(500) } defer producer.Close() partition, offset, err := producer.SendMessage(msg) iferr != nil { logger.Println("Failed to produce message: ", err) } logger.Printf("partition=%d, offset=%d\n", partition, offset)}