package taillog import ( "context" "fmt" "github.com/hpcloud/tail" "logagent/kafka" ) // var ( // tailObj *tail.Tail // LogChan chan string // ) type TailTask struct { path string topic string instance *tail.Tail //为了实现退出t.run() ctx context.Context cancelFunc context.CancelFunc } func NewTailTask(path, topic string) (tailObj *TailTask, err error) { ctx, cancel := context.WithCancel(context.Background()) tailObj = &TailTask{ path: path, topic: topic, ctx: ctx, cancelFunc: cancel, } err = tailObj.init() return } func (t *TailTask) init() (err error) { config := tail.Config{ ReOpen: true, // 重新打开 Follow: true, // 是否跟随 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读 MustExist: false, // 文件不存在不报错 Poll: true, } t.instance, err = tail.TailFile(t.path, config) if err != nil { fmt.Println("tail file failed, err:", err) return } go t.Run() return } func (t *TailTask) Run() { for { select { case <-t.ctx.Done(): fmt.Printf("tail task:%s_%s结束了...\n", t.path, t.topic) return case line := <-t.instance.Lines: kafka.SendTOChan(t.topic, line.Text) } } } // //从日志文件收集日志 // func Init(filepath string) (err error) { // fileName := filepath // config := tail.Config{ // ReOpen: true, // 重新打开 // Follow: true, // 是否跟随 // Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读 // MustExist: false, // 文件不存在不报错 // Poll: true, // } // tailObj, err = tail.TailFile(fileName, config) // if err != nil { // fmt.Println("tail file failed, err:", err) // return // } // return nil // } // func (t *TailTask) ReadChan() <-chan *tail.Line { // return t.instance.Lines // }