在 Go语言实战 中看到有些并发相关的例子,讲解得也比较详细,于是乎写来加深下印象。
无缓冲通道在接收前没有能力保存任何值。我自己找了书上的示例来加深一下印象。
- package main
-
- import (
- "fmt"
- "math/rand"
- "sync"
- "time"
- )
-
- var (
- wg sync.WaitGroup
- )
-
- func init() {
- rand.Seed(time.Now().UnixNano()) // 让每次运行生成的随机数不相同
- }
-
- func main() {
- count := make(chan int)
- wg.Add(2)
- go player("Nadal", count)
- go player("Looking", count)
-
- count <- 1
- wg.Wait()
- }
-
- func player(name string, count chan int) {
- defer wg.Done()
-
- for {
- ball, ok := <-count
- if !ok {
- fmt.Printf("Player %s Win\n", name)
- return
- }
- n := rand.Intn(100)
- if n%13 == 0 {
- fmt.Printf("Player %s Missed\n", name)
- close(count)
- return
- }
- fmt.Printf("Player %s Hit %d\n", name, ball)
- count <- ball
- }
- }
接力赛中,接力棒只能在一个人手中。
- package main
-
- import (
- "fmt"
- "sync"
- "time"
- )
-
- var (
- wg sync.WaitGroup
- )
-
-
- func main() {
- baton := make(chan int)
- wg.Add(1)
- go Runner(baton)
- baton <- 1
- wg.Wait()
- }
-
- func Runner(baton chan int) {
- var newRunner int
- runner := <- baton
- fmt.Printf("Runner %d Running with baton\n", runner)
- if runner != 4 {
- newRunner = runner + 1
- fmt.Printf("Runner %d to the line\n", newRunner)
- go Runner(baton)
- }
- time.Sleep(100 * time.Millisecond)
- if runner == 4 {
- fmt.Printf("Runner %d finished, Race over\n", runner)
- wg.Done()
- return
- }
- fmt.Printf("Runner %d Exchange with runner %d \n", runner, newRunner)
- baton <- newRunner
- }
- Runner 1 Running with baton
- Runner 2 to the line
- Runner 1 Exchange with runner 2
- Runner 2 Running with baton
- Runner 3 to the line
- Runner 2 Exchange with runner 3
- Runner 3 Running with baton
- Runner 4 to the line
- Runner 3 Exchange with runner 4
- Runner 4 Running with baton
- Runner 4 finished, Race over
缓冲通道不强制发送和接收同时完成。
当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通 道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道 里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标 志,就能得到通道的状态信息。
- package main
-
- import (
- "fmt"
- "math/rand"
- "sync"
- "time"
- )
-
- const (
- numberGoroutines = 4
- taskLoad = 10
- )
-
- var wg sync.WaitGroup
-
- func init() {
- rand.Seed(time.Now().UnixNano())
- }
-
- func main() {
- tasks := make(chan string, taskLoad)
- wg.Add(numberGoroutines)
- for gr := 1; gr <= numberGoroutines; gr++ {
- go worker(tasks, gr)
- }
- for post := 1; post <= taskLoad; post++ {
- tasks <- fmt.Sprintf("Task: %d", post)
- }
- close(tasks) // 任务发布完毕后关闭通道,关闭通道不影响其它Goroutine对已发布内容的正常接收
- wg.Wait()
- }
-
- func worker(tasks chan string, worker int) {
- defer wg.Done()
-
- for {
- task, ok := <-tasks
- if !ok {
- fmt.Printf("Worker: %d : Shutting down\n", worker)
- return
- }
- fmt.Printf("Worker: %d : Started %s\n", worker, task)
- sleep := rand.Int63n(100)
- time.Sleep(time.Duration(sleep) * time.Millisecond)
- fmt.Printf("Worker: %d : Completed %s\n", worker, task)
- }
- }
- Worker: 4 : Started Task: 4
- Worker: 2 : Started Task: 2
- Worker: 3 : Started Task: 3
- Worker: 1 : Started Task: 1
- Worker: 3 : Completed Task: 3
- Worker: 3 : Started Task: 5
- Worker: 2 : Completed Task: 2
- Worker: 2 : Started Task: 6
- Worker: 1 : Completed Task: 1
- Worker: 1 : Started Task: 7
- Worker: 1 : Completed Task: 7
- Worker: 1 : Started Task: 8
- Worker: 3 : Completed Task: 5
- Worker: 3 : Started Task: 9
- Worker: 4 : Completed Task: 4
- Worker: 4 : Started Task: 10
- Worker: 2 : Completed Task: 6
- Worker: 2 : Shutting down
- Worker: 3 : Completed Task: 9
- Worker: 3 : Shutting down
- Worker: 1 : Completed Task: 8
- Worker: 1 : Shutting down
- Worker: 4 : Completed Task: 10
- Worker: 4 : Shutting down
runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以
用runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。
runner/runner.go
- package runner
-
- import (
- "errors"
- "os"
- "os/signal"
- "time"
- )
-
- type Runner struct {
- interrupt chan os.Signal
- complete chan error
- timeout <-chan time.Time // 单向通道,只允许接收
- tasks []func(int)
- }
-
- var ErrTimeout = errors.New("received timeout")
- var ErrInterrupt = errors.New("received interrupt")
-
- func New(d time.Duration) *Runner {
- return &Runner{
- interrupt: make(chan os.Signal, 1),
- complete: make(chan error),
- timeout: time.After(d),
- }
- }
-
- func (r *Runner) Add(tasks ...func(int)) {
- r.tasks = append(r.tasks, tasks...)
- }
-
- func (r *Runner) Start() error {
- signal.Notify(r.interrupt, os.Interrupt) // 如果有中断,会将中断信号发送到 r.interrupt
- go func() {
- r.complete <- r.run()
- }()
- select {
- case err := <-r.complete:
- return err // 如果提前中断,err 是 ErrInterrupt,正常结束则是 nil
- case <-r.timeout:
- return ErrTimeout
- }
- }
-
- func (r *Runner) run() error {
- for id, task := range r.tasks {
- if r.gotInterrupt() {
- return ErrInterrupt
- }
- task(id)
- }
- return nil
- }
-
- func (r *Runner) gotInterrupt() bool {
- select {
- case <-r.interrupt:
- signal.Stop(r.interrupt)
- return true
- default:
- return false
- }
- }
main.go
- package main
-
- import (
- "github.com/test/runner"
- "log"
- "os"
- "time"
- )
-
- const timeout = 3 * time.Second
-
- func main() {
- log.Println("Starting work.")
- r := runner.New(timeout)
- r.Add(createTask(), createTask(), createTask())
-
- if err := r.Start(); err != nil {
- switch err {
- case runner.ErrTimeout:
- log.Println("Terminating due to timeout.")
- os.Exit(1)
- case runner.ErrInterrupt:
- log.Println("Terminating due to interrupt.")
- os.Exit(2)
- }
- }
- log.Println("Process ended.")
- }
-
- func createTask() func(int) {
- return func(id int) {
- log.Printf("Processor - Task #%d.", id)
- time.Sleep(time.Duration(id) * time.Second)
- }
- }
- 2024/04/21 17:26:30 Starting work.
- 2024/04/21 17:26:30 Processor - Task #0.
- 2024/04/21 17:26:30 Processor - Task #1.
- 2024/04/21 17:26:31 Processor - Task #2.
- 2024/04/21 17:26:33 Terminating due to timeout.
- 2024/04/21 17:28:18 Starting work.
- 2024/04/21 17:28:18 Processor - Task #0.
- 2024/04/21 17:28:18 Processor - Task #1.
- 2024/04/21 17:28:19 Terminating due to interrupt.
- 2024/04/21 17:30:40 Starting work.
- 2024/04/21 17:30:40 Processor - Task #0.
- 2024/04/21 17:30:40 Processor - Task #1.
- 2024/04/21 17:30:41 Processor - Task #2.
- 2024/04/21 17:30:43 Process ended.
pool 使用有缓冲通道实现资源池。
pool/pool.go
- package pool
-
- import (
- "errors"
- "io"
- "log"
- "sync"
- )
-
- type Pool struct {
- m sync.Mutex
- resources chan io.Closer
- factory func() (io.Closer, error)
- closed bool
- }
-
- var ErrPoolClosed = errors.New("pool has been closed")
-
- func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
- if size <= 0 {
- return nil, errors.New("size value too small")
- }
- return &Pool{
- resources: make(chan io.Closer, size),
- factory: fn,
- }, nil
- }
-
- func (p *Pool) Acquire() (io.Closer, error) {
- select {
- case r, ok := <-p.resources:
- log.Println("Acquire:", "Shared resource")
- if !ok {
- return nil, ErrPoolClosed
- }
- return r, nil
- default:
- log.Println("Acquire:", "New resource")
- return p.factory()
- }
- }
-
- func (p *Pool) Release(r io.Closer) {
- p.m.Lock()
- defer p.m.Unlock()
- if p.closed {
- r.Close()
- return
- }
- select {
- case p.resources <- r:
- log.Println("Release:", "In queue")
- default:
- log.Println("Release:", "Closing")
- r.Close()
- }
- }
-
- func (p *Pool) Close() {
- p.m.Lock()
- defer p.m.Unlock()
- if p.closed {
- return
- }
- p.closed = true
- close(p.resources)
- for r := range p.resources {
- r.Close()
- }
- }
main.go
- package main
-
- import (
- "github.com/test/pool"
- "io"
- "log"
- "math/rand"
- "sync"
- "sync/atomic"
- "time"
- )
-
- const (
- maxGoroutines = 5
- pooledResources = 2
- )
-
- type dbConnection struct {
- ID int32
- }
-
- func (dbConn *dbConnection) Close() error {
- log.Println("Close: Connection", dbConn.ID)
- return nil
- }
-
- var idCounter int32
-
- func createConnection() (io.Closer, error) {
- id := atomic.AddInt32(&idCounter, 1)
- log.Println("Create: New Connection", id)
-
- return &dbConnection{id}, nil
- }
-
- func main() {
- var wg sync.WaitGroup
- wg.Add(maxGoroutines)
-
- p, err := pool.New(createConnection, pooledResources)
- if err != nil {
- log.Println(err)
- }
-
- for query := 0; query < maxGoroutines; query++ {
- go func(q int) {
- defer wg.Done()
- performQueries(q, p)
- }(query)
- }
- wg.Wait()
- log.Println("Shutdown program")
- p.Close()
- }
-
- func performQueries(query int, p *pool.Pool) {
- conn, err := p.Acquire()
- if err != nil {
- log.Println(err)
- return
- }
- defer p.Release(conn)
- time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
- log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
- }
- 2024/04/21 20:29:20 Acquire: New resource
- 2024/04/21 20:29:20 Create: New Connection 1
- 2024/04/21 20:29:20 Acquire: New resource
- 2024/04/21 20:29:20 Acquire: New resource
- 2024/04/21 20:29:20 Create: New Connection 3
- 2024/04/21 20:29:20 Acquire: New resource
- 2024/04/21 20:29:20 Create: New Connection 4
- 2024/04/21 20:29:20 Create: New Connection 2
- 2024/04/21 20:29:20 Acquire: New resource
- 2024/04/21 20:29:20 Create: New Connection 5
- 2024/04/21 20:29:20 QID[0] CID[2]
- 2024/04/21 20:29:20 Release: In queue
- 2024/04/21 20:29:20 QID[2] CID[5]
- 2024/04/21 20:29:20 Release: In queue
- 2024/04/21 20:29:20 QID[4] CID[1]
- 2024/04/21 20:29:20 Release: Closing
- 2024/04/21 20:29:20 Close: Connection 1
- 2024/04/21 20:29:21 QID[1] CID[4]
- 2024/04/21 20:29:21 Release: Closing
- 2024/04/21 20:29:21 Close: Connection 4
- 2024/04/21 20:29:21 QID[3] CID[3]
- 2024/04/21 20:29:21 Release: Closing
- 2024/04/21 20:29:21 Close: Connection 3
- 2024/04/21 20:29:21 Shutdown program
- 2024/04/21 20:29:21 Close: Connection 2
- 2024/04/21 20:29:21 Close: Connection 5
work 使用无缓冲通道创建资源池。work 新建时就生成指定个数 goroutine 循环等待(无缓冲通道无数据阻塞)消费任务,然后主线程再分发相应任务到通道,消费 goroutine 再继续执行消费任务。
work/work.go
- package work
-
- import "sync"
-
- type Worker interface {
- Task()
- }
-
- type Pool struct {
- work chan Worker
- wg sync.WaitGroup
- }
-
- func New(maxGoroutines int) *Pool {
- p := Pool{
- work: make(chan Worker),
- }
- p.wg.Add(maxGoroutines)
- for i := 0; i < maxGoroutines; i++ {
- go func() {
- defer p.wg.Done()
- for w := range p.work { // 如果 work 关闭,for range 循环结束
- w.Task()
- }
- }()
- }
- return &p
- }
-
- func (p *Pool) Run(w Worker) {
- p.work <- w
- }
-
- func (p *Pool) Shutdown() {
- close(p.work) // 关闭通道,避免 New 产生的 goroutine 一直阻塞不退出
- p.wg.Wait()
- }
main.go
- package main
-
- import (
- "github.com/test/work"
- "log"
- "sync"
- "time"
- )
-
- var names = []string{
- "steve",
- "jason",
- "looking",
- }
-
- type namePrinter struct {
- name string
- }
-
- func (m *namePrinter) Task() {
- log.Println(m.name)
- time.Sleep(time.Second)
- }
-
- const times = 3
-
- func main() {
- p := work.New(2)
-
- var wg sync.WaitGroup
- wg.Add(times * len(names))
- for i := 0; i < times; i++ {
- for _, name := range names {
- np := namePrinter{name: name}
- go func() {
- defer wg.Done()
- p.Run(&np)
- }()
- }
- }
- wg.Wait()
- p.Shutdown()
- }
- 2024/04/21 21:39:28 steve
- 2024/04/21 21:39:28 looking
- 2024/04/21 21:39:29 jason
- 2024/04/21 21:39:29 looking
- 2024/04/21 21:39:30 steve
- 2024/04/21 21:39:30 jason
- 2024/04/21 21:39:31 looking
- 2024/04/21 21:39:31 steve
- 2024/04/21 21:39:32 jason