• sync.map 源码学习


    如何设计一个线程安全的map

    这是一个很有意思的问题,我们知道,一般库提供的map都是非线程安全的,如果给你设计的话,你该如何去实现一个线程安全的map呢?留坑!!!

    sync.map

    sync.map 适用于读多写少的场景。对于写多的场景,会导致read map 缓存失效,需要加锁,导致冲突变多;而且由于未命中read map 次数过多,导致 dirty map 提升为 read map, 这是一个O(N)的操作,会进一步降低性能。

    type Map struct {
    	mu Mutex // 加锁
    	read atomic.Value // readOnly结构体
    	dirty map[interface{}]*entry //普通的map结构
    	misses int // 记录miss次数
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里插入一下对原子性的理解:我们把一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性,这里的中断是指计算机组成原理中的中断概念。当一个进程在执行原子操作的时候,如果关闭了中断,这样就不会进行进程间的切换。

    // readOnly is an immutable struct stored atomically in the Map.read field.
    type readOnly struct {
    	m       map[interface{}]*entry
    	amended bool // true if the dirty map contains some key not in m.
    }
    
    type entry struct {
    	p unsafe.Pointer // *interface{}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    其中read跟dirty用到的entry结构体包装的是一个p指针,这个p指针指向的是value。read跟dirty指向的都是同一个value。也就是说只要修改了这个entry,对read跟dirty都是可见的。
    在这里插入图片描述

    Stroe

    func (m *Map) Store(key, value any) {
    	// 首先从read中读取,如果read中有这个key并且尝试更新该值成功就直接return,因为read以及dirty指向的是同一个地址,所以更新了read也就更新了dirty,这里如果能在read中读到就直接更新,减少了加锁操作,提升了性能
    	read, _ := m.read.Load().(readOnly)
    	if e, ok := read.m[key]; ok && e.tryStore(&value) {
    		return
    	}
    	// 否则的话说明read中没有这个key,或者该key存在但是已经是expunged,就开始尝试在dirty中操作
    	m.mu.Lock()
    	// double check 这是无锁操作的经典用法
    	read, _ = m.read.Load().(readOnly)
    	// 如果从尝试在read中读取到加锁这一步的过程中read中存在了该key
    	if e, ok := read.m[key]; ok {
    		if e.unexpungeLocked() {
    			// 如果该key之前是expunged,那么说明dirty中没有该key,首先对key的值由expunge改为nil,然后对dirty重新赋值
    			m.dirty[key] = e
    		}
    		// 更新值
    		e.storeLocked(&value)
    	} else if e, ok := m.dirty[key]; ok {
    		// 如果在read中读不到,但是在dirty中存在,就直接更新
    		e.storeLocked(&value)
    	} else {
    		// 在read中读取不到,也在dirty中读取不到
    		// amended == false 说明read以及dirty不存在差异,那么说明该key是第一次插入dirty中
    		if !read.amended {
    			// dirtyLocked主要是将read中不为expunged的key浅拷贝给dirty,并且将p为nil的改为expunged
    			m.dirtyLocked()
    			// 更新amended为true表示dirty中有read中没有的key
    			m.read.Store(readOnly{m: read.m, amended: true})
    		}
    		m.dirty[key] = newEntry(value)
    	}
    	m.mu.Unlock()
    }
    
    func (m *Map) dirtyLocked() {
    	if m.dirty != nil {
    		return
    	}
    
    	read, _ := m.read.Load().(readOnly)
    	m.dirty = make(map[any]*entry, len(read.m))
    	for k, e := range read.m {
    		if !e.tryExpungeLocked() {
    			m.dirty[k] = e
    		}
    	}
    }
    
    func (e *entry) tryExpungeLocked() (isExpunged bool) {
    	p := atomic.LoadPointer(&e.p)
    	for p == nil {
    		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
    			return true
    		}
    		p = atomic.LoadPointer(&e.p)
    	}
    	return p == expunged
    }
    
    // tryStore是典型的for循环加cas操作,在操作的过程中有其他goroutine将p改成了expunged则直接return false
    func (e *entry) tryStore(i *any) bool {
    	for {
    		p := atomic.LoadPointer(&e.p)
    		if p == expunged {
    			return false
    		}
    		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
    			return true
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    在这里插入图片描述

    Load

    func (m *Map) Load(key any) (value any, ok bool) {
    	// 第一步还是先尝试在read中读取,如果能读到直接返回,因为read跟dirty指向的是同一个地址
    	read, _ := m.read.Load().(readOnly)
    	e, ok := read.m[key]
    	if !ok && read.amended {
    		// 	如果在read中不存在,并且dirty中有read中没有的key
    		// 先加锁,对dirty的操作都要有锁
    		m.mu.Lock()
    		// 还是做个double check
    		read, _ = m.read.Load().(readOnly)
    		e, ok = read.m[key]
    		if !ok && read.amended {
    			// 	如果在read中不存在,并且dirty中有read中没有的key
    			e, ok = m.dirty[key]
    			// missLocked主要是将miss字段++,如果miss已经大雨dirty的长度,则将dirty升级为read
    			m.missLocked()
    		}
    		m.mu.Unlock()
    	}
    	if !ok {
    		return nil, false
    	}
    	return e.load()
    }
    
    func (m *Map) missLocked() {
    	m.misses++
    	if m.misses < len(m.dirty) {
    		return
    	}
    	m.read.Store(readOnly{m: m.dirty})
    	m.dirty = nil
    	m.misses = 0
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    delete

    func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
    	read, _ := m.read.Load().(readOnly)
    	e, ok := read.m[key]
    	if !ok && read.amended {
    		m.mu.Lock()
    		read, _ = m.read.Load().(readOnly)
    		e, ok = read.m[key]
    		if !ok && read.amended {
    			e, ok = m.dirty[key]
    			delete(m.dirty, key)
    			// Regardless of whether the entry was present, record a miss: this key
    			// will take the slow path until the dirty map is promoted to the read
    			// map.
    			m.missLocked()
    		}
    		m.mu.Unlock()
    	}
    	if ok {
    		return e.delete()
    	}
    	return nil, false
    }
    func (e *entry) delete() (value any, ok bool) {
    	for {
    		p := atomic.LoadPointer(&e.p)
    		if p == nil || p == expunged {
    			return nil, false
    		}
    		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
    			return *(*any)(p), true
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    delete的操作其实跟之前的store以及load差不多,第一步都是先读read,如果read中有,就直接调用e.delete,e.delete主要就是采用原子将p指针设为nil,否则说明dirty中,那就直接将dirty中的key做delete。这里有个问题就是两种操作不一样,第一种如果key存在read以及dirty中的话只是将p设为nil,而如果只在dirty中的话却是将key整个删除。设为nil的作用是为了下次访问的时候能命中read。并且我们在store中的时候,第一次创建dirty的时候会将nil设expunged。这也就意味着如果p的值为expunged则说明dirty一定不等于nil。

    • 当** p == expunged** 说明这个key已经被删除,表示它已经不在 dirty 中了。
    • sync.map的缺陷在于读少写多的时候,会导致一直将read中的数据浅拷贝到dirty中,浅拷贝的时候是要遍历read的,这是O(N)的操作。

    写时复制技术+atomic.value

    通过atomic.value可以实现写时复制来达到对共享数据的访问,其实很简单就是我们用atomic.store去存一个变量,每次读的时候直接用atomic.load去读取这个变量,但是写的时候我们要加锁,然后用atomic.load取出这个变量,然后将所有的数据再拷贝一份,并且加入新数据,在strore一下。

    package main
    
    import (
    	"sync"
    	"sync/atomic"
    )
    
    func main() {
    	type Map map[string]string
    	var m atomic.Value
    	m.Store(make(Map))
    	var mu sync.Mutex
    	read := func(key string) (val string) {
    		m1 := m.Load().(Map)
    		return m1[key]
    	}
    	insert := func(key, val string) {
    		mu.Lock()
    		defer mu.Unlock()
    		m1 := m.Load().(Map)
    		m2 := make(Map)
    		for k, v := range m1 {
    			m2[k] = v
    		}
    		m2[key] = val
    		m.Store(m2) // m1 会被垃圾回收
    	}
    	_, _ = read, insert
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
  • 相关阅读:
    【HTML】前端网页开发工具Vscode中DOCTYPE和lang以及字符集的作用
    [附源码]计算机毕业设计JAVAjsp小微企业库存管理系统
    C#图片处理如何生成缩略图
    一步步实现知乎热榜采集:Scala与Sttp库的应用
    C#面对对象(用Dictionary字典实现银行取钱系统)
    【VMware vSAN】vSAN Data Protection Part 2:配置管理。
    【电路笔记】-电流源
    Folium 笔记:MarkerCluster
    ORB算法与opencv实现
    LINUX之进程管理
  • 原文地址:https://blog.csdn.net/leekerian/article/details/126651824