本文主要研究一下storagetapper的cache
storagetapper/pipe/cache.go
type cacheEntry struct { pipe Pipe cfg config.PipeConfig } var cache map[string]cacheEntry var lock sync.Mutex
cache是一个cacheEntry的map,cacheEntry定义了Pipe和config.PipeConfig
storagetapper/pipe/cache.go
// CacheGet returns an instance of pipe with specified config from cache or // creates new one if it's not in the cache yet func CacheGet(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { lock.Lock() defer lock.Unlock() if cache == nil { cache = make(map[string]cacheEntry) } b, err := json.Marshal(cfg) if err != nil { return nil, err } h := sha256.New() _, _ = h.Write([]byte(pipeType + "$$$" + fmt.Sprintf("%p", db) + "$$$")) _, _ = h.Write(b) hs := fmt.Sprintf("%0x", h.Sum(nil)) p, ok := cache[hs] if ok && reflect.DeepEqual(cfg, &p.cfg) { return p.pipe, nil } //FIXME: Implement proper collisions handling np, err := Create(pipeType, cfg, db) if err != nil { return nil, err } cache[hs] = cacheEntry{np, *cfg} log.Debugf("Created and cached new '%v' pipe (hash %v) with config: %+v. Cache size %v", pipeType, hs, *cfg, len(cache)) return np, nil }
CacheGet方法加锁操作cache,首先通过sha256来对pipeType及db来作为cache的key,然后取出cacheEntry,若存在则判断cfg与cacheEntry的cfg是否一样,如果一样则返回cacheEntry的pipe;否则通过Create创建Pipe,然后放入cache中
storagetapper/pipe/cache.go
// CacheDestroy releases all resources associated with cached pipes func CacheDestroy() { lock.Lock() defer lock.Unlock() for h, p := range cache { log.Debugf("Closing %v pipe (hash %v) with config %+v", p.pipe.Type(), h, p.cfg) log.E(p.pipe.Close()) } cache = nil }
CacheDestroy方法通过加锁遍历cache,挨个执行pipe.Close()
storagetapper的cache是一个cacheEntry的map,cacheEntry定义了Pipe和config.PipeConfig;CacheGet方法会获取cache,获取不到则Create;CacheDestroy则会通过加锁遍历cache,挨个执行pipe.Close()。