cache2go 是非常简短的 go 开源项目了,很适合作为第一个读源码项目。
如果你有一定的 go 开发经验,读起来会感觉到比较容易。
如果你刚刚接触 go 语音,基础知识还不完全了解,希望阅读本文时,遇到一个不会的知识点,去攻克一个,带着为了看懂本文源码的目的去学习基础知识。比如:
作者这样介绍:Concurrency-safe golang caching library with expiration capabilities,简单来说就是具有过期功能的并发安全 golang 缓存库,因此它具有两大特性:
该项目非常简单,全部逻辑由三个文件实现:
数据结构图:
接下来会自下而上地分析源码。
该文件中包含两块重要内容:
CacheItem 用来缓存表中的一个条目,属性解释:
源码如下:
// CacheItem is an individual cache item // Parameter data contains the user-set value in the cache. type CacheItem struct { sync.RWMutex // The item's key. key interface{} // The item's data. data interface{} // How long will the item live in the cache when not being accessed/kept alive. lifeSpan time.Duration // Creation timestamp. createdOn time.Time // Last access timestamp. accessedOn time.Time // How often the item was accessed. accessCount int64 // Callback method triggered right before removing the item from the cache aboutToExpire []func(key interface{}) }
下面是一些比较简单的 Get 方法,一些有写场景的属性会多两行获取锁与释放锁的代码。
// LifeSpan returns this item's expiration duration. func (item *CacheItem) LifeSpan() time.Duration { // immutable return item.lifeSpan } // AccessedOn returns when this item was last accessed. func (item *CacheItem) AccessedOn() time.Time { item.RLock() defer item.RUnlock() return item.accessedOn } // CreatedOn returns when this item was added to the cache. func (item *CacheItem) CreatedOn() time.Time { // immutable return item.createdOn } // AccessCount returns how often this item has been accessed. func (item *CacheItem) AccessCount() int64 { item.RLock() defer item.RUnlock() return item.accessCount } // Key returns the key of this cached item. func (item *CacheItem) Key() interface{} { // immutable return item.key } // Data returns the value of this cached item. func (item *CacheItem) Data() interface{} { // immutable return item.data }
保活函数:
// KeepAlive marks an item to be kept for another expireDuration period. func (item *CacheItem) KeepAlive() { item.Lock() defer item.Unlock() item.accessedOn = time.Now() item.accessCount++ }
新增回调函数,回调函数无返回值,仅有一个参数 interface{},即支持任意的参数。
// AddAboutToExpireCallback appends a new callback to the AboutToExpire queue func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) { item.Lock() defer item.Unlock() item.aboutToExpire = append(item.aboutToExpire, f) }
设置回调函数需要完全替代,不同于新增,需要先清空,再覆盖。
// SetAboutToExpireCallback configures a callback, which will be called right // before the item is about to be removed from the cache. func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) { if len(item.aboutToExpire) > 0 { item.RemoveAboutToExpireCallback() } item.Lock() defer item.Unlock() item.aboutToExpire = append(item.aboutToExpire, f) }
通过直接置空,删除所有的回调函数。
// RemoveAboutToExpireCallback empties the about to expire callback queue func (item *CacheItem) RemoveAboutToExpireCallback() { item.Lock() defer item.Unlock() item.aboutToExpire = nil }
创建 CacheItem 实例
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem { t := time.Now() return &CacheItem{ key: key, lifeSpan: lifeSpan, createdOn: t, accessedOn: t, accessCount: 0, aboutToExpire: nil, data: data, } }
该文件中总共有 3 个类:CacheTable、CacheItemPair 和 CacheItemPairList。
下面由简单到复杂逐个分析。
CacheItemPair 用来记录缓存访问的次数。
// CacheItemPair maps key to access counter type CacheItemPair struct { Key interface{} AccessCount int64 }
CacheItemPairList 是 CacheItemPair 的切片,通过实现方法 Swap、Len 和 Less 实现了 sort.Interface
,支持排序。
需要注意方法 Less 的实现,是元素 i 大于元素 j,这种实现是为了降序排序。降序排序是为了方法 CacheTable.MostAccessed 返回访问次数最多的条目列表。
// CacheItemPairList is a slice of CacheItemPairs that implements sort. // Interface to sort by AccessCount. type CacheItemPairList []CacheItemPair func (p CacheItemPairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p CacheItemPairList) Len() int { return len(p) } func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }
CacheTable 用来缓存一个表,属性解释:
源码如下:
// CacheTable is a table within the cache type CacheTable struct { sync.RWMutex // The table's name. name string // All cached items. items map[interface{}]*CacheItem // Timer responsible for triggering cleanup. cleanupTimer *time.Timer // Current timer duration. cleanupInterval time.Duration // The logger used for this table. logger *log.Logger // Callback method triggered when trying to load a non-existing key. loadData func(key interface{}, args ...interface{}) *CacheItem // Callback method triggered when adding a new item to the cache. addedItem []func(item *CacheItem) // Callback method triggered before deleting an item from the cache. aboutToDeleteItem []func(item *CacheItem) }
下面会先介绍核心方法,再看简单的方法。
代码逻辑通过流程图描述了一下,其中的「过期检查」单独抽出来后面分析。
NotFoundAdd 和 Add 核心逻辑是一样的,具体区别不做额外描述,源代码如下:
// Add adds a key/value pair to the cache. // Parameter key is the item's cache-key. // Parameter lifeSpan determines after which time period without an access the item // will get removed from the cache. // Parameter data is the item's value. func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem { item := NewCacheItem(key, lifeSpan, data) // Add item to cache. table.Lock() table.addInternal(item) return item } func (table *CacheTable) addInternal(item *CacheItem) { // Careful: do not run this method unless the table-mutex is locked! // It will unlock it for the caller before running the callbacks and checks table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name) table.items[item.key] = item // Cache values so we don't keep blocking the mutex. expDur := table.cleanupInterval addedItem := table.addedItem table.Unlock() // Trigger callback after adding an item to cache. if addedItem != nil { for _, callback := range addedItem { callback(item) } } // If we haven't set up any expiration check timer or found a more imminent item. if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) { table.expirationCheck() } } // NotFoundAdd checks whether an item is not yet cached. Unlike the Exists // method this also adds data if the key could not be found. func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool { table.Lock() if _, ok := table.items[key]; ok { table.Unlock() return false } item := NewCacheItem(key, lifeSpan, data) table.addInternal(item) return true }
过期检查的处理,是一个值得学习的点,这里并不是我们印象中用循环定期扫描哪些 key 过期了,也不是给每个条目分别定义一个定时器。
每次新增条目时,扫描得到最近过期条目的过期时间,仅定义一个定时器。该定时器触发时清除缓存,并生成下一个定时器,如此接力处理。
过期检查中会调用方法 table.deleteInternal 来清除过期的 key,这块儿在讲 Delete 方法时会再详细分析。
// Expiration check loop, triggered by a self-adjusting timer. func (table *CacheTable) expirationCheck() { table.Lock() if table.cleanupTimer != nil { table.cleanupTimer.Stop() } if table.cleanupInterval > 0 { table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name) } else { table.log("Expiration check installed for table", table.name) } // To be more accurate with timers, we would need to update 'now' on every // loop iteration. Not sure it's really efficient though. now := time.Now() smallestDuration := 0 * time.Second for key, item := range table.items { // Cache values so we don't keep blocking the mutex. item.RLock() lifeSpan := item.lifeSpan accessedOn := item.accessedOn item.RUnlock() if lifeSpan == 0 { continue } if now.Sub(accessedOn) >= lifeSpan { // Item has excessed its lifespan. table.deleteInternal(key) } else { // Find the item chronologically closest to its end-of-lifespan. if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration { smallestDuration = lifeSpan - now.Sub(accessedOn) } } } // Setup the interval for the next cleanup run. table.cleanupInterval = smallestDuration if smallestDuration > 0 { table.cleanupTimer = time.AfterFunc(smallestDuration, func() { go table.expirationCheck() }) } table.Unlock() }
从流程图可以看出,这块儿大部分逻辑是在加锁、释放锁,有这么多锁主要是有如下几个原因:
// Delete an item from the cache. func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) { table.Lock() defer table.Unlock() return table.deleteInternal(key) } func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) { r, ok := table.items[key] if !ok { return nil, ErrKeyNotFound } // Cache value so we don't keep blocking the mutex. aboutToDeleteItem := table.aboutToDeleteItem table.Unlock() // Trigger callbacks before deleting an item from cache. if aboutToDeleteItem != nil { for _, callback := range aboutToDeleteItem { callback(r) } } r.RLock() defer r.RUnlock() if r.aboutToExpire != nil { for _, callback := range r.aboutToExpire { callback(key) } } table.Lock() table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name) delete(table.items, key) return r, nil }
取值本身是比较简单的,只不过这里要进行一些额外处理:
// Value returns an item from the cache and marks it to be kept alive. You can // pass additional arguments to your DataLoader callback function. func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) { table.RLock() r, ok := table.items[key] loadData := table.loadData table.RUnlock() if ok { // Update access counter and timestamp. r.KeepAlive() return r, nil } // Item doesn't exist in cache. Try and fetch it with a data-loader. if loadData != nil { item := loadData(key, args...) if item != nil { table.Add(key, item.lifeSpan, item.data) return item, nil } return nil, ErrKeyNotFoundOrLoadable } return nil, ErrKeyNotFound }
这个方法用到了前文提到的 CacheItemPair 和 CacheItemPairList。
// MostAccessed returns the most accessed items in this cache table func (table *CacheTable) MostAccessed(count int64) []*CacheItem { table.RLock() defer table.RUnlock() p := make(CacheItemPairList, len(table.items)) i := 0 for k, v := range table.items { p[i] = CacheItemPair{k, v.accessCount} i++ } sort.Sort(p) var r []*CacheItem c := int64(0) for _, v := range p { if c >= count { break } item, ok := table.items[v.Key] if ok { r = append(r, item) } c++ } return r }
为开发者提供更加丰富的自定义操作。
// Foreach all items func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) { table.RLock() defer table.RUnlock() for k, v := range table.items { trans(k, v) } }
清空缓存的方法比较简单,一方面是数据的清空,另一方面是定时器的清空。
// Flush deletes all items from this cache table. func (table *CacheTable) Flush() { table.Lock() defer table.Unlock() table.log("Flushing table", table.name) table.items = make(map[interface{}]*CacheItem) table.cleanupInterval = 0 if table.cleanupTimer != nil { table.cleanupTimer.Stop() } }
Count 和 Exists 方法是比较简单的,不用多说。
// Count returns how many items are currently stored in the cache. func (table *CacheTable) Count() int { table.RLock() defer table.RUnlock() return len(table.items) } // Exists returns whether an item exists in the cache. Unlike the Value method // Exists neither tries to fetch data via the loadData callback nor does it // keep the item alive in the cache. func (table *CacheTable) Exists(key interface{}) bool { table.RLock() defer table.RUnlock() _, ok := table.items[key] return ok }
下面这些 Set 方法比较简单,也不多做赘述。
// SetDataLoader configures a data-loader callback, which will be called when // trying to access a non-existing key. The key and 0...n additional arguments // are passed to the callback function. func (table *CacheTable) SetDataLoader(f func(interface{}, ...interface{}) *CacheItem) { table.Lock() defer table.Unlock() table.loadData = f } // SetAddedItemCallback configures a callback, which will be called every time // a new item is added to the cache. func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) { if len(table.addedItem) > 0 { table.RemoveAddedItemCallbacks() } table.Lock() defer table.Unlock() table.addedItem = append(table.addedItem, f) } //AddAddedItemCallback appends a new callback to the addedItem queue func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) { table.Lock() defer table.Unlock() table.addedItem = append(table.addedItem, f) } // SetAboutToDeleteItemCallback configures a callback, which will be called // every time an item is about to be removed from the cache. func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) { if len(table.aboutToDeleteItem) > 0 { table.RemoveAboutToDeleteItemCallback() } table.Lock() defer table.Unlock() table.aboutToDeleteItem = append(table.aboutToDeleteItem, f) } // AddAboutToDeleteItemCallback appends a new callback to the AboutToDeleteItem queue func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) { table.Lock() defer table.Unlock() table.aboutToDeleteItem = append(table.aboutToDeleteItem, f) } // SetLogger sets the logger to be used by this cache table. func (table *CacheTable) SetLogger(logger *log.Logger) { table.Lock() defer table.Unlock() table.logger = logger }
过于简单,不做赘述
// RemoveAddedItemCallbacks empties the added item callback queue func (table *CacheTable) RemoveAddedItemCallbacks() { table.Lock() defer table.Unlock() table.addedItem = nil } // RemoveAboutToDeleteItemCallback empties the about to delete item callback queue func (table *CacheTable) RemoveAboutToDeleteItemCallback() { table.Lock() defer table.Unlock() table.aboutToDeleteItem = nil }
Cache 函数是该缓存库的入口函数,该函数存在一段双检逻辑,需要特别了解下原因:
// Cache returns the existing cache table with given name or creates a new one // if the table does not exist yet. func Cache(table string) *CacheTable { mutex.RLock() t, ok := cache[table] mutex.RUnlock() if !ok { mutex.Lock() t, ok = cache[table] // Double check whether the table exists or not. if !ok { t = &CacheTable{ name: table, items: make(map[interface{}]*CacheItem), } cache[table] = t } mutex.Unlock() } return t }
样例也比较简单,读者可以自行阅读下。