云原生训练营
模块二:Go 语言进阶
----------------------------
目录
-线程枷锁
-线程调度
-内存管理
-包引用与依赖管理
----------------------------
1.线程加锁
----------------------------
理解线程安全
线程1 CPU1 缓存key=value2
线程2 CPU2 缓存key=value1
内存 key=value1
每个CPU都存在L1 L2缓存,有缓存就有可见性的问题;
一个线程改了变量,另外一个线程还是读的原来的值;
如何解决?就是加锁:
----------------------------
锁
-Go 语言不仅仅提供基于 CSP 的通讯模型,也支持基于共享内存的多线程数据访问
-Sync 包提供了锁的基本原语
-sync.Mutex 互斥锁
-Lock()加锁,Unlock 解锁
-sync.RWMutex 读写分离锁
-不限制并发读,只限制并发写和并发读写
-sync.WaitGroup
- 等待一组 goroutine 返回
-sync.Once
- 保证某段代码只执行一次
-sync.Cond
- 让一组 goroutine 在满足特定条件时被唤醒
例子:module2/syncmap/main.go
conflictMap := map[int]int{}
for i := 0; i < 100; i++ {
go func() {
conflictMap[1] = i
}()
}
会出现并发写的错误,我的vscode确实没有出现。在命令行运行一下。
$ ./main
fatal error: concurrent map writes
fatal error: concurrent map writes
如果解决,增加了一个互斥锁
type SafeMap struct {
safeMap map[int]int
sync.Mutex
}
func (s *SafeMap) Write(k, v int) {
s.Lock()
defer s.Unlock()
s.safeMap[k] = v
}
func (s *SafeMap) Read(k int) (int, bool) {
s.Lock()
defer s.Unlock()
result, ok := s.safeMap[k]
return result, ok
}
重新运行程序就不出错了。
例子2:读写锁 module2/mutex/main.go
func rLock() {
lock := sync.RWMutex{}
for i := 0; i < 3; i++ {
lock.RLock()
defer lock.RUnlock()
fmt.Println("rLock:", i)
}
}
func wLock() {
lock := sync.RWMutex{}
for i := 0; i < 3; i++ {
lock.Lock()
defer lock.Unlock()
fmt.Println("wLock:", i)
}
}
./main
lock: 0
rLock: 0
rLock: 1
wLock: 0
rLock: 2
写锁被阻塞了
----------------------------
Mutex示例
Kubernetes 中的 informer factory
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
----------------------------
WaitGroup示例
// CreateBatch create a batch of pods. All pods are created before
waiting.
func (c *PodClient) CreateBatch(pods []*v1.Pod) []*v1.Pod {
ps := make([]*v1.Pod, len(pods))
var wg sync.WaitGroup
for i, pod := range pods {
wg.Add(1)
go func(i int, pod *v1.Pod) {
defer wg.Done()
defer GinkgoRecover()
ps[i] = c.CreateSync(pod)
}(i, pod)
}
wg.Wait()
return ps
}
例子:module2/waitgroup/main.go
func waitByWG() {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
----------------------------
Cond示例
Kubernetes 中的队列,标准的生产者消费者模式
cond: sync.NewCond(&sync.Mutex{}),
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
----------------------------
Cond 示例
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
例子:module2/condition/main.go
type Queue struct {
queue []string
cond *sync.Cond
}
q := Queue{
queue: []string{},
cond: sync.NewCond(&sync.Mutex{}),
}
func (q *Queue) Enqueue(item string) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.queue = append(q.queue, item)
fmt.Printf("putting %s to queue, notify all\n", item)
q.cond.Broadcast()
}
func (q *Queue) Dequeue() string {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 {
fmt.Println("no data, wait")
q.cond.Wait()
}
result := q.queue[0]
q.queue = q.queue[1:]
return result
}
----------------------------
单例模式
多个线程同时执行,最终只被执行一次
var once sync.Once --这种方式能初始化?
s := NewSlice()
// 看源代码理解once的行为
once.Do(func() {
s.Add(16)
})
once.Do(func() {
s.Add(16)
})
once.Do(func() {
s.Add(16)
})
once.go
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
----------------------------
2、线程调度
----------------------------
深入理解 Go 语言线程调度
-进程:资源分配的基本单位
-线程:调度的基本单位 pthread_create --没有独立资源的进程
-无论是线程还是进程,在 linux 中都以 task_struct 描述,从内核角度看,与进程无本质区别
-Glibc 中的 pthread 库提供 NPTL(Native POSIX Threading Library)支持
process:mm,fs,files,signal--thread 共享
k8s的设计思想和linux是一样的。
pthread_create(clone_flags)
CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM
----------------------------
Linux进程的内存使用
虚拟内存、物理内存、page
size main
objdump -x main
getconf PAGE_SIZE
----------------------------
CPU对内存的访问
• CPU 上有个 Memory Management Unit(MMU) 单元
• CPU 把虚拟地址给 MMU,MMU 去物理内存中查询页表,得到实际的物理地址
• CPU 维护一份缓存 Translation Lookaside Buffer(TLB),缓存虚拟地址和物理地址的映射关系
----------------------------
进程切换开销
• 直接开销
•切换页表全局目录(PGD)
•切换内核态堆栈
•切换硬件上下文(进程恢复前,必须装入寄存器的数据统称为硬件上下文)
•刷新 TLB
•系统调度器的代码执行
• 间接开销
•CPU 缓存失效导致的进程需要到内存直接访问的 IO 操作变多
----------------------------
线程切换开销
• 线程本质上只是一批共享资源的进程,线程切换本质上依然需要内核进行进程切换
• 一组线程因为共享内存资源,因此一个进程的所有线程共享虚拟地址空间,线程切换相比进程
切换,主要节省了虚拟地址空间的切换--TLB有效,仍然要系统调用,有开销
用户态、内核态
----------------------------
用户线程
无需内核帮助,应用程序在用户空间创建的可执行单元,创建销毁完全在用户态完成。
----------------------------
Goroutine
Go 语言基于 GMP 模型实现用户态线程
• G:表示 goroutine,每个 goroutine 都有自己的栈空间,定时器,
初始化的栈空间在 2k 左右,空间会随着需求增长。
• M:抽象化代表内核线程,记录内核线程栈信息,当 goroutine 调度
到线程时,使用该 goroutine 自己的栈信息。
• P:代表调度器,负责调度 goroutine,维护一个本地 goroutine 队
列,M 从 P 上获得 goroutine 并执行,同时还负责部分内存的管理。
----------------------------
MPG对应关系
KSE:kernel Scheduling Entity
KSE 对应一个M
M对多个P
每个P下面多个G
----------------------------
GMP 模型细节
M对应一个CPU:1对1
初始化P,Pidle, Prunning Psyscall--与M解绑定
每个P维护一个G队列,GFree列表,
两个队列: LRQ--P的队列 长度限制256,GRQ--全局队列
GRunning,Grunnable,Gwaiting阻塞队列,Gsyscall,Gdead
----------------------------
P的状态
Pidle
Prunning
Psyscall
Pgcstop
Pdead
----------------------------
G的状态
Gidle
Grunnable
Grunning
Gsyscall
Gwaiting
Gdead -- 未分配栈
Gcopystac --
Gscan
----------------------------
G的状态转换图
golang转换方法,是个状态机
----------------------------
G 所处的位置 --最早是没有P,只有M和G,后来加入了P
• 进程都有一个全局的 G 队列
• 每个 P 拥有自己的本地执行队列
• 有不在运行队列中的 G
•处于 channel 阻塞态的 G 被放在 sudog
•脱离 P 绑定在 M 上的 G,如系统调用
•为了复用,执行结束进入 P 的 gFree 列表中的 G
----------------------------
Goroutine 创建过程
• 获取或者创建新的 Goroutine 结构体
•从处理器的 gFree 列表中查找空闲的 Goroutine
•如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体
• 将函数传入的参数移到 Goroutine 的栈上
• 更新 Goroutine 调度相关的属性,更新状态为_Grunnable
• 返回的 Goroutine 会存储到全局变量 allgs 中
----------------------------
将 Goroutine 放到运行队列上
•
Goroutine 设置到处理器的 runnext 作为下一个处理器
执行的任务
• 当处理器的本地运行队列已经没有剩余空间时,就会把
本地队列中的一部分 Goroutine 和待加入的 Goroutine
通过 runtime.runqputslow 添加到调度器持有的全局
运行队列上
----------------------------
调度器行为
• 为了保证公平,当全局运行队列中有待执行的 Goroutine 时,通过 schedtick 保证有一定
几率会从全局的运行队列中查找对应的 Goroutine
• 从处理器本地的运行队列中查找待执行的 Goroutine
• 如果前两种方法都没有找到 Goroutine,会通过 runtime.findrunnable 进行阻塞地查找
Goroutine
•从本地运行队列、全局运行队列中查找
•从网络轮询器中查找是否有 Goroutine 等待运行
•通过 runtime.runqsteal 尝试从其他随机的处理器中窃取待运行的 Goroutine
----------------------------