前言:
又是golang的定时器时间轮库,这是我实现的第二版时间轮了 😅 ,以前的版本不是不能用,不好用而已。主要问题是实现偏复杂,就懒得维护,索性直接重新造轮子了。
以前是如何实现的 ?以前时间轮每个格子里是一个最小堆,由于堆的时间复杂度问题,导致密集场景下cpu还是不低。当然对比golang timer要好的多。 当然也有优点,为了支持并发操作时间轮,把锁的粒度细到每个槽位,减少了锁的竞争。
第一版的代码,有兴趣可以看看,https://github.com/rfyiamcool/go-ringtimer
话说,当初为什么要在时间轮里加入最小堆,而不是双端列表或者map这类的结构。主要是考虑定时任务超过了时间轮的大小,而现在改用map来存,对于超过时间轮周期的任务设定一个round环数,每轮一圈就减 1 。
老生常谈
golang在1.9.x版本后优化了定时器,以前为一个四叉堆,现在是多个四叉堆,具体你能用到多少个堆,要看你的gomaxprocs配置大小了。go timer init里初始化了64个timerBucket,那么time会如何选择timerBucket? 是通过g.m.p.id % 64取摸拿到的timerBucket。
// xiaorui.cc
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}
那么golang分了这么多时间堆是干嘛的? 主要是为了解决锁的竞争。既然做了优化,那么时间轮的意义又是什么?
golang标准库里实现的定时器都是高精度的,高精度带来了频发的读写操作heap和锁的竞争压力。
时间轮的意义在于把精度放低,比如同一秒内的定时任务放凑在一起,数据结构改用map, 那么时间轮只需要o(1)的时间复杂度就可以把任务放进去,只需要一次加锁放锁就可以把同一个时间精度的任务都捞出来。
go timewheel
timewheel的代码实现原理我这里就不细说了,源码已经推到github里了,有兴趣的朋友可以看下。https://github.com/rfyiamcool/go-timewheel
下面列下go-timewheel间轮的基本用法:
// xiaorui.cc
// https://github.com/rfyiamcool/go-timewheel/blob/master/README.md
// 初始化时间轮, 精度为1s,槽位360个
tw, err := NewTimeWheel(1 * time.Second, 360)
if err != nil {
panic(err)
}
// 启动时间轮
tw.Start()
// 关闭时间轮
tw.Stop()
// 添加任务
task := tw.Add(5 * time.Second, func(){})
// 删除任务
tw.Remove(task)
// 添加周期性任务
task := tw.AddCron(5 * time.Second, func(){
...
})
// 实现time.Sleep
tw.Sleep(5 * time.Second)
similar to time.After()
// 实现After
<- tw.After(5 * time.Second)
similar to time.NewTimer
// 实现NewTimer定时器
timer :=tw.NewTimer(5 * time.Second)
<- timer.C
timer.Reset(1 * time.Second)
timer.Stop()
// 实现NewTicker定时器
timer :=tw.NewTicker(5 * time.Second)
<- timer.C
timer.Stop()
// 实现go time的AfterFunc
runner :=tw.AfterFunc(5 * time.Second, func(){})
<- runner.C
runner.Stop()
功能点实现思路
如何解决提交的定时任务小于时间轮的精度?
如果小于时间轮精度,那么就强制该任务为一个时间轮精度。
如何保证map结构的读写安全?
增删改查都在一个协程里,添加删除任务也只是添加到chan里。
如何保证调度器和任务添加在一个bucket的冲突问题?
比如调度器正在执行bucket 1,但是添加任务的槽位也是bucket 1,由于写安全,添加是在调度器执行之后,那么可以想象该任务算是被遗漏了,只有在下次轮询才能被执行。当然,还是有办法解决的。
我采用的方法跟上面一样,所有时间轮任务的管理都在一个协程里。
调度器每次执行任务是按照time.Ticker走的,如果调度器执行流发生超时?
一般是不会出现超时。但如果发生超时,会导致时间轮变慢。我们可以独立time.Ticker定时器,go默认给time.Ticker缓冲为1,我们可以多缓冲几个事件。
// xiaorui.cc
func (tw *TimeWheel) tickGenerator() {
if tw.tickQueue != nil {
return
}
for !tw.exited {
select {
case <-tw.ticker.C:
select {
case tw.tickQueue <- time.Now():
default:
panic("raise long time blocking")
}
}
}
}
func (tw *TimeWheel) schduler() {
queue := tw.ticker.C
if tw.tickQueue == nil {
queue = tw.tickQueue
}
for {
select {
case <-queue:
tw.handleTick()
...
如何兼容了golang time定时器的实现?
通过在定时任务里加入各种功能的回调方法来实现的,这里在时间轮里实现了类似标准库的 time.After, time.Sleep, time.NewTimer, time.AfterFunc, time.Tikcer的方法。
如何解决go标准库里timer, ticker stop后,select无法接收到已经stop事件的问题
我在timer ticker里封装了context cancel,当用户触发stop的时候,可以使用ctx来得知该定时器已经被关闭。
// xiaorui.cc
// similar to golang std timer
type Timer struct {
task *Task
tw *TimeWheel
fn func() // external custom func
C chan bool
cancel context.CancelFunc
Ctx context.Context
}
func (t *Timer) Reset(delay time.Duration) {
var task *Task
if t.fn != nil { // use AfterFunc
task = t.tw.addAny(delay,
func() {
t.fn()
notfiyChannel(t.C)
},
modeNotCircle, modeIsAsync, // must async mode
)
} else {
task = t.tw.addAny(delay,
func() {
notfiyChannel(t.C)
},
modeNotCircle, modeNotAsync)
}
t.task = task
}
func (t *Timer) Stop() {
t.task.stop = true
t.cancel()
t.tw.Remove(t.task)
}
在时间轮里,调度器处理事件时,区分了同步和异步调用
参考了go time的实现,当定时任务类型是通知类型的,比如timer.C,那么就可以用同步调用,因为在select里实现了default且chan buf为1,所谓不会阻塞。 当进行AfterFunc时,就使用go func调用。
简单说,直接函数调用肯定要比go func快的多的多。虽然golang实现了gfree来缓存了goroutine对象,但毕竟要经过runtime的pmg调度。
// xiaorui.cc
func AfterFunc(d Duration, f func()) *Timer {
t := &Timer{
r: runtimeTimer{
when: when(d),
f: goFunc, // go func 异步调用
arg: f,
},
}
startTimer(&t.r)
return t
}
func goFunc(arg interface{}, seq uintptr) {
go arg.(func())()
}
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1) // 缓冲为1,提高了效率
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default: // 不阻塞
}
}
性能
没有做太深度的压力测试。添加50w个定时任务都在合理的时间内都执行完毕。
// xiaorui.cc add timer cost: 330.263465ms recv sig cost: 1.196655379s
如果性能还是达不到要求,可以封装多个时间轮到一个池子,类似go time标准库里timerprc的实现。
可能不会存在的问题?
如果添加的定时器超过时间轮的大小,那么我这里会给他加入一个round字段,标明他的轮数。那么问题来了,如果大量的任务超过了时间轮大小,那么一个槽位的map里除了存在正常的任务外,且会存在一些下一轮才能执行的任务。
这样会造成遍历的开销? 是有点,但时间轮的场景一般是用在时间短且任务多的超时场景里。如果任务的周期有些长,可以多实例化几组时间轮,比如精度为秒的和分钟的各一个时间轮。
结尾:
时间轮的效率确实很高,在一个高频的推送服务里会有各种定时器的使用,时间轮可以减少cpu消耗。