Kafka是一个高吞吐量的分布式发布订阅消息系统,由于其高效的消息传递机制,被广泛应用于大规模数据处理、日志收集和实时数据流处理等场景中。但是,由于网络、硬件等原因,Kafka的消息有时可能会出现丢失的情况。下面是一些Go语言中解决Kafka消息丢失问题的方法:
1.设置合适的Kafka配置
在创建Kafka生产者时,可以设置一些参数来控制消息传输的可靠性,例如确认消息是否已经被服务器接收、等待服务器返回确认消息的时间等等。通过合适地设置这些参数,可以减少消息丢失的概率。
2.使用Kafka事务
Kafka事务是一组原子性操作的集合,要么全部成功,要么全部失败。使用Kafka事务可以确保消息的可靠传递,即使发生了部分失败,也可以回滚所有的操作,防止消息丢失。
3.使用同步发送消息
在Go语言中,Kafka的生产者提供了异步和同步两种发送消息的方式。异步发送消息时,程序会立即返回,而不等待消息是否已经被服务器接收。如果要确保消息已经被成功接收,可以使用同步发送消息的方式,等待服务器返回确认消息后再继续执行后续操作。
4.处理发送失败的消息
如果Kafka生产者在发送消息时出现异常,例如网络中断、服务器宕机等等,可以在代码中处理发送失败的消息,例如记录日志、重新发送消息等等,确保消息最终被成功接收。
5.使用Kafka副本机制
Kafka提供了副本机制,可以将同一个分区的消息复制到多个服务器上,以实现数据冗余和高可用性。使用副本机制可以在某个服务器出现故障时,自动切换到其他正常的服务器,确保消息不会丢失。
一、Kafka集群
broker.idserver.properties
/brokers/ids临时节点broker.id/controller
二、副本机制
controller broker
2.1 分区和副本
replication-factor
2.2 ISR机制
每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:
- 与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
- 在规定的时间内从首领副本那里低延迟地获取过消息。
如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。
--replication-factor--describe
2.3 不完全的首领选举
unclean.leader.election.enable
2.4 最少同步副本
min.insync.replicasorg.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
2.5 发送确认
Kafka 在生产者上有一个可选的参数 ack,该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入成功:
- acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
三、数据请求
3.1 元数据请求机制
Not a Leader for Partition
metadata.max.age.ms
Not a Leader for Partition
3.2 数据可见性
需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
3.3 零拷贝
Kafka 所有数据的写入和读取都是通过零拷贝来实现的。传统拷贝与零拷贝的区别如下:
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过 Socket 将内存中的数据发送出去。
这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝),然后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝),接着用户程序通过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝),最后通过 DMA 拷贝将数据拷贝到 NIC Buffer。同时,还伴随着四次上下文切换,如下图所示:
sendfile和transferTo实现零拷贝
sendfilesendfile
PlaintextTransportLayertransferFromtransferTo
transferTotransferFromsendfile
四、物理存储
4.1 分区分配
在创建主题时,Kafka 会首先决定如何在 broker 间分配分区副本,它遵循以下原则:
broker.rack
基于以上原因,如果你在一个单节点上创建一个 3 副本的主题,通常会抛出下面的异常:
4.2 分区数据保留规则
保留数据是 Kafka 的一个基本特性, 但是 Kafka 不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反, Kafka 为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。分别对应以下四个参数:
log.retention.byteslog.retention.mslog.retention.minuteslog.retention.minuteslog.retention.hourslog.retention.hours
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以 Kafka 把分区分成若干个片段,当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除。如果按照默认值保留数据一周,而且每天使用一个新片段,那么你就会看到,在每天使用一个新片段的同时会删除一个最老的片段,所以大部分时间该分区会有 7 个片段存在。
4.3 文件格式
通常保存在磁盘上的数据格式与生产者发送过来消息格式是一样的。 如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送 (格式如下所示) ,然后保存到磁盘上。之后消费者读取后再自己解压这个包装消息,获取每条消息的具体信息。
总之,解决Kafka消息丢失问题需要综合考虑各种因素,并采取多种方法来提高消息传输的可靠性和稳定性。