• go-channel


    设计原理

    Go 提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

    • 共享内存方式:多个协程共享同一块内存,但是多个协程中读写变量是操作同一块内存,会产生多线程问题的并发问题,所以需要使用互斥锁来实现临界区的互斥访问,会大大影响效率
    • 通信方式(go语言使用):channel通道当做通信的中间件队列,发送方 向channel

    先入先出

    channel收/发操作都遵循了先进先出的设计,它一共使用了3个队列来实现:

    • 发操作:先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;(使用写队列hchan.sendq)
      • 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满会重新陷入休眠;
    • 读操作:先从 Channel 读取数据的 Goroutine 会先接收到数据;(使用读队列hchan.recvq)
      • 使用读队列:发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到会重新陷入休眠;

    无锁channel(结构体内还是有锁,好像暂未实现)

    并发控制可由2种方式实现:

    • 乐观锁:CAS(compare and swap)就是一种乐观锁,默认没有其他线程在修改,当本线程保存数据到内存时判断数据和修改前的原数据是否相同。
    • 悲观锁:redis setnx就是一种悲观锁,默认有其他线程在修改,所以在其他线程拿数据前就阻塞,等待锁释放才能继续操作

    乐观锁并没有锁这个变量,而是对原数据进行比较,所以乐观锁只是一种思想无锁channel是使用了乐观锁思想实现的。

    数据结构

    runtime.hchan结构体

    type hchan struct {
    	qcount   uint
    	dataqsiz uint
    	buf      unsafe.Pointer
    	elemsize uint16
    	closed   uint32
    	elemtype *_type
    	sendx    uint
    	recvx    uint
    	recvq    waitq
    	sendq    waitq
    
    	lock mutex
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • qcount:channel里的元素个数
    • dataqsiz:Channel 中的循环队列的容量
    • buf: Channel 的缓冲区数据指针
    • elemsize:元素占内存的大小
    • closed:channel的关闭状态
    • elemtype:元素的类型元数据
    • sendx: Channel 的发送操作处理到的位置
    • recvx:Channel 的接收操作处理到的位置;
    • recvq:接受队列(读队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
    • sendq:发送队列(写队列),当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表
    • lock:操作通道的锁,同一个时刻只有一个协程可以操作这个chan

    队列中存的结构是runtime.sudog

    type sudog struct {
    	// The following fields are protected by the hchan.lock of the
    	// channel this sudog is blocking on. shrinkstack depends on
    	// this for sudogs involved in channel ops.
    
    	g *g
    
    	next *sudog
    	prev *sudog
    	elem unsafe.Pointer // data element (may point to stack)
    
    	// The following fields are never accessed concurrently.
    	// For channels, waitlink is only accessed by g.
    	// For semaphores, all fields (including the ones above)
    	// are only accessed when holding a semaRoot lock.
    
    	acquiretime int64
    	releasetime int64
    	ticket      uint32
    
    	// isSelect indicates g is participating in a select, so
    	// g.selectDone must be CAS'd to win the wake-up race.
    	isSelect bool
    
    	// success indicates whether communication over channel c
    	// succeeded. It is true if the goroutine was awoken because a
    	// value was delivered over channel c, and false if awoken
    	// because c was closed.
    	success bool
    
    	parent   *sudog // semaRoot binary tree
    	waitlink *sudog // g.waiting list or semaRoot
    	waittail *sudog // semaRoot
    	c        *hchan // channel
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • g:等待channel的goroutine指针
    • channel:等待的哪个channel
    • elem:等待发送/接收的缓冲区地址下标

    channel类型

    有缓冲区channel

    hchan.buff指向一个数组地址,能存放数据,尽量避免了所有协程有阻塞

    • 写操作:
      • 如果缓冲区内有空间hchan.qcount,将数据放入缓冲区,hchan.sendx指向下一个数组下标,唤醒读队列hchan.recvq头部的协程。当前协程不阻塞,继续向下执行代码。
      • 如果缓冲区内没有空间hchan.qcount>=hchan.dataqsiz,当前goroutine阻塞(被挂起_GWating),新创建一个sudogsudog.g指向当前goroutine,sudog变量塞进写队列hchan.sendq
    • 读操作:和写操作差不多,只是操作hchan.recvqhchan.recvx

    无缓冲区channel

    hchan.buff是个nil值,没有数据存储的区域,肯定会出现阻塞现象

    • 写操作:去hchan.recvq队列中去获取一个正阻塞的协程sudog结构变量
      • 如果hchan.recvq有数据,则根据sudog.g变量去唤醒协程,并向这个协程发送数据
      • 如果hchan.recvq有数据,则创建一个sudog结构体变量,sudog.g变量指向当前协程,放到hchan.sendq队列,当前goroutine阻塞(被挂起_GWating)
    • 读操作:和写操作差不多,只是操作hchan.recvq

    唤醒阻塞协程对channel做操作,都是由当前协程通知g0协程做调度

    多路select

    问题:为什么多个case被阻塞,说明当前g被加到了多个hchan.recvq或者hchan.sendq中,为什么只会执行一个case。

    func SendBlock2() {
    	c1 := make(chan int)
        c2 := make(chan int)
        // go不能放select后,因为执行顺序的问题,如果放后面在select就挂起了协程,导致没有创建这个协程,也就不可能唤醒当前协程,从而导致死锁
    	go func() {
    		time.Sleep(3 * time.Second)
    		a := <-c1
    		fmt.Println(a)
    	}()
        
    	select {
    	case c1 <- 2:
    		fmt.Println("case1")
    	case c2 <- 3:
    		fmt.Println("case2")
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    上例的现象是:打印了case1或者case2,并不会两个都打印。

    执行步骤:

    1. 执行case1时给c1加锁,执行case2时给c2加锁
    2. select乱序轮询
    3. g被加到c1和c2的 hchan.sendq中,c1和c2解锁允许其他协程操作这个channel,g被挂起等待
    4. 子协程命中唤醒主协程,命中case1,执行case1操作
    5. 再次对所有case的channel加锁(原因是下一步)
    6. 去c1,c2的recvqsendq遍历删除绑定了当前协程的sudog,因为删除了队列中的等待g,所以g不会被重新唤醒,case2就再也不命中。
    7. c1,c2再次解锁
    8. select结束

    使用语法

    创建channel

    // 方法1,没有分配地址,无法读写chan
    var c chan Type
    
    // 方法2,分配了地址,设置了size就是有缓冲channel,反之是无缓冲地址
    c := make(chan Type [, size])
    
    • 1
    • 2
    • 3
    • 4
    • 5

    写channel

    c := make(chan Type [, size])
    
    // 方法1
    c <- val
    
    // 方法2
    select {
        case c<-2:
        // 下一个
        case c<-2:
        //业务逻辑
        default:
        //可以避免阻塞
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    读channel

    c := make(chan Type [, size])
    
    // 方法1
    t := <-c
    
    // 方法2
    t,ok := <-c 	// ok==false表明,chan被关闭
    
    // 方法3
    select {
        case <-c:
        //业务逻辑
        default:
        //可以避免阻塞
    }
    
    // 方法4
    for element := range c {
        fmt.Println("chan element:", element)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    关闭channel

    close(c)
    
    • 1

    具体案例

    有缓冲区channel

    func SendBlock1() {
        // 创建缓冲区容量是3的通道
    	c := make(chan int, 3)
    	defer close(c)
        // 创建4个协程往通道里写,会有一个协程阻塞等待
    	for i := 0; i < 4; i++ {
    		
    		go func(i int) {
    			c <- i
    			fmt.Printf("i=%d成功插入chan\n", i)
    		}(i)	// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
    	}
    	time.Sleep(3 * time.Second)
    
    	//打印,2协程阻塞等待
    	//i=3成功插入chan
    	//i=0成功插入chan
    	//i=1成功插入chan
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    无缓冲区channel

    func SendBlock() {
    	c := make(chan int)
    	defer close(c)
    	for i := 0; i < 4; i++ {
    		// 如果i不使用传参方式,而是使用闭包函数,那么就会发生数据逃逸,i会被存到堆中,栈帧上的i变成指针指向堆,导致协程里的i不一定打印0,1,2,3
    		go func(i int) {
    			c <- i
    			fmt.Printf("i=%d成功插入chan\n", i)
    		}(i)
    	}
    	time.Sleep(3 * time.Second)
    
    	//没有任何打印,因为hchan.recvq没有协程可以唤醒
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    使用注意

    • 对一个关闭的channel发送值 panic
    • 对一个关闭的channel接收值,会一直读取成功,直到管道内数据为空
    • 对一个关闭的并且没有值的管道执行接收操作,会得到对应类型的空值
    • 关闭一个已关闭的通道会导致panic
    • 关闭一个chan,会向所有正在监听这个chan的协程都发送一个空元素(元素类型取决于你的chan类型)

    死锁:

    func f1(channel chan int) {
        time.Sleep(6 * time.Second)
    	channel <- 20
    	//close(channel)
    }
    
    func main() {
    	channel := make(chan int)
    
        //例1 不会死锁,因为读写都只进行了一次之后就结算了
    	go func() {
            time.Sleep(6 * time.Second)
    		channel <- 20
    	}()
    	fmt.Println(<-channel)	// 主协程会阻塞等待管道进入数据
        
        //例2 不会死锁,因为读写都只进行了一次之后就结算了
    	go f1(channel)
        fmt.Println(<-channel)
    
        //例3 不会死锁,因为读写都只进行了一次之后就结算了
    	go func(channel chan int) {
    		channel <- 20
    	}(channel)
    	fmt.Println(<-channel)
        
        //例4 会死锁,主协程会一直等待子进程写入,无法退出,此时需要在子协程加入close(channel),表明自己不会在对协程做操作了
        go func(channel chan int) {
    		channel <- 20
            // close(channel) 加上则不会死锁
    	}(channel)
        for element := range channel {
    		fmt.Println(element)
    	}
        
        
        //例子5,结果是等待3秒后,其中一个消费协程会被死锁,因为他一直在等待channel的数据进入
        channel := make(chan int, 3)
    
    	wg := sync.WaitGroup{}
    	wg.Add(3)
    	go func() {
    		defer wg.Done()
    		//fmt.Println("子协程1")
    		fmt.Println("子协程1抢到的" + strconv.Itoa(<-channel))
    	}()
    	go func() {
    		defer wg.Done()
    		//fmt.Println("子协程2")
    		fmt.Println("子协程2抢到的" + strconv.Itoa(<-channel))
    
    	}()
    	go func() {
    		defer wg.Done()
    		channel <- 20
    		for i := 0; i < 3; i++ {
    			time.Sleep(time.Second)
    		}
    	}()
    
    	//channel <- 21
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
  • 相关阅读:
    酷开科技逐步为用户构建健全的智慧家庭生活场景
    redis内存淘汰策略
    基础算法之双指针
    护肤品微商怎么找人脉资源
    app运营:提高用户活跃度的技巧
    服务器搭建:从零开始创建自己的Spring Boot应用【含登录、注册功能】
    C++工资管理系统
    Java 反射机制到底是什么?
    去中心化应用程序(DApp),引领数字世界的未来
    JMeter笔记5 |Badboy使用和录制
  • 原文地址:https://blog.csdn.net/qiu18610714529/article/details/132903407