cache2go是一款由golang实现的本地缓存库,提供并发安全的读写操作,具有过期时间控制等特性。项目地址:https://github.com/muesli/cache2go
go get github.com/muesli/cache2go
package main
import (
"github.com/muesli/cache2go"
"log"
"time"
)
type Item struct {
Name string `json:"name"`
Prices int64 `json:"prices"`
Stocks int64 `json:"stocks"`
}
func basicOpTest() {
// 初始化itemCache本地缓存
itemCache := cache2go.Cache("itemCache")
item := &Item{
Name: "MacBookPro",
Prices: 10000,
Stocks: 1,
}
// 添加item1缓存,过期时间为5秒钟
itemCache.Add("item1", 5*time.Second, item)
// 读取item1缓存
if v, err := itemCache.Value("item1"); err != nil {
log.Printf("item1 err = %v", err)
} else {
log.Printf("读取item1缓存:%#v", v.Data())
}
// 睡眠6s后读取
time.Sleep(6 * time.Second)
if v, err := itemCache.Value("item1"); err != nil {
log.Printf("item1 err = %v", err)
} else {
log.Printf("6s后读取item1缓存:%#v", v.Data())
}
// 添加item2,不设置过期时间
itemCache.Add("item2", 0, item)
// 读取item2缓存
if v, err := itemCache.Value("item2"); err != nil {
log.Printf("item2 err = %v", err)
} else {
log.Printf("读取item2缓存:%#v", v.Data())
}
// 删除掉item2缓存
itemCache.Delete("item2")
// 再读取item2缓存
if v, err := itemCache.Value("item2"); err != nil {
log.Printf("item2 err = %v", err)
} else {
log.Printf("读取item2缓存:%#v", v.Data())
}
// 添加item3缓存,并删除所有缓存
itemCache.Add("item3", 0, item)
itemCache.Flush()
// 读取item3缓存
if v, err := itemCache.Value("item3"); err != nil {
log.Printf("item3 err = %v", err)
} else {
log.Printf("读取item3缓存:%#v", v.Data())
}
}
运行结果:
2022/10/17 20:52:00 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item1 err = Key not found in cache
2022/10/17 20:52:06 读取item2缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item2 err = Key not found in cache
2022/10/17 20:52:06 item3 err = Key not found in cache
func callBackTest() {
// 初始化itemCache本地缓存
itemCache := cache2go.Cache("itemCache")
// 设置各操作回调函数
itemCache.AddAddedItemCallback(func(item *cache2go.CacheItem) {
log.Printf("added callback, item = %#v", item)
})
itemCache.AddAboutToDeleteItemCallback(func(item *cache2go.CacheItem) {
log.Printf("deleted callback, item = %#v", item)
})
item := itemCache.Add("expire_item", 1*time.Second, Item{
Name: "expire_item",
Prices: 1,
Stocks: 1,
})
item.AddAboutToExpireCallback(func(item interface{}) {
log.Printf("expired callback, item = %#v", item)
})
// 执行基本操作
basicOpTest()
}
输出结果
2022/10/17 21:12:09 added callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:12:09 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 21:12:10 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"expire_item", data:main.Item{Name:"expire_item", Prices:1, Stocks:1}, lifeSpan:1000000000, createdOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {}){(func(interface {}))(0x10a0530)}}
2022/10/17 21:12:10 expired callback, item = "expire_item"
2022/10/17 21:12:14 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55eaa820, ext:755728, loc:(*time.Location)(0x1187880)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
// ...
func dataLoaderTest() {
// 初始化itemCache本地缓存
redisItemCache := cache2go.Cache("redisItemCache")
// 设置自定义的cache加载逻辑
redisItemCache.SetDataLoader(func(key interface{}, args ...interface{}) *cache2go.CacheItem {
// 如果是redis开头的key,先从redis中获取
if strings.HasPrefix(key.(string), "redis") {
return cache2go.NewCacheItem(key, 0, Item{
Name: "redis_item",
})
}
return nil
})
// 写入一条数据
redisItemCache.Add("item1", 0, Item{
Name: "item1",
})
item1, _ := redisItemCache.Value("item1")
log.Printf("item1 = %#v", item1)
redisItem, _ := redisItemCache.Value("redis_item")
log.Printf("redisItem = %#v", redisItem)
}
输出结果
2022/10/17 21:59:37 item1 = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:main.Item{Name:"item1", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601954b8, ext:492934, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d260196840, ext:497913, loc:(*time.Location)(0x11858c0)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:59:37 redisItem = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"redis_item", data:main.Item{Name:"redis_item", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
核心代码文件为:
type CacheTable struct {
sync.RWMutex
// The table's name.
name string
// All cached items.
items map[interface{}]*CacheItem
// Timer responsible for triggering cleanup.
cleanupTimer *time.Timer
// Current timer duration.
cleanupInterval time.Duration
// The logger used for this table.
logger *log.Logger
// Callback method triggered when trying to load a non-existing key.
loadData func(key interface{}, args ...interface{}) *CacheItem
// Callback method triggered when adding a new item to the cache.
addedItem []func(item *CacheItem)
// Callback method triggered before deleting an item from the cache.
aboutToDeleteItem []func(item *CacheItem)
}
type CacheItem struct {
sync.RWMutex
// The item's key.
key interface{}
// The item's data.
data interface{}
// How long will the item live in the cache when not being accessed/kept alive.
lifeSpan time.Duration
// Creation timestamp.
createdOn time.Time
// Last access timestamp.
accessedOn time.Time
// How often the item was accessed.
accessCount int64
// Callback method triggered right before removing the item from the cache
aboutToExpire []func(key interface{})
}
位于cache.go文件,维护了全局CacheTable Map。
var (
// 全局cache map
cache = make(map[string]*CacheTable)
// cache map 读写锁
mutex sync.RWMutex
)
// 从cache map中获取对应的CacheTable,不存在则创建新的
func Cache(table string) *CacheTable {
// 先上读锁,获取cacheTable
mutex.RLock()
t, ok := cache[table]
mutex.RUnlock()
// 不存在,则新建
if !ok {
// 写操作需要上写锁
mutex.Lock()
t, ok = cache[table]
// 双重校验是否存在
if !ok {
// 不存在则新建cacheTable
t = &CacheTable{
name: table,
items: make(map[interface{}]*CacheItem),
}
cache[table] = t
}
mutex.Unlock()
}
return t
}
位于cachetable.go文件,是CacheTable结构体的方法之一,实现了添加KV缓存的逻辑。
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
// 封装一个item
item := NewCacheItem(key, lifeSpan, data)
// 锁表,将item添加进去
table.Lock()
table.addInternal(item)
return item
}
func (table *CacheTable) addInternal(item *CacheItem) {
// 添加kv值到map中
table.items[item.key] = item
expDur := table.cleanupInterval
addedItem := table.addedItem
// 添加完成解除写锁
table.Unlock()
// 触发Add回调函数
if addedItem != nil {
for _, callback := range addedItem {
callback(item)
}
}
// 如果一个item有设置过期时间,且比检查失效间隔小,则进行过期key清理(懒加载思想,只有存在这类Key才会启动清理,而不是定时任务)
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
}
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}
now := time.Now()
smallestDuration := 0 * time.Second
for key, item := range table.items {
// 遍历该table下的所有items
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// 该item已超出存活时间,删除key
table.deleteInternal(key)
} else {
// 找到最小的需要过期的item,计算最优时间间隔
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}
// 在最优时间间隔后启动定时任务检查table的过期key
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck()
})
}
table.Unlock()
}
Value用于读取Key匹配的CacheItem。
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
table.RLock()
// 先尝试从items中获取该item
r, ok := table.items[key]
loadData := table.loadData
table.RUnlock()
if ok {
// 如果存在,则更新该item的accessOn时间和accessCount计数
r.KeepAlive()
return r, nil
}
// 如果不存在,则从loadData自定义加载函数中尝试获取
if loadData != nil {
item := loadData(key, args...)
if item != nil {
// 如果自定义加载函数中存在该item,则添加到table中并返回
table.Add(key, item.lifeSpan, item.data)
return item, nil
}
return nil, ErrKeyNotFoundOrLoadable
}
return nil, ErrKeyNotFound
}
Delete函数用于删除指定Key的Item。
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.Lock()
defer table.Unlock()
return table.deleteInternal(key)
}
func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
// 判断key是否存在
r, ok := table.items[key]
if !ok {
return nil, ErrKeyNotFound
}
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()
// 先触发删除回调函数
if aboutToDeleteItem != nil {
for _, callback := range aboutToDeleteItem {
callback(r)
}
}
r.RLock()
defer r.RUnlock()
// 触发item的过期回调函数
if r.aboutToExpire != nil {
for _, callback := range r.aboutToExpire {
callback(key)
}
}
table.Lock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
// 将item从table中删除
delete(table.items, key)
return r, nil
}
清空整个table的cache。
func (table *CacheTable) Flush() {
table.Lock()
defer table.Unlock()
table.log("Flushing table", table.name)
// 直接将items重新初始化
table.items = make(map[interface{}]*CacheItem)
table.cleanupInterval = 0
if table.cleanupTimer != nil {
// 如果此时还有清理过期定时器,则终止其运行
table.cleanupTimer.Stop()
}
}
cache2go这个项目写得很精简,本质上就是使用到了map来作为本地缓存kv存储结构,但是有一些值得学习的地方: