• Go工作池


    前言

            我们使用Go语言开发项目,常常会使用到goroutine;goroutine太多会造成系统占用过高或其他系统异常,我们可以将goroutine控制指定数量,且减少goroutine的创建,这就运用到Go工作池,下面就介绍和使用一下。


    一、概念

            我们可以将工作池理解为线程池。线程池的创建和销毁非常消耗资源,所以专门写一个pool,每次用过的线程池再放回pool中而不是销毁。不过在Go语言中不会使用系统的线程,而是使用goroutine。gorotine的创建和销毁比系统线程的消耗要小的多,而且goroutine没有标号。所以goroutine的pool就不再时线程池,而是work pool(工作池)。

            虽然goroutine的系统消耗较小,但也不能随意在编码时使用go func(),如果程序频繁启动goroutine,会造成极其不可控性能问题。对于可以提前预知的大量异步处理的任务就要考虑使用工作池。

            工作池的作用控制goroutine的规模,或者说是goroutine的数量。在Go语言中,控制goroutine的数量最好方式就是使用缓存通道。

    二、实例

    1.简单示例

            下面是Go语言解决工作池的经典用法。

    1. func worker(id int, jobs <-chan int, results chan<- int) {
    2. for job := range jobs {
    3. fmt.Printf("worker(%d) start to do job(%d)\n", id, job)
    4. time.Sleep(time.Second)
    5. fmt.Printf("worker(%d) finished job(%d)\n", id, job)
    6. results <- job
    7. }
    8. }
    9. func main() {
    10. // 为了使用我们的工作池,我们需要发送工作和接受工作的结果,
    11. // 这里我们定义两个通道,一个jobs,一个results
    12. jobs := make(chan int, 100)
    13. results := make(chan int, 100)
    14. // 开启3个goroutine
    15. for id := 1; id <= 3; id++ {
    16. go worker(id, jobs, results)
    17. }
    18. // 创建5个任务
    19. for job := 1; job <= 5; job++ {
    20. jobs <- job
    21. }
    22. close(jobs)
    23. // 输出结果
    24. for i := 1; i <= 5; i++ {
    25. <-results
    26. }
    27. }

            上述代码工作池思想主要体现在jobs的通道上,因为定义了一个缓存长度为100的通道,所以在通道到100以后,新任务就会阻塞,只有等worker从通道取走一个工作以后才能继续分配新工作。

            本案例较为简单,如果worker的数量较大,业务执行时间较长的话,我们需要在程序设计上将jobs和worker的模式进行优化,每个worker处理一项工作,工作池可以自定义最大数量的worker;这样可以保证goroutine的最大数量,可程序更加可控,避免代码消耗压垮系统。

    2.读入数据

            下面时改良之后代码

    1. 1package main
    2. import (
    3. "fmt"
    4. "reflect"
    5. "time"
    6. )
    7. // Job 任务内容
    8. type Job struct {
    9. ID int
    10. Name string
    11. }
    12. // Worker 工作
    13. type Worker struct {
    14. id int // id
    15. WorkerPool chan chan Job // 工作者池(通道的通道),每个元素都是一个job通道, 公共的job
    16. JobChannel chan Job // 工作通道,每个元素是一个job,worker私有的job
    17. exit chan bool // 结束信号
    18. }
    19. var (
    20. MaxWorker = 5 // 最大worker数量
    21. JobQueue = make(chan Job, 5) // 工作通道,模拟需处理的工作
    22. )
    23. // Scheduler 排程中心
    24. type Scheduler struct {
    25. WorkerPool chan chan Job // 工作池
    26. WorkerMaxNum int // 最大工作者数
    27. Workers []*Worker // worker队列
    28. }
    29. // NewScheduler 创建排程中心
    30. func NewScheduler(workerMaxNum int) *Scheduler {
    31. workerPool := make(chan chan Job, workerMaxNum) // 工作池
    32. return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum}
    33. }
    34. // Start 工作池开始
    35. func (s *Scheduler) Start() {
    36. Workers := make([]*Worker, s.WorkerMaxNum)
    37. for i := 0; i < s.WorkerMaxNum; i++ {
    38. worker := NewWorker(s.WorkerPool, i)
    39. worker.Start()
    40. Workers[i] = &worker
    41. }
    42. s.Workers = Workers
    43. go s.schedule()
    44. }
    45. // Stop 工作池的关闭
    46. func (s *Scheduler) Stop() {
    47. Workers := s.Workers
    48. for _, w := range Workers {
    49. w.Stop()
    50. }
    51. time.Sleep(time.Second)
    52. close(s.WorkerPool)
    53. }
    54. func NewWorker(WorkerPool chan chan Job, id int) Worker {
    55. fmt.Printf("new a worker(%d)\n", id)
    56. return Worker{
    57. id: id,
    58. WorkerPool: WorkerPool,
    59. JobChannel: make(chan Job),
    60. exit: make(chan bool),
    61. }
    62. }
    63. // Start 监听任务和结束信号
    64. func (w Worker) Start() {
    65. go func() {
    66. for {
    67. select {
    68. case job := <-w.JobChannel: // 收到任务
    69. fmt.Println("get a job from private w.JobChannel")
    70. fmt.Println(job)
    71. case <-w.exit: // 收到结束信号
    72. fmt.Println("worker exit", w)
    73. return
    74. }
    75. }
    76. }()
    77. }
    78. func (w Worker) Stop() {
    79. go func() {
    80. w.exit <- true
    81. }()
    82. }
    83. // 排程
    84. func (s *Scheduler) schedule() {
    85. for {
    86. select {
    87. case job := <-JobQueue:
    88. fmt.Println("get a job from JobQueue")
    89. go func(job Job) {
    90. //从WorkerPool获取jobChannel,忙时阻塞
    91. jobChannel := <-s.WorkerPool
    92. fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel))
    93. jobChannel <- job
    94. fmt.Println("worker's private jobChannel add one job")
    95. }(job)
    96. }
    97. }
    98. }
    99. func main() {
    100. scheduler := NewScheduler(MaxWorker)
    101. scheduler.Start()
    102. jobQueue()
    103. scheduler.Stop()
    104. }
    105. // 模拟Job任务
    106. func jobQueue() {
    107. for i := 1; i <= 30; i++ {
    108. JobQueue <- Job{ID: i, Name: fmt.Sprintf("Job【%d】", i)}
    109. fmt.Printf("jobQueue add %d job\n", i)
    110. }
    111. }

            定义了两个结构体:Task任务和Job工作,Task并没有实质性的内容,这里仅仅定义了一个整型变量;

            定义两个全局变量:MaxWorker是最大的worker数量;JobQueue是Job的通道。这两个变量都用于后面的模拟,在真实场景中可以不设置这两个变量。

            定义了一个Worker结构体,与上一个简单工作池的示例不同,本例的Worker不再是简单的一个goroutine,而是一个结构体。结构体内定义了如下四个变量。▪id:worker编号。▪exit:这是一个bool类型的通道,当有数据写入时worker结束运行。▪JobChannel:Job类型的通道,该通道是专属于当前worker的私有工作队列。▪WorkerPool:注意看,定义的时候使用了两个Channel,每一个元素是一个Job通道,其实每一个元素是一个JobChannel。

            NewWorker方法用于创建一个新的worker,要注意该方法的参数workerPool用于创建worker时传入,这就说明每个worker与其他worker的WorkerPool是共享的,或者说多个worker使用一个WorkerPool。这一点很重要,这是本示例代码在上一个简单示例代码基础上的优化。而JobChannel和exit变量则是随着Worker的新建而新建的。

            Worker的Start方法,该方法用于监听任务或者结束信号。Start方法一开始就用goroutine运行一个匿名函数,而函数内部是一个无限循环。在循环内部,首先是把当前的JobChannel注册到WorkerPool里,一旦注册进去也就说明该worker可以接收任务了。然后通过select判断JobChannel是否可以读取,也就是其中是否有Job,或者exit通道是否可以读取。如果JobChannel可读取,证明有Job,后续开始处理Job;而如果exit可读,则结束当前的无限循环。所以,后面的代码中要特别注意对WorkerPool的操作,Worker是从WorkerPool领取工作的。Worker的Stop方法,用于为exit通道写入数据,在Start方法内Worker会读取到写入的数据,进而结束无限循环。

            NewScheduler函数用于创建一个Scheduler,可以看到函数内部的WorkerPool是通过make函数新建的,NewWorker函数一样靠参数传入。注意WorkerPool是有缓存通道的,缓存长度是MaxWorkers。

            Scheduler的Create方法,该方法根据MaxWorkers最大数创建Worker,并且把引用存入Workers切片。创建好Worker后,马上调用Worker的Start方法,最后通过goroutine运行Schedule方法。Scheduler的Shutdown方法,用于关闭工作池,调用所有worker的Stop方法并且关闭WorkerPool工作池。

            Scheduler的Schedule方法,该方法内也是一个无限循环,循环内部就是不停地读取JobQueue,然后运行一个goroutine。在新运行的goroutine内从s.WorkerPool读取一个JobChannel,注意,Worker注册到WorkerPool以后此处才可以读取到,如果WorkerPool的缓存通道内没有JobChannel,则会阻塞,直到读取到JobChannel,才把Job写入。  

    备注:此文内容来自《Go微服务实战》

  • 相关阅读:
    php脚本执行timeout
    信创之国产浪潮电脑+统信UOS操作系统体验2:安装visual studio code和cmake搭建C++开发环镜
    为什么低代码CRM越来越受欢迎?
    SAS学习2(data步,input语句,从文件中读取数据)
    MobaXterm工具软件使用介绍
    电池使用时间报告
    java计算机毕业设计vue架构云餐厅美食订餐系统MyBatis+系统+LW文档+源码+调试部署
    Hive数据仓库行转列
    【广州华锐互动】AR技术为气象站远程监控及在线指导维修提供极大便利
    如何使用Stable Diffusion生成艺术二维码?
  • 原文地址:https://blog.csdn.net/qq_34272964/article/details/127034590