• golang中channel使用


    1 golang中channel使用

    1.1 channel介绍

    Go并发是源自CSP模型,通过channel来实现协程的同步,Go并发不通过共享内存来通信,而是通过通信来共享内存,Go内建channel实现了go协程之间数据的读写相关操作,通道(channel)是一种特殊的类型,在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。channel可以看作一个消息队列,遵循先进先出的原则,从而保证了收发数据的顺序性。

    channel有如下特点:

    • channel本身是一个队列,先进先出
    • 线程安全,不需要枷锁,但是如果传递数组指针或者其他非线程安全的指针或引用,需要额外做好保护
    • channel是引用类型,必须make之后才能使用,一旦初始化容量,就不会改变了
    • 当写满时,不可以写,取空时,不可以取,如果容量满则发送阻塞直到通道数据被取走,如果通道数据为空,则接收阻塞,可通过select方式非阻塞读写
    • 通道一次只能接收一个数据元素
    • 每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型,比如chan int

    Go语言提倡使用channel的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个channel,并提供了确保同步交换数据的机制,声明channel时,需要指定将要被共享的数据的类型,可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针等。

    channel与goroutine交互示意图如下:
    chan

    1.2 channel使用

    1.2.1 channel声明和初始化

    golang中channel声明方式如下:
    var 通道变量 chan 通道类型

    例如:var chan1 chan int //通道传递int类型的数据

    通过此声明chan1为nil,需要配合make进行初始化后,才能使用

    var chan1 chan int
    chan1 = make(chan int) //无缓冲
    chan2 := make(chan int,3) //缓冲大小为3的通道
    type as struct{
        data []byte
        name string
    }
    a := &as{
        make([]byte,0,1024),"buf"
    }
    
    chan3 := make(chan *as,3) //缓冲大小为3,传输类型为as的指针
    a <- a  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    1.2.2 channel数据发送与接收

    通道创建好后,根据make时是否创建缓冲分为有缓冲chan和无缓冲的chan

    1.2.2.1 无缓冲chan的数据传输

    如果创建时缓冲大小设置为0,或者未设置,此时把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞,如果接收时一直未有数据发送,则也一直阻塞,实例如下:

    package main
    
    import "fmt"
    
    func main() {
        chan1 := make(chan int)
        go gorun1(chan1)
        var a int = 3
        fmt.Println(a)
        chan1 <- a
        a++
        fmt.Println(a)
        chan1 <- a
        a++
        fmt.Println(a)
    }
    
    //传递的时chan引用
    func gorun1(c chan int) {
        v := <-c
        fmt.Println("gorun1 receive:", v)
    }
    /*打印结果
    3
    4
    gorun1 receive: 3
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    gorun1是在go程中运行,第一次发送数据3时,gorun1读取了数据3并打印,第二次发送数据4时,由于gorun1已经退出,无人接收,所以一直阻塞在chan1<-a
    无缓冲通道的chan比较常用于等待go程退出的场景,实例如下:

    package main
    
    import "fmt"
    
    var done chan any
    
    func main() {
     done = make(chan any)
     chan1 := make(chan int)
     go gorun1(chan1)
     go gorun2(chan1)
     a := <-done //阻塞,等待gorun1退出时执行close(done)
     fmt.Println(a)
    }
    
    func gorun2(c chan int) {
     c <- 3
     c <- 4
     c <- 5
     c <- 0
    }
    
    //传递的时chan引用
    func gorun1(c chan int) {
     defer close(done) //close chan时会触发向chan发送一个nil
     for {
      select {
      case v := <-c:
       fmt.Println("gorun1 receive:", v)
       if v == 0 {
        return
       }
      }
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    从例子中可以看出,直到gorun2向chan1发送0时,gorun1退出,整个进程才推出,实例还使用了select,实现了无缓冲非阻塞接收。这里done实际上实现了wait-set的功能。

    chan数据接收时,可通过如下方式判断是否关闭,当无缓冲时,其阻塞,直到关闭时ok为false,v为nil

    v,ok := <-chan1 //经过测试无缓冲时,无数据写入时,其是阻塞的,直到close或者写入数据时,才会被触发
    
    • 1

    数据接收时,还可使用for range方式循环遍历接收,实例如下:

    package main
    
    import (
     "fmt"
     "time"
    )
    
    var done chan any
    
    func main() {
     done = make(chan any)
     chan1 := make(chan int)
     go gorun1(chan1)
     go gorun2(chan1)
     a, ok := <-done //等待gorun1退出
     fmt.Println(a, ok)
    }
    
    func gorun2(c chan int) {
     c <- 3
     time.Sleep(time.Duration(2) * time.Second)
     c <- 4
     time.Sleep(time.Duration(2) * time.Second)
     c <- 5
     time.Sleep(time.Duration(2) * time.Second)
     c <- 0
     time.Sleep(time.Duration(2) * time.Second)
    }
    
    //传递的时chan引用
    func gorun1(c chan int) {
     defer close(done)  //close chan时会触发向chan发送一个nil
     for v := range c { //循环接收,如果无数据则阻塞直到数据到来
      fmt.Println("gorun1 receive:", v)
      if v == 0 {
       return
      }
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    1.2.2.2 有缓冲chan数据传输

    为了满足并发数据处理的需求,chan可以声明为带有缓冲区的通道,此时只有通道未满,数据都可写入缓冲器,接收端只要缓冲区不为空都可以接收数据,但是chan不支持批量读写,每次只能写入一个数据,每次只能读取一个数据,当缓冲区满时,发送端阻塞;当缓冲区为空时,接收端阻塞;可以通过select方式来避免阻塞,实例如下:

    package main
    
    import (
     "fmt"
    )
    
    func main() {
     chan1 := make(chan int, 3) //缓冲大小为3
     chan1 <- 1 //放入第一个数据
     fmt.Println(1)
     chan1 <- 2 //放入第二个数据
     fmt.Println(2)
     chan1 <- 3 //放入第三个数据
     fmt.Println(3)
     fmt.Println(<-chan1) //读取数据
     fmt.Println(<-chan1) //读取数据
     fmt.Println(<-chan1) //读取数据
     fmt.Println(<-chan1) //读取数据
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    实例声明了缓冲区大小为3的chan,在写入数据时,连续写入3个并未阻塞,连续接收三个也未阻塞,第四次读取时由于无数据,阻塞

    有关阻塞/非阻塞/无缓冲/缓冲,可参照此篇文章:
    https://blog.csdn.net/aggie4628/article/details/124646319

    1.3 channel的应用场景

    1.3.1 通过channel实现信号量功能

    前面已经介绍了此种场景,通过无缓冲的chan,接收阻塞直到另一个并发go程处理完程,并发送close(chan)或者发送任意值后,触发当前现场继续执行,实例如下:

    package main
    
    import (
     "fmt"
     "time"
    )
    
    var done chan any
    
    func main() {
     done = make(chan any)
     go task1()
     <-done//等待task1执行完成
     fmt.Println("task2")
    }
    
    func task1() {
     fmt.Println("task1")
     time.Sleep(time.Duration(5) * time.Second)
     done <- nil //执行完成后发送一个nil触发done解除阻塞
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.3.2 通过channel实现广播通知功能

    可利用已关闭的channel读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知,例如定义一个exitp chan any来控制所有go程退出,实例如下:

    package main
    
    import (
     "fmt"
     "time"
    )
    
    var exitp chan any
    
    func main() {
     exitp = make(chan any)
     go task1()
     go task2()
     fmt.Println("task3")
     time.Sleep(time.Duration(3) * time.Second)
     close(exitp)
     time.Sleep(time.Duration(1) * time.Second)
    }
    
    func task1() {
     fmt.Println("task1")
     time.Sleep(time.Duration(1) * time.Second)
     <-exitp
     fmt.Println("exit task1")
    }
    
    func task2() {
     fmt.Println("task2")
     time.Sleep(time.Duration(1) * time.Second)
     <-exitp
     fmt.Println("exit task2")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    通过close(chan)的特性,只能通知一次,不能携带参数,如果需要携带参数通知,则需要建立一个总线channel的方式,分发到子chan中,例如

    package main
    
    import (
     "fmt"
     "time"
    )
    
    type aa struct {
     message chan any
     name    string
    }
    
    func (p *aa) Run() {
     defer close(p.message)
     for {
      select {
      case ss := <-p.message:
       fmt.Println(p.name, ss)
      case <-Exitp:
       fmt.Println("exit :", p.name)
       return
      }
     }
    }
    
    var Eventbus chan any
    var Exitp chan any
    var Reicevermap map[string]*aa
    
    func main() {
     Exitp = make(chan any) //控制go程退出
     Eventbus = make(chan any, 1)
     Reicevermap = make(map[string]*aa)
     a := &aa{make(chan any, 3), "aa"} //aa接收者
     go a.Run()                        //aa接收广播线程
     Reicevermap[a.name] = a
     b := &aa{make(chan any, 3), "bb"} //bb接收者
     go b.Run()                        //bb接收广播线程
     Reicevermap[b.name] = b
     fmt.Println("111")
     go broadcast()
     time.Sleep(time.Duration(1) * time.Second)
     Eventbus <- "第一个广播"
     time.Sleep(time.Duration(3) * time.Second)
     close(Exitp)
     time.Sleep(time.Duration(3) * time.Second)
    }
    
    func broadcast() {
     for event := range Eventbus {//不建议此种方式接收chan数据,建议用select
      for _, v := range Reicevermap {
       v.message <- event
      }
     }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    1.3.3 通过channel互斥量

    通过设置容量未1的chan来实现互斥的功能,这种用法较少,因为互斥量本来就可以用锁来实现,没必要滥用chan,这里就不做举例

    1.3.4 channel多写多读控制

    原则上尽量一个channel只有一个go程写,一个go程读取,这样最好控制,尽量避免多写多读的情况,此种情况如果无法避免,可以引入额外的chan来做分发,就如上述带参数的广播的例子,这里也不做举例

    1.4 channel使用注意事项

    • channel不用时最好close关闭,虽然说gc会回收,但是还是有序关闭为好
    • channel不能重复close否则会报恐慌错误
    • 涉及到大数据传输时,传输指针或者引用,来避免大内存拷贝
    • channel灵活使用,可让多进程并发通信变得很简单,但是也不能滥用
  • 相关阅读:
    [Qt]基础数据类型和信号槽
    C#:实现平方数序列算法(附完整源码)
    【微信小程序】解决分页this.setData数据量太大的限制问题
    java中集合框架的基础-1
    【计算机网络】网络层之IP协议
    uboot启动流程涉及reset汇编函数
    【python】用Pillow操作图片
    【Java】已解决java.nio.channels.FileLockInterruptionException异常
    去电脑维修店修电脑需要注意什么呢?装机之家晓龙
    5分钟带你了解什么是敏捷测试?难点显而易见!
  • 原文地址:https://blog.csdn.net/water1209/article/details/126638266