• Golang源码分析:本地缓存库cache2go


    1.背景

    1.1.项目介绍

    cache2go是一款由golang实现的本地缓存库,提供并发安全的读写操作,具有过期时间控制等特性。项目地址:https://github.com/muesli/cache2go

    1.2.使用方法

    go get github.com/muesli/cache2go

    • 核心操作API:
      • Cache:创建Cache Table
      • Add:添加Cache
      • Value:读取Cache
      • Delete:删除指定Key Cache
      • Flush:清空整个Cache Table
    package main
    
    import (
      "github.com/muesli/cache2go"
      "log"
      "time"
    )
    
    type Item struct {
      Name   string `json:"name"`
      Prices int64  `json:"prices"`
      Stocks int64  `json:"stocks"`
    }
    
    func basicOpTest() {
      // 初始化itemCache本地缓存
      itemCache := cache2go.Cache("itemCache")
      item := &Item{
         Name:   "MacBookPro",
         Prices: 10000,
         Stocks: 1,
      }
    
      // 添加item1缓存,过期时间为5秒钟
      itemCache.Add("item1", 5*time.Second, item)
    
      // 读取item1缓存
      if v, err := itemCache.Value("item1"); err != nil {
         log.Printf("item1 err = %v", err)
      } else {
         log.Printf("读取item1缓存:%#v", v.Data())
      }
    
      // 睡眠6s后读取
      time.Sleep(6 * time.Second)
      if v, err := itemCache.Value("item1"); err != nil {
         log.Printf("item1 err = %v", err)
      } else {
         log.Printf("6s后读取item1缓存:%#v", v.Data())
      }
    
      // 添加item2,不设置过期时间
      itemCache.Add("item2", 0, item)
    
      // 读取item2缓存
      if v, err := itemCache.Value("item2"); err != nil {
         log.Printf("item2 err = %v", err)
      } else {
         log.Printf("读取item2缓存:%#v", v.Data())
      }
    
      // 删除掉item2缓存
      itemCache.Delete("item2")
    
      // 再读取item2缓存
      if v, err := itemCache.Value("item2"); err != nil {
         log.Printf("item2 err = %v", err)
      } else {
         log.Printf("读取item2缓存:%#v", v.Data())
      }
    
      // 添加item3缓存,并删除所有缓存
      itemCache.Add("item3", 0, item)
      itemCache.Flush()
    
      // 读取item3缓存
      if v, err := itemCache.Value("item3"); err != nil {
         log.Printf("item3 err = %v", err)
      } else {
         log.Printf("读取item3缓存:%#v", v.Data())
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    运行结果:
    2022/10/17 20:52:00 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
    2022/10/17 20:52:06 item1 err = Key not found in cache
    2022/10/17 20:52:06 读取item2缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
    2022/10/17 20:52:06 item2 err = Key not found in cache
    2022/10/17 20:52:06 item3 err = Key not found in cache
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 操作回调API:
      • AddAddedItemCallback:新增Cache回调函数
      • AddAboutToDeleteItemCallback:删除Cache回调函数
      • AddAboutToExpireCallback:过期CacheItem回调函数
      • 以上三个方法分别再对应着RemoveXXX,表示删去对应操作的全部回调函数
    func callBackTest() {
       // 初始化itemCache本地缓存
       itemCache := cache2go.Cache("itemCache")
    
       // 设置各操作回调函数
       itemCache.AddAddedItemCallback(func(item *cache2go.CacheItem) {
          log.Printf("added callback, item = %#v", item)
       })
       itemCache.AddAboutToDeleteItemCallback(func(item *cache2go.CacheItem) {
          log.Printf("deleted callback, item = %#v", item)
       })
       item := itemCache.Add("expire_item", 1*time.Second, Item{
          Name:   "expire_item",
          Prices: 1,
          Stocks: 1,
       })
       item.AddAboutToExpireCallback(func(item interface{}) {
          log.Printf("expired callback, item = %#v", item)
       })
       // 执行基本操作
       basicOpTest()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    输出结果
    2022/10/17 21:12:09 added callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
    2022/10/17 21:12:09 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
    2022/10/17 21:12:10 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"expire_item", data:main.Item{Name:"expire_item", Prices:1, Stocks:1}, lifeSpan:1000000000, createdOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {}){(func(interface {}))(0x10a0530)}}
    2022/10/17 21:12:10 expired callback, item = "expire_item"
    2022/10/17 21:12:14 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55eaa820, ext:755728, loc:(*time.Location)(0x1187880)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
    // ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 设置自定义缓存加载器:SetDataLoader
    func dataLoaderTest() {
       // 初始化itemCache本地缓存
       redisItemCache := cache2go.Cache("redisItemCache")
    
       // 设置自定义的cache加载逻辑
       redisItemCache.SetDataLoader(func(key interface{}, args ...interface{}) *cache2go.CacheItem {
          // 如果是redis开头的key,先从redis中获取
          if strings.HasPrefix(key.(string), "redis") {
             return cache2go.NewCacheItem(key, 0, Item{
                Name: "redis_item",
             })
          }
          return nil
       })
    
       // 写入一条数据
       redisItemCache.Add("item1", 0, Item{
          Name: "item1",
       })
    
       item1, _ := redisItemCache.Value("item1")
       log.Printf("item1 = %#v", item1)
    
       redisItem, _ := redisItemCache.Value("redis_item")
       log.Printf("redisItem = %#v", redisItem)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    输出结果
    2022/10/17 21:59:37 item1 = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:main.Item{Name:"item1", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601954b8, ext:492934, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d260196840, ext:497913, loc:(*time.Location)(0x11858c0)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
    2022/10/17 21:59:37 redisItem = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"redis_item", data:main.Item{Name:"redis_item", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
    
    • 1
    • 2
    • 3

    2.源码分析

    2.1.项目结构

    在这里插入图片描述

    核心代码文件为:

    • cachetable.go:封装了CacheTable结构体,实现Cache列表相关操作API
    • cacheitem.go:封装了CacheItem结构体,实现了Cache对象相关操作API
    • cache.go:提供cache全局map,存储了CacheTable结构体与table名称映射

    2.2.数据结构

    • CacheTable
    type CacheTable struct {
       sync.RWMutex
    
       // The table's name.
       name string
       // All cached items.
       items map[interface{}]*CacheItem
    
       // Timer responsible for triggering cleanup.
       cleanupTimer *time.Timer
       // Current timer duration.
       cleanupInterval time.Duration
    
       // The logger used for this table.
       logger *log.Logger
    
       // Callback method triggered when trying to load a non-existing key.
       loadData func(key interface{}, args ...interface{}) *CacheItem
       // Callback method triggered when adding a new item to the cache.
       addedItem []func(item *CacheItem)
       // Callback method triggered before deleting an item from the cache.
       aboutToDeleteItem []func(item *CacheItem)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • CacheItem
    type CacheItem struct {
       sync.RWMutex
    
       // The item's key.
       key interface{}
       // The item's data.
       data interface{}
       // How long will the item live in the cache when not being accessed/kept alive.
       lifeSpan time.Duration
    
       // Creation timestamp.
       createdOn time.Time
       // Last access timestamp.
       accessedOn time.Time
       // How often the item was accessed.
       accessCount int64
    
       // Callback method triggered right before removing the item from the cache
       aboutToExpire []func(key interface{})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.3.API代码流程

    1.Cache

    位于cache.go文件,维护了全局CacheTable Map。

    var (
       // 全局cache map
       cache = make(map[string]*CacheTable)
       // cache map 读写锁
       mutex sync.RWMutex
    )
    
    // 从cache map中获取对应的CacheTable,不存在则创建新的
    func Cache(table string) *CacheTable {
       // 先上读锁,获取cacheTable
       mutex.RLock()
       t, ok := cache[table]
       mutex.RUnlock()
    
       // 不存在,则新建
       if !ok {
          // 写操作需要上写锁
          mutex.Lock()
          t, ok = cache[table]
          // 双重校验是否存在
          if !ok {
             // 不存在则新建cacheTable
             t = &CacheTable{
                name:  table,
                items: make(map[interface{}]*CacheItem),
             }
             cache[table] = t
          }
          mutex.Unlock()
       }
       return t
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    2.Add

    位于cachetable.go文件,是CacheTable结构体的方法之一,实现了添加KV缓存的逻辑。

    func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
       // 封装一个item
       item := NewCacheItem(key, lifeSpan, data)
    
       // 锁表,将item添加进去
       table.Lock()
       table.addInternal(item)
    
       return item
    }
    
    func (table *CacheTable) addInternal(item *CacheItem) {
       // 添加kv值到map中
       table.items[item.key] = item
    
       expDur := table.cleanupInterval
       addedItem := table.addedItem
       // 添加完成解除写锁
       table.Unlock()
    
       // 触发Add回调函数
       if addedItem != nil {
          for _, callback := range addedItem {
             callback(item)
          }
       }
    
       // 如果一个item有设置过期时间,且比检查失效间隔小,则进行过期key清理(懒加载思想,只有存在这类Key才会启动清理,而不是定时任务)
       if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
          table.expirationCheck()
       }
    }
    
    func (table *CacheTable) expirationCheck() {
       table.Lock()
       if table.cleanupTimer != nil {
          table.cleanupTimer.Stop()
       }
       if table.cleanupInterval > 0 {
          table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
       } else {
          table.log("Expiration check installed for table", table.name)
       }
       
       now := time.Now()
       smallestDuration := 0 * time.Second
       for key, item := range table.items {
          // 遍历该table下的所有items
          item.RLock()
          lifeSpan := item.lifeSpan
          accessedOn := item.accessedOn
          item.RUnlock()
    
          if lifeSpan == 0 {
             continue
          }
          if now.Sub(accessedOn) >= lifeSpan {
             // 该item已超出存活时间,删除key
             table.deleteInternal(key)
          } else {
             // 找到最小的需要过期的item,计算最优时间间隔
             if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
                smallestDuration = lifeSpan - now.Sub(accessedOn)
             }
          }
       }
    
       // 在最优时间间隔后启动定时任务检查table的过期key
       table.cleanupInterval = smallestDuration
       if smallestDuration > 0 {
          table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
             go table.expirationCheck()
          })
       }
       table.Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    3.Value

    Value用于读取Key匹配的CacheItem。

    func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
       table.RLock()
       // 先尝试从items中获取该item
       r, ok := table.items[key]
       loadData := table.loadData
       table.RUnlock()
    
       if ok {
          // 如果存在,则更新该item的accessOn时间和accessCount计数
          r.KeepAlive()
          return r, nil
       }
    
       // 如果不存在,则从loadData自定义加载函数中尝试获取
       if loadData != nil {
          item := loadData(key, args...)
          if item != nil {
             // 如果自定义加载函数中存在该item,则添加到table中并返回
             table.Add(key, item.lifeSpan, item.data)
             return item, nil
          }
    
          return nil, ErrKeyNotFoundOrLoadable
       }
    
       return nil, ErrKeyNotFound
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    4.Delete

    Delete函数用于删除指定Key的Item。

    func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
       table.Lock()
       defer table.Unlock()
    
       return table.deleteInternal(key)
    }
    
    func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
       // 判断key是否存在
       r, ok := table.items[key]
       if !ok {
          return nil, ErrKeyNotFound
       }
       
       aboutToDeleteItem := table.aboutToDeleteItem
       table.Unlock()
    
       // 先触发删除回调函数
       if aboutToDeleteItem != nil {
          for _, callback := range aboutToDeleteItem {
             callback(r)
          }
       }
    
       r.RLock()
       defer r.RUnlock()
       // 触发item的过期回调函数
       if r.aboutToExpire != nil {
          for _, callback := range r.aboutToExpire {
             callback(key)
          }
       }
    
       table.Lock()
       table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
       // 将item从table中删除
       delete(table.items, key)
    
       return r, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    5.Flush

    清空整个table的cache。

    func (table *CacheTable) Flush() {
       table.Lock()
       defer table.Unlock()
    
       table.log("Flushing table", table.name)
    
       // 直接将items重新初始化
       table.items = make(map[interface{}]*CacheItem)
       table.cleanupInterval = 0
       if table.cleanupTimer != nil {
          // 如果此时还有清理过期定时器,则终止其运行
          table.cleanupTimer.Stop()
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.总结

    cache2go这个项目写得很精简,本质上就是使用到了map来作为本地缓存kv存储结构,但是有一些值得学习的地方:

    • 使用map时,由于其是非线程安全的,所以在并发场景下需要上锁,可以选择RWMutex读写锁来控制并发的读和串行写,避免panic
    • 对于需要清理过期Key的场景,如果使用定时任务定时遍历整个集合来做清理,会耗费较多时间和资源,可以由写入时判断是否存在需要清理的Key,再启动定时任务来做清理,避免频繁遍历
    • 可以利用golang函数式的特性,方便地实现各操作回调函数,比如添加、删除、失效操作回调等
    • 另外个人觉得这个项目可以优化的空间:由于使用的是本地缓存,为了避免内存oom可以在创建时指定限制key的最大数量,以及在内存不足时的写入策略(如直接报错或者随机清理掉一批Key等)。
  • 相关阅读:
    [Codeforces] games (R1200) Part.3
    JavaScript系列之赋值运算符
    Git - 入门到熟悉_分支管理
    dropwizard中上传和下载文件
    redis
    【Keras】重用预训练层
    【深度学习】QA机器人的实现
    大数据量一次性导入MongoDB
    手机无线投屏到windows11电脑
    大学生简单个人静态HTML网页设计作品 DIV布局个人介绍网页模板代码 DW学生个人网站制作成品下载
  • 原文地址:https://blog.csdn.net/pbrlovejava/article/details/127405406