最近因为云原生日志收集的需要,我们打算使用 Filebeat 作为容器日志收集工具,并对其进行二次开发,因此笔者将谈谈 Filebeat 收集日志的那些事儿。本文不涉及过具体的源码分析,希望通过阅读您可以了解 filebeat 的基本使用方法和原理,姑且算是 filebeat 的入门吧。

1、前言

开源日志收集组件众多,之所以选择 Filebeat,主要基于以下几点:
  • 功能上能满足我们的需求:收集磁盘日志文件,发送到 Kafka 集群;支持多行收集和自定义字段等;
  • 性能上相比运行于 jvm 上的 logstash 和 flume 优势明显;
  • Filebeat 基于 golang 技术栈,二次开发对于我们来说有一定的技术积累;
  • 部署方便,没有第三方依赖;

2、Filebeat 能做什么

简单来说 Filebeat 就是数据的搬运工,只不过除了搬运还可以对数据作一些深加工,为业务增加一些附加值。
  • Filebeat 可以从多种不同的上游 input 中接受需要收集的数据,其中我们最常用的就是 log input,即从日志中收集数据;

  • Filebeat 对收集来的数据进行加工,比如:多行合并,增加业务自定义字段,json 等格式的 encode;

  • Filebeat 将加工好的数据发送到被称为 output 的下游,其中我们最常用的就是 Elasticsearch 和 Kafka;

  • Filebeat 具有 ACK 反馈确认机制,即成功发送到 output 后,会将当前进度反馈给 input, 这样在进程重启后可以断点续传;

  • Filebeat 在发送 output 失败后,会启动 retry 机制,和上一次 ACK 反馈确认机制一起,保证了每次消息至少发送一次的语义

  • Filebeat 在发送 output 时,由于网络等原因发生阻塞,则在 input 上游端会减慢收集,自适应匹配下游output 的状态。

一图以蔽之。

3、Filebeat 背后的“老大”

说到 Filebeat,它其实只是 beats 家族众多成员中的一个。除了 Filebeat, 还有很多其他的 beat 小伙伴:
如果你愿意的话,你也可以按照 beat 的规范来写自己的 beat。
能实现以上这些 beat,都离不开 beats 家族真正的“老大”— libbeat, 它是 beat 体系的核心库。我们接下来看一下 libbeat 到底都做了些什么:
  • libbeat 提供了 publisher 组件,用于对接 input;

  • 收集到的数据在进入到 libbeat 后,首先会经过各种 processor 的加工处理,比如过滤添加字段,多行合并等等;

  • input 组件通过 publisher 组件将收集到的数据推送到 publisher 内部的队列;

  • libbeat 本身实现了前面介绍过的多种 output, 因此它负责将处理好的数据通过 output 组件发送出去;

  • libbeat 本身封装了 retry 的逻辑;

  • libbeat 负责将 ACK 反馈通过到 input 组件 ;

由此可见,大部分活儿都是 libbeat 来做,当“老大”不容易啊~。
input 仅需要做两件事:
  • 从不同的介质中收集数据后投递给 libbeat;

  • 接收 libbeat 反馈回来的 ACK, 作相应的持久化;

4、Filebeat 的简单使用示例

Filebeat 本身的使用很简单,我们只需要按需写好相应的 input 和 output 配置就好了。下面我们以一个收集磁盘日志文件到 Kafka 集群的例子来讲一下。
1.配置 inputs.d 目录
在 filebeat.yml 添加如下配置,这样我们可以将每一种等收集的路径写在单独的配置文件里,然后将这些配置文件统一放到 inputs.d 目录,方便管理
filebeat.config.inputs:enabled: truepath: inputs.d/*.yml
2.在 inputs.d 目录下创建 test1.yml,内容如下
 - type: log                       # Change to true to enable t    enabled: true                   # Paths that should be crawl    paths:                            - /home/lw/test/filebeat/*.log    fields:                       log_topic: lw_filebeat_t_2
/home/lw/test/filebeat/*.logfiled:log_topic: lw_filebeat_t_2

3.在filebeat.yml中配置kafka output:

output.kafka:                                                                hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"] version: 0.9.0.1                                                           topic: '%{[fields.log_topic]}'                                             partition.round_robin:                                                     reachable_only: true                                                     compression: none                                                          required_acks: 1                                                           max_message_bytes: 1000000                                                 codec.format:                                                                    string: '%{[host.name]}-%{[message]}'
其中:
  • hosts 是 kafka 集群的 broker list;

  • topic: ‘%{[fields.log_topic]}’  : 这项指定了我们要写入 kafka 集群哪个 topic, 可以看到它实现上是引用了上面 test.yml 配置中我们自定义的 filed 字段,通过这种方式我们就可以将收集的不同路径的数据写入到不同的 topic 中,但是这个有个限制就是只能写到一个 kafka 集群,因为当前版本的 filebeat 不允许同时配置多个output。

  • codec.format: 指定了写入 kafka 集群的消息格式,我们在从日志文件中读取的每行内容前面加上了当前机器的 hostname。

启动就很简单了,filebeat 和 filebeat.yml, inputs.d 都在同一目录下,然后 ./filebeat run 就好了。

filebeat 本身有很多全局的配置,每种input和output又有很多各自的配置,关乎日志收集的内存使用,是不是会丢失日志等方方面面,大家在使用时还需要仔细阅读,这里不赘述。

5、Log input 是如何从日志文件中收集日志的

input 的创建:
Pipeline.queue.Producerproducer
收集文件内容:
  • input 会根据配置文件中的收集路径(正则匹配)来轮询是否有新文件产生,文件是否已经过期,文件是否被删除或移动;
  • 针对每一个文件创建一个 Harvester 来逐行读取文件内容;
将文件内容封装后通过 producer 发送到 libbeat 的内部队列;
处理文件重命名,删除,截断:
  • 获取文件信息时会获取文件的 device id + indoe 作为文件的唯一标识;
  • 前面我们提过文件收集进度会被持久化,这样当创建 Harvester 时,首先会对文件作 openFile, 以 device id + inode 为 key 在持久化文件中查看当前文件是否被收集过,收集到了什么位置,然后断点续传;
  • 在读取过程中,如果文件被截断,认为文件已经被同名覆盖,将从头开始读取文件;
  • 如果文件被删除,因为原文件已被打开,不影响继续收集,但如果设置了 CloseRemoved, 则不会再继续收集;
  • 如果文件被重命名,因为原文件已被打开,不影响继续收集,但如果设置了 CloseRenamed, 则不会再继续收集;

6、日志如何被发送

发送流程简述:

  • input 将日志内容写入 libbeat 的内部队列后,剩下的事件就都交由 libbeat 来做了;
  • libbeat 会创建 consumer, 复现作 libbeat 的队列里消费日志 event, 封装成 Batch 对象;
  • 针对每个Batch 对象,还会创建 ack Channel, 用来将 ACK 反馈信息写入这个 channel;
  • Batch 对象会被源源不断地写入一个叫 workQueue 的 channel 中;
  • 以 kafka output 为例,在创 kafka output 时首先会创建一个 outputs.Group,它内部封装了一组 kafka client, 同时启动一组 goroutine;
  • 上面创建的每个 goroutine 都从 workQueue 队列里读取 Batch 对象,然后通过 kafka client 发送出去,这里相当于多线程并发读队列后发送;
  • 若 kafka client 发送成功,写入信息到 ack channel, 最终会通过到 input 中;

重试机制:

ch <-chan *sarama.ProducerError

7、后记

在本文里,我们没有深入到源码层次,为了讲清 filebeat 运作的原理,我们也忽略了一些实现细节,后续将会从源码层面作进一步剖析。

GOPS 2020 · 全球运维大会,9月25-26日,精彩呈现。

云上服务器无人值守怎么做?GOPS 2020 · 深圳站,阿里高级技术专家滕圣波与您聊聊~

近期好文:

“高效运维”公众号诚邀广大技术人员投稿,

点个“在看”,一年不宕机