robfig cron是go开发者最常用的基于cron解析的定时任务管理器   cron介绍

一、基本介绍:

1. 从Cron结构体查看整体设计:

type Cron struct {

entries []*Entry // 一个Entry即一个定时任务

chain Chain // 装饰原始Job的装饰器数组

stop chan struct{} // 用于已启动Cron的关闭

add chan *Entry // 用于已启动Cron 新增Entry

remove chan EntryID // 用于已启动Cron 删除Entry

snapshot chan chan []Entry // 用于获取已启动Cron 的entries

running bool // 控制只启动一次, 启动与关闭状态下, 很多函数的行为将有差异

runningMu sync.Mutex // 控制running、entries、nextID等并发安全修改或访问

location *time.Location // 定时任务运行的时区

parser ScheduleParser // cron解析器

nextID EntryID // 存放当前最大EntryID

jobWaiter sync.WaitGroup // 用于已启动Cron的优雅关闭(等待所有任务执行完毕)

}

type Entry struct {

ID EntryID // EntryID (int)

Schedule Schedule // 用于获取任务的下一次执行时间

Next time.Time // 任务下一次将要执行的时间

Prev time.Time // 任务上一次执行的时间

WrappedJob Job // 为Job字段经过Cron.chain包装后的Job,定时任务运行的即Job.Run函数

Job Job // 原始Job

}

type ScheduleParser interface {

// 解析spec(即我们传入的cron字符串)为Schedule

Parse(spec string) (Schedule, error)

}

type Schedule interface {

// 获取任务下一次执行的时间(晚于传入的time)

Next(time.Time) time.Time

}

// 将我们需要定时执行的逻辑存放在Job实现的Run函数中

type Job interface {

Run()

}

2. 从官方ScheduleParser实现来学习 robig/cron 的cron书写规则:

type Parser struct {

// ParseOption通过位运算设计简化了相关解析逻辑

options ParseOption

}

由本文开头的链接可以知道,cron表达式一般最多可以支持7个子表达式,但是robfig/cron/v3最多支持前6个,即不支持Year的配置,且默认配置为最小单位分钟即crontab式的配置(crontab cron)对于6个子表达式的配置,robig/cron 采取了位运算的方式:

type ParseOption int

const (

Second ParseOption = 1 << iota // Seconds field, default 0

SecondOptional // Optional seconds field, default 0

Minute // Minutes field, default 0

Hour // Hours field, default 0

Dom // Day of month field, default *

Month // Month field, default *

Dow // Day of week field, default *

DowOptional // Optional day of week field, default *

Descriptor // Allow descriptors such as @monthly, @weekly, etc.

)

对于Parser.options,即是我们选取若干需要的配置按位或的结果, 比如默认配置:

var standardParser = NewParser(

Minute | Hour | Dom | Month | Dow | Descriptor,

)

需要注意的是, 秒可选配置(SecondOptional,书写cron表达式的时候,可以加秒表达式也可以不加的意思,不加将添加默认配置)与星期可选配置(DowOptional)最多只能选取一个

func NewParser(options ParseOption) Parser {

optionals := 0

if options&DowOptional > 0 {

optionals++

}

if options&SecondOptional > 0 {

optionals++

}

if optionals > 1 {

panic("multiple optionals may not be configured")

}

return Parser{options}

}

Parser核心函数Parse:由于函数过长,将省略部分分支逻辑和错误处理逻辑

func (p Parser) Parse(spec string) (Schedule, error) {

if len(spec) == 0 {return nil, fmt.Errorf("empty spec string")}

// 1. 处理时区, 注意这个时区不是Cron的时区, 设置的是Schedule的时区

var loc = time.Local

if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {

// 省略

}

// 2. 处理@monthly等简略写法的特殊情况并返回, 需要配置中有Descriptor

if strings.HasPrefix(spec, "@") {// 省略}

// 3. 分割为子表达式

fields := strings.Fields(spec)

// 4. 检查子表达式长度并补充可选项的默认值

var err error

fields, err = normalizeFields(fields, p.options)

if err != nil {return nil, err}

// 5. 分别处理各子表达式, 每个子表达式将处理为uint64, 其中每一位的0/1都代表当前单位

// 在此取值时执不执行任务。如second处理结果的后8位为00000010,

// 则代表在第一秒(秒的取值为0~59)将要执行函数

// 所有子表达式的uint64结合起来就可以完整表示在何时将要执行任务

field := func(field string, r bounds) uint64 {

if err != nil {

return 0

}

var bits uint64

// getField

bits, err = getField(field, r)

return bits

}

var (

// 第二个参数为各个子表达式为:

// type bounds struct {

// min, max uint // 最小最大取值

// names map[string]uint // 支持星期和月份等可以用英文表达的对应的值

// }

second = field(fields[0], seconds)

minute = field(fields[1], minutes)

hour = field(fields[2], hours)

dayofmonth = field(fields[3], dom)

month = field(fields[4], months)

dayofweek = field(fields[5], dow)

)

if err != nil {return nil, err}

return &SpecSchedule{Second: second, Minute: minute,Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc,}, nil

}

var (

seconds = bounds{0, 59, nil}

minutes = bounds{0, 59, nil}

hours = bounds{0, 23, nil}

dom = bounds{1, 31, nil}

months = bounds{1, 12, map[string]uint{

"jan": 1,

"feb": 2,

"mar": 3,

"apr": 4,

"may": 5,

"jun": 6,

"jul": 7,

"aug": 8,

"sep": 9,

"oct": 10,

"nov": 11,

"dec": 12,

}}

dow = bounds{0, 6, map[string]uint{

"sun": 0,

"mon": 1,

"tue": 2,

"wed": 3,

"thu": 4,

"fri": 5,

"sat": 6,

}}

)

getField函数:

每个子表达式通过 ”,“ 分割为若干range,如秒的表示可以为 0,1,20。代表第0 1 20秒每个range中可以包含"-",其前后数字代表最大最小值,也可以包含”/“,其之后的数字代表在最大最小值范围中取值的steprange中还可以包含"*", "?"代表 min-max

func getField(field string, r bounds) (uint64, error) {

var bits uint64

// 每一个子表达式由 "," 分割的若干range组成, range之间为叠加关系

ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })

for _, expr := range ranges {

bit, err := getRange(expr, r)

if err != nil {

return bits, err

}

bits |= bit

}

return bits, nil

}

func getRange(expr string, r bounds) (uint64, error) {

var (

// 所有用,分割的range都将解析为开始-结尾-每次增长量

start, end, step uint

// ”/“ 符号后面的即为step, 前面为start与end(可选)

rangeAndStep = strings.Split(expr, "/")

// start与end用 ”-“ 分割

lowAndHigh = strings.Split(rangeAndStep[0], "-")

singleDigit = len(lowAndHigh) == 1

err error

)

var extra uint64

// 1. 处理start [-end]

if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { // 1.1 * 或者 ?

start = r.min

end = r.max

extra = 1 << 63 // 注意用第64位为特殊位来代表表达式中有 * 或 ? (step需为1, 不为1的后面重置extra了)

} else { // 1.2 start 或者 start-end

start, err = parseIntOrName(lowAndHigh[0], r.names)

if err != nil {return 0, err}

switch len(lowAndHigh) {

case 1:

end = start

case 2:

end, err = parseIntOrName(lowAndHigh[1], r.names)

if err != nil {return 0, err}

default:

return 0, fmt.Errorf("too many hyphens: %s", expr)

}

}

// 2. 处理step

switch len(rangeAndStep) {

case 1:

step = 1

case 2:

step, err = mustParseInt(rangeAndStep[1])

if err != nil {

return 0, err

}

// Special handling: "N/step" means "N-max/step".

// 注意有step, 没end才修复end为max

// eg: 1 为start=1, end=1, step=1

// eg: 1/1 为start=1, end=max, step=1

if singleDigit {

end = r.max

}

if step > 1 {

extra = 0

}

// 省略min, max, step取值规范错误处理

return getBits(start, end, step) | extra, nil

}

func getBits(min, max, step uint) uint64 {

var bits uint64

// 特殊情况处理降低时间复杂度

// eg: 秒的取值为4-7

// ^(11111111 11111111 11111111 11111111 11111111 11111111 11111111 00000000)

// & 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11110000

if step == 1 {

return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)

}

for i := min; i <= max; i += step {

bits |= 1 << i

}

return bits

}

我们可以发现getRange函数的逻辑让我们cron的书写可以有一个不算bug的bug 拿秒的配置举例,我们可以写成 *-5 这种表达而可以正常运行

3. 官方Schedule实现,SpecSchedule:

结构体如下

type SpecSchedule struct {

Second, Minute, Hour, Dom, Month, Dow uint64

Location *time.Location

}

Next函数:从年开始位运算匹配,一直匹配到最小单位后即所有单位都匹配即下一次执行时间(大于传入时间)

func (s *SpecSchedule) Next(t time.Time) time.Time {

// 时区逻辑: 如果cron string没有指定时区, 则所有时区都按Cron的时区来(传入的t即为Cron时区时间)

// 如果cron string指定了非本地时区,则先把时间转换为SpecSchedule时区用于计算下一次执行时间,返回时再转换为Cron时区

origLocation := t.Location()

loc := s.Location

if loc == time.Local {

loc = t.Location()

}

if s.Location != time.Local {

t = t.In(s.Location)

}

// 向上取整秒

t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)

// 用于第一次调整时将小单位全部取 ”零值“

// 很好理解,比如现在我们有个数字是0123 我们要 找到 1001,我们发现最高位不匹配

// 我们不能直接变为1123,我们需要变为1000

added := false

// 找到5年后还未找到下一次执行时间就退出

yearLimit := t.Year() + 5

WRAP:

if t.Year() > yearLimit {

return time.Time{}

}

for 1<

if !added {

added = true

t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)

}

t = t.AddDate(0, 1, 0)

// 溢出后高位会变动可能已经不满足了,要从循环开头重新匹配

if t.Month() == time.January {

goto WRAP

}

}

// 日、时、秒等处理也相似,省略

return t.In(origLocation)

感兴趣也可以看看对于day的处理,其中有夏令时处理逻辑

二、Cron方法简析:

1. 启动及运行时分析:

func (c *Cron) Start() {

c.runningMu.Lock()

defer c.runningMu.Unlock()

// running参数保证只启动一次

if c.running {

return

}

c.running = true

// 非阻塞启动, Run方法为阻塞启动

go c.run()

}

Cron核心函数为run:

func (c *Cron) run() {

// 获取当前时间,初始化各个任务的Next即下一次执行时间

now := c.now()

for _, entry := range c.entries {

entry.Next = entry.Schedule.Next(now)

}

for {

// 按Next由低到高排序, 其中找不到Next(零值)为最大值

sort.Sort(byTime(c.entries))

var timer *time.Timer

if len(c.entries) == 0 || c.entries[0].Next.IsZero() {

// 无任务或者任务均找不到执行时间就睡眠, 当有任务加入时会走加入分支唤醒

timer = time.NewTimer(100000 * time.Hour)

} else {

// 设置定时器为距离当前最近的任务

timer = time.NewTimer(c.entries[0].Next.Sub(now))

}

for {

select {

case now = <-timer.C:

now = now.In(c.location)

// 执行Next <= now的任务

for _, e := range c.entries {

if e.Next.After(now) || e.Next.IsZero() {

break

}

// 执行chain包装后的任务,其中有sync.WaitGroup逻辑,为了Stop时优雅停止

c.startJob(e.WrappedJob)

e.Prev = e.Next

e.Next = e.Schedule.Next(now)

}

case newEntry := <-c.add: // 新增任务, 加入entries后重新执行排序逻辑

timer.Stop()

now = c.now()

newEntry.Next = newEntry.Schedule.Next(now)

c.entries = append(c.entries, newEntry)

case replyChan := <-c.snapshot:

replyChan <- c.entrySnapshot()

continue

case <-c.stop: // 接收关闭任务, 返回

timer.Stop()

return

case id := <-c.remove:

timer.Stop()

now = c.now()

c.removeEntry(id)

}

break

}

}

}

2. 停止:

func (c *Cron) Stop() context.Context {

// 保证运行中Cron只关闭一次, chan都是无缓存, 关闭多次会有影响

c.runningMu.Lock()

defer c.runningMu.Unlock()

if c.running {

c.stop <- struct{}{} // 给运行中Cron发信号关闭

c.running = false

}

ctx, cancel := context.WithCancel(context.Background())

// 优雅关闭, 调用后可根据ctx是否取消来判断是否完全关闭

go func() {

c.jobWaiter.Wait()

cancel()

}()

return ctx

}

3. 增加任务:

// AddJob(spec string, cmd Job) (EntryID, error) 调用 Schedule

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {

// 保证并发安全

c.runningMu.Lock()

defer c.runningMu.Unlock()

c.nextID++

entry := &Entry{

ID: c.nextID,

Schedule: schedule,

WrappedJob: c.chain.Then(cmd),

Job: cmd,

}

if !c.running { // 没有运行直接加入

c.entries = append(c.entries, entry)

} else { // 运行中需要发送add信号

c.add <- entry

}

return entry.ID

}

删除、获取entries也类似,不再粘贴描述