• Go定时任务源码 - robfig/cron


    介绍

    robfig/cron是Go语言实现的开源定时任务调度框架,核心代码是巧妙的使用chan + select + for实现了一个轻量级调度协程,不但语法简洁,而且具有很好的性能。

    设计

    任务抽象(业务隔离):任务抽象成一个Job接口,业务逻辑类只需实现该接口

    type Job interface {
      Run()
    }
    
    • 1
    • 2
    • 3

    计划接口:通过当前时间计算任务的下次执行执行时间,具体实现类可以根据实际需求实现

    type Schedule interface {
    	Next(time.Time) time.Time
    }
    
    • 1
    • 2
    • 3

    定时任务对象:保存执行的任务Job、计算执行时间

    type Entry struct {
    	ID       EntryID   // id
    	Schedule Schedule  // 计划
    	Next     time.Time // 下次执行时间
    	Job      Job       // 任务
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    任务调度管理:保存定时任务对象(Entry),调度任务执行,提供新增、删除接口(涉及关联资源竞争)

    // 任务管理类
    type Cron struct {
    	nextID  int64        // 生成entry自增ID
    	entries []*Entry     // 保存Entry
    	add     chan *Entry  // 添加
    	remove  chan EntryID // 删除
    }
    // 删除
    func (c *Cron) Remove(id EntryID) { 
    	c.remove <- id
    }
    // 新增
    func (c *Cron) Add(spec string, cmd Job) EntryID  { 
    	entry := &Entry{
    		ID:         EntryID(atomic.AddInt64(&c.nextID, 1)),
    		Schedule:   ParseStandard(spec),
    		Job:        cmd,
    	}
    	c.add <- entry
    	return entry.ID
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    核心调度:计算下次执行时间 -> 排序 -> 取最早执行数据 -> timer 等待,因为只有一个协程在执行这个run的调度,所以不存在资源竞争,不需要加锁,另外考虑到执行任务可能涉及阻塞,例如:IO操作,所以一般startJob方法会开启协程执行

    func (c *Cron) run() {
    	now := time.Now()
    	for _, entry := range c.entries {
    		entry.Next = entry.Schedule.Next(now) // 计算下次执行时间
    	}
    	for {
    		sort.Sort(byTime(c.entries)) // 时间排序
    		timer := time.NewTimer(c.entries[0].Next.Sub(now))
    		select {
    		case now = <-timer.C:
    			for _, e := range c.entries {
    				if e.Next.After(now) || e.Next.IsZero() {
    					break
    				}
    				c.startJob(e.Job) // 开协程执行
    				e.Next = e.Schedule.Next(now) // 计算下次执行时间
    			}
    		case newEntry := <-c.add: // 新增
    			timer.Stop()
    			newEntry.Next = newEntry.Schedule.Next(now)
    			c.entries = append(c.entries, newEntry)
    		}
        ...
    	}
    }
    // 执行任务
    func (c *Cron) startJob(j Job) {
    	go func() {
    		j.Run()
    	}()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    启动时会开启唯一协程执行run方法,计算任务执行时间,执行,任务管理等

    func New() *Cron {
    	c := &Cron{
    		entries: nil,
    		add:     make(chan *Entry),
    		remove:  make(chan EntryID),
    	}
    	return c
    }
    func (c *Cron) Start() {
    	go c.run()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    总结

    1. 共享资源(定时任务)的管理和调度由唯一协程管理
    2. 通过for + select + channel来循环计算执行时间,监听任务到期、增删事件
    3. 执行任务会新启协程执行,不阻塞调度
    4. 采用扇入/扇出原理,多协程添加、增删任务调度协程(Fan In),调度启动新协程执行任务(Fan Out)
    5. 调度协程使用的是CSP并发模型思想

    我的博客:https://itart.cn
    原文地址:https://itart.cn/blogs/2022/explore/cron-source-code.html

  • 相关阅读:
    2023年第四届MathorCup大数据挑战赛(B题)|电商零售商家需求预测及库存优化问题|数学建模完整代码+建模过程全解全析
    hyperf框架接入pgsql扩展包
    Python(win+r--mspaint——打开画图)
    初级算法_字符串 --- 最长公共前缀
    [附源码]计算机毕业设计springboot汽配管理系统
    【java期末复习题】第2章 Java语言的基本语法
    [PyTorch][chapter 57][WGAN-GP 代码实现]
    终于把下载安装更新的功能整出来了,记录关键点
    ASEMI-GBJ3510新能源专用整流桥
    2022北京国际养老产业展览会/北京养老展/养老用品展11月
  • 原文地址:https://blog.csdn.net/piaohai/article/details/127974560