本文转自 跟我学IM后盾开发作者 杰克.许 经OpenIM技术人员整顿订正后公布。
写在后面

Open-IM是由前微信技术专家打造的开源的即时通讯组件。Open-IM包含IM服务端和客户端SDK,实现了高性能、轻量级、易扩大等重要个性。开发者通过集成Open-IM组件,并私有化部署服务端,能够将即时通讯、实时网络能力疾速集成到本身利用中,并确保业务数据的安全性和私密性。

Kafka在OpenIM我的项目中承当重要的角色,感激作者在应用OpenIM中发现的bug(应用Kafka不当的bug)

理解更多原创文章:

【OpenIM原创】开源OpenIM:轻量、高效、实时、牢靠、低成本的音讯模型

【OpenIM原创】C/C++调用golang函数,golang回调C/C++函数

【OpenIM原创】简略轻松入门 一文解说WebRTC实现1对1音视频通信原理

【OpenIM扩大】OpenIM服务发现和负载平衡golang插件:gRPC接入etcdv3

【开源OpenIM】高性能、可伸缩、易扩大的即时通讯架构

如果您有趣味能够在文章结尾理解到更多对于咱们的信息,期待着与您的交换单干。

01 背景

在一些业务零碎中,模块之间通过引入Kafka解耦,拿IM举例(图起源):

用户A给B发送音讯,msg_gateway收到音讯后,投递音讯到Kafka后就给A返回发送胜利。这个时候,其实还没有长久化到mysql中,尽管最终会放弃一致性。所以,试想如果Kafka丢音讯了,是不是就出大问题了?A认为给B发送音讯胜利了,然而在服务器内部消息失落了B并没有收到。

所以,在应用Kafka的时候,有一些业务对音讯失落问题十分的关注。

同样,常见的问题还有:

  • 反复生产的问题。
  • 乱序的问题。

上面咱们来一起看一下如何应用sarama包来解决这些问题。

02 Kafka音讯失落问题形容

以下内容起源:

kafka什么时候会丢音讯:https://blog.csdn.net/qrne06/…

下面咱们放心的点须要进一步明确一下丢音讯的定义:kafka集群中的局部或全副broker挂了,导致consumer没有及时收到音讯,这不属于丢音讯。broker挂了,只有音讯全副长久化到了硬盘上,重启broker集群之后,使消费者持续拉取音讯,音讯就没有失落,依然全量生产了。所以我的了解,所谓丢音讯,意味着:开发人员未感知到哪些音讯没有被生产。

作者把音讯的失落演绎了以下几种状况:

1) producer把音讯发送给broker,因为网络抖动,音讯没有达到broker,且开发人员无感知。

解决方案:producer设置acks参数,音讯同步到master之后返回ack信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这种形式肯定水平保障了音讯的可靠性,producer期待broker确认信号的时延也不高。

2)producer把音讯发送给broker-master,master接管到音讯,在未将音讯同步给follower之前,挂掉了,且开发人员无感知。

解决方案:producer设置acks参数,音讯同步到master且同步到所有follower之后返回ack信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保障了音讯的可靠性,毛病是producer期待broker确认信号的时延比拟高。

3)producer把音讯发送给broker-master,master接管到音讯,master未胜利将音讯同步给每个follower,有音讯失落危险。

解决方案:同上。

4)某个broker音讯尚未从内存缓冲区长久化到磁盘,就挂掉了,这种状况无奈通过ack机制感知。

解决方案:设置参数,放慢音讯长久化的频率,能在肯定水平上缩小这种状况产生的概率。但进步频率天然也会影响性能。

5)consumer胜利拉取到了音讯,consumer挂了。

解决方案:设置手动sync,生产胜利才提交

综上所述,集群/我的项目运行失常的状况下,kafka不会丢音讯。一旦集群呈现问题,音讯的可靠性无奈齐全保障。要想尽可能保障音讯牢靠,根本只能在发现音讯有可能没有被生产时,重发消息来解决。所以在业务逻辑中,要思考音讯的反复生产问题,对于关键环节,要有幂等机制。

作者的几条倡议:

1)如果一个业务很要害,应用kafka的时候要思考丢音讯的老本和解决方案。

2)producer端确认音讯是否达到集群,若有异样,进行重发。

3)consumer端保障生产幂等性。

4)运维保障集群运行失常且高可用,保障网络状况良好。

03 生产端丢音讯问题解决

下面说了,只须要把producer设置acks参数,期待Kafka所有follower都胜利后再返回。咱们只须要进行如下设置:

  • \1. config := sarama.NewConfig() 2. config.Producer.RequiredAcks = sarama.WaitForAll // -1

ack参数有如下取值:

1. const (
2. // NoResponse doesn't send any response, the TCP ACK is all you get. 3.   NoResponse RequiredAcks = 0
4. // WaitForLocal waits for only the local commit to succeed before         responding.    
5. WaitForLocal RequiredAcks = 1   
6. // WaitForAll waits for all in-sync replicas to commit before          responding.    
7. // The minimum number of in-sync replicas is configured on the             broker    via   
8. // the `min.insync.replicas` configuration key.    
9. WaitForAll RequiredAcks = -1
10.  )

04 生产端丢音讯问题

通常生产端丢音讯都是因为Offset主动提交了,然而数据并没有插入到mysql(比方呈现BUG或者过程Crash),导致下一次消费者重启后,音讯漏掉了,天然数据库中也查不到。这个时候,咱们能够通过手动提交解决,甚至在一些简单场景下,还要应用二阶段提交。

主动提交模式下的丢音讯问题

默认状况下,sarama是主动提交的形式,距离为1秒钟

1.  // NewConfig returns a new configuration instance with sane                defaults.
2. func NewConfig() *Config {  
3. // …  
4. c.Consumer.Offsets.AutoCommit.Enable = true. // 主动提交 
5. c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 距离 
6. c.Consumer.Offsets.Initial = OffsetNewest 
7. c.Consumer.Offsets.Retry.Max = 3 
8.  // ...
9.  }

这里的主动提交,是基于被标记过的音讯(sess.MarkMessage(msg, “”))

1. type exampleConsumerGroupHandler struct{}
2. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession)        error   { return nil }
3. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession)      error { return nil }
4. func (h exampleConsumerGroupHandler) ConsumeClaim(sess                  ConsumerGroupSession, claim ConsumerGroupClaim) error {  
5. for msg := range claim.Messages() {      
6. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic,      msg.Partition, msg.Offset)      
7. // 标记音讯已解决,sarama会主动提交     
8. sess.MarkMessage(msg, "") 
9. }   
10. return nil
11. }

如果不调用sess.MarkMessage(msg, “”),即便启用了主动提交也没有成果,下次启动消费者会从上一次的Offset从新生产,咱们无妨正文掉sess.MarkMessage(msg, “”),而后关上Offset Explorer查看:

那么这样,咱们就大略了解了sarama主动提交的原理:先标记再提交。咱们只须要放弃标记逻辑在插入mysql代码之后即可确保不会呈现丢音讯的问题:

正确的调用程序:

1. func (h msgConsumerGroup) ConsumeClaim(sesssarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   
2. for msg := range claim.Messages() {
3. // 插入mysql
4. insertToMysql(msg)      
5. // 正确:插入mysql胜利后程序解体,下一次顶多反复生产一次,而不是因为Offset超         前,导致应用层音讯失落了     
6.  sess.MarkMessage(msg, “") 
7.  }  
8.  return nil
9.  }

谬误的程序:

1. func (h msgConsumerGroup) ConsumeClaim(sess                           sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 2. for msg := range claim.Messages() {     
3. // 谬误1:不能先标记,再插入mysql,可能标记的时候刚好主动提交Offset,但mysql插入失败了,导致下一次这个音讯不会被生产,造成失落      
4. // 谬误2:罗唆遗记调用sess.MarkMessage(msg, “"),导致反复生产   
5. sess.MarkMessage(msg, “")      
6. // 插入mysql      
7. insertToMysql(msg)  
8.  }  
9.  return nil
10. }

sarama手动提交模式

当然,另外也能够通过手动提交来解决丢音讯的问题,然而集体不举荐,因为主动提交模式下曾经能解决丢音讯问题。

1. consumerConfig := sarama.NewConfig()
2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig.
3. Consumer.Return.Errors = falseconsumerConfig.
4. Consumer.Offsets.AutoCommit.Enable = false  // 禁用主动提交,改为手动
5. consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   7. for msg := range claim.Messages() {      
8. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))      9. // 插入mysql     
10. insertToMysql(msg)      
11. // 手动提交模式下,也须要先进行标记     
12. sess.MarkMessage(msg, "")      
13. consumerCount++      
14. if consumerCount%3 == 0 {         
15. // 手动提交,不能频繁调用,耗时9ms左右,macOS i7 16GB         
16. t1 := time.Now().Nanosecond()         
17. sess.Commit()         
18. t2 := time.Now().Nanosecond()         
19.fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")      
20. }   
21. }   
22. return nil
23. }

05 Kafka音讯程序问题

投递Kafka之前,咱们通过一次gRPC调用解决了音讯序号的生成问题,然而这里其实还波及一个音讯程序问题:订阅Kafka的消费者如何依照音讯程序写入mysql,而不是随机写入呢?

咱们晓得,Kafka的音讯在一个partition中是有序的,所以只有确保发给某个人的音讯都在同一个partition中即可。

1. 全局一个partition

这个最简略,然而在kafka中一个partition对应一个线程,所以这种模型下Kafka的吞吐是个问题。

2. 多个partition,手动指定

1. msg := &sarama.ProducerMessage{   
2. Topic: “msgc2s",   
3. Value: sarama.StringEncoder(“hello”),   
4. Partition: toUserId % 10,
5. }
6. partition, offset, err := producer.SendMessage(msg)

生产音讯的时候,除了Topic和Value,咱们能够通过手动指定partition,比方总共有10个分区,咱们依据用户ID取余,这样发给同一个用户的音讯,每次都到1个partition外面去了,消费者写入mysql中的时候,天然也是有序的。

然而,因为分区总数是写死的,万一Kafka的分区数要调整呢?那不得从新编译代码?所以这个形式不够柔美。

3. 多个partition,主动计算

kafka客户端为咱们提供了这种反对。首先,在初始化的时候,设置抉择分区的策略为Hash:

p.config.Producer.Partitioner = sarama.NewHashPartitioner

而后,在生成音讯之前,设置音讯的Key值:

1. msg := &sarama.ProducerMessage{   
2. Topic: "testAutoSyncOffset",   
3. Value: sarama.StringEncoder("hello"),   
4. Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
5. }

Kafka客户端会依据Key进行Hash,咱们通过把接管用户ID作为Key,这样就能让所有发给某个人的音讯落到同一个分区了,也就有序了。

4.扩大常识:多线程状况下一个partition的乱序解决

咱们下面说了,Kafka客户端针对一个partition开一个线程进行生产,如果解决比拟耗时的话,比方解决一条音讯耗时几十 ms,那么 1 秒钟就只能解决几十条音讯,这吞吐量太低了。这个时候,咱们可能就把逻辑挪动到其余线程外面去解决,这样的话,程序就可能会乱。

咱们能够通过写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue 即可,这样就能保障程序性。PS:就像4 % 10 = 4,14 % 10 = 4,他们取余都是等于4,所以落到了一个partition,然而key值不一样啊,咱们能够本人再取余,放到不同的queue外面。

06 反复生产和音讯幂等

这篇文章中:

kafka什么时候会丢音讯:https://blog.csdn.net/qrne06/…

具体了形容了各种丢音讯的状况,咱们通过设置 RequiredAcks = sarama.WaitForAll(-1),能够解决生产端丢音讯的问题。第六节中也对生产端丢音讯进行了阐明,只须要确保在插入数据库之后,调用 sess.MarkMessage(msg, “”) 即可。

如果呈现了插入Mysql胜利,然而因为主动提交有1秒的距离,如果此时解体,下次启动消费者势必会对这1秒的数据进行反复生产,咱们在应用层须要解决这个问题。

常见的有2种思路:

  1. 如果是存在redis中不须要长久化的数据,比方string类型,set具备人造的幂等性,无需解决。
  2. 插入mysql之前,进行一次query操作,针对每个客户端发的音讯,咱们为它生成一个惟一的ID(比方GUID),或者间接把音讯的ID设置为惟一索引。

第2个计划的难点在于,全局惟一ID的生成,实践上GUID也是存在反复的可能性的,如果是客户端生成,那么插入失败,怎么让客户端感知呢?

所以,这里我认为还是须要自定义ID生产,比方通过组合法:用户ID + 以后工夫 + 32位GUID,是不是简直不会反复了呢(试想,1集体发1亿条文本须要多少年。。。)?

07 残缺代码实例

consumer.go

1. type msgConsumerGroup struct{}
2. 
3. func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
4. func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {   6. for msg := range claim.Messages() {      
7. fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
8. 
9. // 查mysql去重      
10. if check(msg) {          
11. // 插入mysql          
12. insertToMysql()      
13. }
14.
15. // 标记,sarama会主动进行提交,默认距离1秒      
16. sess.MarkMessage(msg, "")  
17. }   
18. return nil
19. }
20.
21. func main(){    
22. consumerConfig := sarama.NewConfig()    
23. consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version    
24. consumerConfig.Consumer.Return.Errors = false    
25. //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true      
26. // 禁用主动提交,改为手动  //
27. consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 测试3秒主动提交    consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
28.
29. cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092",    "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig)  30. if err != nil {       
31. panic(err)   
32. }
33. 
34. for {      
35. err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)       
36. if err != nil {         
37. fmt.Println(err.Error())         
38. break     
39. }   
40. }
41. 
42.  _ = cGroup.Close()
43. }

producer.go

1. func main(){    
2. config := sarama.NewConfig()    
3. config.Producer.RequiredAcks = sarama.WaitForAll // 期待所有follower都回复ack,确保Kafka不会丢音讯    
4. config.Producer.Return.Successes = true    
5. config.Producer.Partitioner = sarama.NewHashPartitioner
6.
7.  // 对Key进行Hash,同样的Key每次都落到一个分区,这样音讯是有序的
    // 应用同步producer,异步模式下有更高的性能,然而解决更简单,这里倡议先从简略的动手    
8. producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)    
9. defer func() {       
10. _ = producer.Close()    
11. }()    
12. if err != nil {       
13. panic(err.Error())   
14. }
15.
16. msgCount := 4   
17. // 模仿4个音讯    
18. for i := 0; i < msgCount; i++ {        
19. rand.Seed(int64(time.Now().Nanosecond()))        
20. msg := &sarama.ProducerMessage{          
21. Topic: "testAutoSyncOffset",          
22. Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),   
23. Key:   sarama.StringEncoder("BBB”),        
24. }
25.
26.  t1 := time.Now().Nanosecond()        
27. partition, offset, err := producer.SendMessage(msg)        
28. t2 := time.Now().Nanosecond()
29.
30. if err == nil {            
31. fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), " ms")        
32. } else {           
33. fmt.Println(err.Error())      
34.      }   
35.   }
36.}

完结

OpenIM github开源地址:

https://github.com/OpenIMSDK/…

OpenIM官网 :https://www.rentsoft.cn

OpenIM官方论坛:https://forum.rentsoft.cn/

咱们致力于通过开源模式,为寰球企业/开发者提供简略、易用、高效的IM服务和实时音视频通信能力,帮忙开发者升高我的项目的开发成本,并让开发者掌控业务的外围数据。

IM作为外围业务数据,平安的重要性毋庸置疑,OpenIM开源以及私有化部署让企业能更放心使用。

现在IM云服务商免费高企,如何让企业低成本、平安、牢靠接入IM服务,是OpenIM的历史使命,也是咱们后退的方向。

如您有技术下面的浅见请到咱们的论坛分割沟通,用户也可与咱们的技术人员谈讨应用方面的难题以及见解