本文转自 跟我学IM后盾开发作者 杰克.许 经OpenIM技术人员整顿订正后公布。
写在后面
Open-IM是由前微信技术专家打造的开源的即时通讯组件。Open-IM包含IM服务端和客户端SDK,实现了高性能、轻量级、易扩大等重要个性。开发者通过集成Open-IM组件,并私有化部署服务端,能够将即时通讯、实时网络能力疾速集成到本身利用中,并确保业务数据的安全性和私密性。
Kafka在OpenIM我的项目中承当重要的角色,感激作者在应用OpenIM中发现的bug(应用Kafka不当的bug)
理解更多原创文章:
【OpenIM原创】开源OpenIM:轻量、高效、实时、牢靠、低成本的音讯模型
【OpenIM原创】C/C++调用golang函数,golang回调C/C++函数
【OpenIM原创】简略轻松入门 一文解说WebRTC实现1对1音视频通信原理
【OpenIM扩大】OpenIM服务发现和负载平衡golang插件:gRPC接入etcdv3
【开源OpenIM】高性能、可伸缩、易扩大的即时通讯架构
如果您有趣味能够在文章结尾理解到更多对于咱们的信息,期待着与您的交换单干。
01 背景
在一些业务零碎中,模块之间通过引入Kafka解耦,拿IM举例(图起源):
用户A给B发送音讯,msg_gateway收到音讯后,投递音讯到Kafka后就给A返回发送胜利。这个时候,其实还没有长久化到mysql中,尽管最终会放弃一致性。所以,试想如果Kafka丢音讯了,是不是就出大问题了?A认为给B发送音讯胜利了,然而在服务器内部消息失落了B并没有收到。
所以,在应用Kafka的时候,有一些业务对音讯失落问题十分的关注。
同样,常见的问题还有:
- 反复生产的问题。
- 乱序的问题。
上面咱们来一起看一下如何应用sarama包来解决这些问题。
02 Kafka音讯失落问题形容
以下内容起源:
kafka什么时候会丢音讯:https://blog.csdn.net/qrne06/…
下面咱们放心的点须要进一步明确一下丢音讯的定义:kafka集群中的局部或全副broker挂了,导致consumer没有及时收到音讯,这不属于丢音讯。broker挂了,只有音讯全副长久化到了硬盘上,重启broker集群之后,使消费者持续拉取音讯,音讯就没有失落,依然全量生产了。所以我的了解,所谓丢音讯,意味着:开发人员未感知到哪些音讯没有被生产。
作者把音讯的失落演绎了以下几种状况:
1) producer把音讯发送给broker,因为网络抖动,音讯没有达到broker,且开发人员无感知。
解决方案:producer设置acks参数,音讯同步到master之后返回ack信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这种形式肯定水平保障了音讯的可靠性,producer期待broker确认信号的时延也不高。
2)producer把音讯发送给broker-master,master接管到音讯,在未将音讯同步给follower之前,挂掉了,且开发人员无感知。
解决方案:producer设置acks参数,音讯同步到master且同步到所有follower之后返回ack信号,否则抛异样使应用程序感知到并在业务中进行重试发送。这样设置,在更大程度上保障了音讯的可靠性,毛病是producer期待broker确认信号的时延比拟高。
3)producer把音讯发送给broker-master,master接管到音讯,master未胜利将音讯同步给每个follower,有音讯失落危险。
解决方案:同上。
4)某个broker音讯尚未从内存缓冲区长久化到磁盘,就挂掉了,这种状况无奈通过ack机制感知。
解决方案:设置参数,放慢音讯长久化的频率,能在肯定水平上缩小这种状况产生的概率。但进步频率天然也会影响性能。
5)consumer胜利拉取到了音讯,consumer挂了。
解决方案:设置手动sync,生产胜利才提交。
综上所述,集群/我的项目运行失常的状况下,kafka不会丢音讯。一旦集群呈现问题,音讯的可靠性无奈齐全保障。要想尽可能保障音讯牢靠,根本只能在发现音讯有可能没有被生产时,重发消息来解决。所以在业务逻辑中,要思考音讯的反复生产问题,对于关键环节,要有幂等机制。
作者的几条倡议:
1)如果一个业务很要害,应用kafka的时候要思考丢音讯的老本和解决方案。
2)producer端确认音讯是否达到集群,若有异样,进行重发。
3)consumer端保障生产幂等性。
4)运维保障集群运行失常且高可用,保障网络状况良好。
03 生产端丢音讯问题解决
下面说了,只须要把producer设置acks参数,期待Kafka所有follower都胜利后再返回。咱们只须要进行如下设置:
- \1. config := sarama.NewConfig() 2. config.Producer.RequiredAcks = sarama.WaitForAll // -1
ack参数有如下取值:
1. const (
2. // NoResponse doesn't send any response, the TCP ACK is all you get. 3. NoResponse RequiredAcks = 0
4. // WaitForLocal waits for only the local commit to succeed before responding.
5. WaitForLocal RequiredAcks = 1
6. // WaitForAll waits for all in-sync replicas to commit before responding.
7. // The minimum number of in-sync replicas is configured on the broker via
8. // the `min.insync.replicas` configuration key.
9. WaitForAll RequiredAcks = -1
10. )
04 生产端丢音讯问题
通常生产端丢音讯都是因为Offset主动提交了,然而数据并没有插入到mysql(比方呈现BUG或者过程Crash),导致下一次消费者重启后,音讯漏掉了,天然数据库中也查不到。这个时候,咱们能够通过手动提交解决,甚至在一些简单场景下,还要应用二阶段提交。
主动提交模式下的丢音讯问题
默认状况下,sarama是主动提交的形式,距离为1秒钟
1. // NewConfig returns a new configuration instance with sane defaults.
2. func NewConfig() *Config {
3. // …
4. c.Consumer.Offsets.AutoCommit.Enable = true. // 主动提交
5. c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 距离
6. c.Consumer.Offsets.Initial = OffsetNewest
7. c.Consumer.Offsets.Retry.Max = 3
8. // ...
9. }
这里的主动提交,是基于被标记过的音讯(sess.MarkMessage(msg, “”))
1. type exampleConsumerGroupHandler struct{}
2. func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
3. func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
4. func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
5. for msg := range claim.Messages() {
6. fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
7. // 标记音讯已解决,sarama会主动提交
8. sess.MarkMessage(msg, "")
9. }
10. return nil
11. }
如果不调用sess.MarkMessage(msg, “”),即便启用了主动提交也没有成果,下次启动消费者会从上一次的Offset从新生产,咱们无妨正文掉sess.MarkMessage(msg, “”),而后关上Offset Explorer查看:
那么这样,咱们就大略了解了sarama主动提交的原理:先标记再提交。咱们只须要放弃标记逻辑在插入mysql代码之后即可确保不会呈现丢音讯的问题:
正确的调用程序:
1. func (h msgConsumerGroup) ConsumeClaim(sesssarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
2. for msg := range claim.Messages() {
3. // 插入mysql
4. insertToMysql(msg)
5. // 正确:插入mysql胜利后程序解体,下一次顶多反复生产一次,而不是因为Offset超 前,导致应用层音讯失落了
6. sess.MarkMessage(msg, “")
7. }
8. return nil
9. }
谬误的程序:
1. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 2. for msg := range claim.Messages() {
3. // 谬误1:不能先标记,再插入mysql,可能标记的时候刚好主动提交Offset,但mysql插入失败了,导致下一次这个音讯不会被生产,造成失落
4. // 谬误2:罗唆遗记调用sess.MarkMessage(msg, “"),导致反复生产
5. sess.MarkMessage(msg, “")
6. // 插入mysql
7. insertToMysql(msg)
8. }
9. return nil
10. }
sarama手动提交模式
当然,另外也能够通过手动提交来解决丢音讯的问题,然而集体不举荐,因为主动提交模式下曾经能解决丢音讯问题。
1. consumerConfig := sarama.NewConfig()
2. consumerConfig.Version = sarama.V2_8_0_0consumerConfig.
3. Consumer.Return.Errors = falseconsumerConfig.
4. Consumer.Offsets.AutoCommit.Enable = false // 禁用主动提交,改为手动
5. consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
6. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 7. for msg := range claim.Messages() {
8. fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) 9. // 插入mysql
10. insertToMysql(msg)
11. // 手动提交模式下,也须要先进行标记
12. sess.MarkMessage(msg, "")
13. consumerCount++
14. if consumerCount%3 == 0 {
15. // 手动提交,不能频繁调用,耗时9ms左右,macOS i7 16GB
16. t1 := time.Now().Nanosecond()
17. sess.Commit()
18. t2 := time.Now().Nanosecond()
19.fmt.Println("commit cost:", (t2-t1)/(1000*1000), "ms")
20. }
21. }
22. return nil
23. }
05 Kafka音讯程序问题
投递Kafka之前,咱们通过一次gRPC调用解决了音讯序号的生成问题,然而这里其实还波及一个音讯程序问题:订阅Kafka的消费者如何依照音讯程序写入mysql,而不是随机写入呢?
咱们晓得,Kafka的音讯在一个partition中是有序的,所以只有确保发给某个人的音讯都在同一个partition中即可。
1. 全局一个partition
这个最简略,然而在kafka中一个partition对应一个线程,所以这种模型下Kafka的吞吐是个问题。
2. 多个partition,手动指定
1. msg := &sarama.ProducerMessage{
2. Topic: “msgc2s",
3. Value: sarama.StringEncoder(“hello”),
4. Partition: toUserId % 10,
5. }
6. partition, offset, err := producer.SendMessage(msg)
生产音讯的时候,除了Topic和Value,咱们能够通过手动指定partition,比方总共有10个分区,咱们依据用户ID取余,这样发给同一个用户的音讯,每次都到1个partition外面去了,消费者写入mysql中的时候,天然也是有序的。
然而,因为分区总数是写死的,万一Kafka的分区数要调整呢?那不得从新编译代码?所以这个形式不够柔美。
3. 多个partition,主动计算
kafka客户端为咱们提供了这种反对。首先,在初始化的时候,设置抉择分区的策略为Hash:
p.config.Producer.Partitioner = sarama.NewHashPartitioner
而后,在生成音讯之前,设置音讯的Key值:
1. msg := &sarama.ProducerMessage{
2. Topic: "testAutoSyncOffset",
3. Value: sarama.StringEncoder("hello"),
4. Key: sarama.StringEncoder(strconv.Itoa(RecvID)),
5. }
Kafka客户端会依据Key进行Hash,咱们通过把接管用户ID作为Key,这样就能让所有发给某个人的音讯落到同一个分区了,也就有序了。
4.扩大常识:多线程状况下一个partition的乱序解决
咱们下面说了,Kafka客户端针对一个partition开一个线程进行生产,如果解决比拟耗时的话,比方解决一条音讯耗时几十 ms,那么 1 秒钟就只能解决几十条音讯,这吞吐量太低了。这个时候,咱们可能就把逻辑挪动到其余线程外面去解决,这样的话,程序就可能会乱。
咱们能够通过写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue 即可,这样就能保障程序性。PS:就像4 % 10 = 4,14 % 10 = 4,他们取余都是等于4,所以落到了一个partition,然而key值不一样啊,咱们能够本人再取余,放到不同的queue外面。
06 反复生产和音讯幂等
这篇文章中:
kafka什么时候会丢音讯:https://blog.csdn.net/qrne06/…
具体了形容了各种丢音讯的状况,咱们通过设置 RequiredAcks = sarama.WaitForAll(-1),能够解决生产端丢音讯的问题。第六节中也对生产端丢音讯进行了阐明,只须要确保在插入数据库之后,调用 sess.MarkMessage(msg, “”) 即可。
如果呈现了插入Mysql胜利,然而因为主动提交有1秒的距离,如果此时解体,下次启动消费者势必会对这1秒的数据进行反复生产,咱们在应用层须要解决这个问题。
常见的有2种思路:
- 如果是存在redis中不须要长久化的数据,比方string类型,set具备人造的幂等性,无需解决。
- 插入mysql之前,进行一次query操作,针对每个客户端发的音讯,咱们为它生成一个惟一的ID(比方GUID),或者间接把音讯的ID设置为惟一索引。
第2个计划的难点在于,全局惟一ID的生成,实践上GUID也是存在反复的可能性的,如果是客户端生成,那么插入失败,怎么让客户端感知呢?
所以,这里我认为还是须要自定义ID生产,比方通过组合法:用户ID + 以后工夫 + 32位GUID,是不是简直不会反复了呢(试想,1集体发1亿条文本须要多少年。。。)?
07 残缺代码实例
consumer.go
1. type msgConsumerGroup struct{}
2.
3. func (msgConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error { return nil }
4. func (msgConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
5. func (h msgConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 6. for msg := range claim.Messages() {
7. fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
8.
9. // 查mysql去重
10. if check(msg) {
11. // 插入mysql
12. insertToMysql()
13. }
14.
15. // 标记,sarama会主动进行提交,默认距离1秒
16. sess.MarkMessage(msg, "")
17. }
18. return nil
19. }
20.
21. func main(){
22. consumerConfig := sarama.NewConfig()
23. consumerConfig.Version = sarama.V2_8_0_0 // specify appropriate version
24. consumerConfig.Consumer.Return.Errors = false
25. //consumerConfig.Consumer.Offsets.AutoCommit.Enable = true
26. // 禁用主动提交,改为手动 //
27. consumerConfig.Consumer.Offsets.AutoCommit.Interval = time.Second * 1 // 测试3秒主动提交 consumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
28.
29. cGroup, err := sarama.NewConsumerGroup([]string{"10.0.56.153:9092", "10.0.56.153:9093", "10.0.56.153:9094"},"testgroup", consumerConfig) 30. if err != nil {
31. panic(err)
32. }
33.
34. for {
35. err := cGroup.Consume(context.Background(), []string{"testAutoSyncOffset"}, consumerGroup)
36. if err != nil {
37. fmt.Println(err.Error())
38. break
39. }
40. }
41.
42. _ = cGroup.Close()
43. }
producer.go
1. func main(){
2. config := sarama.NewConfig()
3. config.Producer.RequiredAcks = sarama.WaitForAll // 期待所有follower都回复ack,确保Kafka不会丢音讯
4. config.Producer.Return.Successes = true
5. config.Producer.Partitioner = sarama.NewHashPartitioner
6.
7. // 对Key进行Hash,同样的Key每次都落到一个分区,这样音讯是有序的
// 应用同步producer,异步模式下有更高的性能,然而解决更简单,这里倡议先从简略的动手
8. producer, err := sarama.NewSyncProducer([]string{"10.0.56.153:9092"}, config)
9. defer func() {
10. _ = producer.Close()
11. }()
12. if err != nil {
13. panic(err.Error())
14. }
15.
16. msgCount := 4
17. // 模仿4个音讯
18. for i := 0; i < msgCount; i++ {
19. rand.Seed(int64(time.Now().Nanosecond()))
20. msg := &sarama.ProducerMessage{
21. Topic: "testAutoSyncOffset",
22. Value: sarama.StringEncoder("hello+" + strconv.Itoa(rand.Int())),
23. Key: sarama.StringEncoder("BBB”),
24. }
25.
26. t1 := time.Now().Nanosecond()
27. partition, offset, err := producer.SendMessage(msg)
28. t2 := time.Now().Nanosecond()
29.
30. if err == nil {
31. fmt.Println("produce success, partition:", partition, ",offset:", offset, ",cost:", (t2-t1)/(1000*1000), " ms")
32. } else {
33. fmt.Println(err.Error())
34. }
35. }
36.}
完结
OpenIM github开源地址:
https://github.com/OpenIMSDK/…
OpenIM官网 :https://www.rentsoft.cn
OpenIM官方论坛:https://forum.rentsoft.cn/
咱们致力于通过开源模式,为寰球企业/开发者提供简略、易用、高效的IM服务和实时音视频通信能力,帮忙开发者升高我的项目的开发成本,并让开发者掌控业务的外围数据。
IM作为外围业务数据,平安的重要性毋庸置疑,OpenIM开源以及私有化部署让企业能更放心使用。
现在IM云服务商免费高企,如何让企业低成本、平安、牢靠接入IM服务,是OpenIM的历史使命,也是咱们后退的方向。
如您有技术下面的浅见请到咱们的论坛分割沟通,用户也可与咱们的技术人员谈讨应用方面的难题以及见解