• Go语言协程


    一: 协程的简单使用

    • 1: 在一个函数前面使用go关键字标识,则该函数就是协程。
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func testGoRoutine(routine_num int) {
    	for i := 1; i < 5; i++ {
    		fmt.Printf("routine_num is %d ; i is %d\n", routine_num, i)
    		time.Sleep(10 * time.Millisecond)
    	}
    }
    
    func main() {
    	go testGoRoutine(1)
    	go testGoRoutine(2)
    	time.Sleep(100 * time.Millisecond)
    }
    // routine_num is 1 ; i is 1
    //routine_num is 2 ; i is 1
    //routine_num is 2 ; i is 2
    //routine_num is 1 ; i is 2
    //routine_num is 1 ; i is 3
    //routine_num is 2 ; i is 3
    //routine_num is 2 ; i is 4
    //routine_num is 1 ; i is 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    二:信道

    • 作用: 一个可以让一个 goroutine 与另一个 goroutine 传输信息的通道。
    • 信道定义: pipline := make(chan 信道类型)
    • 信道关闭: close(pipline)
    • 发送数据到信道: pipline<- 数据
    • 信道中取数据: mydata := <-pipline
    • 关闭信道后,接收方仍然可以从信道中取到数据,只是接收到的会永远是 0。
    • 当从信道中读取数据时,可以有多个返回值,其中第二个可以表示 信道是否被关闭,如果已经被关闭,ok 为 false,若还没被关闭,ok 为true。
      • x, ok := <-pipline
    • 缓冲信道和无缓冲信道
      • 无缓冲信道
        • 特点:接收端必须先于发送端准备好
      • 缓冲信道

    2.1:信道的简单使用

    package main
    
    import "fmt"
    
    // 发送信息到信道中
    func send_data(pipline chan int, data int) {
    	pipline <- data
    }
    
    // 在信道中获取信息
    func get_data(pipline chan int) int {
    	return <-pipline
    }
    
    func main() {
    	// 定义一个信道
    	pipline := make(chan int)
    	go send_data(pipline, 200)
    	resData := get_data(pipline)
    	fmt.Println(resData)
    }
    // 200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.2: 单向信道

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    // Sender 定义只写信道类型
    type Sender = chan<- int
    
    // Receiver 定义只读信道类型
    type Receiver = <-chan int
    
    func main() {
    	var pipline = make(chan int)
    
    	go func() {
    		var sender Sender = pipline
    		fmt.Println("准备发送数据: 100")
    		sender <- 100
    	}()
    
    	go func() {
    		var receiver Receiver = pipline
    		num := <-receiver
    		fmt.Printf("接收到的数据是: %d", num)
    	}()
    	// 主函数sleep,使得上面两个goroutine有机会执行
    	time.Sleep(time.Second)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.3: 遍历信道

    • 遍历信道,要确保信道是处于关闭状态,否则循环会阻塞。
    package main
    
    import "fmt"
    
    func addDataToChan(myChan chan int) {
    	for i := 1; i <= 5; i++ {
    		myChan <- i
    	}
    	// 关闭信道
    	close(myChan)
    }
    
    func main() {
    	// 定义一个信道
    	myChan := make(chan int, 5)
    
    	// 信道中加入值, 然后关闭信道
    	addDataToChan(myChan)
    
    	// 遍历信道
    	for myChanData := range myChan {
    		fmt.Printf("myChan is %d\n", myChanData)
    	}
    
    }
    //myChan is 1
    //myChan is 2
    //myChan is 3
    //myChan is 4
    //myChan is 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2.4: 信道做锁

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func increment(ch chan bool, x *int) {
    	// 当前协程在信道存上值,其他的协程就一直阻塞中。
    	ch <- true
    	*x = *x + 1
    	<-ch
    }
    
    func main() {
    	// 注意要设置容量为 1 的缓冲信道
    	pipline := make(chan bool, 1)
    
    	var x int
    	for i := 0; i < 1000; i++ {
    		go increment(pipline, &x)
    	}
    	
    	time.Sleep(time.Second)
    	fmt.Println("x 的值:", x)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    三: select-case的用法

    • 使用范围: 仅支持通道使用
    • 作用: 在运行 select 时,会遍历所有(如果有机会的话)的 case 表达式,只要有一个信道有接收到数据,那么 select 就结束。
    • 特性: select中的case是随机执行的,而不是顺序执行的。
    • 注意: 尽量都要写default语句, 如果没写, 又没有命中case则会抛出异常。
    • 基本使用案例:
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    // 定义两个信道,开两个协程, 往信道中追加数据, 测试结果。
    func sendDataToChan(inputChan chan int, inputData int, wg *sync.WaitGroup) {
    	defer wg.Done()
    	inputChan <- inputData
    }
    
    func main() {
    	chan01 := make(chan int, 1)
    	chan02 := make(chan int, 1)
    
    	var wg sync.WaitGroup
    	wg.Add(2)
    
    	go sendDataToChan(chan01, 1, &wg)
    	go sendDataToChan(chan02, 2, &wg)
    
    	wg.Wait()
    
    	select {
    	case outData := <-chan01:
    		fmt.Printf("outData is %d\n", outData)
    	case outData := <-chan02:
    		fmt.Printf("outData is %d\n", outData)
    	default:
    		fmt.Printf("no data\n")
    	}
    
    }
    
    //outData is 2
    //outData is 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 设置信道超时时间
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func sendDataToChan(inputChan chan int, inputData int) {
    	// 增加延时
    	time.Sleep(20 * time.Millisecond)
    	inputChan <- inputData
    }
    
    func delayTime(timeChan chan bool, count int) {
    	// 设置最大延时时间
    	time.Sleep(time.Duration(count) * time.Millisecond)
    	// 通道追加真值
    	timeChan <- true
    }
    
    func main() {
    	chan01 := make(chan int, 1)
    	chan02 := make(chan int, 1)
    	timeChan := make(chan bool, 1)
    
    	go sendDataToChan(chan01, 1)
    	go sendDataToChan(chan02, 2)
    
    	// 设置
    	go delayTime(timeChan, 10)
    
    	select {
    	case outData := <-chan01:
    		fmt.Printf("outData is %d\n", outData)
    	case outData := <-chan02:
    		fmt.Printf("outData is %d\n", outData)
    	case <-timeChan:
    		fmt.Printf("time out!!!\n")
    	}
    }
    
    //time out!!!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 信道的关闭, 不影响select-case语句的判断
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func sendDataToChan(inputChan chan int, inputData int) {
    	inputChan <- inputData
    }
    
    func main() {
    	chan01 := make(chan int, 1)
    	chan02 := make(chan int, 1)
    
    	go sendDataToChan(chan01, 1)
    	go sendDataToChan(chan02, 2)
    
    	// 阻塞主函数, 保证执行完成
    	time.Sleep(time.Duration(30) * time.Millisecond)
    
    	// 关闭通道
    	close(chan01)
    	close(chan02)
    
    	select {
    	case outData := <-chan01:
    		fmt.Printf("outData is %d\n", outData)
    	case outData := <-chan02:
    		fmt.Printf("outData is %d\n", outData)
    	default:
    		fmt.Printf("default\n")
    	}
    }
    
    //outData is 1
    //outData is 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    四: Go协程池的实现

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    // Pool 定义一个协程池
    type Pool struct {
    	work chan func()   // 任务
    	sem  chan struct{} // 数量
    }
    
    // New 用于创建协程池对象
    func New(size int) *Pool {
    	return &Pool{
    		work: make(chan func()),
    		sem:  make(chan struct{}, size),
    	}
    }
    
    // worker 用于执行协程任务
    func (p *Pool) worker(task func()) {
    	// 如果某个协程发生了异常, 则此时协程池中就少了一个协程。因此数量上需要移除一个协程。
    	defer func() {
    		<-p.sem
    	}()
    	for {
    		task()
    		task = <-p.work
    	}
    }
    
    // NewTask 协程中增加任务
    func (p *Pool) NewTask(task func()) {
    	select {
    	case p.work <- task:
    	case p.sem <- struct{}{}:
    		go p.worker(task)
    	}
    }
    
    func task() {
    	time.Sleep(2 * time.Second)
    	fmt.Println(time.Now())
    }
    
    func main() {
    	pool := New(3)
    	for i := 1; i <= 6; i++ {
    		pool.NewTask(task)
    	}
    	// 保证所有的协程都执行完毕
    	time.Sleep(6 * time.Second)
    }
    
    // 分析:
    // 开启第一个协程
    // 无缓冲通道work不会走, 只会走sem通道, 开启一个协程A, 执行第一个协程。
    
    // 开启第二个协程
    // 无缓冲通道work不会走, 只会走sem通道, 开启一个协程B, 执行第二个协程。
    
    // 开启第三个协程
    // 无缓冲通道work不会走, 只会走sem通道, 开启一个协程C, 执行第三个协程。
    
    // 开启第四个协程
    // sem通道达到最大值了,两个通道都不会执行, 而是一直阻塞。
    // 当有协程执行完成,则会在无缓冲通道work中, 获取到该任务, 然后执行该任务。
    
    // 依次类推。。。。
    
    //2022-09-06 14:00:34.546411 +0800 CST m=+2.000235481
    //2022-09-06 14:00:34.546422 +0800 CST m=+2.000246776
    //2022-09-06 14:00:34.546444 +0800 CST m=+2.000268540
    //2022-09-06 14:00:36.547245 +0800 CST m=+4.001160624
    //2022-09-06 14:00:36.547282 +0800 CST m=+4.001197624
    //2022-09-06 14:00:36.54726 +0800 CST m=+4.001175658
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    五: Context上下文的使用

    • 作用: 控制协程退出
    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    )
    
    func monitor(ctx context.Context, number int) {
    	for {
    		select {
    		// 上下文关闭
    		case v := <-ctx.Done():
    			fmt.Printf("监控器%v,接收到通道值为:%v,监控结束。\n", number, v)
    			return
    		default:
    			fmt.Printf("监控器%v,正在监控中...\n", number)
    			time.Sleep(2 * time.Second)
    		}
    	}
    }
    
    func main() {
    	// 实例化上下文
    	ctx, cancel := context.WithCancel(context.Background())
    
    	// 开启多个协程
    	for i := 1; i <= 5; i++ {
    		go monitor(ctx, i)
    	}
    
    	time.Sleep(1 * time.Second)
    	// 关闭所有 goroutine
    	cancel()
    	time.Sleep(5 * time.Second)
    	fmt.Println("主程序退出!!")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    六: WaitGroup

    • 作用: 主函数, 等一堆协程结束后再执行。
    • 作用: 用于协程之间的流程控制。
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func worker(x int, wg *sync.WaitGroup) {
    	// 计数器 -1
    	defer wg.Done()
    	for i := 0; i < 5; i++ {
    		fmt.Printf("worker %d: %d\n", x, i)
    	}
    }
    
    func main() {
    	var wg sync.WaitGroup
    	// 计数器 +2
    	wg.Add(2)
    	go worker(1, &wg)
    	go worker(2, &wg)
    	//阻塞, 直到计数器归0
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    七:互斥锁与读写锁

    • 互斥锁:
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func add(count *int, wg *sync.WaitGroup, lock *sync.Mutex) {
    	lock.Lock()
    	*count = *count + 1
    	lock.Unlock()
    	wg.Done()
    }
    
    func main() {
    	var wg sync.WaitGroup
    	// 定义一个互斥锁
    	lock := &sync.Mutex{}
    	count := 0
    	wg.Add(10000)
    	// 开1000个协程
    	for i := 1; i <= 10000; i++ {
    		go add(&count, &wg, lock)
    	}
    	wg.Wait()
    	fmt.Println("count 的值为:", count)
    }
    
    //count 的值为: 10000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 读写锁
    • 读锁开启的条件: 所有协程的写锁必须关闭。
    • 写锁开启的条件: 所有协程的读锁全部关闭。
    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        lock := &sync.RWMutex{}
        lock.Lock()
    
        for i := 0; i < 4; i++ {
            go func(i int) {
                fmt.Printf("第 %d 个协程准备开始... \n", i)
                lock.RLock()
                fmt.Printf("第 %d 个协程获得读锁, sleep 1s 后,释放锁\n", i)
                time.Sleep(time.Second)
                lock.RUnlock()
            }(i)
        }
    
        time.Sleep(time.Second * 2)
    
        fmt.Println("准备释放写锁,读锁不再阻塞")
        // 写锁一释放,读锁就自由了
        lock.Unlock()
    
        // 由于会等到读锁全部释放,才能获得写锁
        // 因为这里一定会在上面 4 个协程全部完成才能往下走
        lock.Lock()
        fmt.Println("程序退出...")
        lock.Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  • 相关阅读:
    在MySQL中使用VARCHAR字段进行日期筛选
    sql编写踩坑总结-join篇
    《网络是怎样连接的》Charpter 1 笔记
    Git窗口打开vim后如何退出编辑(IDEA/Goland等编辑器)
    树叶识别系统python+Django网页界面+TensorFlow+算法模型+数据集+图像识别分类
    mysql双主互从
    深入探讨进程间通信的重要性:理解不同的通信机制(上)
    jvm双亲委派机制详解
    面试提问:为什么不建议在MySQL中使用 utf8?
    技术分享 | orchestrator--运维--配置集群自动切换&测试
  • 原文地址:https://blog.csdn.net/qq_41341757/article/details/126727745