• go线程安全哈希表concurrent-map


    github.com/orcaman/concurrent-map

    这个哈希表是基于golang提供的map来实现线程安全的哈希表。map的key只能是string

    下面给一个多线程操作的例子,该map的value为string

    1. package main
    2. import (
    3. "fmt"
    4. "github.com/orcaman/concurrent-map/v2"
    5. "sync"
    6. )
    7. func main() {
    8. var wg sync.WaitGroup
    9. // Create a new map.
    10. m := cmap.New[string]()
    11. wg.Add(1)
    12. // Sets item within map, sets "bar" under key "foo"
    13. go func() {
    14. defer wg.Done()
    15. m.Set("foo", "bar1111")
    16. m.Set("foo1", "bar11112")
    17. m.Set("test1", "val1")
    18. }()
    19. wg.Add(1)
    20. go func() {
    21. defer wg.Done()
    22. fmt.Println("====================")
    23. m.Set("111", "2222")
    24. for item := range m.IterBuffered() {
    25. fmt.Println("==== kv :", item)
    26. }
    27. fmt.Println("====================")
    28. } ()
    29. // Retrieve item from map.
    30. go func () {
    31. bar, ok := m.Get("foo")
    32. fmt.Println("bar : ", bar, ", ok :", ok)
    33. // Removes item under key "foo"
    34. m.Remove("foo")
    35. }()
    36. wg.Add(1)
    37. go func() {
    38. defer wg.Done()
    39. m.Set("3333", "4444")
    40. fmt.Println("-----------------")
    41. for item := range m.IterBuffered() {
    42. fmt.Println("---- kv :", item)
    43. }
    44. fmt.Println("-----------------")
    45. }()
    46. wg.Wait()
    47. }

    运行结果:

    1. ====================
    2. bar : bar1111 , ok : true
    3. -----------------
    4. ==== kv : {3333 4444}
    5. ---- kv : {3333 4444}
    6. ---- kv : {111 2222}
    7. ==== kv : {111 2222}
    8. ---- kv : {foo1 bar11112}
    9. ---- kv : {test1 val1}
    10. ==== kv : {foo1 bar11112}
    11. -----------------
    12. ==== kv : {test1 val1}
    13. ====================
    14. Process finished with the exit code 0
    1. -----------------
    2. bar : bar1111 , ok : true
    3. ====================
    4. ---- kv : {3333 4444}
    5. ---- kv : {111 2222}
    6. ---- kv : {foo1 bar11112}
    7. ---- kv : {test1 val1}
    8. -----------------
    9. ==== kv : {3333 4444}
    10. ==== kv : {111 2222}
    11. ==== kv : {foo1 bar11112}
    12. ==== kv : {test1 val1}
    13. ====================
    14. Process finished with the exit code 0

    运行结果比较随机。

    下面是对该哈希表的源码分析:

    数据结构:

    1. /*
    2. *多少个子哈希表,这个可以修改,如果想减少冲突,可以参考之前的博客中的github.com/tidwall/shardmap哈希表的计算方式,即根据实际的物理CPU核心数来计算
    3. */
    4. var SHARD_COUNT = 32
    5. // A "thread" safe map of type string:Anything.
    6. // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
    7. type ConcurrentMap[V any] []*ConcurrentMapShared[V]//对外暴露的哈希结构,是ConcurrentMapShared的指针数组,其中V any的用法类似于c++的模板,传入的结构就是value的结构
    8. // A "thread" safe string to anything map.
    9. type ConcurrentMapShared[V any] struct {
    10. items map[string]V//key为string就是在这里决定的,这里使用的是golang自带的哈希表,减少重复造轮子
    11. sync.RWMutex // Read Write mutex, guards access to internal map.//每个哈希表一个读写锁
    12. }

    创建哈希表cmap.New:

    1. // Creates a new concurrent map.
    2. func New[V any]() ConcurrentMap[V] {
    3. m := make(ConcurrentMap[V], SHARD_COUNT)//创建哈希表结构
    4. for i := 0; i < SHARD_COUNT; i++ {//循环创建子哈希表
    5. m[i] = &ConcurrentMapShared[V]{items: make(map[string]V)}
    6. }
    7. return m
    8. }

    添加元素Set:

    1. // Sets the given value under the specified key.
    2. func (m ConcurrentMap[V]) Set(key string, value V) {
    3. // Get map shard.
    4. shard := m.GetShard(key)//根据传入的key,计算此key对应在哪个子哈希表中,
    5. shard.Lock()//使用对应的锁进行加写锁
    6. shard.items[key] = value//赋值(如果已存在,替换)
    7. shard.Unlock()//解锁
    8. }

    哈希函数进行散列(fnv哈希算法):

    1. // GetShard returns shard under given key
    2. func (m ConcurrentMap[V]) GetShard(key string) *ConcurrentMapShared[V] {
    3. return m[uint(fnv32(key))%uint(SHARD_COUNT)]
    4. }
    5. func fnv32(key string) uint32 {
    6. hash := uint32(2166136261)
    7. const prime32 = uint32(16777619)
    8. keyLength := len(key)
    9. for i := 0; i < keyLength; i++ {
    10. hash *= prime32
    11. hash ^= uint32(key[i])
    12. }
    13. return hash
    14. }
    Get函数:
    1. // Get retrieves an element from map under given key.
    2. func (m ConcurrentMap[V]) Get(key string) (V, bool) {//基本逻辑和Set一样,不过因为是读操作,所以加读锁
    3. // Get shard
    4. shard := m.GetShard(key)
    5. shard.RLock()
    6. // Get item from shard.
    7. val, ok := shard.items[key]
    8. shard.RUnlock()
    9. return val, ok
    10. }

    代码中有个遍历哈希表操作

    for item := range m.IterBuffered() {......}

    具体代码实现如下:

    1. // Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
    2. type Tuple[V any] struct {//键值对结构
    3. Key string
    4. Val V
    5. }
    6. ......
    7. // IterBuffered returns a buffered iterator which could be used in a for range loop.
    8. func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] {
    9. chans := snapshot(m)//获取快照数据,其实这里就是将所有子哈希表当前数据的键值对信息,后面增加或者删除了元素不影响,
    10. total := 0
    11. for _, c := range chans {//遍历chan数组个数
    12. total += cap(c)//每个元素都是一个一维数组
    13. }
    14. ch := make(chan Tuple[V], total)//根据计算键值对的个数分配数组
    15. go fanIn(chans, ch)//并发去将chans的内容写入ch中,这里为什么不直接用chans,因为chans是二维数组,这里的操作是将二维数组转换成一维数组
    16. return ch
    17. }

    我们看一下快照函数的实现:

    1. // Returns a array of channels that contains elements in each shard,
    2. // which likely takes a snapshot of `m`.
    3. // It returns once the size of each buffered channel is determined,
    4. // before all the channels are populated using goroutines.
    5. func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
    6. //When you access map items before initializing.
    7. if len(m) == 0 {
    8. panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
    9. }
    10. chans = make([]chan Tuple[V], SHARD_COUNT)//根据实际的子哈希表个数分配chan 数组
    11. wg := sync.WaitGroup{}
    12. wg.Add(SHARD_COUNT)//设置等待goroutine个数,因为我们需要对每个子表进行遍历操作,所以还是SHARD_COUNT
    13. // Foreach shard.
    14. for index, shard := range m {//遍历子表
    15. go func(index int, shard *ConcurrentMapShared[V]) {//启动goroutine进行操作,提升性能
    16. // Foreach key, value pair.
    17. shard.RLock()//使用对应的锁加读锁
    18. chans[index] = make(chan Tuple[V], len(shard.items))
    19. wg.Done()
    20. for key, val := range shard.items {//遍历子表,将对应键值对复制到对应的chans的子数组中
    21. chans[index] <- Tuple[V]{key, val}
    22. }
    23. shard.RUnlock()
    24. close(chans[index])
    25. }(index, shard)
    26. }
    27. wg.Wait()//这里会等待所有子表都遍历完,才会返回,所以如果散列的不够均匀,等待时间会长
    28. return chans
    29. }

    我们来看将二维数组聚合成一维数组的过程fanIn:

    1. // fanIn reads elements from channels `chans` into channel `out`
    2. func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
    3. wg := sync.WaitGroup{}
    4. wg.Add(len(chans))
    5. for _, ch := range chans {//遍历每一个chan,一个子表一个chan
    6. go func(ch chan Tuple[V]) {
    7. for t := range ch {//遍历子表的键值对
    8. out <- t//全部赋值到out的一维数组中
    9. }
    10. wg.Done()
    11. }(ch)
    12. }
    13. wg.Wait()
    14. close(out)
    15. }

    其实之前还有一个老的Iter版本,注释说性能比较差。我们看看它的实现:

    1. // Iter returns an iterator which could be used in a for range loop.
    2. //
    3. // Deprecated: using IterBuffered() will get a better performence
    4. func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] {
    5. chans := snapshot(m)
    6. ch := make(chan Tuple[V])
    7. go fanIn(chans, ch)
    8. return ch
    9. }

    唯一的区别就是没有指定ch的的大小。chan内部实现是一个环形队列,如果队列不够大,会等待处理,性能差。

    例子中还用到了Remove接口,其实和set大同小异,底层实现使用的是系统自带的delete函数。其它的函数后面用到了再来追加,基本逻辑都是一样。

    备注:

    里面有两个函数比较好一个是MSet,这个用于将一个map数据导入到这个并发安全的map中。

    第二个是Upsert,这个函数用于修改或添加键值对,提供了UpsertCb回调函数,也就是说,可以提供自己的逻辑来处理数据,比如在历史数据上面进行修改,比较方便。

    下面是一部分测试代码:

    1. ......//这是上面的测试用例代码,我们只需要在main函数中调用下UpdateTest()就可以了。
    2. func UpdateTest() {
    3. m := cmap.New[int64]()
    4. m.Set("test", 1)
    5. m.Upsert("test", 3, UpsertCb)
    6. v, ok :=m.Get("test")
    7. if ok {
    8. fmt.Println(v)
    9. }
    10. }
    11. func UpsertCb (exist bool, valueInMap int64, newValue int64) int64{
    12. if exist {
    13. return valueInMap + newValue
    14. } else {
    15. return newValue
    16. }
    17. }

    运行结果如下:

    1. ====================
    2. bar : bar1111 , ok : true
    3. -----------------
    4. ==== kv : {foo1 bar11112}
    5. ---- kv : {foo1 bar11112}
    6. ==== kv : {3333 4444}
    7. ---- kv : {3333 4444}
    8. ==== kv : {111 2222}
    9. ==== kv : {test1 val1}
    10. ====================
    11. ---- kv : {111 2222}
    12. ---- kv : {test1 val1}
    13. -----------------
    14. 4
    15. Process finished with the exit code 0

    运行结果为4,我们的测试代码就是将新旧值相互累加后返回,如果存在旧值的话。这里就能看到它比sync.Map好的地方,这样就能进行临界控制来修改旧值。

    同理RemoveCb也是一样的道理。这里就不一一分析了。

    这里还有个地方需要注意就是Keys函数,它在刚进入函数的时候就获取该哈希表的总的个数,然后分配了一个一维数组chan,个数刚好就是此时哈希表的总个数,然后循环遍历哈希表将所有的key,最后转换成一个一维数组。需要考虑一个问题,如果此时往该哈希表中添加一个元素会不会出问题?程序出来这么久,应该不会问题,整个哈希表遍历完,就调用close channel,就算数据少了也会直接返回,不会有问题,如果多了,channel也是能获取到新增的数据。

  • 相关阅读:
    LeetCode 1004.最大连续1的个数
    1024程序员节|基于Springboot实现爱心捐赠管理系统
    Visual Studio 2022 安装
    CAN bus总线静电保护方案
    【JAVA】总结线程Thread的基本用法
    【鸿蒙(HarmonyOS)】List列表、ArkUI资源组数据类型
    9、【Qlib】【主要组件】投资组合策略:投资组合管理
    子网划分总结和技巧
    优秀的网络工程师,需要具备什么?
    [车联网安全自学篇] 五十六. Android安全之APK应用程序是否签名异常的检测方法
  • 原文地址:https://blog.csdn.net/guoguangwu/article/details/126915586