• robfig/cron-go cron定时任务库架构剖析


    Cron深度解析


    思想

    对于cron 这个三方库来说,他可以说是做两件事,其一是:解析cron string,生成一个定时器,达到循环时间发送信号。其二是核心(引擎):用以执行,判断,接收任务,停止cron等事情。


    robfig/cron at v3.0.1架构图

    robfig/cron 架构图


    关键位置分析

    Cron 实现 Executable 接口

    Executable 接口 有三个方法,Cron结构体继承。

    • 通过Start 会 启动一个并行队列
    • 通过Run 会启动一个串行队列
    // Start the cron scheduler in its own go-routine, or no-op if already started.
    func (c *Cron) Start() {
       if c.running {
       	return
       }
       c.running = true
       go c.run()
    }
    
    // Run the cron scheduler, or no-op if already running.
    func (c *Cron) Run() {
       if c.running {
       	return
       }
       c.running = true
       c.run()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Cron 结构体分析

    // Cron keeps track of any number of entries, invoking the associated func as
    // specified by the schedule. It may be started, stopped, and the entries may
    // be inspected while running.
    type Cron struct {
    	entries  []*Entry
    	stop     chan struct{}
    	add      chan *Entry
    	snapshot chan []*Entry
    	running  bool
    	ErrorLog *log.Logger
    	location *time.Location
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • entries:指针切片,用以执行当中的entry(即每一个任务)以及接收传进来的任务
    • stop:停止信号
    • add:接收传输管道
    • snapshot:entries 副本,打印需要

    cron string 解析

    在 AddJob 方法中,通过Parse 函数获取实现了Schedule 接口的结构体:SpecSchedule

    func parseDescriptor(descriptor string) (Schedule, error) {
    	switch descriptor {
    	case "@yearly", "@annually":
    		return &SpecSchedule{
    			Second: 1 << seconds.min,
    			Minute: 1 << minutes.min,
    			Hour:   1 << hours.min,
    			Dom:    1 << dom.min,
    			Month:  1 << months.min,
    			Dow:    all(dow),
    		}, nil
    
    	case "@monthly":
    		return &SpecSchedule{
    			Second: 1 << seconds.min,
    			Minute: 1 << minutes.min,
    			Hour:   1 << hours.min,
    			Dom:    1 << dom.min,
    			Month:  all(months),
    			Dow:    all(dow),
    		}, nil
    ......
    }
    
    // 该结构体实现了 Schedule 接口
    type SpecSchedule struct {
    	Second, Minute, Hour, Dom, Month, Dow uint64
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    引擎解析

    // Run the scheduler. this is private just due to the need to synchronize
    // access to the 'running' state variable.
    func (c *Cron) run() {
    	// Figure out the next activation times for each entry.
    	now := c.now()
    	for _, entry := range c.entries {
    		entry.Next = entry.Schedule.Next(now)
    	}
    
    	for {
    		// Determine the next entry to run.
    		sort.Sort(byTime(c.entries))
    
    		var timer *time.Timer
    		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
    			// If there are no entries yet, just sleep - it still handles new entries
    			// and stop requests.
    			timer = time.NewTimer(100000 * time.Hour)
    		} else {
    			timer = time.NewTimer(c.entries[0].Next.Sub(now))
    		}
    
    		for {
    			select {
    			case now = <-timer.C:
    				now = now.In(c.location)
    				// Run every entry whose next time was less than now
    				for _, e := range c.entries {
    					if e.Next.After(now) || e.Next.IsZero() {
    						break
    					}
    					go c.runWithRecovery(e.Job)
    					e.Prev = e.Next
    					e.Next = e.Schedule.Next(now)
    				}
    
    			case newEntry := <-c.add:
    				timer.Stop()
    				now = c.now()
    				newEntry.Next = newEntry.Schedule.Next(now)
    				c.entries = append(c.entries, newEntry)
    
    			case <-c.snapshot:
    				c.snapshot <- c.entrySnapshot()
    				continue
    
    			case <-c.stop:
    				timer.Stop()
    				return
    			}
    
    			break
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    就是一个循环,循环等,循环加。

    改进点

    可以发现在这其中并没有写注销某一个定时任务的方法

    但是想实现也是比较简单的,加一个case,监听channel,遍历 entries,找到目标,删除掉就ok了

    但是既然它不加,应该是有原因的,不加的原因应该是处于经验考虑的吧?

  • 相关阅读:
    WSL2最佳实践,淘汰难用的Xshell和Finalshell
    小程序首页如何进行装修设置
    网络攻防实验 (by quqi99)
    网络安全之应急流程
    P1074 [NOIP2009 提高组] 靶形数独
    JS测试出最小支持字体,以及修复PDFJS的文本错误偏移
    C语言迭代法求一个数的平方根。迭代公式:Xn+1=(Xn+a/Xn)/2,其中a是输入的数字
    XML文件的解析操作
    AFPN:用于目标检测的渐近特征金字塔网络
    CentOS 7迁移Anolis OS 7
  • 原文地址:https://blog.csdn.net/wys74230859/article/details/128135071