目录
众所周知,goroutine相比于线程来说,更加轻量、资源占用更少、无线程上下文切换等优势,但是也不能无节制的创建使用,如果系统中开启的goroutine过多而没有及时回收,也会造成系统内存资源耗尽。
ants是一款高性能的协程管理池,实现了协程的创建、缓存、复用、刷新、停止等功能,同时允许开发者设置线程池中worker的数量、线程池本身的个数以及workder中的任务,从而实现更加高效的运行效果。
ants的使用有四种方式,分别如下:

这四种使用方式,前两种最常用,基本能满足日常系统的开发需要,第四种默认池,简单的场景也可以用,但不推荐,多池的情况还没想到特别合适的应用场景。
ants在启动时,会默认初始化一个协程池,这部分代码位于ants.go文件中:
- var (
- // ErrLackPoolFunc will be returned when invokers don't provide function for pool.
- ErrLackPoolFunc = errors.New("must provide function for pool")
-
- // ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines.
- ErrInvalidPoolExpiry = errors.New("invalid expiry for pool")
-
- // ErrPoolClosed will be returned when submitting task to a closed pool.
- ErrPoolClosed = errors.New("this pool has been closed")
-
- // ErrPoolOverload will be returned when the pool is full and no workers available.
- ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
-
- // ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode.
- ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")
-
- // ErrTimeout will be returned after the operations timed out.
- ErrTimeout = errors.New("operation timed out")
-
- // ErrInvalidPoolIndex will be returned when trying to retrieve a pool with an invalid index.
- ErrInvalidPoolIndex = errors.New("invalid pool index")
-
- // ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy.
- ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy")
-
- // workerChanCap determines whether the channel of a worker should be a buffered channel
- // to get the best performance. Inspired by fasthttp at
- // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
- workerChanCap = func() int {
- // Use blocking channel if GOMAXPROCS=1.
- // This switches context from sender to receiver immediately,
- // which results in higher performance (under go1.5 at least).
- if runtime.GOMAXPROCS(0) == 1 {
- return 0
- }
-
- // Use non-blocking workerChan if GOMAXPROCS>1,
- // since otherwise the sender might be dragged down if the receiver is CPU-bound.
- return 1
- }()
-
- // log.Lmsgprefix is not available in go1.13, just make an identical value for it.
- logLmsgprefix = 64
- defaultLogger = Logger(log.New(os.Stderr, "[ants]: ", log.LstdFlags|logLmsgprefix|log.Lmicroseconds))
-
- // Init an instance pool when importing ants.
- defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
- )
使用起来就比较简单了,直接往池子里提交任务即可。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
-
- "github.com/panjf2000/ants/v2"
- )
-
- func add(d int) {
- sum := 0
- for i := 0; i < d; i++ {
- sum += i
- }
- }
- func main() {
- var wg sync.WaitGroup
- now := time.Now()
- for i := 0; i < 5; i++ {
- wg.Add(1)
- ants.Submit(func() {
- add(10000000000)
- wg.Done()
- })
- }
- wg.Wait()
- fmt.Println(time.Since(now))
- now = time.Now()
- for i := 0; i < 5; i++ {
- add(10000000000)
- }
- fmt.Println(time.Since(now))
- }
运行结果:

普通模式和使用默认池非常类似,但是需要自己创建一个线程池:
p, _ := ants.NewPool(5)
函数参数为协程池协程个数,测试代码如下:
- package main
-
- import (
- "fmt"
- "sync"
- "time"
-
- "github.com/panjf2000/ants/v2"
- )
-
- func add(d int) {
- sum := 0
- for i := 0; i < d; i++ {
- sum += i
- }
- }
- func main() {
- var wg sync.WaitGroup
- now := time.Now()
- p, _ := ants.NewPool(5)
- for i := 0; i < 5; i++ {
- wg.Add(1)
- p.Submit(func() {
- add(10000000000)
- wg.Done()
- })
- }
- wg.Wait()
- fmt.Println("协程池运行:", time.Since(now))
- now = time.Now()
- for i := 0; i < 5; i++ {
- add(10000000000)
- }
- fmt.Println("循环运行:", time.Since(now))
- }
带参函数,重点是往worker中传递函数执行的参数,每个workder中都是同一个执行函数。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
-
- "github.com/panjf2000/ants/v2"
- )
-
- func add(d int) {
- sum := 0
- for i := 0; i < d; i++ {
- sum += i
- }
- fmt.Println("the sum is: ", sum)
- }
- func main() {
- var wg sync.WaitGroup
- now := time.Now()
- p, _ := ants.NewPoolWithFunc(5, func(i interface{}) {
- add(i.(int))
- wg.Done()
- })
- for i := 0; i < 5; i++ {
- wg.Add(1)
- p.Invoke(1000000000)
- }
- wg.Wait()
- fmt.Println("循环运行:", time.Since(now))
- }
运行结果:
- liupeng@liupengdeMacBook-Pro ants_study % go run thread_default.go
- the sum is: 499999999500000000
- the sum is: 499999999500000000
- the sum is: 499999999500000000
- the sum is: 499999999500000000
- the sum is: 499999999500000000
- 循环运行: 352.447333ms
这种模式,就是声明了多个协程池,每个池子里有多个协程在跑。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
-
- "github.com/panjf2000/ants/v2"
- )
-
- func add(d int) {
- sum := 0
- for i := 0; i < d; i++ {
- sum += i
- }
- fmt.Println("the sum is: ", sum)
- }
- func main() {
- var wg sync.WaitGroup
- runTimes := 20
- now := time.Now()
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
- add(i.(int))
- wg.Done()
- }, ants.LeastTasks)
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- mpf.Invoke(1000000000)
- }
- wg.Wait()
- fmt.Println("循环运行:", time.Since(now))
- }
运行记录:

以上就是ants协程池所有的使用方式,3.2、3.3章节介绍的两种方式比较常用,也是推荐的使用方式,使用协程池,可以有效的控制系统硬件资源的使用,防止机器被打满,对于高并发服务非常推荐使用。
后面会学习一下ants的源码,并整理成文档发出来,欢迎围观。