• cache2go-源码阅读


    简介

    cache2go 是非常简短的 go 开源项目了,很适合作为第一个读源码项目。

    如果你有一定的 go 开发经验,读起来会感觉到比较容易。

    如果你刚刚接触 go 语音,基础知识还不完全了解,希望阅读本文时,遇到一个不会的知识点,去攻克一个,带着为了看懂本文源码的目的去学习基础知识。比如:

    • time.Timer
    • defer
    • sync.RWMutex

    作者这样介绍:Concurrency-safe golang caching library with expiration capabilities,简单来说就是具有过期功能的并发安全 golang 缓存库,因此它具有两大特性:

    • 并发安全
    • 自动过期

    项目结构

    该项目非常简单,全部逻辑由三个文件实现:

    • cache.go:缓存多个表。
    • cachetable.go:缓存一个表。
    • cacheitem.go:缓存表中的一个条目。

    数据结构图:

    接下来会自下而上地分析源码。

    cacheitem.go

    该文件中包含两块重要内容:

    • 结构体 CacheItem,用来缓存表中的一个条目。
    • 函数 NewCacheItem,用来创建 CacheItem 实例。

    CacheItem

    CacheItem 用来缓存表中的一个条目,属性解释:

    • sync.RWMutex:读写锁,保证并发读写安全。
    • key:键。
    • value:值,即数据。
    • lifeSpan:该条目的存活周期,即过期时间。
    • createdOn:创建时间。
    • accessedOn:上次访问时间。
    • accessCount:访问次数。
    • aboutToExpire:从缓存中删除项目之前触发的回调方法,可以在删除之前做一些自定义操作。

    源码如下:

    // CacheItem is an individual cache item
    // Parameter data contains the user-set value in the cache.
    type CacheItem struct {
    	sync.RWMutex
    
    	// The item's key.
    	key interface{}
    	// The item's data.
    	data interface{}
    	// How long will the item live in the cache when not being accessed/kept alive.
    	lifeSpan time.Duration
    
    	// Creation timestamp.
    	createdOn time.Time
    	// Last access timestamp.
    	accessedOn time.Time
    	// How often the item was accessed.
    	accessCount int64
    
    	// Callback method triggered right before removing the item from the cache
    	aboutToExpire []func(key interface{})
    }
    

    Get 方法

    下面是一些比较简单的 Get 方法,一些有写场景的属性会多两行获取锁与释放锁的代码。

    // LifeSpan returns this item's expiration duration.
    func (item *CacheItem) LifeSpan() time.Duration {
    	// immutable
    	return item.lifeSpan
    }
    
    // AccessedOn returns when this item was last accessed.
    func (item *CacheItem) AccessedOn() time.Time {
    	item.RLock()
    	defer item.RUnlock()
    	return item.accessedOn
    }
    
    // CreatedOn returns when this item was added to the cache.
    func (item *CacheItem) CreatedOn() time.Time {
    	// immutable
    	return item.createdOn
    }
    
    // AccessCount returns how often this item has been accessed.
    func (item *CacheItem) AccessCount() int64 {
    	item.RLock()
    	defer item.RUnlock()
    	return item.accessCount
    }
    
    // Key returns the key of this cached item.
    func (item *CacheItem) Key() interface{} {
    	// immutable
    	return item.key
    }
    
    // Data returns the value of this cached item.
    func (item *CacheItem) Data() interface{} {
    	// immutable
    	return item.data
    }
    
    折叠

    KeepAlive

    保活函数:

    • 前两行代码表示:加锁保证并发安全读写。
    • 后两行代码表示:当被访问时,更新访问时间,同时访问次数加 1。
    // KeepAlive marks an item to be kept for another expireDuration period.
    func (item *CacheItem) KeepAlive() {
    	item.Lock()
    	defer item.Unlock()
    	item.accessedOn = time.Now()
    	item.accessCount++
    }
    

    AddAboutToExpireCallback

    新增回调函数,回调函数无返回值,仅有一个参数 interface{},即支持任意的参数。

    // AddAboutToExpireCallback appends a new callback to the AboutToExpire queue
    func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
    	item.Lock()
    	defer item.Unlock()
    	item.aboutToExpire = append(item.aboutToExpire, f)
    }
    

    SetAboutToExpireCallback

    设置回调函数需要完全替代,不同于新增,需要先清空,再覆盖。

    // SetAboutToExpireCallback configures a callback, which will be called right
    // before the item is about to be removed from the cache.
    func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
    	if len(item.aboutToExpire) > 0 {
    		item.RemoveAboutToExpireCallback()
    	}
    	item.Lock()
    	defer item.Unlock()
    	item.aboutToExpire = append(item.aboutToExpire, f)
    }
    

    RemoveAboutToExpireCallback

    通过直接置空,删除所有的回调函数。

    // RemoveAboutToExpireCallback empties the about to expire callback queue
    func (item *CacheItem) RemoveAboutToExpireCallback() {
    	item.Lock()
    	defer item.Unlock()
    	item.aboutToExpire = nil
    }
    

    NewCacheItem

    创建 CacheItem 实例

    func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
    	t := time.Now()
    	return &CacheItem{
    		key:           key,
    		lifeSpan:      lifeSpan,
    		createdOn:     t,
    		accessedOn:    t,
    		accessCount:   0,
    		aboutToExpire: nil,
    		data:          data,
    	}
    }
    

    cachetable.go

    该文件中总共有 3 个类:CacheTable、CacheItemPair 和 CacheItemPairList。

    下面由简单到复杂逐个分析。

    CacheItemPair

    CacheItemPair 用来记录缓存访问的次数。

    // CacheItemPair maps key to access counter
    type CacheItemPair struct {
    	Key         interface{}
    	AccessCount int64
    }
    

    CacheItemPairList

    CacheItemPairList 是 CacheItemPair 的切片,通过实现方法 Swap、Len 和 Less 实现了 sort.Interface,支持排序。

    需要注意方法 Less 的实现,是元素 i 大于元素 j,这种实现是为了降序排序。降序排序是为了方法 CacheTable.MostAccessed 返回访问次数最多的条目列表。

    // CacheItemPairList is a slice of CacheItemPairs that implements sort.
    // Interface to sort by AccessCount.
    type CacheItemPairList []CacheItemPair
    
    func (p CacheItemPairList) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
    func (p CacheItemPairList) Len() int           { return len(p) }
    func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }
    

    CacheTable

    CacheTable 用来缓存一个表,属性解释:

    • sync.RWMutex:读写锁,保证并发读写安全。
    • name:表名。
    • items:表中的条目列表。
    • cleanupTimer:过期清除定时器。
    • cleanupInterval:过期清除的时间。
    • logger:打印日志的对象。
    • loadData:读取不存在 key 的回调函数,可以用来做初始化缓存的逻辑。
    • addedItem:新增条目时的回调函数,增加灵活性。
    • aboutToDeleteItem:删除条目前的回调函数,增加灵活性。

    源码如下:

    // CacheTable is a table within the cache
    type CacheTable struct {
    	sync.RWMutex
    
    	// The table's name.
    	name string
    	// All cached items.
    	items map[interface{}]*CacheItem
    
    	// Timer responsible for triggering cleanup.
    	cleanupTimer *time.Timer
    	// Current timer duration.
    	cleanupInterval time.Duration
    
    	// The logger used for this table.
    	logger *log.Logger
    
    	// Callback method triggered when trying to load a non-existing key.
    	loadData func(key interface{}, args ...interface{}) *CacheItem
    	// Callback method triggered when adding a new item to the cache.
    	addedItem []func(item *CacheItem)
    	// Callback method triggered before deleting an item from the cache.
    	aboutToDeleteItem []func(item *CacheItem)
    }
    

    下面会先介绍核心方法,再看简单的方法。

    Add 新增条目

    代码逻辑通过流程图描述了一下,其中的「过期检查」单独抽出来后面分析。

    NotFoundAdd 和 Add 核心逻辑是一样的,具体区别不做额外描述,源代码如下:

    // Add adds a key/value pair to the cache.
    // Parameter key is the item's cache-key.
    // Parameter lifeSpan determines after which time period without an access the item
    // will get removed from the cache.
    // Parameter data is the item's value.
    func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
    	item := NewCacheItem(key, lifeSpan, data)
    
    	// Add item to cache.
    	table.Lock()
    	table.addInternal(item)
    
    	return item
    }
    
    func (table *CacheTable) addInternal(item *CacheItem) {
    	// Careful: do not run this method unless the table-mutex is locked!
    	// It will unlock it for the caller before running the callbacks and checks
    	table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
    	table.items[item.key] = item
    
    	// Cache values so we don't keep blocking the mutex.
    	expDur := table.cleanupInterval
    	addedItem := table.addedItem
    	table.Unlock()
    
    	// Trigger callback after adding an item to cache.
    	if addedItem != nil {
    		for _, callback := range addedItem {
    			callback(item)
    		}
    	}
    
    	// If we haven't set up any expiration check timer or found a more imminent item.
    	if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
    		table.expirationCheck()
    	}
    }
    
    // NotFoundAdd checks whether an item is not yet cached. Unlike the Exists
    // method this also adds data if the key could not be found.
    func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool {
    	table.Lock()
    
    	if _, ok := table.items[key]; ok {
    		table.Unlock()
    		return false
    	}
    
    	item := NewCacheItem(key, lifeSpan, data)
    	table.addInternal(item)
    
    	return true
    }
    
    折叠

    expirationCheck 过期检查

    过期检查的处理,是一个值得学习的点,这里并不是我们印象中用循环定期扫描哪些 key 过期了,也不是给每个条目分别定义一个定时器。

    每次新增条目时,扫描得到最近过期条目的过期时间,仅定义一个定时器。该定时器触发时清除缓存,并生成下一个定时器,如此接力处理。

    过期检查中会调用方法 table.deleteInternal 来清除过期的 key,这块儿在讲 Delete 方法时会再详细分析。

    // Expiration check loop, triggered by a self-adjusting timer.
    func (table *CacheTable) expirationCheck() {
    	table.Lock()
    	if table.cleanupTimer != nil {
    		table.cleanupTimer.Stop()
    	}
    	if table.cleanupInterval > 0 {
    		table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
    	} else {
    		table.log("Expiration check installed for table", table.name)
    	}
    
    	// To be more accurate with timers, we would need to update 'now' on every
    	// loop iteration. Not sure it's really efficient though.
    	now := time.Now()
    	smallestDuration := 0 * time.Second
    	for key, item := range table.items {
    		// Cache values so we don't keep blocking the mutex.
    		item.RLock()
    		lifeSpan := item.lifeSpan
    		accessedOn := item.accessedOn
    		item.RUnlock()
    
    		if lifeSpan == 0 {
    			continue
    		}
    		if now.Sub(accessedOn) >= lifeSpan {
    			// Item has excessed its lifespan.
    			table.deleteInternal(key)
    		} else {
    			// Find the item chronologically closest to its end-of-lifespan.
    			if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
    				smallestDuration = lifeSpan - now.Sub(accessedOn)
    			}
    		}
    	}
    
    	// Setup the interval for the next cleanup run.
    	table.cleanupInterval = smallestDuration
    	if smallestDuration > 0 {
    		table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
    			go table.expirationCheck()
    		})
    	}
    	table.Unlock()
    }
    
    折叠

    Delete 方法

    从流程图可以看出,这块儿大部分逻辑是在加锁、释放锁,有这么多锁主要是有如下几个原因:

    • 一部分是表级别的,一部分是条目级别的;
    • 表级别锁出现两次获取与释放,这种实现主要是考虑到 deleteInternal 的复用性,同时支持 Delete 和 expirationCheck 的调用,做了一些锁回溯的逻辑。思考:假如 Mutex 是可重入锁,是不是不需要回溯处理了?

    // Delete an item from the cache.
    func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
    	table.Lock()
    	defer table.Unlock()
    
    	return table.deleteInternal(key)
    }
    
    func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
    	r, ok := table.items[key]
    	if !ok {
    		return nil, ErrKeyNotFound
    	}
    
    	// Cache value so we don't keep blocking the mutex.
    	aboutToDeleteItem := table.aboutToDeleteItem
    	table.Unlock()
    
    	// Trigger callbacks before deleting an item from cache.
    	if aboutToDeleteItem != nil {
    		for _, callback := range aboutToDeleteItem {
    			callback(r)
    		}
    	}
    
    	r.RLock()
    	defer r.RUnlock()
    	if r.aboutToExpire != nil {
    		for _, callback := range r.aboutToExpire {
    			callback(key)
    		}
    	}
    
    	table.Lock()
    	table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
    	delete(table.items, key)
    
    	return r, nil
    }
    
    折叠

    Value 取值

    取值本身是比较简单的,只不过这里要进行一些额外处理:

    • 取不值时,是否有自定义逻辑,比如降级查询后缓存进去。
    • 取到值时,更新访问时间,达到保活的目的。

    // Value returns an item from the cache and marks it to be kept alive. You can
    // pass additional arguments to your DataLoader callback function.
    func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
    	table.RLock()
    	r, ok := table.items[key]
    	loadData := table.loadData
    	table.RUnlock()
    
    	if ok {
    		// Update access counter and timestamp.
    		r.KeepAlive()
    		return r, nil
    	}
    
    	// Item doesn't exist in cache. Try and fetch it with a data-loader.
    	if loadData != nil {
    		item := loadData(key, args...)
    		if item != nil {
    			table.Add(key, item.lifeSpan, item.data)
    			return item, nil
    		}
    
    		return nil, ErrKeyNotFoundOrLoadable
    	}
    
    	return nil, ErrKeyNotFound
    }
    

    MostAccessed 最常访问的条目

    这个方法用到了前文提到的 CacheItemPair 和 CacheItemPairList。

    • 首先遍历条目,取出 key 和 accessCount 存储到 p 中,用来排序;
    • 接着用有序的 p 映射出所有条目的顺序,返回有序的条目。
    // MostAccessed returns the most accessed items in this cache table
    func (table *CacheTable) MostAccessed(count int64) []*CacheItem {
    	table.RLock()
    	defer table.RUnlock()
    
    	p := make(CacheItemPairList, len(table.items))
    	i := 0
    	for k, v := range table.items {
    		p[i] = CacheItemPair{k, v.accessCount}
    		i++
    	}
    	sort.Sort(p)
    
    	var r []*CacheItem
    	c := int64(0)
    	for _, v := range p {
    		if c >= count {
    			break
    		}
    
    		item, ok := table.items[v.Key]
    		if ok {
    			r = append(r, item)
    		}
    		c++
    	}
    
    	return r
    }
    
    折叠

    Foreach 方法

    为开发者提供更加丰富的自定义操作。

    // Foreach all items
    func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) {
    	table.RLock()
    	defer table.RUnlock()
    
    	for k, v := range table.items {
    		trans(k, v)
    	}
    }
    

    清空缓存

    清空缓存的方法比较简单,一方面是数据的清空,另一方面是定时器的清空。

    // Flush deletes all items from this cache table.
    func (table *CacheTable) Flush() {
    	table.Lock()
    	defer table.Unlock()
    
    	table.log("Flushing table", table.name)
    
    	table.items = make(map[interface{}]*CacheItem)
    	table.cleanupInterval = 0
    	if table.cleanupTimer != nil {
    		table.cleanupTimer.Stop()
    	}
    }
    

    查询相关方法

    Count 和 Exists 方法是比较简单的,不用多说。

    // Count returns how many items are currently stored in the cache.
    func (table *CacheTable) Count() int {
    	table.RLock()
    	defer table.RUnlock()
    	return len(table.items)
    }
    
    // Exists returns whether an item exists in the cache. Unlike the Value method
    // Exists neither tries to fetch data via the loadData callback nor does it
    // keep the item alive in the cache.
    func (table *CacheTable) Exists(key interface{}) bool {
    	table.RLock()
    	defer table.RUnlock()
    	_, ok := table.items[key]
    
    	return ok
    }
    

    Set 相关方法

    下面这些 Set 方法比较简单,也不多做赘述。

    // SetDataLoader configures a data-loader callback, which will be called when
    // trying to access a non-existing key. The key and 0...n additional arguments
    // are passed to the callback function.
    func (table *CacheTable) SetDataLoader(f func(interface{}, ...interface{}) *CacheItem) {
    	table.Lock()
    	defer table.Unlock()
    	table.loadData = f
    }
    
    // SetAddedItemCallback configures a callback, which will be called every time
    // a new item is added to the cache.
    func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) {
    	if len(table.addedItem) > 0 {
    		table.RemoveAddedItemCallbacks()
    	}
    	table.Lock()
    	defer table.Unlock()
    	table.addedItem = append(table.addedItem, f)
    }
    
    //AddAddedItemCallback appends a new callback to the addedItem queue
    func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) {
    	table.Lock()
    	defer table.Unlock()
    	table.addedItem = append(table.addedItem, f)
    }
    
    // SetAboutToDeleteItemCallback configures a callback, which will be called
    // every time an item is about to be removed from the cache.
    func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) {
    	if len(table.aboutToDeleteItem) > 0 {
    		table.RemoveAboutToDeleteItemCallback()
    	}
    	table.Lock()
    	defer table.Unlock()
    	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
    }
    
    // AddAboutToDeleteItemCallback appends a new callback to the AboutToDeleteItem queue
    func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) {
    	table.Lock()
    	defer table.Unlock()
    	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
    }
    
    // SetLogger sets the logger to be used by this cache table.
    func (table *CacheTable) SetLogger(logger *log.Logger) {
    	table.Lock()
    	defer table.Unlock()
    	table.logger = logger
    }
    
    折叠

    删除相关方法

    过于简单,不做赘述

    // RemoveAddedItemCallbacks empties the added item callback queue
    func (table *CacheTable) RemoveAddedItemCallbacks() {
    	table.Lock()
    	defer table.Unlock()
    	table.addedItem = nil
    }
    
    // RemoveAboutToDeleteItemCallback empties the about to delete item callback queue
    func (table *CacheTable) RemoveAboutToDeleteItemCallback() {
    	table.Lock()
    	defer table.Unlock()
    	table.aboutToDeleteItem = nil
    }
    

    cache.go

    Cache 函数是该缓存库的入口函数,该函数存在一段双检逻辑,需要特别了解下原因:

    • mutex.Lock() 获取锁过程中,可能另一协程已经完成了初始化。因此,需要再次校验。
    // Cache returns the existing cache table with given name or creates a new one
    // if the table does not exist yet.
    func Cache(table string) *CacheTable {
    	mutex.RLock()
    	t, ok := cache[table]
    	mutex.RUnlock()
    
    	if !ok {
    		mutex.Lock()
    		t, ok = cache[table]
    		// Double check whether the table exists or not.
    		if !ok {
    			t = &CacheTable{
    				name:  table,
    				items: make(map[interface{}]*CacheItem),
    			}
    			cache[table] = t
    		}
    		mutex.Unlock()
    	}
    
    	return t
    }
    

    examples

    样例也比较简单,读者可以自行阅读下。

    引用

    1. https://github.com/muesli/cache2go
    2. https://mp.weixin.qq.com/s/6JjL0KVccW7nAQiKuDAl-w
    3. https://mp.weixin.qq.com/s/gIvNjn7GdOQUwg1pKtDTEQ
    4. https://mp.weixin.qq.com/s/898HtDyFTykvMu2-vvMy-A
  • 相关阅读:
    Oracle管理表(创建、修改、删除)
    matlab运行CLBP程序为什么没有图?
    2022年高教社杯全国大学生数学建模竞赛-【赛题解析篇】D题:气象报文信息卫星通信传输
    2022.11.4 英语背诵
    Java基础11——抽象类和接口
    隐语 Meetup 北京站|精彩时刻大盘点!新品发布、行业案例、专家解读......欢迎围观
    移动支付新时代——低代码如何对接支付宝和微信支付
    Linux环境搭建Nginx+Tomcat负载均衡集群
    图像和图像处理
    【Selenium】提高测试&爬虫效率:Selenium与多线程的完美结合
  • 原文地址:https://www.cnblogs.com/zhouweixin/p/16538769.html