定时任务 robfig/cron

go get github.com/robfig/cron/v3@v3.0.0

https://github.com/robfig/cron
https://godoc.org/github.com/robfig/cron

注意:v3开始执行策略为5个参数,不是6个,实际上秒级的定时任务也没有存在的意义。

robfig/cron 是一个常驻内存的进程,通过多个协程内部的定时器来来实现定时任务,因此需要考虑一下几个问题。

1、如果一个任务的执行时间超过了频率时间会怎样

因为你无法预测一个任务要多久才能被执行完,比如有一个任务没十分钟执行一次,但是无法保证每一次都在十分钟之内就执行完毕。

默认情况下,新的任务会开始重新执行,原先正在执行的任务还在继续执行,相当于是并行了。

{"* * * * *", fun1}

func fun1() {

	fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 start")

	for i := 0; i < 40; i++ {
		fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111")
		time.Sleep(time.Second * 2)
	}

	fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 end")
}

func fun2() {
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 start")
	time.Sleep(time.Second * 10)
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 end")
}

可以通过设置 Job Wrappers 来控制

  • Recover any panics from jobs (activated by default)
  • Delay a job’s execution if the previous run hasn’t completed yet
  • Skip a job’s execution if the previous run hasn’t completed yet
  • Log each job’s invocations
c := cron.New(cron.WithChain(
	cron.SkipIfStillRunning(FileLogger{}),
))

测试 
{"* * * * *", fun1},
{"*/2 * * * *", fun2},

然而,SkipIfStillRunning 的控制并不是我理解的那样,我想说的是相同的任务如何避免并行的情况,而它控制的却是不同任务避免并行的情况。

SkipIfStillRunning 会控制你所有的任务池中,在同一时刻只能有一个在运行,这样会导致有些任务永远无法被运行,这显然不能满足需求,我想要的是同一个任务只有一个在运行,而不同的任务各自不受影响。

而如果使用 DelayIfStillRunning 的话,不同任务可以并行,相同任务串行,冲突的任务会延迟执行,这会导致任务可能不是设置的时刻执行的,且延迟超过1分钟会有日志。

貌似 DelayIfStillRunning 勉强能满足需求。

2、前面的任务执行时间较长会使后面的别的任务被延迟执行吗?

不会
每一个任务都是在协程里面运行的。

3、c.Stop 方法会打断正在执行的任务吗

暂停任务调度

4、如何知道有没有正在执行的任务,热更新时需要

没法知道,需要自己实现,在执行前后埋点,改变变量的值。

5、程序启动后将会等到设定的时刻才会被调度。

6、@every 0h10m 与 */10 * * * * 的区别

两者是不同的效果,前者是从启动是开始计时,后者是按照绝对时间来的。

7、源码说明

https://www.jianshu.com/p/fd3dda663953

8、具体实现

CronTab.go
/*
-- @Time : 2020/9/27 18:02
-- @Author : raoxiaoya
-- @Desc : CronTab

example:
./voteapi CronTab --action=run
./voteapi CronTab --action=stop

*/

package CronTab

import (
	"fmt"
	"github.com/robfig/cron/v3"
	"log"
	"time"

	"voteapi/commands/Interfaces"
	"voteapi/pkg/logging"
	"voteapi/pkg/util"
)

var startTime = time.Now().Unix()
var JobStatus = make(map[string]int8)
var isRunning bool

// 自定义logger
type FileLogger struct{}

func (fl FileLogger) Info(msg string, keysAndValues ...interface{}) {
	logging.Info(msg, keysAndValues)
}

func (fl FileLogger) Error(err error, msg string, keysAndValues ...interface{}) {
	logging.Error(err, msg, keysAndValues)
}

type CronTab struct {
	Interfaces.CommandInfo
}

func (c CronTab) GetCommand() (cmd Interfaces.CommandInfo) {
	return Interfaces.CommandInfo{
		Signature: "CronTab",
		Description: "CronTab Schedule",
	}
}

func (c CronTab) Handle() {
	param := util.GetTerminalInput()
	action, ok := param["action"]
	if !ok {
		log.Fatal(fmt.Sprintf("the command %s missing parameter action", c.GetCommand().Signature))
	}

	switch action {
	case "run":
		run(c)
	case "stop":
		c.SendStopSignal(c.GetCommand().Signature)
	}
}

func run(c CronTab) {
	defer func() {
		logging.Info("CronTab is stopped")

		if err := recover(); err != nil {
			logging.Info(err)
			// 自启动
			c.Reboot()
		}
	}()

	logging.Info("CronTab is starting")

	cr := cron.New(cron.WithChain(
		cron.DelayIfStillRunning(FileLogger{}),
	))

	jobLen := len(Schedule)
	if jobLen == 0 {
		return
	}

	for _, v := range Schedule {
		// 注意golang的闭包问题,不能直接使用v.T, v.Job
		t := v.T
		j := v.Job
		name := v.Name
		JobStatus[name] = 0

		_, err := cr.AddFunc(t, func() {
			defer func() {
				if err := recover(); err != nil {
					logging.Info("Job Error: ", err)
				}
				JobStatus[name] = 0
			}()

			JobStatus[name] = 1

			start := time.Now().Unix()
			logging.Info(name + " start")

			j() // 执行任务

			end := time.Now().Unix()
			logging.Info(name+" end, time = ", end-start)
		})
		if err != nil {
			logging.Info("AddFunc Error: ", err)
			return
		}
	}

	cr.Start()
	isRunning = true

	for {
		if ok, _ := c.CheckIfNeedExit(c.GetCommand().Signature, startTime); ok {
			if isRunning {
				cr.Stop()
				isRunning = false
			} else {
				stopedJobLen := 0
				for _, v := range JobStatus {
					if v == 0 {
						stopedJobLen++
					}
				}
				if stopedJobLen == jobLen {
					break
				}
			}
		}

		time.Sleep(time.Second * 1)
	}
}
CronJobs.go
package CronTab

import (
	"voteapi/service/Vote2Service"
)

type JobMap struct {
	T    string
	Job  func()
	Name string
}

var Schedule = []JobMap{
	JobMap{"*/10 * * * *", Vote2StoreVisiteLog, "Vote2StoreVisiteLog"},
}

// vote2: 回写总访问数,总投票数,作品访问量
func Vote2StoreVisiteLog() {
	err := Vote2Service.Vote2StoreVisiteLog()
	if err != nil {
		panic(err)
	}
}

其他

// 命令行模式下获取输入参数,flag 不适合
// ./voteapi Vote2MQConsumer --action=run
func GetTerminalInput() (param map[string]string) {
	param = make(map[string]string)
	list := os.Args
	if len(list) <= 2 {
		return param
	}

	for _, v := range list {
		if strings.Contains(v, "--") {
			arr := strings.Split(v, "=")
			key := string(([]byte(arr[0]))[2:])
			param[key] = arr[1]
		}
	}

	return param
}

func (c CommandInfo) Reboot() {
	// 延迟执行,防止无限循环
	time.Sleep(time.Minute * 1)

	path := os.Args[0]
	var args []string
	if len(os.Args) > 1 {
		args = os.Args[1:]
	}

	cmd := exec.Command(path, args...)
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
	cmd.Env = os.Environ()
	err := cmd.Start()
	if err != nil {
		logging.Fatal("Reboot: Failed to launch, error: %v", err)
	} else {
		logging.Info(fmt.Sprintf("Reboot %s success", c.Signature))
	}
}

robfig/cronSendStopSignalCheckIfNeedExit
nohup ./voteapi CronTab --action=run > /dev/null 2>&1 &./voteapi CronTab --action=stopps -ef |grep CronTab