• robfig/cron-go cron定时任务库架构剖析


    Cron深度解析


    思想

    对于cron 这个三方库来说,他可以说是做两件事,其一是:解析cron string,生成一个定时器,达到循环时间发送信号。其二是核心(引擎):用以执行,判断,接收任务,停止cron等事情。


    robfig/cron at v3.0.1架构图

    robfig/cron 架构图


    关键位置分析

    Cron 实现 Executable 接口

    Executable 接口 有三个方法,Cron结构体继承。

    • 通过Start 会 启动一个并行队列
    • 通过Run 会启动一个串行队列
    // Start the cron scheduler in its own go-routine, or no-op if already started.
    func (c *Cron) Start() {
       if c.running {
       	return
       }
       c.running = true
       go c.run()
    }
    
    // Run the cron scheduler, or no-op if already running.
    func (c *Cron) Run() {
       if c.running {
       	return
       }
       c.running = true
       c.run()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Cron 结构体分析

    // Cron keeps track of any number of entries, invoking the associated func as
    // specified by the schedule. It may be started, stopped, and the entries may
    // be inspected while running.
    type Cron struct {
    	entries  []*Entry
    	stop     chan struct{}
    	add      chan *Entry
    	snapshot chan []*Entry
    	running  bool
    	ErrorLog *log.Logger
    	location *time.Location
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • entries:指针切片,用以执行当中的entry(即每一个任务)以及接收传进来的任务
    • stop:停止信号
    • add:接收传输管道
    • snapshot:entries 副本,打印需要

    cron string 解析

    在 AddJob 方法中,通过Parse 函数获取实现了Schedule 接口的结构体:SpecSchedule

    func parseDescriptor(descriptor string) (Schedule, error) {
    	switch descriptor {
    	case "@yearly", "@annually":
    		return &SpecSchedule{
    			Second: 1 << seconds.min,
    			Minute: 1 << minutes.min,
    			Hour:   1 << hours.min,
    			Dom:    1 << dom.min,
    			Month:  1 << months.min,
    			Dow:    all(dow),
    		}, nil
    
    	case "@monthly":
    		return &SpecSchedule{
    			Second: 1 << seconds.min,
    			Minute: 1 << minutes.min,
    			Hour:   1 << hours.min,
    			Dom:    1 << dom.min,
    			Month:  all(months),
    			Dow:    all(dow),
    		}, nil
    ......
    }
    
    // 该结构体实现了 Schedule 接口
    type SpecSchedule struct {
    	Second, Minute, Hour, Dom, Month, Dow uint64
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    引擎解析

    // Run the scheduler. this is private just due to the need to synchronize
    // access to the 'running' state variable.
    func (c *Cron) run() {
    	// Figure out the next activation times for each entry.
    	now := c.now()
    	for _, entry := range c.entries {
    		entry.Next = entry.Schedule.Next(now)
    	}
    
    	for {
    		// Determine the next entry to run.
    		sort.Sort(byTime(c.entries))
    
    		var timer *time.Timer
    		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
    			// If there are no entries yet, just sleep - it still handles new entries
    			// and stop requests.
    			timer = time.NewTimer(100000 * time.Hour)
    		} else {
    			timer = time.NewTimer(c.entries[0].Next.Sub(now))
    		}
    
    		for {
    			select {
    			case now = <-timer.C:
    				now = now.In(c.location)
    				// Run every entry whose next time was less than now
    				for _, e := range c.entries {
    					if e.Next.After(now) || e.Next.IsZero() {
    						break
    					}
    					go c.runWithRecovery(e.Job)
    					e.Prev = e.Next
    					e.Next = e.Schedule.Next(now)
    				}
    
    			case newEntry := <-c.add:
    				timer.Stop()
    				now = c.now()
    				newEntry.Next = newEntry.Schedule.Next(now)
    				c.entries = append(c.entries, newEntry)
    
    			case <-c.snapshot:
    				c.snapshot <- c.entrySnapshot()
    				continue
    
    			case <-c.stop:
    				timer.Stop()
    				return
    			}
    
    			break
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    就是一个循环,循环等,循环加。

    改进点

    可以发现在这其中并没有写注销某一个定时任务的方法

    但是想实现也是比较简单的,加一个case,监听channel,遍历 entries,找到目标,删除掉就ok了

    但是既然它不加,应该是有原因的,不加的原因应该是处于经验考虑的吧?

  • 相关阅读:
    [附源码]计算机毕业设计JAVA公益劳动招募管理系统
    ResNet网络详解及其PyTorch实现
    C#中反射的使用总结
    安卓手机APP开发__媒体开发部分__播放器的接口
    Django-(8)
    Hive集群高可用配置与impala集群高可用配置
    这是什么代码帮我看看
    北大肖臻老师《区块链技术与应用》系列课程学习笔记[15]以太坊-交易树和收据树
    MARS: An Instance-aware, Modular and Realistic Simulator for Autonomous Driving
    Multitask Vision-Language Prompt Tuning
  • 原文地址:https://blog.csdn.net/wys74230859/article/details/128135071