• goroutine摘要


    使用goroutine的原则

    • Keep yourself busy or do the work yourself
      如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它( go func() )更简单。这通常消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。
      就是说,能前台做的操作,不要放到后台的goroutine中,能简化很多操作。
    • Leave concurrency to the caller
      尽量让调用者来决定要不要并发, 比如如果一个函数内部启动了goroutine,则容易导致goroutine泄露(比如写入chan却外部无人接收、chan满写不进去等)
      例如下面的函数, 使用chan很容易写出这种函数。在函数内容另开了goroutine来填充chan,外部来消耗chan中数据。但有两个问题:1. 当目录遍历完或者中间出错时,需要关闭chan,此时关闭操作就具备了两个含义 2. 函数内的goroutine依靠外部的读取chan,如果外部只想读取到某个目录就结束,则内部goroutine写阻塞,因此无论如何外部都必须讲chan中数据消耗完。
      一个更好的方法,是函数本身接收一个回调函数,是告知调用方显示停止其内部goroutine的方法。
    func ListDirectory(dir string) ([]string, error)
    
    func ListDirectory(dir string) chan string
    
    func ListDirectory(dir string, fn func(string))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • Never start a goroutine without knowning when it will stop
      永远不要开启一个不知道何时结束的goroutine,比如下面这个例子,也是在leak中内部启动了goroutine
    func leak() {
    	ch := make(chan int)
    	go func() {
    		val := <- ch
    		fmt.Println("We received a value:", val)
    	}()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    因此启动goroutine必须想清楚两个问题:

    1. When will it terminate(它何时结束?)
    2. What could prevent it from terminating(如何使它结束?)

    下面的两个例子,都是体现出这两个问题的方案。

    两个http服务同时运行

    有两个http后台服务,要求一个结束时,另一个也要结束,采用如下方法:

    package main
    
    import (
    	"context"
    	"fmt"
    	"net/http"
    )
    
    func Serve(addr string, handler http.Handler, stop <-chan struct{}) error {
    	server := http.Server{
    		Addr: addr,
    		Handler: handler,
    	}
    	go func() {
    		<- stop
    		server.Shutdown(context.Background())
    	}()
    	return server.ListenAndServe()
    }
    
    type MyStruct struct {
    
    }
    
    func (m *MyStruct) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
    	path := request.URL.Path
    	writer.Write([]byte(fmt.Sprintf("Welcome %s", path)))
    }
    
    func main()  {
    	stop := make(chan struct{}) // 是否要分配容量?
    	done := make(chan error, 2)   // 是否要分配容量?
    	handler := new(MyStruct)
    
    	// 开启两个服务
    	go func() {
    		done <- Serve(":8012", handler, stop)
    	}()
    
    	go func() {
    		done <- Serve(":9000", handler, stop)
    	}()
    
    	var stopped bool
    	for i := 0; i < cap(done); i++ {
    		if err := <- done; err != nil {
    			fmt.Printf("error: %v\n", err)
    		}
    		if !stopped {
    			stopped = true
    			close(stop) // close只做一次
    		}
    	}
    
    	fmt.Println("main goroutine end")
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    埋点服务

    一个埋点服务,假设每个http请求过来时,都需要做埋点的Event上报处理。

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    )
    
    type Tracker struct {
    	ch   chan string
    	stop chan struct{}
    }
    
    func NewTracker() *Tracker {
    	return &Tracker{
    		ch: make(chan string, 10), // 容量10, 作为一个buffer使用
    	}
    }
    
    // Event 把原来真正的任务操作,简化为放入buf中,而任务操作由后台固定数量线程执行
    func (t *Tracker) Event(ctx context.Context, data string) error {
    	select {
    	case t.ch <- data:
    		return nil
    	case <-ctx.Done():
    		return ctx.Err()
    	}
    }
    
    // Run 后台线程 从buf中取数据进行执行,使用for的写法,则只允许启动一个goroutine执行Run函数
    func (t *Tracker) Run() {
    	for data := range t.ch {
    		// 这里可以写成多个goroutine
    		time.Sleep(10 * time.Second)
    		fmt.Println(data)
    	}
    	t.stop <- struct{}{}
    }
    
    func (t *Tracker) Shutdown(ctx context.Context) error {
    	close(t.ch)  // 关闭ch,则不能再往里写
    	select {
    	case <-t.stop:
    		return nil
    	case <-ctx.Done():
    		return ctx.Err()
    	}
    }
    
    // 一个埋点服务
    func main() {
    	tr := NewTracker()
    	go tr.Run()    		// 后台线程不断消耗buf数据
    	_ = tr.Event(context.Background(), "test1") 	// 往chan中写
    	_ = tr.Event(context.Background(), "test2")
    	_ = tr.Event(context.Background(), "test3")
    	time.Sleep(3 * time.Second)
    
    	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    	defer cancel()
    	tr.Shutdown(ctx)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
  • 相关阅读:
    Pluma 插件管理框架
    神经网络一词在英文中是,网络词神经质什么意思
    Spring-Boot-Starter 学习笔记(1)
    1412. 查找成绩处于中游的学生
    01-图数据库 Nebula Graph 简介
    JS实现简易观察者模式
    eBPF学习笔记(一)—— eBPF介绍&内核编译
    关于跑腿小哥DMA的介绍
    使用C++的CCF-CSP满分解决方案 202012-2 期末预测之最佳阈值
    四、RocketMq本地集群搭建
  • 原文地址:https://blog.csdn.net/Zerore/article/details/125596092