• golang sync pool


    sync.Pool是内置对象池技术,可用于缓存临时对象,避免因频繁建立临时对象所带来的消耗以及对GC造成的压力

    在很多知名框架中都可以看到sync.Pool的大量使用。比如Gin中用sync.Pool来复用每个请求都会创建的gin.Context对象

    但是值得注意的是sync.Pool缓存的对象可能被无通知的清理

    基本用法

    sync.Pool在初始化时,需要用户提供对象构造函数New。用户使用Get来从对象池中获取对象,然后使用Put将对象归还对象池。

    type Item struct {
    	A int
    }
    
    func TestUsePool(t *testing.T) {
    	pool:=sync.Pool{
    		New: func() interface{} {
    			return &Item{
    				A:1,
    			}
    		},
    	}
    	i:=pool.Get().(*Item)
    	fmt.Println(i.A)
    
    	i1:=&Item{
    		A: 2,
    	}
    	pool.Put(i1)
    	i11:=pool.Get().(*Item)
    	fmt.Println(i11.A)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    底层实现

    以下基于go-1.16

    在Golang的GMP调度中,同一时间一个M(系统线程)上只能运行一个P。也就是说,从线程维度来看,在P上的逻辑都是单线程执行的。

    sync.Pool就是充分利用了GMP这一特点。对于同一个sync.Pool,每个P都有一个自己的本地对象池poollocal

    type pool struct{
      // 禁止拷贝检测方法
      noCopy noCopy
      
      // 元素类型为poolLocal的数组。储存各个P对应本地对象池
      local unsafe.Pointer // local fixed-size per-P pool
      // local数组长度
      localSize uintptr // size of the local array
      // 上一轮清理前的对象池
      victim unsafe.Pointer // local from previous cycle
      victimSize uintptr // size of victims array
      
      // 创建对象的方法
      New func() interface{}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    type poolLocal struct{
      poolLocalInternal
      pad	[128-unsafe.SizeOf(poolLocalInternal{}%128)]byte
    }
    
    type poolLocalInternal struct{
      // Get、Put操作优先存取private变量
      private interface{}
      // p本地对象池
      shared poolChain
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    poolChain实现

    // 池中的双端队列
    type poolDequeue struct {
      // 储存队列的头、尾
    	headTail uint64
      // 队列元素
    	vals []eface
    }
    
    // 链节点
    type poolChainElt struct {
    	poolDequeue
    	next, prev *poolChainElt
    }
    
    // 池链,指向头尾节点
    type poolChain struct {
    	head *poolChainElt
    	tail *poolChainElt
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    poolChain由图及代码可知是链表+ring buffer的结构。其中采用ringBuffer的理由如下

    • 预先分配内存(可能是为了在put的时候省去内存分配消耗),且分配内存项可不断复用
    • ringBuffer实质上是数组,是连续内存结构,非常利用CPU Cache。在访问poolDequeue某一项,其附近数据项都有可能统一加载到Cache Line,访问速度更快

    另一个值得注意的点是head与tail居然并不是独立两个变量。而使用一个64位变量,前32位为head,后32位为tail。

    这种打包操作是常见的lock free优化手段

    lock free是在多线程情况下访问共有内存时不阻塞彼此的编程手段

    对于poolDequeue来说,可能会被多个P同时访问,那么比如在ring buffer仅剩一个时,head-tail==1,同时访问,可能两个P都能获取到对象,而这并不符合预期。

    所以采用了CAS操作,是多个P都可能拿到对象,但只有一个P调用CAS成功

    Put

    Put方法将对象放入池中,按照以下顺序优先放入

    1. 当前P对应的本地缓存池的私有对象
    2. 当前P对应的本地缓存池的共享链表
    func (p *pool) Put(x interface{}) {
    	if x == nil {
    		return
    	}
    
    	// 获取池中当前P对应的本地缓存池
    	l,_ := p.pin()
    
    	// 优先设置private,若成功,将不会写入shared池
    	if l.private == nil {
    		l.private = x
    		x = nil
    	}
    
    	// 推入对象到当前P对应的本地缓存池共享链表
    	if x != nil {
    		l.shared.pushHead(x)
    	}
    
    	runtime_procUnpin()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    pushHead

    pushHead将对象推入链头部

    func (c *poolChain) pushHead(val any) {
      // 若链头节点为空,则初始化
    	d := c.head
    	if d == nil {
    		// Initialize the chain.
    		const initSize = 8 // Must be a power of 2
    		d = new(poolChainElt)
    		d.vals = make([]eface, initSize)
    		c.head = d
    		storePoolChainElt(&c.tail, d)
    	}
    
      // 将元素推入头节点双向队列中
    	if d.pushHead(val) {
    		return
    	}
    
      // 若当前双向队列满,则分配两倍于原队列的新队列
    	newSize := len(d.vals) * 2
    	if newSize >= dequeueLimit {
    		// Can't make it any bigger.
    		newSize = dequeueLimit
    	}
    
      // 使新队列为链头,并插入对象到新队列中
    	d2 := &poolChainElt{prev: d}
    	d2.vals = make([]eface, newSize)
    	c.head = d2
    	storePoolChainElt(&d.next, d2)
    	d2.pushHead(val)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    pin

    pin方法主要用于

    • 初始化或者重新创建local数组。当local数组为空,或者与当前runtime.GOMAXPROCS不一致,就会触发重新创建local数组以和P数量一致
    • 从当前P中取对应的本地缓存池poolLocal
    • 防止当前P被抢占。
    // pin pins the current goroutine to P, disables preemption and
    // returns poolLocal pool for the P and the P's id.
    // Caller must call runtime_procUnpin() when done with the pool.
    func (p *Pool) pin() (*poolLocal, int) {
      // 获取当前P的id,并禁止抢占
    	pid := runtime_procPin()
    
      // 若池的本地缓存池数量大于pid,说明P数量没有变化,可以直接取P所对应的本地缓存池
    	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    	l := p.local                              // load-consume
    	if uintptr(pid) < s {
        // 为什么是以pid去获取所在local的位置呢
    		return indexLocal(l, pid), pid
    	}
      
    	return p.pinSlow()
    }
    
    func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    	return (*poolLocal)(lp)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    pinSlow
    func (p *Pool) pinSlow() (*poolLocal, int) {
      // 取消P的禁止抢占,使能加上全局池锁
    	runtime_procUnpin()
      // 全局池加锁后,先再次尝试直接获取
    	allPoolsMu.Lock()
    	defer allPoolsMu.Unlock()
      
    	pid := runtime_procPin()
    	// poolCleanup won't be called while we are pinned.
    	s := p.localSize
    	l := p.local
    	if uintptr(pid) < s {
    		return indexLocal(l, pid), pid
    	}
      
      // 若池的本地池为空,添加到全局中
    	if p.local == nil {
    		allPools = append(allPools, p)
    	}
      
    	// 新建与当前P数量一致的本地缓存池,并返回当前P的本地缓存池
    	size := runtime.GOMAXPROCS(0)
    	local := make([]poolLocal, size)
    	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
    	return &local[pid], pid
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    runtime_procPin

    runtime_procPin是procPin的封装,主要是为了防止P被抢占以及返回P id

    func procPin() int {
      // 先获取当前goroutine
    	_g_ := getg()
      
      // 接着获取goroutine绑定的系统线程,并对该线程加锁
      // 加锁之后P便不会被抢占,使得不会被GC
    	mp := _g_.m
    	mp.locks++
      
      // 返回系统线程绑定的P id
    	return int(mp.p.ptr().id)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Get

    获取对象的顺序如下

    1. 当前P对应的本地缓存池的私有对象
    2. 当前P对应的本地缓存池的共享链表
    3. 其他P对应的本地缓存池的共享链表
    4. 上轮GC幸存缓存池私有对象和共享链表
    5. New方法构造
    func (p *pool) Get() interface{} {
    	l, pid := p.pin()
    
      // 优先尝试获取当前P对应的本地缓存池的私有对象
    	x := l.private
    	l.private = nil
    
    	if x == nil {
        // 接着尝试当前P对应的本地缓存池的共享链表的头节点
    		x,_ = l.shared.popHead()
    
    		// 当无法从当前p缓存池获取数据,就会尝试从其他P缓存池获取
    		// 对于其他p的poolChain会调用popTail
    		// 若其他p也没有,那就尝试从victim中取数据
    		if x == nil {
    			x = p.getSlow(pid)
    		}
    	}
    	runtime_procUnpin()
    
      // 都获取不到的情况,重新构造
    	if x == nil && p.New != nil {
    		x = p.New()
    	}
    	
    	return x
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    getSlow是先从其他P窃取,然后从victim缓存中获取

    func (p *Pool) getSlow(pid int) any {
    	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    	locals := p.local                            // load-consume
      
      // 尝试从其他P池窃取对象
    	for i := 0; i < int(size); i++ {
    		l := indexLocal(locals, (pid+i+1)%int(size))
        // 仅被允许获取其他P的尾部对象
    		if x, _ := l.shared.popTail(); x != nil {
    			return x
    		}
    	}
    
      // 若未从其他P窃取到对象,还可以从上轮GC遗留的本地池中获取
    	size = atomic.LoadUintptr(&p.victimSize)
    	if uintptr(pid) >= size {
    		return nil
    	}
    	locals = p.victim
    	l := indexLocal(locals, pid)
    	if x := l.private; x != nil {
    		l.private = nil
    		return x
    	}
    	for i := 0; i < int(size); i++ {
    		l := indexLocal(locals, (pid+i)%int(size))
    		if x, _ := l.shared.popTail(); x != nil {
    			return x
    		}
    	}
    
    	// 获取不到,说明无幸存者
    	atomic.StoreUintptr(&p.victimSize, 0)
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    poolCleanup

    poolCleanup在GC之前将pool清空,通过victim将回收拆为了两步,防止GC大量清理导致的抖动

    func init() {
    	runtime_registerPoolCleanup(poolCleanup)
    }
    
    func poolCleanup() {
    	// 清理oldPools上的幸存对象
    	for _, p := range oldPools {
    		p.victim = nil
    		p.victimSize = 0
    	}
    
    	// 迁移池本地缓存到池victim
    	for _, p := range allPools {
    		p.victim = p.local
    		p.victimSize = p.localSize
    		p.local = nil
    		p.localSize = 0
    	}
    
    	// 全局池迁移到oldPools
    	oldPools, allPools = allPools, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Ref

    1. https://www.cyhone.com/articles/think-in-sync-pool/
    2. https://www.cnblogs.com/gaochundong/p/lock_free_programming.html
    3. https://zhuanlan.zhihu.com/p/99710992
  • 相关阅读:
    最新超高效的Docker学习笔记,实战总结一步到位,“啃完”不是问题
    2D函数优化实战
    I2C 死锁原因及解决方法
    基于springboot+vue的疫情期间外出务工人员信息管理系统
    20-Java多线程1详解~
    软件方法(下)第8章Part14:不要因为偷懒或炫耀而定义组合
    SQL Server 防病毒软件配置
    WPF DataGrid详细列表手动显示与隐藏
    达梦数据库集群修改初始化级别的系统参数步骤
    吴恩达机器学习-可选实验室:可选实验:使用逻辑回归进行分类(Classification using Logistic Regression)
  • 原文地址:https://blog.csdn.net/iUcool/article/details/138189723