• threadSafeMap代码解析


    1. // threadSafeMap implements ThreadSafeStore
    2. type threadSafeMap struct {
    3. lock sync.RWMutex
    4. items map[string]interface{}
    5. // indexers maps a name to an IndexFunc
    6. indexers Indexers
    7. // indices maps a name to an Index
    8. indices Indices
    9. }
    10. // Index maps the indexed value to a set of keys in the store that match on that value
    11. type Index map[string]sets.String
    12. // Indexers maps a name to an IndexFunc
    13. type Indexers map[string]IndexFunc
    14. // Indices maps a name to an Index
    15. type Indices map[string]Index
    16. // IndexFunc knows how to compute the set of indexed values for an object.
    17. type IndexFunc func(obj interface{}) ([]string, error)
    • lock用于加锁,使操作原子化
    • items是实际存储的数据,它的key可以认为是普通数据库的主键,k8s一般对应资源的gvk
    • Index可以认为是非主键索引,它保存的是 indexed value 到 相关的keys的映射
    • Indexers是非主键索引的集合,每个非主键索引用indexName标识
    • indexers是Indexers类型,即map[string]IndexFunc
    • IndexFunc是将obj转换为indexed values的函数
    • 一个indexName(非主键索引)对应一个IndexFunc, Index

    那么现在可以尝试解读一下函数:

    1. // Index returns a list of items that match the given object on the index function.
    2. // Index is thread-safe so long as you treat all items as immutable.
    3. func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
    4. c.lock.RLock()
    5. defer c.lock.RUnlock()
    6. indexFunc := c.indexers[indexName]
    7. if indexFunc == nil {
    8. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    9. }
    10. indexedValues, err := indexFunc(obj)
    11. if err != nil {
    12. return nil, err
    13. }
    14. index := c.indices[indexName]
    15. var storeKeySet sets.String
    16. if len(indexedValues) == 1 {
    17. // In majority of cases, there is exactly one value matching.
    18. // Optimize the most common path - deduping is not needed here.
    19. storeKeySet = index[indexedValues[0]]
    20. } else {
    21. // Need to de-dupe the return list.
    22. // Since multiple keys are allowed, this can happen.
    23. storeKeySet = sets.String{}
    24. for _, indexedValue := range indexedValues {
    25. for key := range index[indexedValue] {
    26. storeKeySet.Insert(key)
    27. }
    28. }
    29. }
    30. list := make([]interface{}, 0, storeKeySet.Len())
    31. for storeKey := range storeKeySet {
    32. list = append(list, c.items[storeKey])
    33. }
    34. return list, nil
    35. }

    该函数用于根据某个indexName(非主键索引)中某个obj对应的所有items。

    1. 利用indexName查询indexFunc
    2. 利用indexFunc计算obj的索引值(indexed values), 是一个[]string类型
    3. 遍历indexed values, 并根据每个indexed value去对应的Index查找到真实的keys
    4. 最后通过这些keys到items中查找到所有结果

    该过成加速了查询,但是使更新变得麻烦, 更新的时候不仅要更新items中的数据,还要更新所有indices中的数据。

    1. // updateIndices modifies the objects location in the managed indexes:
    2. // - for create you must provide only the newObj
    3. // - for update you must provide both the oldObj and the newObj
    4. // - for delete you must provide only the oldObj
    5. // updateIndices must be called from a function that already has a lock on the cache
    6. func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    7. var oldIndexValues, indexValues []string
    8. var err error
    9. for name, indexFunc := range c.indexers {
    10. if oldObj != nil {
    11. oldIndexValues, err = indexFunc(oldObj)
    12. } else {
    13. oldIndexValues = oldIndexValues[:0]
    14. }
    15. if err != nil {
    16. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
    17. }
    18. if newObj != nil {
    19. indexValues, err = indexFunc(newObj)
    20. } else {
    21. indexValues = indexValues[:0]
    22. }
    23. if err != nil {
    24. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
    25. }
    26. index := c.indices[name]
    27. if index == nil {
    28. index = Index{}
    29. c.indices[name] = index
    30. }
    31. if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
    32. // We optimize for the most common case where indexFunc returns a single value which has not been changed
    33. continue
    34. }
    35. for _, value := range oldIndexValues {
    36. c.deleteKeyFromIndex(key, value, index)
    37. }
    38. for _, value := range indexValues {
    39. c.addKeyToIndex(key, value, index)
    40. }
    41. }
    42. }

    以上就是k8s client-go 使用的缓存的最底层结构, 它用在deltafifo和store里面

  • 相关阅读:
    传奇出现黑屏卡屏不动是怎么回事
    解决因d3dx9_30.dll丢失程序无法运行,电脑缺失d3dx9_30.dll报错解决方案
    数据结构与算法(java版)第二季 - 1 冒泡、选择、堆排序
    GPT-人工智能如何改变我们的编码方式
    tiup update
    OpenCV(十三):图像中绘制直线、圆形、椭圆形、矩形、多边形和文字
    Elasticsearch查询
    智慧燃气,如何能够防患于未“燃”!
    java类的动态加载
    linux读取U盘操作
  • 原文地址:https://blog.csdn.net/uestczshen/article/details/133993470