• go的singleFlight学习


    Package singleflight provides a duplicate function call suppression mechanism
    “golang.org/x/sync/singleflight”

    原来底层是 waitGroup,我还以为等待的协程主动让出 cpu 了,没想到 waitGroup.Wait() 阻塞了
    doCall 不但返回值是 func 的 val 和 error,而且 doCall 内部也给 chan 写入了一遍
    这样外部的同步 Do 和异步 DoChan 都能复用了

    当 Do->doCall 执行 fn 发生 panic 时:
    对首发请求,直接在 defer 中把 fn 中捕获的 panic 进行回放
    对非首发请求,c.wg.Wait() 结束之后,对 c.err 进行断言,判断是否是一个 panic 错误,如是则回放
    这样就保证了首发请求和非首发请求都发生了 panic

    一个协程对waitGroup进行Add(1)操作后,多个协程都可以监听它的读

    package singleflight
    
    import (
    	"bytes"
    	"errors"
    	"fmt"
    	"runtime"
    	"runtime/debug"
    	"sync"
    )
    
    // errGoexit indicates the runtime.Goexit was called in
    // the user given function.
    // 用户给定的函数中,调用了 runtime.Goexit
    var errGoexit = errors.New("runtime.Goexit was called")
    
    // A panicError is an arbitrary value recovered from a panic
    // with the stack trace during the execution of given function.
    // 执行给定函数期间,panicError 是一个从 panic 中收到的任意值
    // 带有栈追踪
    type panicError struct {
    	// value 中存储 error
    	value interface{}
    	stack []byte
    }
    
    // Error implements error interface.
    func (p *panicError) Error() string {
    	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
    }
    
    func (p *panicError) Unwrap() error {
    	err, ok := p.value.(error)
    	if !ok {
    		return nil
    	}
    
    	return err
    }
    
    func newPanicError(v interface{}) error {
    	stack := debug.Stack()
    
    	// The first line of the stack trace is of the form "goroutine N [status]:"
    	// but by the time the panic reaches Do the goroutine may no longer exist
    	// and its status will have changed. Trim out the misleading line.
    	// 去掉误导性的信息
    	// 栈帧第一行,是"goroutine N [status]:"的信息
    	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
    		stack = stack[line+1:]
    	}
    	return &panicError{value: v, stack: stack}
    }
    
    // call is an in-flight or completed singleflight.Do call
    type call struct {
    	wg sync.WaitGroup
    
    	// These fields are written once before the WaitGroup is done
    	// and are only read after the WaitGroup is done.
    	// WaitGroup Done 之前,这两个字段只会被写入一次
    	// WaitGroup Done 之后,才能读取
    	val interface{}
    	err error
    
    	// These fields are read and written with the singleflight
    	// mutex held before the WaitGroup is done, and are read but
    	// not written after the WaitGroup is done.
    	// WaitGroup Done 之前,singleflight mutex 持有它的期间,这些字段被读取和写入
    	// WaitGroup Done 之后,仅用于读取,不再被写入
    	dups  int
    	chans []chan<- Result
    }
    
    // Group represents a class of work and forms a namespace in
    // which units of work can be executed with duplicate suppression.
    // Group 代表一个 work 类,形成一个 namespace
    // 在该命名空间中(in which),可以通过重复抑制(duplicate suppression)来执行工作单元
    type Group struct {
    	mu sync.Mutex       // protects m
    	m  map[string]*call // lazily initialized
    }
    
    // Result holds the results of Do, so they can be passed
    // on a channel.
    type Result struct {
    	Val    interface{}
    	Err    error
    	Shared bool
    }
    
    // Do executes and returns the results of the given function, making
    // sure that only one execution is in-flight for a given key at a
    // time. If a duplicate comes in, the duplicate caller waits for the
    // original to complete and receives the same results.
    // The return value shared indicates whether v was given to multiple callers.
    // 
    // duplicate caller 会等待在 singleFlight 上,等待最开始的任务执行结束
    // 返回的值 shared ,指示是否要将结果共享给其他 caller
    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    	g.mu.Lock()
    	if g.m == nil {
    		g.m = make(map[string]*call)
    	}
    	if c, ok := g.m[key]; ok {
    		c.dups++
    		g.mu.Unlock()
    
    		// 等待 waitGroup Done
    		c.wg.Wait()
    
    		// 此时一定是 waitGroup Done 了
    	
    		// 发生了 panic ,不能吞掉panic错误
    		// 发生了 error 
    		if e, ok := c.err.(*panicError); ok {
    			panic(e)
    		} else if c.err == errGoexit {
    			// 优雅地退出 goroutine,防止对上游协程产生干扰
    			runtime.Goexit()
    		}
    		
    		// 返回最终结果
    		return c.val, c.err, true
    	}
    
    	// 第一次进来的时候,执行这里
    	c := new(call)
    	// waitGroup 计数从 0 -> 1
    	c.wg.Add(1)
    	g.m[key] = c
    	g.mu.Unlock()
    
    	g.doCall(c, key, fn)
    	return c.val, c.err, c.dups > 0
    }
    
    // doCall handles the single call for a key.
    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    	normalReturn := false
    	recovered := false
    
    	// use double-defer to distinguish panic from runtime.Goexit,
    	// more details see https://golang.org/cl/134395
    	// double-defer 以区分panic 和runtime.Goexit
    	defer func() {
    		// the given function invoked runtime.Goexit
    		// 把当前的堆栈给记录下来
    		// normalReturn=true,正常结束
    		// normalReturn=false && recovered == true,panic,需要外部还原panic的堆栈
    		// normalReturn=false && recovered == false,go协程主动退出,需要制造一个err
    		if !normalReturn && !recovered {
    			c.err = errGoexit
    		}
    
    		g.mu.Lock()
    		defer g.mu.Unlock()
    		c.wg.Done()
    		if g.m[key] == c {
    			delete(g.m, key)
    		}
    
    		if e, ok := c.err.(*panicError); ok {
    			// In order to prevent the waiting channels from being blocked forever,
    			// needs to ensure that this panic cannot be recovered.
    			if len(c.chans) > 0 {
    				// goroutine的崩溃不会影响主goroutine或其他goroutine。
    				go panic(e)
    				// 能让panic爆出来
    				select {} // Keep this goroutine around so that it will appear in the crash dump.
    			} else {
    				panic(e)
    			}
    		} else if c.err == errGoexit {
    			// Already in the process of goexit, no need to call again
    		} else {
    			// Normal return
    			for _, ch := range c.chans {
    				ch <- Result{c.val, c.err, c.dups > 0}
    			}
    		}
    	}()
    
    	func() {
    		defer func() {
    			if !normalReturn {
    				// Ideally, we would wait to take a stack trace until we've determined
    				// whether this is a panic or a runtime.Goexit.
    				// 理想情况下,会等到确定是否是 panic/runtime.Goexit 后才进行堆栈跟踪
    				//
    				// Unfortunately, the only way we can distinguish the two is to see
    				// whether the recover stopped the goroutine from terminating, and by
    				// the time we know that, the part of the stack trace relevant to the
    				// panic has been discarded.
    				// 不幸的是,我们区分两者的唯一方法是查看 recover 是否阻止了 goroutine 终止
    				// 而当我们知道这一点时,与 panic 相关的堆栈跟踪部分已被丢弃。
    				// 把 recover 拦住之后,返回一个 error ,然后在外部再进行放大,杀人于无形,让外部不知道singleFlight
    				if r := recover(); r != nil {
    					c.err = newPanicError(r)
    				}
    			}
    		}()
    
    		c.val, c.err = fn()
    		normalReturn = true
    	}()
    
    	if !normalReturn {
    		recovered = true
    	}
    }
    
    // Forget tells the singleflight to forget about a key.  Future calls
    // to Do for this key will call the function rather than waiting for
    // an earlier call to complete.
    // 如果不想等之前的 singleflight 返回,则在 map[string]*call 中删除之前的 key 
    func (g *Group) Forget(key string) {
    	g.mu.Lock()
    	delete(g.m, key)
    	g.mu.Unlock()
    }
    

    时序分析
    首发请求,先在Do中制造call,然后 c.wg.Add(1),然后将其放到map中

    c.wg.Add(1)
    g.m[key] = c
    

    首发结束时,在doCall的defer中,先 c.wg.Done(),然后将任务从map中移除:delete(g.m, key)

    c.wg.Done()
    if g.m[key] == c {
    	delete(g.m, key)
    }
    

    首发请求和首发结束都在锁的操作下执行。
    所以抢到锁的时候,要么是首发请求执行请求的开始,要么是首发请求执行请求的结束

    附录

    一石激起千层浪

    sync.WaitGroup 反向运用

    func TestDemo333(t *testing.T) {
    	var wg sync.WaitGroup
    	wg.Add(1)
    
    	for i := 1; i <= 3; i++ {
    		go func(taskID int) {
    			fmt.Println(i, "before wait")
    			wg.Wait()
    			fmt.Println(i, "wait finish")
    		}(i)
    	}
    
    	time.Sleep(4 * time.Second)
    	fmt.Println("main before send done")
    	wg.Done() // 在协程结束时,调用Done方法
    	fmt.Println("main after send done")
    	select {}
    }
    

    debug.Stack()

    属于 runtime/debug 包
    用于获取当前程序的堆栈跟踪(stack trace),通常用于调试和错误处理

    当调用 stack := debug.Stack() 时,实际上是在获取当前程序的堆栈信息,并将其存储在一个字符串类型的变量 stack 中。
    这个字符串包含了程序在调用 debug.Stack() 时的调用栈信息,包括函数名、文件名和行号等。

    package main
    
    import (
        "fmt"
        "runtime/debug"
    )
    
    func main() {
        stack := debug.Stack()
        fmt.Println(string(stack))
    }
    
    func functionA() {
        functionB()
    }
    
    func functionB() {
        stack := debug.Stack()
        fmt.Println(string(stack))
    }
    

    示例中定义两个函数 functionA 和 functionB。
    在 functionB 中,调用了 debug.Stack() 并打印了堆栈信息。
    当运行这个程序时,你会看到类似以下的输出:

    goroutine 1 [running]:
    main.functionB()
        /path/to/your/project/main.go:14 +0x8e
    main.functionA()
        /path/to/your/project/main.go:7 +0x56
    main.main()
        /path/to/your/project/main.go:22 +0x4a
    runtime.main()
        /usr/local/go/src/runtime/proc.go:225 +0x235
    runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1571 +0x1
    

    这个输出显示了程序在调用 debug.Stack() 时的堆栈跟踪信息。
    这对于调试程序和查找错误非常有用。

    runtime.Goexit()

    属于 runtime 包,用于退出当前的 goroutine。
    当调用 runtime.Goexit() 时,当前正在执行的 goroutine 会立即终止,但不会对其他 goroutine 产生影响。

    runtime.Goexit() 的使用场景通常包括:
    (1) 优雅地退出 goroutine:
    在某些情况下,可能需要在满足特定条件时退出 goroutine,而不是等待它自然完成。
    使用 runtime.Goexit() 可以实现这一点。
    (2) 避免 panic 引起的异常退出:
    如果 goroutine 中发生了 panic,它会向上传播并可能影响到其他 goroutine。
    在这种情况下,使用 runtime.Goexit() 可以优雅地退出当前 goroutine
    避免 panic 对其他 goroutine 的影响
    (3) 控制 goroutine 的生命周期:
    在某些复杂的并发场景中,可能需要手动控制 goroutine 的生命周期。
    通过在适当的时候调用 runtime.Goexit(),可以实现这一点。

    package main
    
    import (
        "fmt"
        "runtime"
        "time"
    )
    
    var someCondition = true
    
    func main() {
        go func() {
            for {
                fmt.Println("Running...")
                time.Sleep(1 * time.Second)
                if someCondition {
                    fmt.Println("Exiting...")
                    runtime.Goexit()
                }
            }
        }()
    
        time.Sleep(5 * time.Second)
        fmt.Println("Main function finished.")
    }
    

    在这个示例中,启动了一个 goroutine,它会每隔一秒钟打印 “Running…”。
    当 someCondition 为 true 时,goroutine 会打印 “Exiting…” 并调用 runtime.Goexit() 退出。
    主函数在等待 5 秒钟后结束。

    过度使用 runtime.Goexit() 可能会导致代码难以理解和维护。
    在大多数情况下,使用 channel 和其他同步机制来控制 goroutine 的生命周期是更好的选择。

    DoChan的学习

    // DoChan is like Do but returns a channel that will receive the
    // results when they are ready.
    //
    // The returned channel will not be closed.
    func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    	ch := make(chan Result, 1)
    	g.mu.Lock()
    	if g.m == nil {
    		g.m = make(map[string]*call)
    	}
    	if c, ok := g.m[key]; ok {
    		c.dups++
    		c.chans = append(c.chans, ch)
    		g.mu.Unlock()
    		return ch
    	}
    	c := &call{chans: []chan<- Result{ch}}
    	c.wg.Add(1)
    	g.m[key] = c
    	g.mu.Unlock()
    
    	go g.doCall(c, key, fn)
    
    	return ch
    }
    
  • 相关阅读:
    【手把手带你学JavaSE】全方面带你了解异常
    【英语基础篇】新概念一册lesson 3 - sorry,sir
    华为云交换数据空间 EDS:“可信、可控、可证”能力实现你的数据你做主
    数据库技术及应用期末作业
    椭圆的矩阵表示法
    Java版企业电子招标采购系统源码—企业战略布局下的采购寻源
    数据治理:如何利用元数据管理数据资产
    占位,稍后补上
    MySQL 事务隔离级别与锁机制详解
    canal集群部署及使用
  • 原文地址:https://blog.csdn.net/wangkai6666/article/details/140053694