• 四叉堆在GO中的应用-定时任务timer


    堆作为必须掌握的数据结构之一,在众多场景中也得到了广泛的应用。
    比较典型的,如java中的优先队列PriorityQueue、算法中的TOP-K问题、最短路径Dijkstra算法等,在这些经典应用中堆都担任着灵魂般的角色。

    理论基础

    binary heap

    再一起回忆一下堆的一个性质:堆总是一棵完全二叉树。有些文章中也将堆称为二叉堆(binary heap)。
    在堆中,再根据堆顶点为最大值与最小值,分为大顶堆与小顶堆。
    大顶堆
    小顶堆

    新增一个元素,需要进行sift-up操作,其时间复杂度为O(logN)

    构造二叉堆,有两种方式:

    • 一种是比较简单的方式:遍历每个元素进行sift-up,其时间复杂度为O(N*logN)
    • 另一种是将元素以完全二叉树进行存储,遍历每个非叶子节点自下而上构建子堆,其时间负载度为O(N)

    删除堆顶元素,需要对堆末尾元素进行sift-down,其时间复杂度也为O(logN)。

    堆排序的过程是在构建好堆后再逐个删除堆顶元素,其时间复杂度O(N+(N-1)*logN),约为O(NlogN)

    堆排序整体运行过程动画如下:
    堆排序过程

    d-ary deap

    除了二叉堆外,还有三叉堆、四叉堆、五叉堆这些N叉堆,即维基百科中的d-ary heap

    The d-ary heap or d-heap is a priority queue data structure, a generalization of the binary heap in which the nodes have d children instead of 2.

    在这里插入图片描述

    N叉堆与二叉堆进行对比,由于N叉堆树的高度更低,上推(sift-up)过程的时间复杂度是二叉堆的O(logN2)倍,即新增元素时则会更快。

    删除堆顶元素时进行sift-down操作,时间复杂度为O(N * log s / log N)。(N为维度,s为堆中节点个数)

    在N叉堆中,四叉堆由于综合性能相对稳定在N叉堆中脱颖而出。

    测试数据可参考:https://vearne.cc/archives/39627

    GO中的应用(time.ticker源码分析)

    在有了理论基础后,再看下四叉堆在GO中的应用-timer(定时任务)。

    ticker用法

    在go项目中,可以使用go自带的time.ticker进行简单的定时任务。示例代码如下:

    // 新建一个ticker,定设置周期为1秒
    ticker := time.NewTicker(time.Second * 3)
    // 在一个协程接收ticker的channel回调
    go func() {
    	for {
    		<-ticker.C
    		// 周期到达后,输出当前时间
    		fmt.Println("tick-->", time.Now().String())
    	}
    }()
    time.Sleep(time.Hour)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    输出示例为:

    ……
    tick--> 2023-10-08 21:01:30.1830277 +0800 CST m=+3.009288301
    tick--> 2023-10-08 21:01:33.1811243 +0800 CST m=+6.007384901
    tick--> 2023-10-08 21:01:36.179331 +0800 CST m=+9.005591601
    ……
    
    • 1
    • 2
    • 3
    • 4
    • 5

    以上一个定时任务代码就完成了

    ticker结构

    以上简短的代码便完成一个定时任务的功能,再来探究一下它的原理。

    一个Ticker由两部分组成:

    • 一个接收消息的channel
    • 一个runtimeTimer结构体
    type Ticker struct {
    	C <-chan Time // The channel on which the ticks are delivered.
    	r runtimeTimer
    }
    
    type runtimeTimer struct {
    	pp       uintptr
    	when     int64
    	period   int64
    	f        func(any, uintptr) // NOTE: must not be closure
    	arg      any
    	seq      uintptr
    	nextwhen int64
    	status   uint32
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    从NewTicker方法入手:

    func NewTicker(d Duration) *Ticker {
    	if d <= 0 {
    		panic(errors.New("non-positive interval for NewTicker"))
    	}
    	c := make(chan Time, 1)
    	t := &Ticker{
    		C: c,
    		r: runtimeTimer{
    			when:   when(d), //下次触发时间
    			period: int64(d),//运行周期
    			f:      sendTime,//触发时执行的动作
    			arg:    c,
    		},
    	}
    	startTimer(&t.r)//启动Timer
    	return t
    }
    
    // sendTime does a non-blocking send of the current time on c.
    func sendTime(c any, seq uintptr) {
    	select {
    	case c.(chan Time) <- Now()://将当前时间发送给等待的channel
    	default://channel缓存区满了,不执行任何操作
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    以上代码在NewTicker方法中创建了一个Ticker,并调用了startTimer方法。
    且runtimeTimer与一个ticker是一对一的关系,用一个堆来存储所有的定时任务,则一个ticker是一个节点。

    startTimer方法

    startTimer在time包下无法找到实现代码,需要在go源码的runtime下查看。

    time.go

    如上图所示,源码在src/runtime/time.go文件中。

    // startTimer adds t to the timer heap.
    //
    //go:linkname startTimer time.startTimer
    func startTimer(t *timer) {
    	if raceenabled {
    		racerelease(unsafe.Pointer(t))
    	}
    	addtimer(t)
    }
    
    // Note: this changes some unsynchronized operations to synchronized operations
    // addtimer adds a timer to the current P.
    // This should only be called with a newly created timer.
    // That avoids the risk of changing the when field of a timer in some P's heap,
    // which could cause the heap to become unsorted.
    func addtimer(t *timer) {
    	// when must be positive. A negative value will cause runtimer to
    	// overflow during its delta calculation and never expire other runtime
    	// timers. Zero will cause checkTimers to fail to notice the timer.
    	if t.when <= 0 {
    		throw("timer when must be positive")
    	}
    	if t.period < 0 {
    		throw("timer period must be non-negative")
    	}
    	if t.status.Load() != timerNoStatus {
    		throw("addtimer called with initialized timer")
    	}
    	t.status.Store(timerWaiting)
    
    	when := t.when
    
    	// Disable preemption while using pp to avoid changing another P's heap.
    	mp := acquirem()
    
    	pp := getg().m.p.ptr()
    	lock(&pp.timersLock)
    	cleantimers(pp)
    	doaddtimer(pp, t)
    	unlock(&pp.timersLock)
    
    	wakeNetPoller(when)
    
    	releasem(mp)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    addtimer方法为关键方法。看懂addtimer的整体方法需要对go中的GMP模型有一定的了解。
    G(gorountine协程),M(thread线程),P(processor处理器)

    gmp

    咱们这里仅看主流程,直接看doaddtimer方法。

    doaddtimer方法(新增节点)

    // doaddtimer adds t to the current P's heap.
    // The caller must have locked the timers for pp.
    func doaddtimer(pp *p, t *timer) {
    	// Timers rely on the network poller, so make sure the poller
    	// has started.
    	if netpollInited.Load() == 0 {
    	    // netpool如未初始化则进行初始化
    		netpollGenericInit()
    	}
    
    	if t.pp != 0 {
    		throw("doaddtimer: P already set in timer")
    	}
    	// 给timer绑定p
    	t.pp.set(pp)
    	i := len(pp.timers)
    	// 将此timer添加到p的timer集合中,放到堆的末尾
    	pp.timers = append(pp.timers, t)
    	// 堆内新增了元素,进行上推操作在保持堆的特性
    	siftupTimer(pp.timers, i)
    	if t == pp.timers[0] {
    		pp.timer0When.Store(t.when)
    	}
    	// p的timer计数器加1
    	pp.numTimers.Add(1)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    再来详细学习下4叉堆的siftup具体是如何操作的。
    siftupTimer方法中的t为堆的所有元素,i为要进行siftup元素的索引,也就是新增的元素索引。

    siftupTimer方法如下:

    func siftupTimer(t []*timer, i int) int {
    	// 判断新增元素的正确性
    	if i >= len(t) {
    		badTimer()
    	}
    	// 获取出新增元素的具体运行时间
    	when := t[i].when
    	if when <= 0 {
    		badTimer()
    	}
    	// 新增元素的值
    	tmp := t[i]
    	for i > 0 {
    	    // 获取出新增元素的父节点索引,四叉堆时父节点索引为(i-1)/4
    		p := (i - 1) / 4 // parent
    		if when >= t[p].when {
    		    // 新增元素的运行时间晚于父节点,则无需继续siftup
    			break
    		}
    		// 将原父节点位置往下降一级
    		t[i] = t[p]
    		// 新增元素的位置往上提升一级
    		i = p
    	}
    	if tmp != t[i] {
    	    // 新增元素的值在最后确定了位置后才赋值,而不是每次都进行交换
    		t[i] = tmp
    	}
    	return i
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    runtimer方法(执行/删除节点)

    当timer堆中维护好后,就可以准备执行timer堆中的timer了。
    此过程为持续从堆顶取出timer,判断timer是否达到了执行的条件(时间、状态),如果条件满足就执行此timer。

    执行timer的方法为time中的runtimer方法,执行时主要关注runOneTimer方法。源码如下:

    func runtimer(pp *p, now int64) int64 {
    	for {
    	    // 获取出当前p的堆顶timer
    		t := pp.timers[0]
    		if t.pp.ptr() != pp {
    			throw("runtimer: bad p")
    		}
    		// 对堆顶timer的状态进行判断
    		switch s := t.status.Load(); s {
    		case timerWaiting:
    			if t.when > now {
    				// Not ready to run.
    				return t.when
    			}
    
    			if !t.status.CompareAndSwap(s, timerRunning) {
    			    // 已在运行,不重复运行
    				continue
    			}
    			// Note that runOneTimer may temporarily unlock
    			// pp.timersLock.
    			runOneTimer(pp, t, now)
    			return 0
    
    		case timerDeleted:
    			if !t.status.CompareAndSwap(s, timerRemoving) {
    				continue
    			}
    			dodeltimer0(pp)
    			if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {
    				badTimer()
    			}
    			pp.deletedTimers.Add(-1)
    			if len(pp.timers) == 0 {
    				return -1
    			}
    
    		case timerModifiedEarlier, timerModifiedLater:
    			if !t.status.CompareAndSwap(s, timerMoving) {
    				continue
    			}
    			t.when = t.nextwhen
    			dodeltimer0(pp)
    			doaddtimer(pp, t)
    			if !t.status.CompareAndSwap(timerMoving, timerWaiting) {
    				badTimer()
    			}
    
    		case timerModifying:
    			// Wait for modification to complete.
    			osyield()
    
    		case timerNoStatus, timerRemoved:
    			// Should not see a new or inactive timer on the heap.
    			badTimer()
    		case timerRunning, timerRemoving, timerMoving:
    			// These should only be set when timers are locked,
    			// and we didn't do it.
    			badTimer()
    		default:
    			badTimer()
    		}
    	}
    }
    
    // runOneTimer runs a single timer.
    // The caller must have locked the timers for pp.
    // This will temporarily unlock the timers while running the timer function.
    //
    //go:systemstack
    func runOneTimer(pp *p, t *timer, now int64) {
    	if raceenabled {
    		ppcur := getg().m.p.ptr()
    		if ppcur.timerRaceCtx == 0 {
    			ppcur.timerRaceCtx = racegostart(abi.FuncPCABIInternal(runtimer) + sys.PCQuantum)
    		}
    		raceacquirectx(ppcur.timerRaceCtx, unsafe.Pointer(t))
    	}
        
        // 取出timer中的function和参数
    	f := t.f
    	arg := t.arg
    	seq := t.seq
    
    	if t.period > 0 {
    	    // tick类型的timer,以实际运行时间和固定周期计算出下次运行时间
    		// Leave in heap but adjust next time to fire.
    		delta := t.when - now
    		t.when += t.period * (1 + -delta/t.period)
    		if t.when < 0 { // check for overflow.
    			t.when = maxWhen
    		}
    		// siftdown堆顶节点,重新调整堆
    		siftdownTimer(pp.timers, 0)
    		if !t.status.CompareAndSwap(timerRunning, timerWaiting) {
    			badTimer()
    		}
    		updateTimer0When(pp)
    	} else {
    	    // 非tick类型的timer,执行删除
    		// Remove from heap.
    		dodeltimer0(pp)
    		if !t.status.CompareAndSwap(timerRunning, timerNoStatus) {
    			badTimer()
    		}
    	}
    
    	if raceenabled {
    		// Temporarily use the current P's racectx for g0.
    		gp := getg()
    		if gp.racectx != 0 {
    			throw("runOneTimer: unexpected racectx")
    		}
    		gp.racectx = gp.m.p.ptr().timerRaceCtx
    	}
    
    	unlock(&pp.timersLock)
        // 执行timer的function和参数
    	f(arg, seq)
    
    	lock(&pp.timersLock)
    
    	if raceenabled {
    		gp := getg()
    		gp.racectx = 0
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    在删除堆顶节点时执行的是siftdownTimer方法。其源码如下:

    // siftdownTimer puts the timer at position i in the right place
    // in the heap by moving it down toward the bottom of the heap.
    func siftdownTimer(t []*timer, i int) {
    	n := len(t)
    	if i >= n {
    		badTimer()
    	}
    	// 获取出要调整节点的执行时间
    	when := t[i].when
    	if when <= 0 {
    		badTimer()
    	}
    	tmp := t[i]
    	for {
    	    // c为调整节点的最左子节点,从左往右第1个
    		c := i*4 + 1 // left child
    		// c3为调整节点的中间节点,从左往右第3个
    		c3 := c + 2  // mid child
    		if c >= n {
    			break
    		}
    		// 最左子节点的下次执行时间
    		w := t[c].when
    		// 左边第2个节点的执行时间比最左子节点执行时间更先执行
    		if c+1 < n && t[c+1].when < w {
    		    // 左边部分timer排序交换,最先执行的排左边
    			w = t[c+1].when
    			c++
    		}
    		// 判断中间节点是否存在
    		if c3 < n {
    		    // 中间子节点的timer执行时间
    			w3 := t[c3].when
    			if c3+1 < n && t[c3+1].when < w3 {
    			// 同上,将最先执行的往左排
    				w3 = t[c3+1].when
    				c3++
    			}
    			// 子节点整体做对比,左侧与右侧对比
    			if w3 < w {
    			    // 将最先执行的放在左边
    				w = w3
    				c = c3
    			}
    		}
    		if w >= when {
    		    // 堆已调整完毕
    			break
    		}
    		// 将最左的子节点向上升一级
    		t[i] = t[c]
    		// 原i向下降一级
    		i = c
    	}
    	if tmp != t[i] {
    	    // 将siftdown节点调整到最终确定的位置
    		t[i] = tmp
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    某一个timer运行时,会判断此timer是否为周期性timer,周期性timer会将堆顶节点进行移除,再计算出下次执行时间,并使用sift-down将此timer下沉到适当的位置,以整体满足堆的特性。

    dodeltimer0(临时性timer)

    从runOneTimer方法中可以看到有两个分支,分别为:

    • timer有period(周期性定时任务类型)
    • timer无period(仅计时类型)

    前面看的siftdownTimer是周期性定时任务会执行的方法。如果为临时性定时任务,如倒计时或time.sleep场景中,则最终运行的为dodeltimer0方法。

    源码如下:

    // dodeltimer0 removes timer 0 from the current P's heap.
    // We are locked on the P when this is called.
    // It reports whether it saw no problems due to races.
    // The caller must have locked the timers for pp.
    func dodeltimer0(pp *p) {
    	if t := pp.timers[0]; t.pp.ptr() != pp {
    		throw("dodeltimer0: wrong P")
    	} else {
    		t.pp = 0
    	}
    	// 获取到堆中的最后一个节点
    	last := len(pp.timers) - 1
    	if last > 0 {
    	    // 最后一个节点放到堆顶
    		pp.timers[0] = pp.timers[last]
    	}
    	// 删除堆中的原末尾节点
    	pp.timers[last] = nil
    	pp.timers = pp.timers[:last]
    	if last > 0 {
    	    // 对放到堆顶的原末尾节点进行siftdown操作
    		siftdownTimer(pp.timers, 0)
    	}
    	// 更新timer集合
    	updateTimer0When(pp)
    	n := pp.numTimers.Add(-1)
    	if n == 0 {
    		// If there are no timers, then clearly none are modified.
    		pp.timerModifiedEarliest.Store(0)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    从源码可以看出,当临时性timer触发后会将此节点删除不会再次入堆。这个过程咱们所了解的常规堆排序的过程是一致的,只是这里用的是四叉堆堆排序中用的是二叉堆。

    proc.checkTimers(运行检测)

    前面所提到的持续从堆顶取timer,并判断是否满足执行条件的步骤在proc.checkTimers方法中,也就是它才是timer执行的入口。此方法的上层调度可通过跟踪源码查看到,后期再详细深入探究。

    timer与robfig/cron对比

    由于参与的GO项目中有常看到另一个框架https://github.com/robfig/cron,看着源码不太多就浅浅看了下,总结出以下几点:

    1. cron是基于timer开发的,底层内部仍是使用的timer
    2. cron支持的任务最小周期为秒,timer的最小周期无限制
    3. cron中的某一任务是可能并行运行的,而timer.tick中的同一任务不会出现同时运行的情况

    比较关键的点为第3点,具体选择时看具体的应用场景

    总结

    • 在数据量不太大的情况下,四叉堆的综合性能比二叉堆更优
    • GO中time.timer和time.tick是使用四叉堆实现的
    • time.tick的任务每次运行后会重新入堆,time.timer的任务每次运行后会从堆顶删除
  • 相关阅读:
    latex:表格(包含表格标题)实例
    【Java】涉及到GUI、JAVASE、网络编程、多线程、数据库的聊天系统,非常适合大学Java课程的练手
    通过 js 给元素添加动画样式animation属性 ,以及 perspective 属性探究
    Selenium打开页面,出现弹窗需要登录账号密码,怎么解决?
    微服务拆分的思考
    【数据结构初阶】双链表
    05_不同路径2(带障碍物版)
    Servlet
    【面经】联想大数据开发面经
    【性能优化】聊聊性能优化那些事
  • 原文地址:https://blog.csdn.net/puhaiyang/article/details/133845588