这是一个很有意思的问题,我们知道,一般库提供的map都是非线程安全的,如果给你设计的话,你该如何去实现一个线程安全的map呢?留坑!!!
sync.map 适用于读多写少的场景。对于写多的场景,会导致read map 缓存失效,需要加锁,导致冲突变多;而且由于未命中read map 次数过多,导致 dirty map 提升为 read map, 这是一个O(N)的操作,会进一步降低性能。
type Map struct {
mu Mutex // 加锁
read atomic.Value // readOnly结构体
dirty map[interface{}]*entry //普通的map结构
misses int // 记录miss次数
}
这里插入一下对原子性的理解:我们把一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性,这里的中断是指计算机组成原理中的中断概念。当一个进程在执行原子操作的时候,如果关闭了中断,这样就不会进行进程间的切换。
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
type entry struct {
p unsafe.Pointer // *interface{}
}
其中read跟dirty用到的entry结构体包装的是一个p指针,这个p指针指向的是value。read跟dirty指向的都是同一个value。也就是说只要修改了这个entry,对read跟dirty都是可见的。
func (m *Map) Store(key, value any) {
// 首先从read中读取,如果read中有这个key并且尝试更新该值成功就直接return,因为read以及dirty指向的是同一个地址,所以更新了read也就更新了dirty,这里如果能在read中读到就直接更新,减少了加锁操作,提升了性能
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 否则的话说明read中没有这个key,或者该key存在但是已经是expunged,就开始尝试在dirty中操作
m.mu.Lock()
// double check 这是无锁操作的经典用法
read, _ = m.read.Load().(readOnly)
// 如果从尝试在read中读取到加锁这一步的过程中read中存在了该key
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果该key之前是expunged,那么说明dirty中没有该key,首先对key的值由expunge改为nil,然后对dirty重新赋值
m.dirty[key] = e
}
// 更新值
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果在read中读不到,但是在dirty中存在,就直接更新
e.storeLocked(&value)
} else {
// 在read中读取不到,也在dirty中读取不到
// amended == false 说明read以及dirty不存在差异,那么说明该key是第一次插入dirty中
if !read.amended {
// dirtyLocked主要是将read中不为expunged的key浅拷贝给dirty,并且将p为nil的改为expunged
m.dirtyLocked()
// 更新amended为true表示dirty中有read中没有的key
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
// tryStore是典型的for循环加cas操作,在操作的过程中有其他goroutine将p改成了expunged则直接return false
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (m *Map) Load(key any) (value any, ok bool) {
// 第一步还是先尝试在read中读取,如果能读到直接返回,因为read跟dirty指向的是同一个地址
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
// 如果在read中不存在,并且dirty中有read中没有的key
// 先加锁,对dirty的操作都要有锁
m.mu.Lock()
// 还是做个double check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 如果在read中不存在,并且dirty中有read中没有的key
e, ok = m.dirty[key]
// missLocked主要是将miss字段++,如果miss已经大雨dirty的长度,则将dirty升级为read
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}
delete的操作其实跟之前的store以及load差不多,第一步都是先读read,如果read中有,就直接调用e.delete,e.delete主要就是采用原子将p指针设为nil,否则说明dirty中,那就直接将dirty中的key做delete。这里有个问题就是两种操作不一样,第一种如果key存在read以及dirty中的话只是将p设为nil,而如果只在dirty中的话却是将key整个删除。设为nil的作用是为了下次访问的时候能命中read。并且我们在store中的时候,第一次创建dirty的时候会将nil设expunged。这也就意味着如果p的值为expunged则说明dirty一定不等于nil。
通过atomic.value可以实现写时复制来达到对共享数据的访问,其实很简单就是我们用atomic.store去存一个变量,每次读的时候直接用atomic.load去读取这个变量,但是写的时候我们要加锁,然后用atomic.load取出这个变量,然后将所有的数据再拷贝一份,并且加入新数据,在strore一下。
package main
import (
"sync"
"sync/atomic"
)
func main() {
type Map map[string]string
var m atomic.Value
m.Store(make(Map))
var mu sync.Mutex
read := func(key string) (val string) {
m1 := m.Load().(Map)
return m1[key]
}
insert := func(key, val string) {
mu.Lock()
defer mu.Unlock()
m1 := m.Load().(Map)
m2 := make(Map)
for k, v := range m1 {
m2[k] = v
}
m2[key] = val
m.Store(m2) // m1 会被垃圾回收
}
_, _ = read, insert
}