Golang日志收集项目(采用taillog收集+etcd注册集群+kafka作为开源流处理平台+ES)
package taillog
import (
"context"
"fmt"
"github.com/hpcloud/tail"
"logagent/kafka"
)
// var (
// tailObj *tail.Tail
// LogChan chan string
// )
type TailTask struct {
path string
topic string
instance *tail.Tail
//为了实现退出t.run()
ctx context.Context
cancelFunc context.CancelFunc
}
func NewTailTask(path, topic string) (tailObj *TailTask, err error) {
ctx, cancel := context.WithCancel(context.Background())
tailObj = &TailTask{
path: path,
topic: topic,
ctx: ctx,
cancelFunc: cancel,
}
err = tailObj.init()
return
}
func (t *TailTask) init() (err error) {
config := tail.Config{
ReOpen: true, // 重新打开
Follow: true, // 是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
MustExist: false, // 文件不存在不报错
Poll: true,
}
t.instance, err = tail.TailFile(t.path, config)
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
go t.Run()
return
}
func (t *TailTask) Run() {
for {
select {
case <-t.ctx.Done():
fmt.Printf("tail task:%s_%s结束了...\n", t.path, t.topic)
return
case line := <-t.instance.Lines:
kafka.SendTOChan(t.topic, line.Text)
}
}
}
// //从日志文件收集日志
// func Init(filepath string) (err error) {
// fileName := filepath
// config := tail.Config{
// ReOpen: true, // 重新打开
// Follow: true, // 是否跟随
// Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
// MustExist: false, // 文件不存在不报错
// Poll: true,
// }
// tailObj, err = tail.TailFile(fileName, config)
// if err != nil {
// fmt.Println("tail file failed, err:", err)
// return
// }
// return nil
// }
// func (t *TailTask) ReadChan() <-chan *tail.Line {
// return t.instance.Lines
// }