• cloudenative2-1-go进阶


    云原生训练营
    模块二:Go 语言进阶
    ----------------------------
    目录
    -线程枷锁
    -线程调度
    -内存管理
    -包引用与依赖管理
    ----------------------------
    1.线程加锁
    ----------------------------
    理解线程安全
    线程1 CPU1 缓存key=value2
    线程2 CPU2 缓存key=value1
    内存 key=value1
    每个CPU都存在L1 L2缓存,有缓存就有可见性的问题;
    一个线程改了变量,另外一个线程还是读的原来的值;
    如何解决?就是加锁:
    ----------------------------

    -Go 语言不仅仅提供基于 CSP 的通讯模型,也支持基于共享内存的多线程数据访问
    -Sync 包提供了锁的基本原语
    -sync.Mutex 互斥锁
     -Lock()加锁,Unlock 解锁
    -sync.RWMutex 读写分离锁
     -不限制并发读,只限制并发写和并发读写
    -sync.WaitGroup
     - 等待一组 goroutine 返回
    -sync.Once
     - 保证某段代码只执行一次
    -sync.Cond
     - 让一组 goroutine 在满足特定条件时被唤醒
    例子:module2/syncmap/main.go
        conflictMap := map[int]int{}
        for i := 0; i < 100; i++ {
            go func() {
                conflictMap[1] = i
            }()
        }
    会出现并发写的错误,我的vscode确实没有出现。在命令行运行一下。
    $ ./main
    fatal error: concurrent map writes
    fatal error: concurrent map writes
    如果解决,增加了一个互斥锁
    type SafeMap struct {
        safeMap map[int]int
        sync.Mutex
    }
    func (s *SafeMap) Write(k, v int) {
        s.Lock()
        defer s.Unlock()
        s.safeMap[k] = v
    }
    func (s *SafeMap) Read(k int) (int, bool) {
        s.Lock()
        defer s.Unlock()
        result, ok := s.safeMap[k]
        return result, ok
    }
    重新运行程序就不出错了。
    例子2:读写锁 module2/mutex/main.go
    func rLock() {
        lock := sync.RWMutex{}
        for i := 0; i < 3; i++ {
            lock.RLock()
            defer lock.RUnlock()
            fmt.Println("rLock:", i)
        }
    }
    func wLock() {
        lock := sync.RWMutex{}
        for i := 0; i < 3; i++ {
            lock.Lock()
            defer lock.Unlock()
            fmt.Println("wLock:", i)
        }
    }
    ./main
    lock: 0
    rLock: 0
    rLock: 1
    wLock: 0
    rLock: 2
    写锁被阻塞了
    ----------------------------
    Mutex示例
    Kubernetes 中的 informer factory
    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
        f.lock.Lock()
        defer f.lock.Unlock()
        for informerType, informer := range f.informers {
            if !f.startedInformers[informerType] {
                go informer.Run(stopCh)
                f.startedInformers[informerType] = true
            }
        }
    }
    ----------------------------
    WaitGroup示例
    // CreateBatch create a batch of pods. All pods are created before
    waiting.
    func (c *PodClient) CreateBatch(pods []*v1.Pod) []*v1.Pod {
        ps := make([]*v1.Pod, len(pods))
        var wg sync.WaitGroup
        for i, pod := range pods {
            wg.Add(1)
            go func(i int, pod *v1.Pod) {
                defer wg.Done()
                defer GinkgoRecover()
                ps[i] = c.CreateSync(pod)
            }(i, pod)
        }
        wg.Wait()
        return ps
    }
    例子:module2/waitgroup/main.go
    func waitByWG() {
        wg := sync.WaitGroup{}
        wg.Add(100)
        for i := 0; i < 100; i++ {
            go func(i int) {
                fmt.Println(i)
                wg.Done()
            }(i)
        }
        wg.Wait()
    }
    ----------------------------
    Cond示例
    Kubernetes 中的队列,标准的生产者消费者模式
    cond: sync.NewCond(&sync.Mutex{}),

    // Add marks item as needing processing.
    func (q *Type) Add(item interface{}) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        if q.shuttingDown {
            return
        }
        if q.dirty.has(item) {
            return
        }
        q.metrics.add(item)
        q.dirty.insert(item)
        if q.processing.has(item) {
            return
        }
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
    ----------------------------
    Cond 示例
    // Get blocks until it can return an item to be processed. If shutdown = true,
    // the caller should end their goroutine. You must call Done with item when you
    // have finished processing it.
    func (q *Type) Get() (item interface{}, shutdown bool) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        for len(q.queue) == 0 && !q.shuttingDown {
            q.cond.Wait()
        }
        if len(q.queue) == 0 {
            // We must be shutting down.
            return nil, true
        }
        item, q.queue = q.queue[0], q.queue[1:]
        q.metrics.get(item)
        q.processing.insert(item)
        q.dirty.delete(item)
        return item, false
    }
    例子:module2/condition/main.go
    type Queue struct {
        queue []string
        cond  *sync.Cond
    }
        q := Queue{
            queue: []string{},
            cond:  sync.NewCond(&sync.Mutex{}),
        }
    func (q *Queue) Enqueue(item string) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        q.queue = append(q.queue, item)
        fmt.Printf("putting %s to queue, notify all\n", item)
        q.cond.Broadcast()
    }

    func (q *Queue) Dequeue() string {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        for len(q.queue) == 0 {
            fmt.Println("no data, wait")
            q.cond.Wait()
        }
        result := q.queue[0]
        q.queue = q.queue[1:]
        return result
    }
    ----------------------------
    单例模式
    多个线程同时执行,最终只被执行一次
        var once sync.Once   --这种方式能初始化?
        s := NewSlice()
        // 看源代码理解once的行为
        once.Do(func() {
            s.Add(16)
        })
        once.Do(func() {
            s.Add(16)
        })
        once.Do(func() {
            s.Add(16)
        })
    once.go    
    type Once struct {
        done uint32
        m    Mutex
    }
    func (o *Once) Do(f func()) {
        if atomic.LoadUint32(&o.done) == 0 {
            o.doSlow(f)
        }
    }
    func (o *Once) doSlow(f func()) {
        o.m.Lock()
        defer o.m.Unlock()
        if o.done == 0 {
            defer atomic.StoreUint32(&o.done, 1)
            f()
        }
    }
    ----------------------------
    2、线程调度
    ----------------------------
    深入理解 Go 语言线程调度
    -进程:资源分配的基本单位
    -线程:调度的基本单位 pthread_create --没有独立资源的进程
    -无论是线程还是进程,在 linux 中都以 task_struct 描述,从内核角度看,与进程无本质区别
    -Glibc 中的 pthread 库提供 NPTL(Native POSIX Threading Library)支持
    process:mm,fs,files,signal--thread 共享
    k8s的设计思想和linux是一样的。
    pthread_create(clone_flags)
    CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM
    ----------------------------
    Linux进程的内存使用
    虚拟内存、物理内存、page
    size main
    objdump -x main
    getconf PAGE_SIZE
    ----------------------------
    CPU对内存的访问
    • CPU 上有个 Memory Management Unit(MMU) 单元
    • CPU 把虚拟地址给 MMU,MMU 去物理内存中查询页表,得到实际的物理地址
    • CPU 维护一份缓存 Translation Lookaside Buffer(TLB),缓存虚拟地址和物理地址的映射关系
    ----------------------------
    进程切换开销
    • 直接开销
    •切换页表全局目录(PGD)
    •切换内核态堆栈
    •切换硬件上下文(进程恢复前,必须装入寄存器的数据统称为硬件上下文)
    •刷新 TLB
    •系统调度器的代码执行
    • 间接开销
    •CPU 缓存失效导致的进程需要到内存直接访问的 IO 操作变多
    ----------------------------
    线程切换开销
    • 线程本质上只是一批共享资源的进程,线程切换本质上依然需要内核进行进程切换
    • 一组线程因为共享内存资源,因此一个进程的所有线程共享虚拟地址空间,线程切换相比进程
    切换,主要节省了虚拟地址空间的切换--TLB有效,仍然要系统调用,有开销
    用户态、内核态
    ----------------------------
    用户线程
    无需内核帮助,应用程序在用户空间创建的可执行单元,创建销毁完全在用户态完成。
    ----------------------------
    Goroutine
    Go 语言基于 GMP 模型实现用户态线程
    • G:表示 goroutine,每个 goroutine 都有自己的栈空间,定时器,
    初始化的栈空间在 2k 左右,空间会随着需求增长。
    • M:抽象化代表内核线程,记录内核线程栈信息,当 goroutine 调度
    到线程时,使用该 goroutine 自己的栈信息。
    • P:代表调度器,负责调度 goroutine,维护一个本地 goroutine 队
    列,M 从 P 上获得 goroutine 并执行,同时还负责部分内存的管理。
    ----------------------------
    MPG对应关系
    KSE:kernel Scheduling Entity
    KSE 对应一个M
    M对多个P
    每个P下面多个G
    ----------------------------
    GMP 模型细节
    M对应一个CPU:1对1
    初始化P,Pidle, Prunning Psyscall--与M解绑定
    每个P维护一个G队列,GFree列表,
    两个队列: LRQ--P的队列 长度限制256,GRQ--全局队列
    GRunning,Grunnable,Gwaiting阻塞队列,Gsyscall,Gdead
    ----------------------------
    P的状态
    Pidle
    Prunning
    Psyscall
    Pgcstop
    Pdead
    ----------------------------
    G的状态
    Gidle
    Grunnable
    Grunning
    Gsyscall
    Gwaiting
    Gdead -- 未分配栈
    Gcopystac -- 
    Gscan
    ----------------------------
    G的状态转换图
    golang转换方法,是个状态机
    ----------------------------
    G 所处的位置 --最早是没有P,只有M和G,后来加入了P
    • 进程都有一个全局的 G 队列
    • 每个 P 拥有自己的本地执行队列
    • 有不在运行队列中的 G
        •处于 channel 阻塞态的 G 被放在 sudog
        •脱离 P 绑定在 M 上的 G,如系统调用
        •为了复用,执行结束进入 P 的 gFree 列表中的 G
    ----------------------------
    Goroutine 创建过程
    • 获取或者创建新的 Goroutine 结构体
        •从处理器的 gFree 列表中查找空闲的 Goroutine
        •如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体
    • 将函数传入的参数移到 Goroutine 的栈上
    • 更新 Goroutine 调度相关的属性,更新状态为_Grunnable
    • 返回的 Goroutine 会存储到全局变量 allgs 中
    ----------------------------
    将 Goroutine 放到运行队列上

    Goroutine 设置到处理器的 runnext 作为下一个处理器
    执行的任务
    • 当处理器的本地运行队列已经没有剩余空间时,就会把
    本地队列中的一部分 Goroutine 和待加入的 Goroutine
    通过 runtime.runqputslow 添加到调度器持有的全局
    运行队列上
    ----------------------------
    调度器行为
    • 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过 schedtick 保证有一定
    几率会从全局的运行队列中查找对应的 Goroutine
    • 从处理器本地的运行队列中查找待执行的 Goroutine
    • 如果前两种方法都没有找到 Goroutine,会通过 runtime.findrunnable 进行阻塞地查找
    Goroutine
        •从本地运行队列、全局运行队列中查找
        •从网络轮询器中查找是否有 Goroutine 等待运行
        •通过 runtime.runqsteal 尝试从其他随机的处理器中窃取待运行的 Goroutine
    ----------------------------
     

  • 相关阅读:
    智慧人社解决方案-最新全套文件
    vue3项目,vite+vue3+ts+pinia(10)-elementplus布局
    Vue双向绑定的原理
    YOLOV5学习笔记(七)——训练自己数据集
    m基于梯度优化的混沌PSO磁悬浮球系统模型优化的matlab仿真
    实战Node.js原理对于阻塞和EventEmitter及其继承的运用心得
    基于SSM+Vue的物流管理系统的设计与实现
    Worthington果胶酶的特性及测定方案
    知识蒸馏方法汇总
    C++之特殊函数成员
  • 原文地址:https://blog.csdn.net/hb_zxl/article/details/126716120