• 最小堆提升每次排序的效率


    之前写过一个分布是任务调度系统,每次执行完任务都要对任务进行排序,使用最小堆确实优化了效率及cpu

    项目中需要使用一个简单的定时任务调度的框架,最初直接从GitHub上搜了一个star比较多的,就是https://github.com/robfig/cron,目前有8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近500QPS,随着业务的推广使用,可以预期增长还会比较快,但是已经遇到CPU使用率偏高的问题,通过pprof分析,很多都是在做排序,看了下这个项目的代码,整体执行的过程大概如下:

    对所有任务进行排序,按照下次执行时间进行排序

    选择数组中第一个任务,计算下次执行时间减去当前时间得到时间t,然后sleep t

    然后从数组第一个元素开始遍历任务,如果此任务需要调度的时间 < now,那么就执行此任务,执行之后重新计算这个任务的next执行时间

    每次待执行的任务执行完毕之后,都会重新对这个数组进行排序

    然后再循环从排好序的数组中找到第一个需要执行的任务去执行。

    代码如下:

    for {
            // Determine the next entry to run.
            sort.Sort(byTime(c.entries))

            var timer *time.Timer
            if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
                // If there are no entries yet, just sleep - it still handles new entries
                // and stop requests.
                timer = time.NewTimer(100000 * time.Hour)
            } else {
                timer = time.NewTimer(c.entries[0].Next.Sub(now))
            }

            for {
                select {
                case now = <-timer.C:
                    now = now.In(c.location)
                    c.logger.Info("wake""now", now)

                    // Run every entry whose next time was less than now
                    for _, e := range c.entries {
                        if e.Next.After(now) || e.Next.IsZero() {
                            break
                        }
                        c.startJob(e.WrappedJob)
                        e.Prev = e.Next
                        e.Next = e.Schedule.Next(now)
                        c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                    }

                case newEntry := <-c.add:
                    timer.Stop()
                    now = c.now()
                    newEntry.Next = newEntry.Schedule.Next(now)
                    c.entries = append(c.entries, newEntry)
                    c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

                case replyChan := <-c.snapshot:
                    replyChan <- c.entrySnapshot()
                    continue

                case <-c.stop:
                    timer.Stop()
                    c.logger.Info("stop")
                    return

                case id := <-c.remove:
                    timer.Stop()
                    now = c.now()
                    c.removeEntry(id)
                    c.logger.Info("removed""entry", id)
                }

                break
            }
        }
    • 1

    问题就显而易见了,执行一个任务(或几个任务)都重新计算next执行时间,重新排序,最坏情况就是每次执行1个任务,排序一遍,那么执行k个任务需要的时间的时间复杂度就是O(k*nlogn),这无疑是非常低效的。

    于是想着怎么优化一下这个框架,不难想到每次找最先需要执行的任务就是从一堆任务中找schedule_time最小的那一个(设schedule_time是任务的执行时间),那么比较容易想到的思路就是使用最小堆:

    在初始化任务列表的时候就直接构建一个最小堆

    每次执行查看peek元素是否需要执行

    需要执行就pop堆顶元素,计算next执行时间,重新push入堆

    不需要执行就break到外层循环取堆顶元素,计算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。

    我修改为min-heap的方式之后,每次添加任务的时候通过堆的属性进行up和down调整,每次添加任务时间复杂度O(logn),执行k个任务时间复杂度是O(klogn)。经过验证线上CPU使用降低4~5倍。CPU从50%左右降低至10%左右。

    优化后的代码如下,只是其中一部分。

    全部的代码也已经在github上已经创建了一个Fork的仓库并推送上去了,全部单测Case也都PASS。感兴趣可以点过去看。https://github.com/tovenja/cron

        for {
            // Determine the next entry to run.
            // Use min-heap no need sort anymore


         // 这里不再需要排序了,因为add的时候直接进行堆调整
         //sort.Sort(byTime(c.entries))

            var timer *time.Timer
            if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
                // If there are no entries yet, just sleep - it still handles new entries
                // and stop requests.
                timer = time.NewTimer(100000 * time.Hour)
            } else {
                timer = time.NewTimer(c.entries[0].Next.Sub(now))
                //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID)
            }

            for {
                select {
                case now = <-timer.C:
                    now = now.In(c.location)
                    c.logger.Info("wake""now", now)
                    // Run every entry whose next time was less than now
                    for {
                        e := c.entries.Peek()
                        if e.Next.After(now) || e.Next.IsZero() {
                            break
                        }
                        e = heap.Pop(&c.entries).(*Entry)
                        c.startJob(e.WrappedJob)
                        e.Prev = e.Next
                        e.Next = e.Schedule.Next(now)
                        heap.Push(&c.entries, e)
                        c.logger.Info("run""now", now, "entry", e.ID, "next", e.Next)
                    }

                case newEntry := <-c.add:
                    timer.Stop()
                    now = c.now()
                    newEntry.Next = newEntry.Schedule.Next(now)
                    heap.Push(&c.entries, newEntry)
                    c.logger.Info("added""now", now, "entry", newEntry.ID, "next", newEntry.Next)

                case replyChan := <-c.snapshot:
                    replyChan <- c.entrySnapshot()
                    continue

                case <-c.stop:
                    timer.Stop()
                    c.logger.Info("stop")
                    return

                case id := <-c.remove:
                    timer.Stop()
                    now = c.now()
                    c.removeEntry(id)
                    c.logger.Info("removed""entry", id)
                }

                break
            }
        }

    • 1

    转自:

    cnblogs.com/aboutblank/p/14860571.html

    更多好文 关注

    alt

    本文由 mdnice 多平台发布

  • 相关阅读:
    设备完全有效生产率TEEP对生产制造企业有什么作用?
    正则验证用户名和跨域postmessage
    Pytorch 学习路程 - 1:入门
    Solaris 9 Sparc下安装整合Apache2和Tomcat5
    OceanBase社区版单节点安装搭建(Docker)
    Spring Cloud Alibaba【授权规则、系统自适应限流、SentinelResource注解配置详解之只配 置fallback】(八)
    Linux常用命令
    如何用Postman做接口自动化测试
    基于slate构建文档编辑器
    leetcode - 1838. Frequency of the Most Frequent Element
  • 原文地址:https://blog.csdn.net/qq_39787367/article/details/126031758