最近因为云原生日志收集的需要,我们打算使用 Filebeat 作为容器日志收集工具,并对其进行二次开发,因此笔者将谈谈 Filebeat 收集日志的那些事儿。本文不涉及过具体的源码分析,希望通过阅读您可以了解 filebeat 的基本使用方法和原理,姑且算是 filebeat 的入门吧。
1、前言
功能上能满足我们的需求:收集磁盘日志文件,发送到 Kafka 集群;支持多行收集和自定义字段等; 性能上相比运行于 jvm 上的 logstash 和 flume 优势明显; Filebeat 基于 golang 技术栈,二次开发对于我们来说有一定的技术积累; 部署方便,没有第三方依赖;
2、Filebeat 能做什么
Filebeat 可以从多种不同的上游 input 中接受需要收集的数据,其中我们最常用的就是 log input,即从日志中收集数据;
Filebeat 对收集来的数据进行加工,比如:多行合并,增加业务自定义字段,json 等格式的 encode;
Filebeat 将加工好的数据发送到被称为 output 的下游,其中我们最常用的就是 Elasticsearch 和 Kafka;
Filebeat 具有 ACK 反馈确认机制,即成功发送到 output 后,会将当前进度反馈给 input, 这样在进程重启后可以断点续传;
Filebeat 在发送 output 失败后,会启动 retry 机制,和上一次 ACK 反馈确认机制一起,保证了每次消息至少发送一次的语义;
Filebeat 在发送 output 时,由于网络等原因发生阻塞,则在 input 上游端会减慢收集,自适应匹配下游output 的状态。
一图以蔽之。
3、Filebeat 背后的“老大”
libbeat 提供了 publisher 组件,用于对接 input;
收集到的数据在进入到 libbeat 后,首先会经过各种 processor 的加工处理,比如过滤添加字段,多行合并等等;
input 组件通过 publisher 组件将收集到的数据推送到 publisher 内部的队列;
libbeat 本身实现了前面介绍过的多种 output, 因此它负责将处理好的数据通过 output 组件发送出去;
libbeat 本身封装了 retry 的逻辑;
libbeat 负责将 ACK 反馈通过到 input 组件 ;
从不同的介质中收集数据后投递给 libbeat;
接收 libbeat 反馈回来的 ACK, 作相应的持久化;
4、Filebeat 的简单使用示例
在 filebeat.yml 添加如下配置,这样我们可以将每一种等收集的路径写在单独的配置文件里,然后将这些配置文件统一放到 inputs.d 目录,方便管理
filebeat.config.inputs:enabled: truepath: inputs.d/*.yml
- type: log# Change to true to enable tenabled: true# Paths that should be crawlpaths:- /home/lw/test/filebeat/*.logfields:log_topic: lw_filebeat_t_2
/home/lw/test/filebeat/*.logfiled:log_topic: lw_filebeat_t_23.在filebeat.yml中配置kafka output:
output.kafka:hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"]version: 0.9.0.1topic: '%{[fields.log_topic]}'partition.round_robin:reachable_only: truecompression: nonerequired_acks: 1max_message_bytes: 1000000codec.format:string: '%{[host.name]}-%{[message]}'
hosts 是 kafka 集群的 broker list;
topic: ‘%{[fields.log_topic]}’ : 这项指定了我们要写入 kafka 集群哪个 topic, 可以看到它实现上是引用了上面 test.yml 配置中我们自定义的 filed 字段,通过这种方式我们就可以将收集的不同路径的数据写入到不同的 topic 中,但是这个有个限制就是只能写到一个 kafka 集群,因为当前版本的 filebeat 不允许同时配置多个output。
codec.format: 指定了写入 kafka 集群的消息格式,我们在从日志文件中读取的每行内容前面加上了当前机器的 hostname。
启动就很简单了,filebeat 和 filebeat.yml, inputs.d 都在同一目录下,然后 ./filebeat run 就好了。
5、Log input 是如何从日志文件中收集日志的
Pipeline.queue.Producerproducerinput 会根据配置文件中的收集路径(正则匹配)来轮询是否有新文件产生,文件是否已经过期,文件是否被删除或移动; 针对每一个文件创建一个 Harvester 来逐行读取文件内容;
获取文件信息时会获取文件的 device id + indoe 作为文件的唯一标识; 前面我们提过文件收集进度会被持久化,这样当创建 Harvester 时,首先会对文件作 openFile, 以 device id + inode 为 key 在持久化文件中查看当前文件是否被收集过,收集到了什么位置,然后断点续传; 在读取过程中,如果文件被截断,认为文件已经被同名覆盖,将从头开始读取文件; 如果文件被删除,因为原文件已被打开,不影响继续收集,但如果设置了 CloseRemoved, 则不会再继续收集; 如果文件被重命名,因为原文件已被打开,不影响继续收集,但如果设置了 CloseRenamed, 则不会再继续收集;
6、日志如何被发送
发送流程简述:
input 将日志内容写入 libbeat 的内部队列后,剩下的事件就都交由 libbeat 来做了; libbeat 会创建 consumer, 复现作 libbeat 的队列里消费日志 event, 封装成 Batch 对象; 针对每个Batch 对象,还会创建 ack Channel, 用来将 ACK 反馈信息写入这个 channel; Batch 对象会被源源不断地写入一个叫 workQueue 的 channel 中; 以 kafka output 为例,在创 kafka output 时首先会创建一个 outputs.Group,它内部封装了一组 kafka client, 同时启动一组 goroutine; 上面创建的每个 goroutine 都从 workQueue 队列里读取 Batch 对象,然后通过 kafka client 发送出去,这里相当于多线程并发读队列后发送; 若 kafka client 发送成功,写入信息到 ack channel, 最终会通过到 input 中;
重试机制:
ch <-chan *sarama.ProducerError7、后记
云上服务器无人值守怎么做?GOPS 2020 · 深圳站,阿里高级技术专家滕圣波与您聊聊~