共计 3306 个字符,预计需要花费 9 分钟才能阅读完成。
导读 | 本文介绍了作者优化定时任务调度框架的实现。一起来来看看吧。 |
项目中需要使用一个简单的定时任务调度的框架,最初直接从 GitHub 上搜了一个 star 比较多的,就是 https://github.com/robfig/cron,目前有 8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近 500QPS,随着业务的推广使用,可以预期增长还会比较快,但是已经遇到 CPU 使用率偏高的问题,通过 pprof 分析,很多都是在做排序,看了下这个项目的代码,整体执行的过程大概如下:
对所有任务进行排序,按照下次执行时间进行排序
选择数组中第一个任务,计算下次执行时间减去当前时间得到时间 t,然后 sleep t
然后从数组第一个元素开始遍历任务,如果此任务需要调度的时间 < now,那么就执行此任务,执行之后重新计算这个任务的 next 执行时间
每次待执行的任务执行完毕之后,都会重新对这个数组进行排序
然后再循环从排好序的数组中找到第一个需要执行的任务去执行。
代码如下:
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {if e.Next.After(now) || e.Next.IsZero() {break}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}
问题就显而易见了,执行一个任务(或几个任务)都重新计算 next 执行时间,重新排序,最坏情况就是每次执行 1 个任务,排序一遍,那么执行 k 个任务需要的时间的时间复杂度就是 O(k*nlogn),这无疑是非常低效的。
于是想着怎么优化一下这个框架,不难想到每次找最先需要执行的任务就是从一堆任务中找 schedule_time 最小的那一个(设 schedule_time 是任务的执行时间),那么比较容易想到的思路就是使用最小堆:
在初始化任务列表的时候就直接构建一个最小堆
每次执行查看 peek 元素是否需要执行
需要执行就 pop 堆顶元素,计算 next 执行时间,重新 push 入堆
不需要执行就 break 到外层循环取堆顶元素,计算 next_time-now() = need_sleep_time,然后 select 睡眠、add、remove 等操作。
我修改为 min-heap 的方式之后,每次添加任务的时候通过堆的属性进行 up 和 down 调整,每次添加任务时间复杂度 O(logn),执行 k 个任务时间复杂度是 O(klogn)。经过验证线上 CPU 使用降低 4~5 倍。CPU 从 50% 左右降低至 10% 左右。
优化后的代码如下,只是其中一部分。
全部的代码也已经在 github 上已经创建了一个 Fork 的仓库并推送上去了,全部单测 Case 也都 PASS。感兴趣可以点过去看。https://github.com/tovenja/cro
for {
// Determine the next entry to run.
// Use min-heap no need sort anymore
// 这里不再需要排序了,因为 add 的时候直接进行堆调整
//sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {timer = time.NewTimer(c.entries[0].Next.Sub(now))
//fmt.Printf("%v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for {e := c.entries.Peek()
if e.Next.After(now) || e.Next.IsZero() {break}
e = heap.Pop(&c.entries).(*Entry)
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
heap.Push(&c.entries, e)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
heap.Push(&c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}
}