对于任何一个正在运行的应用,如何获取准确的绝对时间都非常重要,但是在一个分布式系统中我们很难保证各个节点上绝对时间的一致性,哪怕通过 NTP 这种标准的对时协议也只能把时间的误差控制在毫秒级,所以相对时间在一个分布式系统中显得更为重要,我们在这一节中就会介绍 Go 语言中的定时器以及它在并发编程中起到什么样的作用。

绝对时间一定不会是完全准确的,它对于一个运行中的分布式系统其实没有太多指导意义,但是由于相对时间的计算不依赖于外部的系统,所以它的计算可以做的比较准确,我们在这一节中就会介绍 Go 语言中用于计算相对时间的定时器的实现原理。

结构

timertimertbi
type timer struct {
   tb *timersBucket
   i  int

   when   int64
   period int64
   f      func(interface{}, uintptr)
   arg    interface{}
   seq    uintptr
}
whenperiodf(args, now)argstimertime
type Timer struct {
   C <-chan Time
   r runtimeTimer
}
TimerNewTimerAfterFuncruntimeTimertimerC
timetimerTimertimersBucket
type timersBucket struct {
   lock         mutex
   gp           *g
   created      bool
   sleeping     bool
   rescheduling bool
   sleepUntil   int64
   waitnote     note
   t            []*timer
}
timersBuckett

timertimertimer

工作原理

time.Sleep

创建

timeNewTimerstartTimerTimer
func NewTimer(d Duration) *Timer {
   c := make(chan Time, 1)
   t := &Timer{
       C: c,
       r: runtimeTimer{
           when: when(d),
           f:    sendTime,
           arg:  c,
       },
   }
   startTimer(&t.r)
   return t
}
TimerAfterFuncNewTimer
func AfterFunc(d Duration, f func()) *Timer {
   t := &Timer{
       r: runtimeTimer{
           when: when(d),
           f:    goFunc,
           arg:  f,
       },
   }
   startTimer(&t.r)
   return t
}
startTimer
func startTimer(t *timer) {
   addtimer(t)
}

func addtimer(t *timer) {
   tb := t.assignBucket()
   tb.addtimerLocked(t)
}
addTimerassignBuckettimersBucketaddTimerLocked
func (tb *timersBucket) addtimerLocked(t *timer) bool {
   t.i = len(tb.t)
   tb.t = append(tb.t, t)
   if !siftupTimer(tb.t, t.i) {
       return false
   }
   if t.i == 0 {
       if tb.sleeping && tb.sleepUntil > t.when {
           tb.sleeping = false
           notewakeup(&tb.waitnote)
       }
       if tb.rescheduling {
           tb.rescheduling = false
           goready(tb.gp, 0)
       }
       if !tb.created {
           tb.created = true
           go timerproc(tb)
       }
   }
   return true
}
addtimerLockedsiftupTimer

go timerproc(tb)

触发

timerprocforfor
func timerproc(tb *timersBucket) {
   tb.gp = getg()
   for {
       tb.sleeping = false
       now := nanotime()
       delta := int64(-1)

       // inner loop
       
       if delta < 0 {
           tb.rescheduling = true
           goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
           continue
       }
       tb.sleeping = true
       tb.sleepUntil = now + delta
       noteclear(&tb.waitnote)
       notetsleepg(&tb.waitnote, delta)
   }
}
timerprocreschedulingtruetimerproctimerBuckettimerproc
now + deltatimerprocnotesleepg
func notetsleepg(n *note, ns int64) bool {
   gp := getg()
   if gp == gp.m.g0 {
       throw("notetsleepg on g0")
   }
   semacreate(gp.m)
   entersyscallblock()
   ok := notetsleep_internal(n, ns, nil, 0)
   exitsyscall()
   return ok
}
entersyscallblockexitsyscallns

内部循环的主要作用就是触发已经到期的定时器,在这个内部循环中,我们会按照以下的流程对当前桶中的定时器进行处理:

notetsleepg
period > 0period <= 0

在每次循环的最后都会从定时器中取出定时器中的函数、参数和序列号并调用函数触发该计数器;

        for {
           if len(tb.t) == 0 {
               delta = -1
               break
           }
           t := tb.t[0]
           delta = t.when - now
           if delta > 0 {
               break
           }
           ok := true
           if t.period > 0 {
               t.when += t.period * (1 + -delta/t.period)
               if !siftdownTimer(tb.t, 0) {
                   ok = false
               }
           } else {
               last := len(tb.t) - 1
               if last > 0 {
                   tb.t[0] = tb.t[last]
                   tb.t[0].i = 0
               }
               tb.t[last] = nil
               tb.t = tb.t[:last]
               if last > 0 {
                   if !siftdownTimer(tb.t, 0) {
                       ok = false
                   }
               }
               t.i = -1 // mark as removed
           }
           f := t.f
           arg := t.arg
           seq := t.seq
           f(arg, seq)
       }
NewTimersendTimeAfterFunc

休眠

timeSleeptime.SleeptimeSleep
func timeSleep(ns int64) {
   if ns <= 0 {
       return
   }

   gp := getg()
   t := gp.timer
   if t == nil {
       t = new(timer)
       gp.timer = t
   }
   *t = timer{}
   t.when = nanotime() + ns
   t.f = goroutineReady
   t.arg = gp
   tb := t.assignBucket()
   lock(&tb.lock)
   if !tb.addtimerLocked(t) {
       unlock(&tb.lock)
       badTimer()
   }
   goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)
}
timeSleeptimergoroutineReadygoparkunlockgoroutineReady
func goroutineReady(arg interface{}, seq uintptr) {
   goready(arg.(*g), 0)
}
time.Sleepgoparkunlock

Ticker

time
type Ticker struct {
   C <-chan Time // The channel on which the ticks are delivered.
   r runtimeTimer
}
NewTickerTickerTick
func NewTicker(d Duration) *Ticker {
   if d <= 0 {
       panic(errors.New("non-positive interval for NewTicker"))
   }
   c := make(chan Time, 1)
   t := &Ticker{
       C: c,
       r: runtimeTimer{
           when:   when(d),
           period: int64(d),
           f:      sendTime,
           arg:    c,
       },
   }
   startTimer(&t.r)
   return t
}

func Tick(d Duration) <-chan Time {
   if d <= 0 {
       return nil
   }
   return NewTicker(d).C
}
TickNewTickerNewTicker
NewTickerStopStopTick

性能分析

定时器在内部使用四叉树的方式进行实现和存储,当我们在生产环境中使用定时器进行毫秒级别的计时时,在高并发的场景下会有比较明显的性能问题,我们可以通过实验测试一下定时器在高并发时的性能,假设我们有以下的代码:

func runTimers(count int) {
   durationCh := make(chan time.Duration, count)

   wg := sync.WaitGroup{}
   wg.Add(count)
   for i := 0; i < count; i++ {
       go func() {
           startedAt := time.Now()
           time.AfterFunc(10*time.Millisecond, func() {
               defer wg.Done()
               durationCh <- time.Since(startedAt)
           })
       }()

   }
   wg.Wait()

   close(durationCh)

   durations := []time.Duration{}
   totalDuration := 0 * time.Millisecond
   for duration := range durationCh {
       durations = append(durations, duration)
       totalDuration += duration
   }
   averageDuration := totalDuration / time.Duration(count)
   sort.Slice(durations, func(i, j int) bool {
       return durations[i] < durations[j]
   })

   fmt.Printf("run %v timers with average=%v, pct50=%v, pct99=%v\n", count, averageDuration, durations[count/2], durations[int(float64(count)*0.99)])
}

完整的性能测试代码可以在 benchmark_timers.go 中找到,需要注意的是:由于机器和性能的不同,多次运行测试可能会有不一样的结果。

这段代码开了 N 个 Goroutine 并在每一个 Goroutine 中运行一个定时器,我们会在定时器到期时将开始计时到定时器到期所用的时间加入 Channel 并用于之后的统计,在函数的最后我们会计算出 N 个 Goroutine 中定时器到期时间的平均数、50 分位数和 99 分位数:

$ go test ./... -v
=== RUN   TestTimers
run 1000 timers with average=10.367111ms, pct50=10.234219ms, pct99=10.913219ms
run 2000 timers with average=10.431598ms, pct50=10.37367ms, pct99=11.025823ms
run 5000 timers with average=11.873773ms, pct50=11.986249ms, pct99=12.673725ms
run 10000 timers with average=11.954716ms, pct50=12.313613ms, pct99=13.507858ms
run 20000 timers with average=11.456237ms, pct50=10.625529ms, pct99=25.246254ms
run 50000 timers with average=21.223818ms, pct50=14.792982ms, pct99=34.250143ms
run 100000 timers with average=36.010924ms, pct50=31.794761ms, pct99=128.089527ms
run 500000 timers with average=176.676498ms, pct50=138.238588ms, pct99=676.967558ms
--- PASS: TestTimers (1.21s)

我们将上述代码输出的结果绘制成如下图所示的折线图,其中横轴是并行定时器的个数,纵轴表示定时器从开始到触发时间的差值,三个不同的线分别表示时间的平均值、50 分位数和 99 分位数:

虽然测试的数据可能有一些误差,但是从图中我们也能得出一些跟定时器性能和现象有关的结论:

  • 定时器触发的时间一定会晚于创建时传入的时间,假设定时器需要等待 10ms 触发,那它触发的时间一定是晚于 10ms 的;

  • 当并发的定时器数量达到 5000 时,定时器的平均误差达到了 ~18%,99 分位数上的误差达到了 ~26%;

  • 并发定时器的数量超过 5000 之后,定时器的误差就变得非常明显,不能有效、准确地完成计时任务;

这其实也是因为定时器从开始到触发的时间间隔非常短,当我们将计时的时间改到 100ms 时就会发现性能问题有比较明显的改善:

哪怕并行运行了 10w 个定时器,99 分位数的误差也只有 ~12%,我们其实能够发现 Go 语言标准库中的定时器在计时时间较短并且并发较高时有着非常明显的问题,所以在一些性能非常敏感的基础服务中使用定时器一定要非常注意 —— 它可能达不到我们预期的效果。

context.WithDeadline

总结

Go 语言的定时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了计时器、休眠等接口能够帮助我们在 Go 语言程序中更好地处理过期和超时等问题。

标准库中的定时器在大多数情况下是能够正常工作并且高效完成任务的,但是在遇到极端情况或者性能敏感场景时,它可能没有办法胜任,而在 10ms 的这个粒度下,作者在社区中也没有找到能够使用的定时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。