• sync.pool 源码学习


    sync.pool 的作用

    sync.pool可以认为是一个池子,里面保存着一些临时的对象。有了sync.pool可以解决对于重复分配以及回收内存而造成的cpu压力的问题。频繁的分配,回收内存会给GC带来一定的负担,严重的时候会引起CPU毛刺。sync.pool可以将不用的对象存放到池子里,下次使用的时候直接用,不需要重新分配内存。
    场景

    当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大。形成 “并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环。

    在这个时候,需要有⼀个对象池,每个 goroutine 不再⾃⼰单独创建对象,⽽是从对象池中获取出⼀个对象(如果池中已经有的话)。

    用法:

    package main
    import (
    	"fmt"
    	"sync"
    )
    
    var pool *sync.Pool
    
    type Person struct {
    	Name string
    }
    
    func initPool() {
    	pool = &sync.Pool {
    		New: func()interface{} {
    			fmt.Println("Creating a new Person")
    			return new(Person)
    		},
    	}
    }
    
    func main() {
    	initPool()
    
    	p := pool.Get().(*Person)
    	fmt.Println("首次从 pool 里获取:", p)
    
    	p.Name = "first"
    	fmt.Printf("设置 p.Name = %s\n", p.Name)
    
    	pool.Put(p)
    
    	fmt.Println("Pool 里已有一个对象:&{first},调用 Get: ", pool.Get().(*Person))
    	fmt.Println("Pool 没有对象了,调用 Get: ", pool.Get().(*Person))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    output

    Creating a new Person
    首次从 pool 里获取: &{}
    设置 p.Name = first
    Pool 里已有一个对象:&{first},Get:  &{first}
    Creating a new Person
    Pool 没有对象了,Get:  &{}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意:在做put操作归还对象的时候,需要将对象的字段手动清除

    • Gin 框架中对sync.pool的使用
    engine.pool.New = func() interface{} {
    	return engine.allocateContext()
    }
    
    func (engine *Engine) allocateContext() *Context {
    	return &Context{engine: engine, KeysMutex: &sync.RWMutex{}}
    }
    
    // ServeHTTP conforms to the http.Handler interface.
    func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    	c := engine.pool.Get().(*Context)
    	c.writermem.reset(w)
    	c.Request = req
    	c.reset()
    
    	engine.handleHTTPRequest(c)
    
    	engine.pool.Put(c)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    ServeHTTP函数中对context的使用:先用get从池子中获取一个c,然后调用handleHTTPRequest,用完之后在put回去

    源码分析

    type Pool struct {
    	// noCopy用于防止Pool被复制,Pool不希望被复制
    	noCopy noCopy
    	
    	// 每个 P 的本地队列,实际类型为 [P]poolLocal
    	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    	//  [P]poolLocal的大小
    	localSize uintptr        // size of the local array
    
    	victim     unsafe.Pointer // local from previous cycle
    	victimSize uintptr        // size of victims array
    
    	// New optionally specifies a function to generate
    	// a value when Get would otherwise return nil.
    	// It may not be changed concurrently with calls to Get.
    	New func() any
    }
    
    func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    	return (*poolLocal)(lp)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这里可能会比较困惑local一个指针,怎么来表示一个slice呢?其实有了指向初始地址的指针,再加上大小,就可以根据运算将指针指到指定位置。local+poolLocal{}
    *n,这其实也算一种技巧,可以学习下。

    type poolLocal struct {
    	poolLocalInternal
    
    	// Prevents false sharing on widespread platforms with
    	// 128 mod (cache line size) = 0 .
    	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    每个poolLocal结构体中有一个poolLocalInternal结构体以及一个pad,pad的作用是补位,有了pad可以保证每个poolLocal刚好占满了一个cache line。这样数据加载到cache的时候刚好把一个poolLocal加载到cache中。程序即使只想读内存中的 1 个字节数据,也要同时把附近 63 节字加载到 cache 中,如果读取超个 64 字节,那么就要加载到多个 cache line 中。

    // any is an alias for interface{} and is equivalent to interface{} in all ways.
    type any = interface{}
    
    // Local per-P Pool appendix.
    type poolLocalInternal struct {
    	//	p的私有缓冲区
    	private any       // Can be used only by the respective P.
    	// 公共缓冲区,本地p可以pushHead/popHead,其他P只能popTail
    	shared  poolChain // Local P can pushHead/popHead; any P can popTail.
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里将空的interface{}定义成any,可以借鉴下。

    type poolChain struct {
    	// head is the poolDequeue to push to. This is only accessed
    	// by the producer, so doesn't need to be synchronized.
    	head *poolChainElt
    
    	// tail is the poolDequeue to popTail from. This is accessed
    	// by consumers, so reads and writes must be atomic.
    	tail *poolChainElt
    }
    
    type poolChainElt struct {
    	poolDequeue
    
    	// next and prev link to the adjacent poolChainElts in this
    	// poolChain.
    	//
    	// next is written atomically by the producer and read
    	// atomically by the consumer. It only transitions from nil to
    	// non-nil.
    	//
    	// prev is written atomically by the consumer and read
    	// atomically by the producer. It only transitions from
    	// non-nil to nil.
    	next, prev *poolChainElt
    }
    
    type poolDequeue struct {
    	// headTail packs together a 32-bit head index and a 32-bit
    	// tail index. Both are indexes into vals modulo len(vals)-1.
    	//
    	// tail = index of oldest data in queue
    	// head = index of next slot to fill
    	//
    	// Slots in the range [tail, head) are owned by consumers.
    	// A consumer continues to own a slot outside this range until
    	// it nils the slot, at which point ownership passes to the
    	// producer.
    	//
    	// The head index is stored in the most-significant bits so
    	// that we can atomically add to it and the overflow is
    	// harmless.
    	headTail uint64
    
    	// vals is a ring buffer of interface{} values stored in this
    	// dequeue. The size of this must be a power of 2.
    	//
    	// vals[i].typ is nil if the slot is empty and non-nil
    	// otherwise. A slot is still in use until *both* the tail
    	// index has moved beyond it and typ has been set to nil. This
    	// is set to nil atomically by the consumer and read
    	// atomically by the producer.
    	vals []eface
    }
    
    type eface struct {
    	typ, val unsafe.Pointer
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    其中headTail是一个uint64类型,他实际是有32位的head指针以及32位的tail指针拼在一起的一个字段。
    poolDequeue 被实现为单生产者、多消费者的固定大小的无锁(atomic 实现) 环形队列(底层存储使用数组,使用两个指针标记 head、tail)。生产者可以从 head 插入、head 删除,而消费者仅可从 tail 删除。

    首先来看Get操作

    GET

    func (p *Pool) Get() any {
    	...
    	// 首先调用p.pin获取将当前groutine和P绑定,禁止被抢占,返回当前P对应的poolLocal,以及P的id
    	l, pid := p.pin()
    	// 获取poolLocal的private字段,并将private设置为nil
    	x := l.private
    	l.private = nil
    	// 如果x为nil,就从shared中pophead一个x出来
    	// 如果pop出来的还是nil,就调用getSlow试着从其他P的shared双端队列尾部偷一个对象出来
    	if x == nil {
    		// Try to pop the head of the local shard. We prefer
    		// the head over the tail for temporal locality of
    		// reuse.
    		x, _ = l.shared.popHead()
    		if x == nil {
    			x = p.getSlow(pid)
    		}
    	}
    	// 接触非抢占
    	runtime_procUnpin()
    	...
    	// 经过上面的尝试之后还是获取不到对象,就直接new一个
    	if x == nil && p.New != nil {
    		x = p.New()
    	}
    	return x
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    // pin() 将groutine与P绑定,并返回P的id
    // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消抢占
    func (p *Pool) pin() (*poolLocal, int) {
    	pid := runtime_procPin()
    	s := atomic.LoadUintptr(&p.localSize) // load-acquire
    	l := p.local                          // load-consume
    	// 因为可能存在动态的 P(运行时调整 P 的个数)
    	if uintptr(pid) < s {
    		return indexLocal(l, pid), pid
    	}
    	return p.pinSlow()
    }
    
    func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    	return (*poolLocal)(lp)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    首先要调用proPin禁止抢占P,如果P被抢占,那么groutine会被返回p的本地队列或者全局P队列,下次goutinue与P绑定的时候可能已经不是之前的P,会导致P的id发生变化,所以要先禁止抢占。
    原子操作取出 p.localSize 和 p.local,如果当前 pid 小于 p.localSize,则直接取 poolLocal 数组中的 pid 索引处的元素。否则,说明 Pool 还没有创建 poolLocal,调用 p.pinSlow() 完成创建工作。

    func (p *Pool) pinSlow() (*poolLocal, int) {
    	// Retry under the mutex.
    	// Can not lock the mutex while pinned.
    	runtime_procUnpin()
    	// 加锁
    	allPoolsMu.Lock()
    	defer allPoolsMu.Unlock()
    	pid := runtime_procPin()
    	// 已经枷锁了不需要在做原子操作
    	s := p.localSize
    	l := p.local
    	// 当执行到这里的时候有可能其他groutine已经做了执行,所以要再判断下
    	if uintptr(pid) < s {
    		return indexLocal(l, pid), pid
    	}
    	if p.local == nil {
    		allPools = append(allPools, p)
    	}
    	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    	// 获取P的个数
    	size := runtime.GOMAXPROCS(0)
    	// 分配size个数的slice
    	local := make([]poolLocal, size)
    	// 用原子操作做set
    	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
    	return &local[pid], pid
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    接下来看poolChain.popHead()

    func (c *poolChain) popHead() (interface{}, bool) {
    	d := c.head
    	for d != nil {
    		// 此处调用的是poolDequeue的pophead
    		if val, ok := d.popHead(); ok {
    			return val, ok
    		}
    		// There may still be unconsumed elements in the
    		// previous dequeue, so try backing up.
    		d = loadPoolChainElt(&d.prev)
    	}
    	return nil, false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    poolDequeue.popHead()

    func (d *poolDequeue) popHead() (interface{}, bool) {
    	var slot *eface
    	for {
    		ptrs := atomic.LoadUint64(&d.headTail)
    		// unpack命名可以学习一下
    		head, tail := d.unpack(ptrs)
    		// 判断队列是否为空
    		if tail == head {
    			// Queue is empty.
    			return nil, false
    		}
    
    		// head 位置是队头的前一个位置,所以此处要先退一位。
    		// 在读出 slot 的 value 之前就把 head 值减 1,取消对这个 slot 的控制
    		head--
    		ptrs2 := d.pack(head, tail)
    		// 调用CompareAndSwapUint64比较headTail的值,判断是否旧值=ptrs,如果相等,那么就做交换,赋新值ptrs2
    		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
    			// We successfully took back slot.
    			slot = &d.vals[head&uint32(len(d.vals)-1)]
    			break
    		}
    	}
    
        // 取出 val
    	val := *(*interface{})(unsafe.Pointer(slot))
    	if val == dequeueNil(nil) {
    		val = nil
    	}
    	
    	// 重置 slot,typ 和 val 均为 nil
    	// 这里清空的方式与 popTail 不同,与 pushHead 没有竞争关系,所以不用太小心
    	*slot = eface{}
    	return val, true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    PUT

    // Put 将 x 放回到池中
    // Put adds x to the pool.
    func (p *Pool) Put(x any) {
    	if x == nil {
    		return
    	}
    
    	l, _ := p.pin()
    	if l.private == nil {
    		l.private = x
    	} else {
    		l.shared.pushHead(x)
    	}
    	runtime_procUnpin()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    put操作比较简单,第一步都是先将groutine与P绑定,然后在优先放入p的private中,如果已经有值了就放入shared中

    ##GC
    在pool中,在gc的时候会清理未使用的对象,所以pool不适合做连接池。
    在init函数中,实现了gc发生时,如何清理pool的函数

    func init() {
    	runtime_registerPoolCleanup(poolCleanup)
    }
    
    • 1
    • 2
    • 3

    主要逻辑是poolCleanup函数

    func poolCleanup() {
    	for _, p := range oldPools {
    		p.victim = nil
    		p.victimSize = 0
    	}
    
    	// Move primary cache to victim cache.
    	for _, p := range allPools {
    		p.victim = p.local
    		p.victimSize = p.localSize
    		p.local = nil
    		p.localSize = 0
    	}
    
    	oldPools, allPools = allPools, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    主要逻辑就是将local的值存到victim中,这样不至于让gc把所有的pool都清空了,有victim在兜底
    整个gc的过程:

    1. 初始状态下,oldPools 和 allPools 均为 nil。
    2. 第 1 次调用 Get,由于 p.local 为 nil,将会在 pinSlow 中创建 p.local,然后将 p 放入 allPools,此时 allPools 长度为 1,oldPools 为 nil。
    3. 对象使用完毕,第 1 次调用 Put 放回对象。
    4. 第 1 次GC STW 阶段,allPools 中所有 p.local 将值赋值给 victim 并置为 nil。allPools 赋值给 oldPools,最后 allPools 为 nil,oldPools 长度为 1。
    5. 第 2 次调用 Get,由于 p.local 为 nil,此时会从 p.victim 里面尝试取对象。
    6. 对象使用完毕,第 2 次调用 Put 放回对象,但由于 p.local 为 nil,重新创建 p.local,并将对象放回,此时 allPools 长度为 1,oldPools 长度为 1。
    7. 第 2 次 GC STW 阶段,oldPools 中所有 p.victim 置 nil,前一次的 cache 在本次 GC 时被回收,allPools 所有 p.local 将值赋值给 victim 并置为nil,最后 allPools 为 nil,oldPools 长度为 1。

    由此基本明白 p.victim 的作用。它的定位是次级缓存,GC 时将对象放入其中,下一次 GC 来临之前如果有 Get 调用则会从 p.victim 中取,直到再一次 GC 来临时回收。

    https://www.cnblogs.com/qcrao-2018/p/12736031.html

  • 相关阅读:
    前端开发学习之【Vue】-上
    安装及管理docker
    【C++】多态
    CSS中如何实现文字渐变色效果(Text Gradient Color)?
    Mock工具之Moco使用
    基于JAVA学校运动会信息管理系统计算机毕业设计源码+系统+mysql数据库+lw文档+部署
    Springboot整合Mybatis-Plus
    Html5API(自定义属性、媒体元素、canvas画布)(一)
    43 干货系列从零用Rust编写负载均衡及代理,内网穿透方案完整部署
    傅里叶级数与傅里叶变换
  • 原文地址:https://blog.csdn.net/leekerian/article/details/126533872