缓冲信道的重要应用之一就是实现工作池。
工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配,和python中的进程池,线程池一样。
我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。
我们工作池的核心功能如下:
第一步就是定义任务结构体和结果结构体。
// 1 定义一个任务结构体和结果结构体
type Job struct {
Id int
RandNum int
}
type Result struct {
job Job
total int
}
所有 Job
结构体变量都会有 id
和 RandNum
两个字段,RandNum
用于计算其每位数之和。
而 Result
结构体有一个 job
字段,表示所对应的作业,还有一个 total
字段,表示计算的结果(每位数字之和)。
第二步是分别创建用于接收作业和写入结果的缓冲信道。
//2 定义两个有缓冲信道,一个存放任务,一个存放计算结果
var jobsChan = make(chan Job, 10)
var resultChan = make(chan Result, 10)
工作协程(Worker Goroutine
)会监听缓冲信道 jobsChan
里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 resultChan
。
worker
任务是真正的工作任务,循环从任务信道中取出任务,然后计算整数的每一位之和,最后将计算结果放到结果信道中。为了模拟出计算过程中花费了一段时间,我们在函数内添加了1秒的休眠时间。
func worker(wg *sync.WaitGroup) {
// 从任务信道中取值计算,塞到结果信道中
for job := range jobsChan {
// 从job结构体中取出随机数字,每一位都累加
var total = 0 // 总和
var randNum = job.RandNum // 随机数字
for randNum != 0 {
total += randNum % 10 // 总和+随机数字对每位取余数
randNum /= 10 // 随机数字除以10
}
// 模拟一下延迟,方便后期查看开启多个工作池后,效率是否有提升
time.Sleep(1 * time.Second)
// 把结果塞到结果信道中
resultChan <- Result{job, total}
}
//如果jobsChan取完了,关闭了,任务就可以结束了
wg.Done()
}
上面的函数创建了一个工作者(Worker),读取 jobsChan
信道的数据,根据当前的 jobsChan
计算,并创建了一个 Result
结构体变量,然后将结果写入 results
缓冲信道。worker
函数接收了一个 WaitGroup
类型的 wg
作为参数,当所有的 jobsChan
完成的时候,调用了 Done()
方法。
createWorkPool
函数创建了一个 Go 协程的工作池。
func createWorkPool(num int) {
// 定义一个wg,控制所有工作池在完成所有任务后关闭
var wgPool sync.WaitGroup
for i := 0; i < num; i++ {
wgPool.Add(1)
// 真正的执行任务,把wgPool指针传入
go worker(&wgPool)
}
// 等待所有工作池完成
wgPool.Wait()
// 所有工作池都完成,表明resultChan信道用完了,可以关闭了
close(resultChan)
}
上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 wg.Add(1)
方法,于是 WaitGroup
计数器递增。接下来,我们创建工作协程,并向 worker
函数传递 wg
的地址。创建了需要的工作协程后,函数调用 wg.Wait()
,等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 resultChan
信道。因为所有协程都已经执行完毕,于是不再需要向 resultChan
信道写入数据了。
现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者,随机生成job,写入到 jobsChan
信道中
func genRandNum(num int) {
for i := 0; i < num; i++ {
// 将生成的随机数,塞到任务的缓冲信道中
jobsChan <- Job{i, rand.Intn(999)}
}
//全部塞进去以后,就可以关闭信道了
close(jobsChan)
}
上面的 genRandNum
函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 Job
结构体变量。这个函数把 for 循环的计数器 i
作为 id,最后把创建的结构体变量写入 jobsChan
信道。当写入所有的 job
时,它关闭了 jobsChan
信道。
下一步是创建一个读取 results
信道和打印输出的函数。
func printResult() {
for result := range resultChan {
fmt.Printf("任务id为:%d,任务的随机数为:%d,结果为:%d\n", result.job.Id, result.job.RandNum, result.total)
}
}
result
函数读取 results
信道,并打印出 job
的 id
、输入的随机数、该随机数的每位数之和。
现在一切准备充分了。我们继续完成最后一步,在 main()
函数中调用上面所有的函数。
func main() {
start := time.Now()
// 开启协程,往任务信道中写任务,写100个随机数
go genRandNum(100)
// 开启协程,打印计算结果
go printResult()
// 创建工作池,注意:工作池一定要下载上面俩任务的下方
// 如果放在上面,内部有wgPool.Wait(),主协程一直挺在这,任务信道和结果信道都不会写入数据,造成死锁
createWorkPool(10) // 创建大小为10的工作池
end := time.Now()
fmt.Println("总共耗时:", end.Sub(start))
}
我们首先在 main
函数保存了程序的起始时间start,并在最后一行计算了 end
和 start
的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来看程序运行时间。
我们把 工作池
设置为 10,接下来调用了 genRandNum
,生成100个job,向 jobsChan
信道添加作业。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
//工作池
// 有一批随机数---》数字每位之和---》10个人/100个干
// 第一步:定义任务结构体和结果结构体
type Job struct {
jobId int // 任务id
randNum int // 这个任务的随机数
}
type Result struct {
job Job // 把任务放进来
total int // 随机数每位之和
}
// 第二步:定义两个信道(有缓冲),分别存放 任务 结果
var jobChan = make(chan Job, 10)
var resultChan = make(chan Result, 10)
// 第三步:写一个任务,随机生成一批数字---》放到任务信道中
// n 表示生成多少个
func genRandNum(n int) {
for i := 0; i < n; i++ {
// 生成随机数,随机生成小于999的int类型数字
//rand.Intn(9999)
jobChan <- Job{jobId: i, randNum: rand.Intn(9999)} // 把生成的Job结构体对象放到任务信道中
}
// for循环结束,说明,任务全放进去了,可以关闭 任务信道
close(jobChan)
}
// 第四步:写一个真正执行任务的worker,函数
func worker(wg *sync.WaitGroup) { // worker 要放到协程中执行
for job := range jobChan { // 循环任务信道,从中取出任务执行
// 计算每位之和 job.randNum
num := job.randNum // 67 8
total := 0
for num != 0 {
total += num % 10
num /= 10
} // 计算total
// 模拟时间延迟 干这活需要1s时间
time.Sleep(1*time.Second)
// 结果放到 结果信道中
resultChan <- Result{job: job, total: total}
}
wg.Done()
}
// 第五步:创建工作池
func createWorkingPool(maxPool int) {
var wg sync.WaitGroup
for i := 0; i < maxPool; i++ {
wg.Add(1)
go worker(&wg) // 池多大,就有多少人工作,执行worker
}
wg.Wait() // 等待所有工作协程执行完成
//活干完了--->结果存储信道就可以关闭了
close(resultChan)
}
// 第六步:打印出 结果信道中所有的数据
func printResult() {
for result := range resultChan { // 从结果信道中取数据打印---》一旦结果信道关闭了--》表示任务完成了---》for循环结束
fmt.Printf("任务id为:%d,任务随机数为:%d,随机数结果为:%d\n", result.job.jobId, result.job.randNum, result.total)
}
}
// 第七步:main函数调用
func main() {
start:=time.Now()
// 1 生成100随机数---》放到任务队列中
go genRandNum(100)
// 2 在另一个协程中打印结果
go printResult()
// 3 创建工作池执行任务
createWorkingPool(5)
end:=time.Now()
fmt.Println(end.Sub(start)) // 统计程序运行时间 10个人干要10.032089437s 1个人干 要100s 100个人 1s多干完
}
程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。
如果把 main
函数里的 工作池
增加到 100。我们把工作者的数量加倍了。由于工作协程增加了(准确说来是两倍),因此程序花费的总时间会减少。
现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。总的来说,工作使用就是一个典型的生产者消费者模式例子。