sync.Pool是内置对象池技术,可用于缓存临时对象,避免因频繁建立临时对象所带来的消耗以及对GC造成的压力
在很多知名框架中都可以看到sync.Pool的大量使用。比如Gin中用sync.Pool来复用每个请求都会创建的gin.Context
对象
但是值得注意的是sync.Pool缓存的对象可能被无通知的清理
sync.Pool在初始化时,需要用户提供对象构造函数New。用户使用Get来从对象池中获取对象,然后使用Put将对象归还对象池。
type Item struct {
A int
}
func TestUsePool(t *testing.T) {
pool:=sync.Pool{
New: func() interface{} {
return &Item{
A:1,
}
},
}
i:=pool.Get().(*Item)
fmt.Println(i.A)
i1:=&Item{
A: 2,
}
pool.Put(i1)
i11:=pool.Get().(*Item)
fmt.Println(i11.A)
}
以下基于go-1.16
在Golang的GMP调度中,同一时间一个M(系统线程)上只能运行一个P。也就是说,从线程维度来看,在P上的逻辑都是单线程执行的。
sync.Pool就是充分利用了GMP这一特点。对于同一个sync.Pool,每个P都有一个自己的本地对象池poollocal
type pool struct{
// 禁止拷贝检测方法
noCopy noCopy
// 元素类型为poolLocal的数组。储存各个P对应本地对象池
local unsafe.Pointer // local fixed-size per-P pool
// local数组长度
localSize uintptr // size of the local array
// 上一轮清理前的对象池
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 创建对象的方法
New func() interface{}
}
type poolLocal struct{
poolLocalInternal
pad [128-unsafe.SizeOf(poolLocalInternal{}%128)]byte
}
type poolLocalInternal struct{
// Get、Put操作优先存取private变量
private interface{}
// p本地对象池
shared poolChain
}
// 池中的双端队列
type poolDequeue struct {
// 储存队列的头、尾
headTail uint64
// 队列元素
vals []eface
}
// 链节点
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
// 池链,指向头尾节点
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
poolChain由图及代码可知是链表+ring buffer的结构。其中采用ringBuffer的理由如下
另一个值得注意的点是head与tail居然并不是独立两个变量。而使用一个64位变量,前32位为head,后32位为tail。
这种打包操作是常见的lock free优化手段
lock free是在多线程情况下访问共有内存时不阻塞彼此的编程手段
对于poolDequeue来说,可能会被多个P同时访问,那么比如在ring buffer仅剩一个时,head-tail==1,同时访问,可能两个P都能获取到对象,而这并不符合预期。
所以采用了CAS操作,是多个P都可能拿到对象,但只有一个P调用CAS成功
Put方法将对象放入池中,按照以下顺序优先放入
func (p *pool) Put(x interface{}) {
if x == nil {
return
}
// 获取池中当前P对应的本地缓存池
l,_ := p.pin()
// 优先设置private,若成功,将不会写入shared池
if l.private == nil {
l.private = x
x = nil
}
// 推入对象到当前P对应的本地缓存池共享链表
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}
pushHead将对象推入链头部
func (c *poolChain) pushHead(val any) {
// 若链头节点为空,则初始化
d := c.head
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将元素推入头节点双向队列中
if d.pushHead(val) {
return
}
// 若当前双向队列满,则分配两倍于原队列的新队列
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 使新队列为链头,并插入对象到新队列中
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
pin方法主要用于
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
// 获取当前P的id,并禁止抢占
pid := runtime_procPin()
// 若池的本地缓存池数量大于pid,说明P数量没有变化,可以直接取P所对应的本地缓存池
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
// 为什么是以pid去获取所在local的位置呢
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
func (p *Pool) pinSlow() (*poolLocal, int) {
// 取消P的禁止抢占,使能加上全局池锁
runtime_procUnpin()
// 全局池加锁后,先再次尝试直接获取
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 若池的本地池为空,添加到全局中
if p.local == nil {
allPools = append(allPools, p)
}
// 新建与当前P数量一致的本地缓存池,并返回当前P的本地缓存池
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
runtime_procPin是procPin的封装,主要是为了防止P被抢占以及返回P id
func procPin() int {
// 先获取当前goroutine
_g_ := getg()
// 接着获取goroutine绑定的系统线程,并对该线程加锁
// 加锁之后P便不会被抢占,使得不会被GC
mp := _g_.m
mp.locks++
// 返回系统线程绑定的P id
return int(mp.p.ptr().id)
}
获取对象的顺序如下
func (p *pool) Get() interface{} {
l, pid := p.pin()
// 优先尝试获取当前P对应的本地缓存池的私有对象
x := l.private
l.private = nil
if x == nil {
// 接着尝试当前P对应的本地缓存池的共享链表的头节点
x,_ = l.shared.popHead()
// 当无法从当前p缓存池获取数据,就会尝试从其他P缓存池获取
// 对于其他p的poolChain会调用popTail
// 若其他p也没有,那就尝试从victim中取数据
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
// 都获取不到的情况,重新构造
if x == nil && p.New != nil {
x = p.New()
}
return x
}
getSlow是先从其他P窃取,然后从victim缓存中获取
func (p *Pool) getSlow(pid int) any {
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他P池窃取对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
// 仅被允许获取其他P的尾部对象
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 若未从其他P窃取到对象,还可以从上轮GC遗留的本地池中获取
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 获取不到,说明无幸存者
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
poolCleanup在GC之前将pool清空,通过victim将回收拆为了两步,防止GC大量清理导致的抖动
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {
// 清理oldPools上的幸存对象
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 迁移池本地缓存到池victim
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 全局池迁移到oldPools
oldPools, allPools = allPools, nil
}