• go 并发编程之-工作池


    go 并发编程之-工作池

    什么是工作池

    缓冲信道的重要应用之一就是实现工作池。

    工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配,和python中的进程池,线程池一样。

    我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

    我们工作池的核心功能如下:

    • 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
    • 将作业添加到该输入型缓冲信道中。
    • 作业完成后,再将结果写入一个输出型缓冲信道。
    • 从输出型缓冲信道读取并打印结果。

    在这里插入图片描述

    工作池的使用

    第一步就是定义任务结构体和结果结构体。

    // 1 定义一个任务结构体和结果结构体
    type Job struct {
    	Id      int
    	RandNum int
    }
    type Result struct {
    	job   Job
    	total int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    所有 Job 结构体变量都会有 idRandNum 两个字段,RandNum 用于计算其每位数之和。

    Result 结构体有一个 job 字段,表示所对应的作业,还有一个 total 字段,表示计算的结果(每位数字之和)。

    第二步是分别创建用于接收作业和写入结果的缓冲信道。

    //2 定义两个有缓冲信道,一个存放任务,一个存放计算结果
    var jobsChan = make(chan Job, 10)
    var resultChan = make(chan Result, 10)
    
    • 1
    • 2
    • 3

    工作协程(Worker Goroutine)会监听缓冲信道 jobsChan 里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 resultChan

    worker 任务是真正的工作任务,循环从任务信道中取出任务,然后计算整数的每一位之和,最后将计算结果放到结果信道中。为了模拟出计算过程中花费了一段时间,我们在函数内添加了1秒的休眠时间。

    func worker(wg *sync.WaitGroup) {
    	// 从任务信道中取值计算,塞到结果信道中
    	for job := range jobsChan {
    		// 从job结构体中取出随机数字,每一位都累加
    		var total = 0             // 总和
    		var randNum = job.RandNum // 随机数字
    		for randNum != 0 {
    			total += randNum % 10 // 总和+随机数字对每位取余数
    			randNum /= 10         // 随机数字除以10
    		}
    		// 模拟一下延迟,方便后期查看开启多个工作池后,效率是否有提升
    		time.Sleep(1 * time.Second)
    		// 把结果塞到结果信道中
    		resultChan <- Result{job, total}
    	}
    	//如果jobsChan取完了,关闭了,任务就可以结束了
    	wg.Done()
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    上面的函数创建了一个工作者(Worker),读取 jobsChan 信道的数据,根据当前的 jobsChan 计算,并创建了一个 Result 结构体变量,然后将结果写入 results 缓冲信道。worker 函数接收了一个 WaitGroup 类型的 wg 作为参数,当所有的 jobsChan 完成的时候,调用了 Done() 方法。

    createWorkPool 函数创建了一个 Go 协程的工作池。

    func createWorkPool(num int) {
    	// 定义一个wg,控制所有工作池在完成所有任务后关闭
    	var wgPool sync.WaitGroup
    	for i := 0; i < num; i++ {
    		wgPool.Add(1)
    		// 真正的执行任务,把wgPool指针传入
    		go worker(&wgPool)
    	}
    	// 等待所有工作池完成
    	wgPool.Wait()
    	// 所有工作池都完成,表明resultChan信道用完了,可以关闭了
    	close(resultChan)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 wg.Add(1) 方法,于是 WaitGroup 计数器递增。接下来,我们创建工作协程,并向 worker 函数传递 wg 的地址。创建了需要的工作协程后,函数调用 wg.Wait(),等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 resultChan 信道。因为所有协程都已经执行完毕,于是不再需要向 resultChan 信道写入数据了。

    现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者,随机生成job,写入到 jobsChan信道中

    func genRandNum(num int) {
    	for i := 0; i < num; i++ {
    		// 将生成的随机数,塞到任务的缓冲信道中
    		jobsChan <- Job{i, rand.Intn(999)}
    	}
    	//全部塞进去以后,就可以关闭信道了
    	close(jobsChan)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    上面的 genRandNum 函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 Job 结构体变量。这个函数把 for 循环的计数器 i 作为 id,最后把创建的结构体变量写入 jobsChan 信道。当写入所有的 job 时,它关闭了 jobsChan 信道。

    下一步是创建一个读取 results 信道和打印输出的函数。

    func printResult() {
    	for result := range resultChan {
    		fmt.Printf("任务id为:%d,任务的随机数为:%d,结果为:%d\n", result.job.Id, result.job.RandNum, result.total)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    result 函数读取 results 信道,并打印出 jobid、输入的随机数、该随机数的每位数之和。

    现在一切准备充分了。我们继续完成最后一步,在 main() 函数中调用上面所有的函数。

    func main() {
    	start := time.Now()
    	// 开启协程,往任务信道中写任务,写100个随机数
    	go genRandNum(100)
    	// 开启协程,打印计算结果
    	go printResult()
    
    	// 创建工作池,注意:工作池一定要下载上面俩任务的下方
    	// 如果放在上面,内部有wgPool.Wait(),主协程一直挺在这,任务信道和结果信道都不会写入数据,造成死锁
    	createWorkPool(10) // 创建大小为10的工作池
    
    	end := time.Now()
    	fmt.Println("总共耗时:", end.Sub(start))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    我们首先在 main 函数保存了程序的起始时间start,并在最后一行计算了 endstart 的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来看程序运行时间。

    我们把 工作池 设置为 10,接下来调用了 genRandNum,生成100个job,向 jobsChan 信道添加作业。

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    //工作池
    // 有一批随机数---》数字每位之和---》10个人/100个干
    
    // 第一步:定义任务结构体和结果结构体
    type Job struct {
    	jobId   int // 任务id
    	randNum int // 这个任务的随机数
    }
    type Result struct {
    	job   Job // 把任务放进来
    	total int // 随机数每位之和
    }
    
    // 第二步:定义两个信道(有缓冲),分别存放 任务  结果
    var jobChan = make(chan Job, 10)
    var resultChan = make(chan Result, 10)
    
    // 第三步:写一个任务,随机生成一批数字---》放到任务信道中
    // n 表示生成多少个
    func genRandNum(n int) {
    	for i := 0; i < n; i++ {
    		// 生成随机数,随机生成小于999的int类型数字
    		//rand.Intn(9999)
    		jobChan <- Job{jobId: i, randNum: rand.Intn(9999)} // 把生成的Job结构体对象放到任务信道中
    	}
    	// for循环结束,说明,任务全放进去了,可以关闭 任务信道
    	close(jobChan)
    }
    
    // 第四步:写一个真正执行任务的worker,函数
    func worker(wg *sync.WaitGroup) { // worker 要放到协程中执行
    	for job := range jobChan { // 循环任务信道,从中取出任务执行
    		// 计算每位之和  job.randNum
    		num := job.randNum // 67   8
    		total := 0
    		for num != 0 {
    			total += num % 10
    			num /= 10
    		} // 计算total
    		// 模拟时间延迟 干这活需要1s时间
    		time.Sleep(1*time.Second)
    		// 结果放到 结果信道中
    		resultChan <- Result{job: job, total: total}
    	}
    	wg.Done()
    
    }
    
    // 第五步:创建工作池
    func createWorkingPool(maxPool int) {
    	var wg sync.WaitGroup
    	for i := 0; i < maxPool; i++ {
    		wg.Add(1)
    		go worker(&wg) // 池多大,就有多少人工作,执行worker
    	}
    	wg.Wait() // 等待所有工作协程执行完成
    	//活干完了--->结果存储信道就可以关闭了
    	close(resultChan)
    }
    
    // 第六步:打印出 结果信道中所有的数据
    func printResult() {
    	for result := range resultChan { // 从结果信道中取数据打印---》一旦结果信道关闭了--》表示任务完成了---》for循环结束
    		fmt.Printf("任务id为:%d,任务随机数为:%d,随机数结果为:%d\n", result.job.jobId, result.job.randNum, result.total)
    	}
    }
    
    // 第七步:main函数调用
    func main() {
    	start:=time.Now()
    
    	// 1 生成100随机数---》放到任务队列中
    	go genRandNum(100)
    	// 2 在另一个协程中打印结果
    	go printResult()
    	// 3 创建工作池执行任务
    	createWorkingPool(5)
    
    	end:=time.Now()
    	fmt.Println(end.Sub(start))  // 统计程序运行时间  10个人干要10.032089437s   1个人干 要100s      100个人 1s多干完
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。

    如果把 main 函数里的 工作池 增加到 100。我们把工作者的数量加倍了。由于工作协程增加了(准确说来是两倍),因此程序花费的总时间会减少。

    现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。总的来说,工作使用就是一个典型的生产者消费者模式例子。

  • 相关阅读:
    HTML5:七天学会基础动画网页6
    范数-空间范数
    【计算机网络学习之路】网络基础1
    行列式【线性代数系列(一)】
    整个文档怎么翻译?这些方法亲测实用
    hive窗口分析函数使用详解系列二之分组排序窗口函数
    安装Centos7
    关于yolo7和gpu
    计算机系统4-> 计组与体系结构1 | 基础概念与系统评估
    makefile template
  • 原文地址:https://blog.csdn.net/qq_55752792/article/details/125917846