定时任务 robfig/cron
go get github.com/robfig/cron/v3@v3.0.0
https://github.com/robfig/cron
https://godoc.org/github.com/robfig/cron
注意:v3开始执行策略为5个参数,不是6个,实际上秒级的定时任务也没有存在的意义。
robfig/cron 是一个常驻内存的进程,通过多个协程内部的定时器来来实现定时任务,因此需要考虑一下几个问题。
1、如果一个任务的执行时间超过了频率时间会怎样
因为你无法预测一个任务要多久才能被执行完,比如有一个任务没十分钟执行一次,但是无法保证每一次都在十分钟之内就执行完毕。
默认情况下,新的任务会开始重新执行,原先正在执行的任务还在继续执行,相当于是并行了。
{"* * * * *", fun1}
func fun1() {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 start")
for i := 0; i < 40; i++ {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111")
time.Sleep(time.Second * 2)
}
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 end")
}
func fun2() {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 start")
time.Sleep(time.Second * 10)
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 end")
}
可以通过设置 Job Wrappers 来控制
- Recover any panics from jobs (activated by default)
- Delay a job’s execution if the previous run hasn’t completed yet
- Skip a job’s execution if the previous run hasn’t completed yet
- Log each job’s invocations
c := cron.New(cron.WithChain(
cron.SkipIfStillRunning(FileLogger{}),
))
测试
{"* * * * *", fun1},
{"*/2 * * * *", fun2},
然而,SkipIfStillRunning 的控制并不是我理解的那样,我想说的是相同的任务如何避免并行的情况,而它控制的却是不同任务避免并行的情况。
SkipIfStillRunning 会控制你所有的任务池中,在同一时刻只能有一个在运行,这样会导致有些任务永远无法被运行,这显然不能满足需求,我想要的是同一个任务只有一个在运行,而不同的任务各自不受影响。
而如果使用 DelayIfStillRunning 的话,不同任务可以并行,相同任务串行,冲突的任务会延迟执行,这会导致任务可能不是设置的时刻执行的,且延迟超过1分钟会有日志。
貌似 DelayIfStillRunning 勉强能满足需求。
2、前面的任务执行时间较长会使后面的别的任务被延迟执行吗?
不会
每一个任务都是在协程里面运行的。
3、c.Stop 方法会打断正在执行的任务吗
暂停任务调度
4、如何知道有没有正在执行的任务,热更新时需要
没法知道,需要自己实现,在执行前后埋点,改变变量的值。
5、程序启动后将会等到设定的时刻才会被调度。
6、@every 0h10m 与 */10 * * * * 的区别
两者是不同的效果,前者是从启动是开始计时,后者是按照绝对时间来的。
7、源码说明
https://www.jianshu.com/p/fd3dda663953
8、具体实现
CronTab.go
/*
-- @Time : 2020/9/27 18:02
-- @Author : raoxiaoya
-- @Desc : CronTab
example:
./voteapi CronTab --action=run
./voteapi CronTab --action=stop
*/
package CronTab
import (
"fmt"
"github.com/robfig/cron/v3"
"log"
"time"
"voteapi/commands/Interfaces"
"voteapi/pkg/logging"
"voteapi/pkg/util"
)
var startTime = time.Now().Unix()
var JobStatus = make(map[string]int8)
var isRunning bool
// 自定义logger
type FileLogger struct{}
func (fl FileLogger) Info(msg string, keysAndValues ...interface{}) {
logging.Info(msg, keysAndValues)
}
func (fl FileLogger) Error(err error, msg string, keysAndValues ...interface{}) {
logging.Error(err, msg, keysAndValues)
}
type CronTab struct {
Interfaces.CommandInfo
}
func (c CronTab) GetCommand() (cmd Interfaces.CommandInfo) {
return Interfaces.CommandInfo{
Signature: "CronTab",
Description: "CronTab Schedule",
}
}
func (c CronTab) Handle() {
param := util.GetTerminalInput()
action, ok := param["action"]
if !ok {
log.Fatal(fmt.Sprintf("the command %s missing parameter action", c.GetCommand().Signature))
}
switch action {
case "run":
run(c)
case "stop":
c.SendStopSignal(c.GetCommand().Signature)
}
}
func run(c CronTab) {
defer func() {
logging.Info("CronTab is stopped")
if err := recover(); err != nil {
logging.Info(err)
// 自启动
c.Reboot()
}
}()
logging.Info("CronTab is starting")
cr := cron.New(cron.WithChain(
cron.DelayIfStillRunning(FileLogger{}),
))
jobLen := len(Schedule)
if jobLen == 0 {
return
}
for _, v := range Schedule {
// 注意golang的闭包问题,不能直接使用v.T, v.Job
t := v.T
j := v.Job
name := v.Name
JobStatus[name] = 0
_, err := cr.AddFunc(t, func() {
defer func() {
if err := recover(); err != nil {
logging.Info("Job Error: ", err)
}
JobStatus[name] = 0
}()
JobStatus[name] = 1
start := time.Now().Unix()
logging.Info(name + " start")
j() // 执行任务
end := time.Now().Unix()
logging.Info(name+" end, time = ", end-start)
})
if err != nil {
logging.Info("AddFunc Error: ", err)
return
}
}
cr.Start()
isRunning = true
for {
if ok, _ := c.CheckIfNeedExit(c.GetCommand().Signature, startTime); ok {
if isRunning {
cr.Stop()
isRunning = false
} else {
stopedJobLen := 0
for _, v := range JobStatus {
if v == 0 {
stopedJobLen++
}
}
if stopedJobLen == jobLen {
break
}
}
}
time.Sleep(time.Second * 1)
}
}
CronJobs.go
package CronTab
import (
"voteapi/service/Vote2Service"
)
type JobMap struct {
T string
Job func()
Name string
}
var Schedule = []JobMap{
JobMap{"*/10 * * * *", Vote2StoreVisiteLog, "Vote2StoreVisiteLog"},
}
// vote2: 回写总访问数,总投票数,作品访问量
func Vote2StoreVisiteLog() {
err := Vote2Service.Vote2StoreVisiteLog()
if err != nil {
panic(err)
}
}
其他
// 命令行模式下获取输入参数,flag 不适合
// ./voteapi Vote2MQConsumer --action=run
func GetTerminalInput() (param map[string]string) {
param = make(map[string]string)
list := os.Args
if len(list) <= 2 {
return param
}
for _, v := range list {
if strings.Contains(v, "--") {
arr := strings.Split(v, "=")
key := string(([]byte(arr[0]))[2:])
param[key] = arr[1]
}
}
return param
}
func (c CommandInfo) Reboot() {
// 延迟执行,防止无限循环
time.Sleep(time.Minute * 1)
path := os.Args[0]
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}
cmd := exec.Command(path, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = os.Environ()
err := cmd.Start()
if err != nil {
logging.Fatal("Reboot: Failed to launch, error: %v", err)
} else {
logging.Info(fmt.Sprintf("Reboot %s success", c.Signature))
}
}
robfig/cronSendStopSignalCheckIfNeedExit
nohup ./voteapi CronTab --action=run > /dev/null 2>&1 &./voteapi CronTab --action=stopps -ef |grep CronTab