前两天优化一个从三方查询数据很耗时的接口,改造的方案是每半小时同步一次数据缓存到内存,Golang 下最常用的是 robfig/cron 包,使用简便,功能强大,本文对其使用做了整理记录。

安装

go get github.com/robfig/cron/v3@v3.0.0

引入

import "github.com/robfig/cron/v3"

示例

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
)

func main() {
    // 创建一个默认的cron对象
    c := cron.New()

    // 添加任务
    c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
    c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
    c.AddFunc("@hourly",      func() { fmt.Println("Every hour, starting an hour from now") })
    c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") })
    c.Start()

    //开始执行任务
    c.Start()

    //阻塞
    select {}
}

预定义的 Schedules

条目 描述 等效于
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0 0 1 1 *
@monthly Run once a month, midnight, first of month 0 0 1 * *
@weekly Run once a week, midnight between Sat/Sun 0 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 * * *
@hourly Run once an hour, beginning of hour 0 * * * *

间隔执行

固定间隔执行

@every <duration>

举例

@every 1h30m10s 表示一个小时30分钟10秒后执行,并且之后的每个时间间隔都执行。需要注意的是间隔是以运行时间点递增的,例如一个任务执行需要耗时 3 分钟,任务每五分钟执行一次,则任务运行完后,距离下次执行为 2 分钟。

time.ParseDuration()

精确到秒的 Cron 表达式

cron.WithSeconds()
c := cron.New(cron.WithSeconds())
c.AddFunc("*/1 * * * * *", func() {
    fmt.Println("Every 1 Second")
})
c.Start()
@every
c := cron.New()
c.AddFunc("@every 10s", func() {
    fmt.Println("Every 10 Seconds")
})
c.Start()

常用 Cron 标准表达式

描述 表达式
每1分钟执行一次 * * * * *
每15分钟执行一次 */15 * * * *
每一小时执行 * */1 * * *
每两个小时执行 0 */2 * * *
每小时的第3和第15分钟执行 3,15 * * * *
在上午8点到11点的第3和第15分钟执行 3,15 8-11 * * *
每晚的21:30执行 30 21 * * *
每天18:00至23:00之间每隔30分钟执行 0,30 18-23 * * *
每星期六的晚上11:00pm执行 0 23 * * 6
晚上11点到早上7点之间,每隔一小时执行 * 23-7/1 * * *
指定每天的5:30执行 30 5 * * *
每小时[第一分钟]执行 01 * * * *
每天[凌晨4:02]执行 02 4 * * *

时区设置

// 本地时区早六点
cron.New().AddFunc("0 6 * * ?", ...)

// 上海时区早六点
nyc, _ := time.LoadLocation("Asia/Shanghai")
c := cron.New(cron.WithLocation(nyc))
c.AddFunc("0 6 * * ?", ...)

// 上海时区早六点
cron.New().AddFunc("CRON_TZ=Asia/Shanghai 0 6 * * ?", ...)

// 重庆时区早六点
c := cron.New(cron.WithLocation(nyc))
c.SetLocation("Asia/Tokyo")
c.AddFunc("CRON_TZ=Asia/Chongqing 0 6 * * ?", ...)

Cron 方法

AddFunc()@everyfunc()AddJob()AddJob()Schedule()Schedule()AddJob()Entries()Entry()Remove()Location()Start()Run()Stop()

Entry 结构体

type Entry struct {
    // ID is the cron-assigned ID of this entry, which may be used to look up a
    // snapshot or remove it.
    ID EntryID

    // Schedule on which this job should be run.
    Schedule Schedule

    // Next time the job will run, or the zero time if Cron has not been
    // started or this entry's schedule is unsatisfiable
    Next time.Time

    // Prev is the last time this job was run, or the zero time if never.
    Prev time.Time

    // WrappedJob is the thing to run when the Schedule is activated.
    WrappedJob Job

    // Job is the thing that was submitted to cron.
    // It is kept around so that user code that needs to get at the job later,
    // e.g. via Entries() can do so.
    Job Job
}

可以获取任务上次、下次运行时间。

c = cron.New()
entryId, _ := c.AddFunc("@every 15m", func() {
    fmt.Println("Every 15 Minutes")
})
entry := c.Entry(entryId)
fmt.Println(entry.Next)

Job Wrappers 包装器

JobWrapper 可以对 Job 进行修饰,添加一些行为。

func SkipIfStillRunning(logger Logger) JobWrapperfunc DelayIfStillRunning(logger Logger) JobWrapperfunc Recover(logger Logger) JobWrapper

以上函数中 Skip 及 Recover 会记录 Info 级别的日志,而 Delay 会在延迟超过一分钟后记录 Info 日志。

使用方法

// 作用于调度器
cron.New(cron.WithChain(
    cron.SkipIfStillRunning(logger),
))

// 作用于单个 Job
job = cron.NewChain(
    cron.SkipIfStillRunning(logger),
).Then(job)

Logger 日志记录

type Logger interface {
    // Info logs routine messages about cron's operation.
    Info(msg string, keysAndValues ...interface{})
    // Error logs an error condition.
    Error(err error, msg string, keysAndValues ...interface{})
}

Logger 是一个接口,实现了这些接口的日志包都可被用于 Job 日志的记录工具。

var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)) var DiscardLogger Logger = PrintfLogger(log.New(ioutil.Discard, "", 0))

如果未指定,DefaultLogger 是 Cron 的默认输出 Logger,也可以使用 DiscardLogger,不输出日志。

不输出 Job 日志

c := cron.New(cron.WithLogger(cron.DiscardLogger))

Cron 使用 logrus 记录日志示例

import (
    "github.com/robfig/cron/v3"
    log "github.com/sirupsen/logrus"
)

type LogrusLog struct {
    logger log.Logger
}

func (l *LogrusLog) Info(msg string, keysAndValues ...interface{}) {
    l.logger.WithFields(log.Fields{
        "data": keysAndValues,
    }).Info(msg)

}

func (l *LogrusLog) Error(err error, msg string, keysAndValues ...interface{}) {
    l.logger.WithFields(log.Fields{
        "msg":  msg,
        "data": keysAndValues,
    }).Error(msg)
}

func main() {
    logrusLog := &LogrusLog{}
    c := cron.New(cron.WithLogger(logrusLog))
}

Option 可选项

cron.New()
cron.WithSeconds()cron.WithChain()cron.WithParser()
func WithChain(wrappers ...JobWrapper) Optionfunc WithLocation(loc *time.Location) Optionfunc WithLogger(logger Logger) Optionfunc WithParser(p ScheduleParser) Optionfunc WithSeconds() Option

退出 Stop() 方法等待任务执行完成示例

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
    "time"
)

func main() {
    c := cron.New(cron.WithSeconds())
    c.AddFunc("@every 10s", func() {
        fmt.Println("goroutine task run")

        time.Sleep(2 * time.Second)
        fmt.Println("-> task out")

        time.Sleep(2 * time.Second)
        fmt.Println("-> task out2")
    })
    c.Start()

    fmt.Println("main sleep 12 seconds")
    fmt.Println("after 10 seconds, goroutine task will run")
    time.Sleep(12 * time.Second)

    for {
        ctx := c.Stop()
        select {
        case <-ctx.Done():
            fmt.Println("all task done, stopped")
            return
        default:
            time.Sleep(time.Second)
            fmt.Println("default, wait")
        }
    }
}

输出

➜ go run main.go
main sleep 12 seconds
after 10 seconds, goroutine task will run
goroutine task run
-> task out
default, wait
-> task out2
default, wait
default, wait
all task done, stopped

简单解释下这个示例,它模拟了一个调度器退出的场景,当退出调度器的时候有任务正在执行,那么会等到任务执行完成后再退出。

参考