• Golang并发编程——goroutine、channel、sync


    并发与并行#

    并发和并行是有区别的,并发不等于并行。

    并发#

    两个或多个事件在同一时间不同时间间隔发生。对应在Go中,就是指多个 goroutine 在单个CPU上的交替运行。

    并行#

    两个或者多个事件在同一时刻发生。对应在Go中,就是指多个 goroutine 在多个CPU上同时运行。

    goroutine#

    介绍#

    goroutine 是 Go 中一种轻量级线程。也称为用户态线程。由 Go 的 runtime 进行管理。Go 的程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。

    在程序中,我们只要使用 go 关键字,就可以轻易开启一个 goroutine

    建议#

    在使用 goroutine 时,以下两个建议可以有效避免 goroutine 泄露。

    1. 调用者清楚 goroutine 什么时候结束
    2. 调用者可以控制 goroutine 的生命周期

    来看一个泄露的例子

    func leak() {
            ch := make(chan int)
    	go func() {
    		<-ch//leak 函数阻塞在接受 ch 
    		fmt.Println("receive a value")
    	}()
    }
    func main() {
    	leak(ch)//函数返回,
    }
    
    

    这个channel将无法被关闭,leak 函数里开启的 goroutine 也永远无法返回,当然,这个例子中 leak 函数返回了,main 函数结束,leak 函数里开启的 goroutine 也就返回了。

    1.调用者不清楚什么时候结束,也无法控制 goroutine 的生命周期。只能被动等待 channel 接受信号,然后执行函数逻辑,如你所见,造成的后果便是容易产生 goroutine 泄露。

    来看下面一个例子

    type Worker struct {
    	wg sync.WaitGroup
    }
    
    func (w *Worker) Do() {
    	w.wg.Add(1)
    
    	go func() {
    		defer w.wg.Done()
    		//do someting
    		time.Sleep(800 * time.Millisecond)
    		fmt.Println("finish")
    	}()
    
    }
    
    func (w *Worker) Shutdown(ctx context.Context) error {
    	ch := make(chan struct{})
    	go func() {
    		w.wg.Wait()
    		close(ch)
    	}()
    
    	select {
    	case <-ch:
    		return nil
    	case <-ctx.Done():
    		// time out
    		// close(ch)
    		return errors.New("time out")
    	}
    }
    
    func main() {
    	worker := &Worker{
    		wg: sync.WaitGroup{},
    	}
    	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
    	defer cancel()
    	worker.Do()
    	if err := worker.Shutdown(ctx); err != nil {
    		fmt.Println(err)
    	}
    }
    
    
    折叠

    有一个 worker 对象,这个对象会做一些耗时操作。我们在 Do() 方法中使用 goroutine 来处理具体逻辑,在开启goroutine 之前调用 wg.Add(1), 然后在 goroutine 的 defer 函数中 调用 wg.Done(),在 Shutdown() 方法中使用 wg.Wait() 来等待 Do() 方法执行结束。在 Shutdown() 方法中,如果 goroutine 执行结束了,就会往 ch channel 中发送消息,底下 select {} 中收到 ch channel 消息后,Shutdown 方法就可以正常返回,函数到此执行结束。如果 Do() 方法执行太长超出了 ctx 的最长时间。Shutdown 会返回 "time out" 异常。返回之前可以进行资源的处理。

    在这个例子中调用者可以通过控制上下文控制来控制 Worker 对象的生命周期。

    sync.Mutex、_s_ync.RWMutex#

    Go 的 sync 包提供了 mutex、RwMutex,分别是互斥锁与读写锁。

    在需要共享内存的地方,如果有多个对象同时对这个地方进行读写操作,就会产生竞态条件。我们需要使用程序语言提供的同步原语对读写操作进行保护。互斥锁就是同一时刻一段代码只能被一个线程/协程运行。Mutex 在大量并发的情况下,会造成锁等待,对性能的影响比较大。在读多写少的场景下可以使用读写锁。读写锁主要遵循以下原则:

    1. 读写锁的读锁可以重入,在已经有读锁的情况下,可以继续加读锁。
    2. 在读锁没有全部解锁时,写操作会阻塞直到所有读锁解锁。
    3. 在写锁没有解锁时,其他协程的读写操作都会被阻塞,直到写锁解锁。

    下面是一个互斥锁简单示例。在需要访问共享资源的地方使用 Lock 和 Unlock 方法。表示这部分操作属于“原子操作”。使用时需要注意锁粒度。我们要尽可能的减小锁粒度。锁粒度小了,锁竞争就少。对程序的性能影响就小。

    var l sync.Mutex
    var a string
    
    func f() {
    	a = "hello, world"
    	l.Unlock()
    }
    
    func main() {
    	l.Lock()
    	go f()
    	l.Lock()
    	print(a)
    }
    
    

    sync/atomic#

    sync/atomic 提供了用于实现同步算法的底层原子内存原语

    copy-on-write 思路在微服务降级或者 local cache 经常使用。我们可以使用 atomic 来实现。atmic 依赖于原子 CPU 指令而不是依赖外部锁,性能不俗。

    type NumberArray struct {
    	array []int
    }
    
    func main() {
    	var atomic atomic.Value
    
    	go func() {
    		var i int
    		for {
    			i++
    			numArray := &NumberArray{
    				array: []int{i, i + 1, i + 2, i + 3},
    			}
    			atomic.Store(numArray)
    			time.Sleep(100 * time.Millisecond)
    		}
    	}()
    
    	time.Sleep(500 * time.Millisecond) //先让数据更新
    
    	var wg sync.WaitGroup
    	for n := 0; n < 100000; n++ {
    		wg.Add(1)
    		time.Sleep(100 * time.Millisecond)
    		go func() {
    			numArray := atomic.Load()
    			fmt.Println(numArray)
    			wg.Done()
    		}()
    	}
    	wg.Wait()
    }
    
    
    折叠

    errgroup#

    errgroup 为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

    https://github.com/go-kratos/kratos/blob/main/app.go
    
    func (a *App) Run() error {
    	instance, err := a.buildInstance()
    	if err != nil {
    		return err
    	}
    	eg, ctx := errgroup.WithContext(NewContext(a.ctx, a))
    	wg := sync.WaitGroup{}
    	for _, srv := range a.opts.servers {
    		srv := srv
    		eg.Go(func() error {
    			<-ctx.Done() // wait for stop signal
    			stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
    			defer cancel()
    			return srv.Stop(stopCtx)
    		})
    		wg.Add(1)
    		eg.Go(func() error {
    			wg.Done()
    			return srv.Start(NewContext(a.opts.ctx, a))
    		})
    	}
    	wg.Wait()
    	if a.opts.registrar != nil {
    		rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
    		defer rcancel()
    		if err := a.opts.registrar.Register(rctx, instance); err != nil {
    			return err
    		}
    		a.lk.Lock()
    		a.instance = instance
    		a.lk.Unlock()
    	}
    	c := make(chan os.Signal, 1)
    	signal.Notify(c, a.opts.sigs...)
    	eg.Go(func() error {
    		for {
    			select {
    			case <-ctx.Done():
    				return ctx.Err()
    			case <-c:
    				if err := a.Stop(); err != nil {
    					a.opts.logger.Errorf("failed to stop app: %v", err)
    					return err
    				}
    			}
    		}
    	})
    	if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
    		return err
    	}
    	return nil
    }
    
    
    折叠

    channels#

    channel 是 Go 语言中一种类型安全的消息队列,充当两个 goroutine 之间的通道,通过它可以进行任意资源的的交换。同时通过 channel 实现 Go 的同步机制。

    无缓冲通道#

    当创建的 channel 没有缓冲时,称为无缓冲通道。无缓冲管道必须读写同时操作才会有效果,如果只进行读或者只进行写那么会被阻塞,等待另外一方的操作。

    缓冲通道#

    创建的 channel 具有缓冲时,称为缓冲通道。缓冲通道是固定容量的先进先出(FIFO)队列。容量在队列创建的时候就已经固定,运行是无法更改。消费者从队列中取出元素并处理它们。如果队列为空并且消费者无事可做,就会发生阻塞,直到生产者放入一个元素。如果队列已满,并且消费者未开始消费,则会发生阻塞,知道消费者消费一个元素。

    不论是无缓冲通道还是缓冲通道,都不能往一个已关闭的 channel 发送消息,否则程序会直接 panic ,因此,最好是由发送端进行关闭 channel。

    func main() {
    	ch := make(chan int)
    	close(ch)
    	fmt.Println(<-ch)//0
    	//close(ch)  //panic: close of closed channel
    	//ch <- 2  //panic: send on closed channel
    
    	chs := make(chan int, 2)
    	chs <- 1
    	chs <- 3
    	close(chs)
    	fmt.Println(<-chs)
    	fmt.Println(<-chs)
    	fmt.Println(<-chs)//0
    	// chs <- 2 //panic: send on closed channel
    }
    
    

    关于channel 还可以查看这篇文章 polarisxu:无缓冲和有缓冲通道

  • 相关阅读:
    微服务之流量控制
    前端开发:CSS阴影效果属性box-shadow详解
    iOS中runloop介绍
    【Vue】el 和 data短小精湛的细节!
    C++之虚函数与虚函数表
    工信部教考中心:什么是《研发效能(DevOps)工程师》认证,拿到证书之后有什么作用!(上篇)丨IDCF
    实战篇:单库单表变更成多库多表
    [java] 23种设计模式之代理模式
    【毕业设计】深度学习人体语义分割在弹幕防遮挡上的实现 - python
    如何用JavaScript完美地区分双击和单击事件
  • 原文地址:https://www.cnblogs.com/arvinhuang/p/16437905.html