xxl-job-executor-go
支持
1.执行器注册
2.耗时任务取消
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
7.任务超时取消 (单位:秒,0为不限制)
8.失败重试次数(在参数param中,目前由任务自行处理)
9.可自定义日志
10.自定义日志查看handler
11.支持外部路由(可与gin集成)
复制代码
项目地址
Example
package main
import (
"fmt"
xxl "github.com/xxl-job/xxl-job-executor-go"
"github.com/xxl-job/xxl-job-executor-go/example/task"
"log"
)
func main() {
exec := xxl.NewExecutor(
xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),
xxl.AccessToken(""), //请求令牌(默认为空)
xxl.ExecutorIp("127.0.0.1"), //可自动获取
xxl.ExecutorPort("9999"), //默认9999(非必填)
xxl.RegistryKey("golang-jobs"), //执行器名称
xxl.SetLogger(&logger{}), //自定义日志
)
exec.Init()
//设置日志查看handler
exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {
return &xxl.LogRes{Code: 200, Msg: "", Content: xxl.LogResContent{
FromLineNum: req.FromLineNum,
ToLineNum: 2,
LogContent: "这个是自定义日志handler",
IsEnd: true,
}}
})
//注册任务handler
exec.RegTask("task.test", task.Test)
exec.RegTask("task.test2", task.Test2)
exec.RegTask("task.panic", task.Panic)
log.Fatal(exec.Run())
}
//xxl.Logger接口实现
type logger struct{}
func (l *logger) Info(format string, a ...interface{}) {
fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}
func (l *logger) Error(format string, a ...interface{}) {
log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}
复制代码
示例项目
与gin框架集成
xxl-job-admin配置
添加执行器
AppName 名称 注册方式 OnLine 机器地址 操作
golang-jobs golang执行器 自动注册 无
复制代码
添加任务
1 测试panic BEAN:task.panic * 0 * * * ? admin STOP
2 测试耗时任务 BEAN:task.test2 * * * * * ? admin STOP
3 测试golang BEAN:task.test * * * * * ? admin STOP
复制代码
作者:如水网
链接:https://juejin.cn/post/6920454772319322126
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

很多公司java与go开发共存,java中有xxl-job做为任务调度引擎,为此也出现了go执行器(客户端),使用起来比较简单:

github.com/xxl-job/xxl…

github.com/xxl-job/xxl-job-executor-go/example/

github.com/gin-middlew…

执行器管理->新增执行器,执行器列表如下:

任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

本文主要研究一下promtail的positions

Positions

loki/pkg/promtail/positions/positions.go

type Positions interface {
	// GetString returns how far we've through a file as a string.
	// JournalTarget writes a journal cursor to the positions file, while
	// FileTarget writes an integer offset. Use Get to read the integer
	// offset.
	GetString(path string) string
	// Get returns how far we've read through a file. Returns an error
	// if the value stored for the file is not an integer.
	Get(path string) (int64, error)
	// PutString records (asynchronously) how far we've read through a file.
	// Unlike Put, it records a string offset and is only useful for
	// JournalTargets which doesn't have integer offsets.
	PutString(path string, pos string)
	// Put records (asynchronously) how far we've read through a file.
	Put(path string, pos int64)
	// Remove removes the position tracking for a filepath
	Remove(path string)
	// SyncPeriod returns how often the positions file gets resynced
	SyncPeriod() time.Duration
	// Stop the Position tracker.
	Stop()
}
复制代码

Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法

positions

loki/pkg/promtail/positions/positions.go

// Positions tracks how far through each file we've read.
type positions struct {
	logger    log.Logger
	cfg       Config
	mtx       sync.Mutex
	positions map[string]string
	quit      chan struct{}
	done      chan struct{}
}

func (p *positions) Stop() {
	close(p.quit)
	<-p.done
}

func (p *positions) PutString(path string, pos string) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	p.positions[path] = pos
}

func (p *positions) Put(path string, pos int64) {
	p.PutString(path, strconv.FormatInt(pos, 10))
}

func (p *positions) GetString(path string) string {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	return p.positions[path]
}

func (p *positions) Get(path string) (int64, error) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	pos, ok := p.positions[path]
	if !ok {
		return 0, nil
	}
	return strconv.ParseInt(pos, 10, 64)
}

func (p *positions) Remove(path string) {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	p.remove(path)
}

func (p *positions) remove(path string) {
	delete(p.positions, path)
}

func (p *positions) SyncPeriod() time.Duration {
	return p.cfg.SyncPeriod
}
复制代码

positions定义了logger、cfg、mtx、positions、quit、done属性;它实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除

New

loki/pkg/promtail/positions/positions.go

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (Positions, error) {
	positionData, err := readPositionsFile(cfg, logger)
	if err != nil {
		return nil, err
	}

	p := &positions{
		logger:    logger,
		cfg:       cfg,
		positions: positionData,
		quit:      make(chan struct{}),
		done:      make(chan struct{}),
	}

	go p.run()
	return p, nil
}
复制代码

New方法会通过readPositionsFile读取positionData创建positions,然后异步执行p.run()

run

loki/pkg/promtail/positions/positions.go

func (p *positions) run() {
	defer func() {
		p.save()
		level.Debug(p.logger).Log("msg", "positions saved")
		close(p.done)
	}()

	ticker := time.NewTicker(p.cfg.SyncPeriod)
	for {
		select {
		case <-p.quit:
			return
		case <-ticker.C:
			p.save()
			p.cleanup()
		}
	}
}

func (p *positions) save() {
	if p.cfg.ReadOnly {
		return
	}
	p.mtx.Lock()
	positions := make(map[string]string, len(p.positions))
	for k, v := range p.positions {
		positions[k] = v
	}
	p.mtx.Unlock()

	if err := writePositionFile(p.cfg.PositionsFile, positions); err != nil {
		level.Error(p.logger).Log("msg", "error writing positions file", "error", err)
	}
}

func (p *positions) cleanup() {
	p.mtx.Lock()
	defer p.mtx.Unlock()
	toRemove := []string{}
	for k := range p.positions {
		// If the position file is prefixed with journal, it's a
		// JournalTarget cursor and not a file on disk.
		if strings.HasPrefix(k, "journal-") {
			continue
		}

		if _, err := os.Stat(k); err != nil {
			if os.IsNotExist(err) {
				// File no longer exists.
				toRemove = append(toRemove, k)
			} else {
				// Can't determine if file exists or not, some other error.
				level.Warn(p.logger).Log("msg", "could not determine if log file "+
					"still exists while cleaning positions file", "error", err)
			}
		}
	}
	for _, tr := range toRemove {
		p.remove(tr)
	}
}
复制代码

run方法通过time.NewTicker(p.cfg.SyncPeriod)来触发执行p.save()及p.cleanup();save方法将positions持久化到文件;cleanup方法遍历p.positions,从内存中移除文件不存在的position

readPositionsFile

loki/pkg/promtail/positions/positions.go

func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

	cleanfn := filepath.Clean(cfg.PositionsFile)
	buf, err := ioutil.ReadFile(cleanfn)
	if err != nil {
		if os.IsNotExist(err) {
			return map[string]string{}, nil
		}
		return nil, err
	}

	var p File
	err = yaml.UnmarshalStrict(buf, &p)
	if err != nil {
		// return empty if cfg option enabled
		if cfg.IgnoreInvalidYaml {
			level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
			return map[string]string{}, nil
		}

		return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
	}

	// p.Positions will be nil if the file exists but is empty
	if p.Positions == nil {
		p.Positions = map[string]string{}
	}

	return p.Positions, nil
}
复制代码

readPositionsFile方法从文件读取位置到p.Positions

writePositionFile

loki/pkg/promtail/positions/positions.go

func writePositionFile(filename string, positions map[string]string) error {
	buf, err := yaml.Marshal(File{
		Positions: positions,
	})
	if err != nil {
		return err
	}

	target := filepath.Clean(filename)
	temp := target + "-new"

	err = ioutil.WriteFile(temp, buf, os.FileMode(positionFileMode))
	if err != nil {
		return err
	}

	return os.Rename(temp, target)
}
复制代码

writePositionFile方法将positions写入文件

小结

promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法;positions实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除。

doc