• Golang 协程池 Ants 实现原理,附详细的图文说明和代码


    Golang 协程池 Ants 实现原理,附详细的图文说明和代码。

    1 前置知识点
    1.1 sync.Locker
    sync.Locker 是 go 标准库 sync 下定义的锁接口:

    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
        Lock()
        Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    任何实现了 Lock 和 Unlock 两个方法的类,都可以作为一种锁的实现,最常见的为 go 标准库实现的 sync.Mutex.

    在 ants 中,作者不希望使用 Mutex 这种重锁,而是自定义实现了一种轻量级的自旋锁:

    在这里插入图片描述
    该锁实现原理:

    • 通过一个整型状态值标识锁的状态:0-未加锁;1-加锁;

    • 加锁成功时,即把 0 改为 1;解锁时则把 1 改为 0;改写过程均通过 atomic 包保证并发安全;

    • 加锁通过 for 循环 + cas 操作实现自旋,无需操作系统介入执行 park 操作;

    • 通过变量 backoff 反映抢锁激烈度,每次抢锁失败,执行 backoff 次让 cpu 时间片动作;backoff 随失败次数逐渐升级,封顶 16.

    type spinLock uint32
    const maxBackoff = 16
    
    func (sl *spinLock) Lock() {
        backoff := 1
        for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
            for i := 0; i < backoff; i++ {
                runtime.Gosched()
            }
            if backoff < maxBackoff {
                backoff <<= 1
            }
        }
    }
    
    func (sl *spinLock) Unlock() {
        atomic.StoreUint32((*uint32)(sl), 0)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    1.2 sync.Cond
    sync.Cond 是 golang 标准库提供的并发协调器,用于实现在指定条件下阻塞和唤醒协程的操

    在这里插入图片描述
    1.2.1 数据结构与构造器方法

    type Cond struct {
        noCopy noCopy
    
        // L is held while observing or changing the condition
        L Locker
    
        notify  notifyList
        checker copyChecker
    }
    
    // NewCond returns a new Cond with Locker l.
    func NewCond(l Locker) *Cond {
        return &Cond{L:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    • 成员变量 noCopy + checker 是一套组合拳,保证 Cond 在第一次使用后不允许被复制;

    • 核心变量 L,一把锁,用于实现阻塞操作;

    • 核心变量 notify,阻塞链表,分别存储了调用 Cond.Wait() 方法的次数、goroutine 被唤醒的次数、一把系统运行时的互斥锁以及链表的头尾节点.

    type notifyList struct {
        wait   uint32
        notify uint32
        lock   uintptr // key field of the mutex
        head   unsafe.Pointer
        tail   unsafe.Pointer
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.2.2 Cond.Wait

    func (c *Cond) Wait() {
        c.checker.check()
        t := runtime_notifyListAdd(&c.notify)
        c.L.Unlock()
        runtime_notifyListWait(&c.notify, t)
        c.L.Lock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    • 检查 Cond 是否在使用过后被拷贝,是则 panic;

    • 该 Cond 阻塞链表 wait 统计数加 1;

    • 当前协程释放锁,因为接下来即将被操作系统 park;

    • 将当前协程包装成节点,添加到 Cond 的阻塞队列当中,并调用 park 操作将当前协程挂起;

    • 协程被唤醒后,重新尝试获取锁.

    1.2.3 Cond.Signal

    func (c *Cond) Signal() {
        c.checker.check()
        runtime_notifyListNotifyOne(&c.notify)
    }
    
    • 1
    • 2
    • 3
    • 4

    • 检查 Cond 是否在首次使用后被拷贝,是则 panic;

    • 该 Cond 阻塞链表 notify 统计数加 1;

    • 从头开始遍历阻塞链表,唤醒一个等待时间最长的 goroutine.

    1.2.4 Cond.BroadCast

    func (c *Cond) Broadcast() {
        c.checker.check()
        runtime_notifyListNotifyAll(&c.notify)
    }
    
    • 1
    • 2
    • 3
    • 4

    • 检查 Cond 是否在首次使用后被拷贝,是则 panic;

    • 取 wait 值赋值给 notify;

    • 唤醒阻塞链表所有节点.

    1.3 sync.Pool
    sync.Pool 是 golang 标准库下并发安全的对象池,适合用于有大量对象资源会存在被反复构造和回收的场景,可缓存资源进行复用,以提高性能并减轻 GC 压力.

    1.3.1 gmp 原理简述

    g:goroutine;

    m:类比内核线程;

    p:调取器,通常 p 的数量等于 cpu 核数.

    • p 为中枢,m 通过与 p 的结合,调度 g;

    • p 有本地 g 队列和全局 g 队列,前者取 g 不加锁,后者加锁;

    • 抢占式调度,g 因为阻塞或者时间片耗尽,可能回到等待队列,最终前后可能被不同的 g 和 m 执行.

    在这里插入图片描述
    1.3.2 数据结构

    type Pool struct {
        noCopy noCopy
    
        local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
        localSize uintptr        // size of the local array
    
        victim     unsafe.Pointer // local from previous cycle
        victimSize uintptr        // size of victims array
    
        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    • noCopy 防拷贝标志;

    • local 类型为 [P]poolLocal 的数组,数组容量 P 为 goroutine 处理器 P 的个数;

    • victim 为经过一轮 gc 回收,暂存的上一轮 local;

    • New 为用户指定的工厂函数,当 Pool 内存量元素不足时,会调用该函数构造新的元素.

    type poolLocal struct {
        poolLocalInternal
    }
    
    // Local per-P Pool appendix.
    type poolLocalInternal struct {
        private any       // Can be used only by the respective P.
        shared  poolChain // Local P can pushHead/popHead; any P can popTail.
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    • poolLocal 为 Pool 中对应于某个 P 的缓存数据;

    • poolLocalInternal.private:对应于某个 P 的私有元素,操作时无需加锁;

    • poolLocalInternal.shared: 某个 P 下的共享元素链表,由于各 P 都有可能访问,因此需要加锁.

    1.3.3 核心方法

    I Pool.pin

    func (p *Pool) pin() (*poolLocal, int) {
        pid := runtime_procPin()
        s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
        l := p.local                              // load-consume
        if uintptr(pid) < s {
            return indexLocal(l, pid), pid
        }
        return p.pinSlow()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    • pin 方法内部通过 native 方法 runtime_procPin 取出当前 P 的 index,并且将当前 goroutine 与 P 进行绑定,短暂处于不可抢占状态;

    • 如果是首次调用 pin 方法,则会走进 pinSlow 方法;

    • 在pinSlow 方法中,会完成 Pool.local 的初始化,并且将当前 Pool 添加到全局的 allPool 数组中,用于 gc 回收;

    II Pool.Get

    在这里插入图片描述

    func (p *Pool) Get() any {
        l, pid := p.pin()
        x := l.private
        l.private = nil
        if x == nil {
            x, _ = l.shared.popHead()
            if x == nil {
                x = p.getSlow(pid)
            }
        }
        runtime_procUnpin()
        if x == nil && p.New != nil {
            x = p.New()
        }
        return x
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    • 调用 Pool.pin 方法,绑定当前 goroutine 与 P,并且取得该 P 对应的缓存数据;

    • 尝试获取 P 缓存数据的私有元素 private;

    • 倘若前一步失败,则尝试取 P 缓存数据中共享元素链表的头元素;

    • 倘若前一步失败,则走入 Pool.getSlow 方法,尝试取其他 P 缓存数据中共享元素链表的尾元素;

    • 同样在 Pool.getSlow 方法中,倘若前一步失败,则尝试从上轮 gc 前缓存中取元素(victim);

    • 调用 native 方法解绑 当前 goroutine 与 P

    • 倘若(2)-(5)步均取值失败,调用用户的工厂方法,进行元素构造并返回.

    III Put

    // Put adds x to the pool.
    func (p *Pool) Put(x any) {
        if x == nil {
            return
        }
        l, _ := p.pin()
        if l.private == nil {
            l.private = x
        } else {
            l.shared.pushHead(x)
        }
        runtime_procUnpin()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    • 判断存入元素 x 非空;

    • 调用 Pool.pin 绑定当前 goroutine 与 P,并获取 P 的缓存数据;

    • 倘若 P 缓存数据中的私有元素为空,则将 x 置为其私有元素;

    • 倘若未走入(3)分支,则将 x 添加到 P 缓存数据共享链表的末尾;

    • 解绑当前 goroutine 与 P.

    1.3.4 回收机制

    存入 pool 的对象会不定期被 go 运行时回收,因此 pool 没有容量概念,即便大量存入元素,也不会发生内存泄露.

    具体回收时机是在 gc 时执行的:

    func init() {
        runtime_registerPoolCleanup(poolCleanup)
    }
    
    func poolCleanup() {
        for _, p := range oldPools {
            p.victim = nil
            p.victimSize = 0
        }
    
        for _, p := range allPools {
            p.victim = p.local
            p.victimSize = p.localSize
            p.local = nil
            p.localSize = 0
        }
    
        oldPools, allPools = allPools, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    • 每个 Pool 首次执行 Get 方法时,会在内部首次调用 pinSlow 方法内将该 pool 添加到迁居的 allPools 数组中;

    • 每次 gc 时,会将上一轮的 oldPools 清空,并将本轮 allPools 的元素赋给 oldPools,allPools 置空;

    • 新置入 oldPools 的元素统一将 local 转移到 victim,并且将 local 置为空.

    综上可以得见,最多两轮 gc,pool 内的对象资源将会全被回收.


    2 Ants

    2.1 基本信息
    ant 源码:https://github.com/panjf2000/ants

    2.2 为什么用协程池?
    • 提升性能:主要面向一类场景:大批量轻量级并发任务,任务执行成本与协程创建/销毁成本量级接近;

    • 有一个并发资源控制的概念:研发能够明确系统全局并发度以及各个模块的并发度上限;

    • 协程生命周期控制:实时查看当前全局并发的协程数量;有一个统一的紧急入口释放全局协程.

    2.3 核心数据结构

    在这里插入图片描述
    2.3.1 goWorker

    type goWorker struct {
        pool *Pool
        task chan func()
        recycleTime time.Time
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    goWorker 可以简单理解为一个长时间运行而不回收的协程,用于反复处理用户提交的异步任务,其核心字段包含:

    • pool:goWorker 所属的协程池;

    • task:goWorker 用于接收异步任务包的管道;

    • recycleTime:goWorker 回收到协程池的时间.

    2.3.2 Pool

    type Pool struct {
        capacity int32
        running int32
        lock sync.Locker
        workers workerArray
        state int32
        cond *sync.Cond
        workerCache sync.Pool
        waiting int32
        heartbeatDone int32
        stopHeartbeat context.CancelFunc
        options *Options
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Pool 就是所谓的协程池,其包含的成员字段如下:

    • capacity:池子的容量

    • running:出于运行中的协程数量

    • lock:自制的自旋锁,保证取 goWorker 时并发安全

    • workers:goWorker 列表,即“真正意义上的协程池”

    • state:池子状态标识,0-打开;1-关闭

    • cond:并发协调器,用于阻塞模式下,挂起和唤醒等待资源的协程

    • workerCache:存放 goWorker 的对象池,用于缓存释放的 goworker 资源用于复用. 对象池需要区别于协程池,协程池中的 goWorker 仍存活,进入对象池的 goWorker 严格意义上已经销毁;

    • waiting:标识出于等待状态的协程数量;

    • heartbeatDone:标识回收协程是否关闭;

    • stopHeartbeat:用于关闭回收协程的控制器函数;

    • options:一些定制化的配置.

    2.3.3 options

    type Options struct {
        DisablePurge bool
        ExpiryDuration time.Duration
        MaxBlockingTasks int
        Nonblocking bool
        PanicHandler func(interface{})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    协程池定制化参数集合,包含配置项如下:

    • DisablePurge:是否允许回收空闲 goWorker;

    • ExpiryDuration: 空闲 goWorker 回收时间间隔;仅当 DisablePurge 为 false 时有效;

    • Nonblocking:是否设置为非阻塞模式,若是,goWorker 不足时不等待,直接返回 err;

    • MaxBlockingTasks:阻塞模式下,最多阻塞等待的协程数量;仅当 Nonblocking 为 false 时有效;

    • PanicHandler:提交任务发生 panic 时的处理逻辑;

    2.3.4 workerArray

    type workerArray interface {
        len() int
        isEmpty() bool
        insert(worker *goWorker) error
        detach() *goWorker
        retrieveExpiry(duration time.Duration) []*goWorker
        reset()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    • workerArray 是一个 interface,其实现包含 stack 栈版本和 queue 队列包含;

    • 该 interface 主要定义了作为数据集合的几个通用 api,以及用于回收过期 goWorker 的 api.

    此处仅展示基于 stack 数据结构实现的 goWorker 列表:

    type workerStack struct {
        items  []*goWorker
        expiry []*goWorker
    }
    
    func newWorkerStack(size int) *workerStack {
        return &workerStack{
            items: make([]*goWorker, 0, size),
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    • items:存放的 goWorker 列表;

    • expire:用于临时存放已过期的 goWorker 集合;

    下面几个方法,是 workerStack 作为一个栈的数据结构所提供的一些能力,核心方法为 insert 和 detach 分别为对栈插入或者取出一个 goWorker.

    func (wq *workerStack) len() int {
        return len(wq.items)
    }
    
    func (wq *workerStack) isEmpty() bool {
        return len(wq.items) == 0
    }
    
    func (wq *workerStack) insert(worker *goWorker) error {
        wq.items = append(wq.items, worker)
        return nil
    }
    
    func (wq *workerStack) detach() *goWorker {
        l := wq.len()
        if l == 0 {
            return nil
        }
    
        w := wq.items[l-1]
        wq.items[l-1] = nil // avoid memory leaks
        wq.items = wq.items[:l-1]
        return w
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    下述的 retrieveExpire 方法是从 workerStack 中获取到已经过期的 goWorker 集合;其中 goWorker 的回收时间与入栈先后顺序相关,因此可以借助 binarySearch 方法基于二分法快速获取到目标集合.

    func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
        n := wq.len()
        if n == 0 {
            return nil
        }
    
        expiryTime := time.Now().Add(-duration)
        index := wq.binarySearch(0, n-1, expiryTime)
    
        wq.expiry = wq.expiry[:0]
        if index != -1 {
            wq.expiry = append(wq.expiry, wq.items[:index+1]...)
            m := copy(wq.items, wq.items[index+1:])
            for i := m; i < n; i++ {
                wq.items[i] = nil
            }
            wq.items = wq.items[:m]
        }
        return wq.expiry
    }
    
    func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
        var mid int
        for l <= r {
            mid = (l + r) / 2
            if expiryTime.Before(wq.items[mid].recycleTime) {
                r = mid - 1
            } else {
                l = mid + 1
            }
        }
        return 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2.4 核心 api
    2.4.1 pool 构造器方法

    func NewPool(size int, options ...Option) (*Pool, error) {
        opts := loadOptions(options...)
        // 读取用户配置,做一些前置校验,默认值赋值等前处理动作...
    
        p := &Pool{
            capacity: int32(size),
            lock:     internal.NewSpinLock(),
            options:  opts,
        }
        p.workerCache.New = func() interface{} {
            return &goWorker{
                pool: p,
                task: make(chan func(), workerChanCap),
            }
        }
    
        p.workers = newWorkerArray(stackType, 0)
        p.cond = sync.NewCond(p.lock)
    
        var ctx context.Context
        ctx, p.stopHeartbeat = context.WithCancel(context.Background())
        go p.purgePeriodically(ctx)
        return p, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    • 读取用户传的配置参数,做一些校验和默认赋值的前处理动作;

    • 构造好 Pool 数据结构;

    • 构造好 goWorker 对象池 workerCache,声明好工厂函数;

    • 构造好 Pool 内部的 goWorker 列表;

    • 构造好 Pool 的并发协调器 cond;

    • 异步启动 goWorker 过期销毁协程.

    2.4.2 pool 提交任务

    func (p *Pool) Submit(task func()) error {
        var w *goWorker
        if w = p.retrieveWorker(); w == nil {
            return ErrPoolOverload
        }
        w.task <- task
        return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    • 从 Pool 中取出一个可用的 goWorker;

    • 将用户提交的任务包添加到 goWorker 的 channel 中.

    func (p *Pool) retrieveWorker() (w *goWorker) {
        spawnWorker := func() {
            w = p.workerCache.Get().(*goWorker)
            w.run()
        }
    
        p.lock.Lock()
    
        w = p.workers.detach()
        if w != nil { 
            p.lock.Unlock()
        } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
            p.lock.Unlock()
            spawnWorker()
        } else { 
            if p.options.Nonblocking {
                p.lock.Unlock()
                return
            }
        retry:
            if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
                p.lock.Unlock()
                return
            }
            p.addWaiting(1)
            p.cond.Wait() // block and wait for an available worker
            p.addWaiting(-1)
    
            var nw int
            if nw = p.Running(); nw == 0 { // awakened by the scavenger
                p.lock.Unlock()
                spawnWorker()
                return
            }
            if w = p.workers.detach(); w == nil {
                if nw < p.Cap() {
                    p.lock.Unlock()
                    spawnWorker()
                    return
                }
                goto retry
            }
            p.lock.Unlock()
        }
        return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    • 声明了一个构造 goWorker 的函数 spawnWorker 用于兜底,内部实际上是从对象池 workerCache 中获取 goWorker;

    • 接下来的核心逻辑就是加锁,然后尝试从池子中取出 goWorker 执行任务;

    • 倘若池子容量超限,且池子为阻塞模式,则基于并发协调器 cond 挂起协程阻塞等待;

    • 倘若池子容量超限,且池子为非阻塞模式,直接抛回错误;

    • 倘若池子容量未超限,且未取到 goWorker,调用 spawnWorker 构造新的 goWorker 用于执行任务.

    2.4.3 goWorker 运行

    func (w *goWorker) run() {
        w.pool.addRunning(1)
        go func() {
            defer func() {
                w.pool.addRunning(-1)
                w.pool.workerCache.Put(w)
                if p := recover(); p != nil {
                    // panic 后处理
                }
                w.pool.cond.Signal()
            }()
    
            for f := range w.task {
                if f == nil {
                    return
                }
                f()
                if ok := w.pool.revertWorker(w); !ok {
                    return
                }
            }
        }()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    • 循环 + 阻塞等待,直到获取到用户提交的异步任务包 task 并执行;

    • 执行完成 task 后,会将自己交还给协程池;

    • 倘若回归协程池失败,或者用户提交了一个空的任务包,则该 goWorker 会被销毁,销毁方式是将自身放回协程池的对象池 workerCache. 并且会调用协调器 cond 唤醒一个阻塞等待的协程.

    2.4.4 pool 回收协程

    // revertWorker puts a worker back into free pool, recycling the goroutines.
    func (p *Pool) revertWorker(worker *goWorker) bool {
        worker.recycleTime = time.Now()
        p.lock.Lock()
        err := p.workers.insert(worker)
        if err != nil {
            p.lock.Unlock()
            return false
        }
    
        p.cond.Signal()
        p.lock.Unlock()
        return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Pool.revertWorker 方法用于回收 goWorker 回到协程池:

    • 回收时更新 goWorker 小虎时间,用于 goWorker 的定期销毁;

    • 加锁后,将 goWorker 添加回协程池;

    • 通过协调器 cond 唤醒下一个阻塞等待的协程,并解锁.

    2.4.5 定期回收过期 goWorker

    func (p *Pool) purgePeriodically(ctx context.Context) {
        heartbeat := time.NewTicker(p.options.ExpiryDuration)
        defer func() {
            heartbeat.Stop()
            atomic.StoreInt32(&p.heartbeatDone, 1)
        }()
    
        for {
            select {
            case <-heartbeat.C:
            case <-ctx.Done():
                return
            }
    
            if p.IsClosed() {
                break
            }
    
            p.lock.Lock()
            expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
            p.lock.Unlock()
    
            for i := range expiredWorkers {
                expiredWorkers[i].task <- nil
                expiredWorkers[i] = nil
            }
    
            if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) {
                p.cond.Broadcast()
            }
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    • purgePeriodically 方法开启了一个 ticker,按照用户预设的过期时间间隔轮询销毁过期的 goWorker;

    • 销毁的方式是往对应 goWorker 的 channel 中注入一个空值,goWorker 将会自动将自身放回协程池的对象池 workerCache 当中;

    • 倘若当前存在空闲的 goWorker 且有协程阻塞等待,会唤醒所有阻塞协程.

  • 相关阅读:
    VBA驱动SAP GUI自动化:查找页面元素FindAllByName
    Pytorch:神经网络过程代码详解
    elasticsearch 简单使用【php版本】
    一道题让你秒懂Java中静态代码块、构造代码块、构造方法、普通代码块、main函数的执行顺序
    ⭐北邮复试刷题LCR 012. 寻找数组的中心下标__前缀和思想 (力扣119经典题变种挑战)
    万字详解-MindArmour 小白教程!
    DEX 的现状和去中心化交易的未来
    Oracle与Mysql语法区别
    Django 实现连续请求
    GJB 5000B-2021下载-见文章结尾
  • 原文地址:https://blog.csdn.net/u014374009/article/details/133232094